@@ -19,6 +19,7 @@ public class CopyJobSession extends AbstractJobSession {
19
19
protected AtomicLong readCounter = new AtomicLong (0 );
20
20
protected AtomicLong skippedCounter = new AtomicLong (0 );
21
21
protected AtomicLong writeCounter = new AtomicLong (0 );
22
+ protected AtomicLong errorCounter = new AtomicLong (0 );
22
23
23
24
protected CopyJobSession (CqlSession sourceSession , CqlSession astraSession , SparkConf sc ) {
24
25
super (sourceSession , astraSession , sc );
@@ -44,8 +45,13 @@ public static CopyJobSession getInstance(CqlSession sourceSession, CqlSession as
44
45
public void getDataAndInsert (BigInteger min , BigInteger max ) {
45
46
logger .info ("ThreadID: {} Processing min: {} max: {}" , Thread .currentThread ().getId (), min , max );
46
47
int maxAttempts = maxRetries ;
47
- for ( int retryCount = 1 ; retryCount <= maxAttempts ; retryCount ++) {
48
+ boolean done = false ;
48
49
50
+ for (int retryCount = 1 ; retryCount <= maxAttempts && !done ; retryCount ++) {
51
+ long readCnt = 0 ;
52
+ long writeCnt = 0 ;
53
+ long skipCnt = 0 ;
54
+ long errCnt = 0 ;
49
55
try {
50
56
ResultSet resultSet = sourceSession .execute (sourceSelectStatement .bind (hasRandomPartitioner ?
51
57
min : min .longValueExact (), hasRandomPartitioner ? max : max .longValueExact ())
@@ -59,67 +65,66 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
59
65
if (batchSize == 1 || writeTimeStampFilter || isCounterTable ) {
60
66
for (Row sourceRow : resultSet ) {
61
67
readLimiter .acquire (1 );
68
+ readCnt ++;
69
+ if (readCnt % printStatsAfter == 0 ) {
70
+ printCounts (false );
71
+ }
62
72
63
73
if (filterData ) {
64
74
String col = (String ) getData (new MigrateDataType (filterColType ), filterColIndex , sourceRow );
65
75
if (col .trim ().equalsIgnoreCase (filterColValue )) {
66
76
logger .warn ("Skipping row and filtering out: {}" , getKey (sourceRow ));
67
- skippedCounter . incrementAndGet () ;
77
+ skipCnt ++ ;
68
78
continue ;
69
79
}
70
80
}
71
-
72
81
if (writeTimeStampFilter ) {
73
82
// only process rows greater than writeTimeStampFilter
74
83
Long sourceWriteTimeStamp = getLargestWriteTimeStamp (sourceRow );
75
84
if (sourceWriteTimeStamp < minWriteTimeStampFilter
76
85
|| sourceWriteTimeStamp > maxWriteTimeStampFilter ) {
77
- readCounter .incrementAndGet ();
78
- skippedCounter .incrementAndGet ();
86
+ skipCnt ++;
79
87
continue ;
80
88
}
81
89
}
82
-
83
90
writeLimiter .acquire (1 );
84
- if (readCounter .incrementAndGet () % printStatsAfter == 0 ) {
85
- printCounts (false );
86
- }
91
+
87
92
Row astraRow = null ;
88
93
if (isCounterTable ) {
89
94
ResultSet astraReadResultSet = astraSession
90
95
.execute (selectFromAstra (astraSelectStatement , sourceRow ));
91
96
astraRow = astraReadResultSet .one ();
92
97
}
93
98
94
-
95
99
CompletionStage <AsyncResultSet > astraWriteResultSet = astraSession
96
100
.executeAsync (bindInsert (astraInsertStatement , sourceRow , astraRow ));
97
101
writeResults .add (astraWriteResultSet );
98
102
if (writeResults .size () > fetchSizeInRows ) {
99
- iterateAndClearWriteResults (writeResults , 1 );
103
+ writeCnt += iterateAndClearWriteResults (writeResults , 1 );
100
104
}
101
105
}
102
106
103
107
// clear the write resultset
104
- iterateAndClearWriteResults (writeResults , 1 );
108
+ writeCnt += iterateAndClearWriteResults (writeResults , 1 );
105
109
} else {
106
110
BatchStatement batchStatement = BatchStatement .newInstance (BatchType .UNLOGGED );
107
111
for (Row sourceRow : resultSet ) {
108
112
readLimiter .acquire (1 );
109
- writeLimiter . acquire ( 1 ) ;
110
- if (readCounter . incrementAndGet () % printStatsAfter == 0 ) {
113
+ readCnt ++ ;
114
+ if (readCnt % printStatsAfter == 0 ) {
111
115
printCounts (false );
112
116
}
113
117
114
118
if (filterData ) {
115
119
String colValue = (String ) getData (new MigrateDataType (filterColType ), filterColIndex , sourceRow );
116
120
if (colValue .trim ().equalsIgnoreCase (filterColValue )) {
117
121
logger .warn ("Skipping row and filtering out: {}" , getKey (sourceRow ));
118
- skippedCounter . incrementAndGet () ;
122
+ skipCnt ++ ;
119
123
continue ;
120
124
}
121
125
}
122
126
127
+ writeLimiter .acquire (1 );
123
128
batchStatement = batchStatement .add (bindInsert (astraInsertStatement , sourceRow , null ));
124
129
125
130
// if batch threshold is met, send the writes and clear the batch
@@ -130,27 +135,37 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
130
135
}
131
136
132
137
if (writeResults .size () * batchSize > fetchSizeInRows ) {
133
- iterateAndClearWriteResults (writeResults , batchSize );
138
+ writeCnt += iterateAndClearWriteResults (writeResults , batchSize );
134
139
}
135
140
}
136
141
137
142
// clear the write resultset
138
- iterateAndClearWriteResults (writeResults , batchSize );
143
+ writeCnt += iterateAndClearWriteResults (writeResults , batchSize );
139
144
140
145
// if there are any pending writes because the batchSize threshold was not met, then write and clear them
141
146
if (batchStatement .size () > 0 ) {
142
147
CompletionStage <AsyncResultSet > writeResultSet = astraSession .executeAsync (batchStatement );
143
148
writeResults .add (writeResultSet );
144
- iterateAndClearWriteResults (writeResults , batchStatement .size ());
149
+ writeCnt += iterateAndClearWriteResults (writeResults , batchStatement .size ());
145
150
batchStatement = BatchStatement .newInstance (BatchType .UNLOGGED );
146
151
}
147
152
}
148
153
149
- retryCount = maxAttempts ;
154
+ readCounter .addAndGet (readCnt );
155
+ writeCounter .addAndGet (writeCnt );
156
+ skippedCounter .addAndGet (skipCnt );
157
+ done = true ;
150
158
} catch (Exception e ) {
159
+ if (retryCount == maxAttempts ) {
160
+ readCounter .addAndGet (readCnt );
161
+ writeCounter .addAndGet (writeCnt );
162
+ skippedCounter .addAndGet (skipCnt );
163
+ errorCounter .addAndGet (readCnt - writeCnt - skipCnt );
164
+ }
151
165
logger .error ("Error occurred retry#: {}" , retryCount , e );
152
166
logger .error ("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Retry# {}" ,
153
167
Thread .currentThread ().getId (), min , max , retryCount );
168
+ logger .error ("Error stats Read#: {}, Wrote#: {}, Skipped#: {}, Error#: {}" , readCnt , writeCnt , skipCnt , (readCnt - writeCnt - skipCnt ));
154
169
}
155
170
}
156
171
}
@@ -164,18 +179,22 @@ public synchronized void printCounts(boolean isFinal) {
164
179
logger .info ("{} Read Record Count: {}" , msg , readCounter .get ());
165
180
logger .info ("{} Skipped Record Count: {}" , msg , skippedCounter .get ());
166
181
logger .info ("{} Write Record Count: {}" , msg , writeCounter .get ());
182
+ logger .info ("{} Error Record Count: {}" , msg , errorCounter .get ());
167
183
if (isFinal ) {
168
184
logger .info ("################################################################################################" );
169
185
}
170
186
}
171
187
172
- private void iterateAndClearWriteResults (Collection <CompletionStage <AsyncResultSet >> writeResults , int incrementBy ) throws Exception {
188
+ private int iterateAndClearWriteResults (Collection <CompletionStage <AsyncResultSet >> writeResults , int incrementBy ) throws Exception {
189
+ int cnt = 0 ;
173
190
for (CompletionStage <AsyncResultSet > writeResult : writeResults ) {
174
191
//wait for the writes to complete for the batch. The Retry policy, if defined, should retry the write on timeouts.
175
192
writeResult .toCompletableFuture ().get ().one ();
176
- writeCounter . addAndGet ( incrementBy ) ;
193
+ cnt += incrementBy ;
177
194
}
178
195
writeResults .clear ();
196
+
197
+ return cnt ;
179
198
}
180
199
181
200
}
0 commit comments