Skip to content

Commit b34aba6

Browse files
committed
Fixed incorrect count issue caused when retries happen due to an error. Also included error-count to report possible partition-ranges that could not be moved.
1 parent 7a524a7 commit b34aba6

File tree

3 files changed

+42
-23
lines changed

3 files changed

+42
-23
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>2.11.0</version>
6+
<version>2.11.1</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public class CopyJobSession extends AbstractJobSession {
1919
protected AtomicLong readCounter = new AtomicLong(0);
2020
protected AtomicLong skippedCounter = new AtomicLong(0);
2121
protected AtomicLong writeCounter = new AtomicLong(0);
22+
protected AtomicLong errorCounter = new AtomicLong(0);
2223

2324
protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
2425
super(sourceSession, astraSession, sc);
@@ -44,8 +45,13 @@ public static CopyJobSession getInstance(CqlSession sourceSession, CqlSession as
4445
public void getDataAndInsert(BigInteger min, BigInteger max) {
4546
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
4647
int maxAttempts = maxRetries;
47-
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
48+
boolean done = false;
4849

50+
for (int retryCount = 1; retryCount <= maxAttempts && !done; retryCount++) {
51+
long readCnt = 0;
52+
long writeCnt = 0;
53+
long skipCnt = 0;
54+
long errCnt = 0;
4955
try {
5056
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ?
5157
min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact())
@@ -59,67 +65,66 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
5965
if (batchSize == 1 || writeTimeStampFilter || isCounterTable) {
6066
for (Row sourceRow : resultSet) {
6167
readLimiter.acquire(1);
68+
readCnt++;
69+
if (readCnt % printStatsAfter == 0) {
70+
printCounts(false);
71+
}
6272

6373
if (filterData) {
6474
String col = (String) getData(new MigrateDataType(filterColType), filterColIndex, sourceRow);
6575
if (col.trim().equalsIgnoreCase(filterColValue)) {
6676
logger.warn("Skipping row and filtering out: {}", getKey(sourceRow));
67-
skippedCounter.incrementAndGet();
77+
skipCnt++;
6878
continue;
6979
}
7080
}
71-
7281
if (writeTimeStampFilter) {
7382
// only process rows greater than writeTimeStampFilter
7483
Long sourceWriteTimeStamp = getLargestWriteTimeStamp(sourceRow);
7584
if (sourceWriteTimeStamp < minWriteTimeStampFilter
7685
|| sourceWriteTimeStamp > maxWriteTimeStampFilter) {
77-
readCounter.incrementAndGet();
78-
skippedCounter.incrementAndGet();
86+
skipCnt++;
7987
continue;
8088
}
8189
}
82-
8390
writeLimiter.acquire(1);
84-
if (readCounter.incrementAndGet() % printStatsAfter == 0) {
85-
printCounts(false);
86-
}
91+
8792
Row astraRow = null;
8893
if (isCounterTable) {
8994
ResultSet astraReadResultSet = astraSession
9095
.execute(selectFromAstra(astraSelectStatement, sourceRow));
9196
astraRow = astraReadResultSet.one();
9297
}
9398

94-
9599
CompletionStage<AsyncResultSet> astraWriteResultSet = astraSession
96100
.executeAsync(bindInsert(astraInsertStatement, sourceRow, astraRow));
97101
writeResults.add(astraWriteResultSet);
98102
if (writeResults.size() > fetchSizeInRows) {
99-
iterateAndClearWriteResults(writeResults, 1);
103+
writeCnt += iterateAndClearWriteResults(writeResults, 1);
100104
}
101105
}
102106

103107
// clear the write resultset
104-
iterateAndClearWriteResults(writeResults, 1);
108+
writeCnt += iterateAndClearWriteResults(writeResults, 1);
105109
} else {
106110
BatchStatement batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
107111
for (Row sourceRow : resultSet) {
108112
readLimiter.acquire(1);
109-
writeLimiter.acquire(1);
110-
if (readCounter.incrementAndGet() % printStatsAfter == 0) {
113+
readCnt++;
114+
if (readCnt % printStatsAfter == 0) {
111115
printCounts(false);
112116
}
113117

114118
if (filterData) {
115119
String colValue = (String) getData(new MigrateDataType(filterColType), filterColIndex, sourceRow);
116120
if (colValue.trim().equalsIgnoreCase(filterColValue)) {
117121
logger.warn("Skipping row and filtering out: {}", getKey(sourceRow));
118-
skippedCounter.incrementAndGet();
122+
skipCnt++;
119123
continue;
120124
}
121125
}
122126

127+
writeLimiter.acquire(1);
123128
batchStatement = batchStatement.add(bindInsert(astraInsertStatement, sourceRow, null));
124129

125130
// if batch threshold is met, send the writes and clear the batch
@@ -130,27 +135,37 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
130135
}
131136

132137
if (writeResults.size() * batchSize > fetchSizeInRows) {
133-
iterateAndClearWriteResults(writeResults, batchSize);
138+
writeCnt += iterateAndClearWriteResults(writeResults, batchSize);
134139
}
135140
}
136141

137142
// clear the write resultset
138-
iterateAndClearWriteResults(writeResults, batchSize);
143+
writeCnt += iterateAndClearWriteResults(writeResults, batchSize);
139144

140145
// if there are any pending writes because the batchSize threshold was not met, then write and clear them
141146
if (batchStatement.size() > 0) {
142147
CompletionStage<AsyncResultSet> writeResultSet = astraSession.executeAsync(batchStatement);
143148
writeResults.add(writeResultSet);
144-
iterateAndClearWriteResults(writeResults, batchStatement.size());
149+
writeCnt += iterateAndClearWriteResults(writeResults, batchStatement.size());
145150
batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
146151
}
147152
}
148153

149-
retryCount = maxAttempts;
154+
readCounter.addAndGet(readCnt);
155+
writeCounter.addAndGet(writeCnt);
156+
skippedCounter.addAndGet(skipCnt);
157+
done = true;
150158
} catch (Exception e) {
159+
if (retryCount == maxAttempts) {
160+
readCounter.addAndGet(readCnt);
161+
writeCounter.addAndGet(writeCnt);
162+
skippedCounter.addAndGet(skipCnt);
163+
errorCounter.addAndGet(readCnt - writeCnt - skipCnt);
164+
}
151165
logger.error("Error occurred retry#: {}", retryCount, e);
152166
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Retry# {}",
153167
Thread.currentThread().getId(), min, max, retryCount);
168+
logger.error("Error stats Read#: {}, Wrote#: {}, Skipped#: {}, Error#: {}", readCnt, writeCnt, skipCnt, (readCnt - writeCnt - skipCnt));
154169
}
155170
}
156171
}
@@ -164,18 +179,22 @@ public synchronized void printCounts(boolean isFinal) {
164179
logger.info("{} Read Record Count: {}", msg, readCounter.get());
165180
logger.info("{} Skipped Record Count: {}", msg, skippedCounter.get());
166181
logger.info("{} Write Record Count: {}", msg, writeCounter.get());
182+
logger.info("{} Error Record Count: {}", msg, errorCounter.get());
167183
if (isFinal) {
168184
logger.info("################################################################################################");
169185
}
170186
}
171187

172-
private void iterateAndClearWriteResults(Collection<CompletionStage<AsyncResultSet>> writeResults, int incrementBy) throws Exception {
188+
private int iterateAndClearWriteResults(Collection<CompletionStage<AsyncResultSet>> writeResults, int incrementBy) throws Exception {
189+
int cnt = 0;
173190
for (CompletionStage<AsyncResultSet> writeResult : writeResults) {
174191
//wait for the writes to complete for the batch. The Retry policy, if defined, should retry the write on timeouts.
175192
writeResult.toCompletableFuture().get().one();
176-
writeCounter.addAndGet(incrementBy);
193+
cnt += incrementBy;
177194
}
178195
writeResults.clear();
196+
197+
return cnt;
179198
}
180199

181200
}

src/resources/sparkConf.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ spark.target.keyspaceTable test.a2
1010
spark.target.autocorrect.missing false
1111
spark.target.autocorrect.mismatch false
1212

13-
spark.maxRetries 5
13+
spark.maxRetries 3
1414
spark.readRateLimit 20000
1515
spark.writeRateLimit 20000
1616
spark.splitSize 10000

0 commit comments

Comments
 (0)