Skip to content

Commit ce9e289

Browse files
committed
Minor refactor
1 parent 62cb952 commit ce9e289

File tree

2 files changed

+17
-18
lines changed

2 files changed

+17
-18
lines changed

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -63,28 +63,28 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
6363
ResultSet resultSet = sourceSession.execute(
6464
sourceSelectStatement.bind(hasRandomPartitioner ? min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact()).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM));
6565

66-
Map<Row, CompletionStage<AsyncResultSet>> writeResults = new HashMap<Row, CompletionStage<AsyncResultSet>>();
67-
StreamSupport.stream(resultSet.spliterator(), false).forEach(sRow -> {
66+
Map<Row, CompletionStage<AsyncResultSet>> srcToTargetRowMap = new HashMap<Row, CompletionStage<AsyncResultSet>>();
67+
StreamSupport.stream(resultSet.spliterator(), false).forEach(srcRow -> {
6868
readLimiter.acquire(1);
6969
// do not process rows less than writeTimeStampFilter
70-
if (!(writeTimeStampFilter && (getLargestWriteTimeStamp(sRow) < minWriteTimeStampFilter
71-
|| getLargestWriteTimeStamp(sRow) > maxWriteTimeStampFilter))) {
70+
if (!(writeTimeStampFilter && (getLargestWriteTimeStamp(srcRow) < minWriteTimeStampFilter
71+
|| getLargestWriteTimeStamp(srcRow) > maxWriteTimeStampFilter))) {
7272
if (readCounter.incrementAndGet() % printStatsAfter == 0) {
7373
printCounts("Current");
7474
}
7575

76-
CompletionStage<AsyncResultSet> writeResultSet = astraSession
77-
.executeAsync(selectFromAstra(astraSelectStatement, sRow));
78-
writeResults.put(sRow, writeResultSet);
79-
if (writeResults.size() > 1000) {
80-
iterateAndClearWriteResults(writeResults);
76+
CompletionStage<AsyncResultSet> targetRowFuture = astraSession
77+
.executeAsync(selectFromAstra(astraSelectStatement, srcRow));
78+
srcToTargetRowMap.put(srcRow, targetRowFuture);
79+
if (srcToTargetRowMap.size() > 1000) {
80+
diffAndClear(srcToTargetRowMap);
8181
}
8282
} else {
8383
readCounter.incrementAndGet();
8484
skippedCounter.incrementAndGet();
8585
}
8686
});
87-
iterateAndClearWriteResults(writeResults);
87+
diffAndClear(srcToTargetRowMap);
8888

8989
printCounts("Final");
9090

@@ -98,17 +98,16 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
9898

9999
}
100100

101-
private void iterateAndClearWriteResults(Map<Row, CompletionStage<AsyncResultSet>> writeResults) {
102-
for (Row sr : writeResults.keySet()) {
103-
Row ar = null;
101+
private void diffAndClear(Map<Row, CompletionStage<AsyncResultSet>> srcToTargetRowMap) {
102+
for (Row srcRow : srcToTargetRowMap.keySet()) {
104103
try {
105-
ar = writeResults.get(sr).toCompletableFuture().get().one();
104+
Row targetRow = srcToTargetRowMap.get(srcRow).toCompletableFuture().get().one();
105+
diff(srcRow, targetRow);
106106
} catch (Exception e) {
107-
logger.error("Could not perform diff for Key: " + getKey(sr), e);
107+
logger.error("Could not perform diff for Key: " + getKey(srcRow), e);
108108
}
109-
diff(sr, ar);
110109
}
111-
writeResults.clear();
110+
srcToTargetRowMap.clear();
112111
}
113112

114113
public void printCounts(String finalStr) {

src/resources/sparkConf.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ spark.destination.read.consistency.level LOCAL_QUORUM
1313
spark.destination.keyspaceTable test.a2
1414
spark.destination.autocorrect.missing false
1515
spark.destination.autocorrect.mismatch false
16-
spark.destination.custom.writeTime 0
16+
spark.destination.custom.writeTime 0
1717

1818
spark.maxRetries 10
1919
spark.readRateLimit 20000

0 commit comments

Comments
 (0)