Skip to content

Commit 460cb50

Browse files
committed
DiffData performance improvements (& fix JVM heap space errors)
1 parent d97b7e3 commit 460cb50

File tree

2 files changed

+47
-32
lines changed

2 files changed

+47
-32
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>1.9</version>
6+
<version>2.0</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 46 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@
22

33
import com.datastax.oss.driver.api.core.ConsistencyLevel;
44
import com.datastax.oss.driver.api.core.CqlSession;
5+
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
56
import com.datastax.oss.driver.api.core.cql.ResultSet;
67
import com.datastax.oss.driver.api.core.cql.Row;
78
import org.apache.spark.SparkConf;
89
import org.slf4j.Logger;
910
import org.slf4j.LoggerFactory;
1011

1112
import java.math.BigInteger;
12-
import java.util.concurrent.ForkJoinPool;
13+
import java.util.HashMap;
14+
import java.util.Map;
15+
import java.util.concurrent.CompletionStage;
1316
import java.util.concurrent.atomic.AtomicLong;
1417
import java.util.stream.IntStream;
1518
import java.util.stream.StreamSupport;
@@ -51,7 +54,6 @@ public static DiffJobSession getInstance(CqlSession sourceSession, CqlSession as
5154
}
5255

5356
public void getDataAndDiff(BigInteger min, BigInteger max) {
54-
ForkJoinPool customThreadPool = new ForkJoinPool();
5557
logger.info("TreadID: " + Thread.currentThread().getId() + " Processing min: " + min + " max:" + max);
5658
int maxAttempts = maxRetries;
5759
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
@@ -61,27 +63,30 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
6163
ResultSet resultSet = sourceSession.execute(
6264
sourceSelectStatement.bind(hasRandomPartitioner ? min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact()).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM));
6365

64-
customThreadPool.submit(() -> {
65-
StreamSupport.stream(resultSet.spliterator(), true).forEach(sRow -> {
66-
readLimiter.acquire(1);
67-
// do not process rows less than writeTimeStampFilter
68-
if (!(writeTimeStampFilter && (getLargestWriteTimeStamp(sRow) < minWriteTimeStampFilter
69-
|| getLargestWriteTimeStamp(sRow) > maxWriteTimeStampFilter))) {
70-
if (readCounter.incrementAndGet() % printStatsAfter == 0) {
71-
printCounts("Current");
72-
}
73-
74-
Row astraRow = astraSession
75-
.execute(selectFromAstra(astraSelectStatement, sRow)).one();
76-
diff(sRow, astraRow);
77-
} else {
78-
readCounter.incrementAndGet();
79-
skippedCounter.incrementAndGet();
66+
Map<Row, CompletionStage<AsyncResultSet>> writeResults = new HashMap<Row, CompletionStage<AsyncResultSet>>();
67+
StreamSupport.stream(resultSet.spliterator(), false).forEach(sRow -> {
68+
readLimiter.acquire(1);
69+
// do not process rows less than writeTimeStampFilter
70+
if (!(writeTimeStampFilter && (getLargestWriteTimeStamp(sRow) < minWriteTimeStampFilter
71+
|| getLargestWriteTimeStamp(sRow) > maxWriteTimeStampFilter))) {
72+
if (readCounter.incrementAndGet() % printStatsAfter == 0) {
73+
printCounts("Current");
8074
}
81-
});
8275

83-
printCounts("Final");
84-
}).get();
76+
CompletionStage<AsyncResultSet> writeResultSet = astraSession
77+
.executeAsync(selectFromAstra(astraSelectStatement, sRow));
78+
writeResults.put(sRow, writeResultSet);
79+
if (writeResults.size() > 1000) {
80+
iterateAndClearWriteResults(writeResults);
81+
}
82+
} else {
83+
readCounter.incrementAndGet();
84+
skippedCounter.incrementAndGet();
85+
}
86+
});
87+
iterateAndClearWriteResults(writeResults);
88+
89+
printCounts("Final");
8590

8691
retryCount = maxAttempts;
8792
} catch (Exception e) {
@@ -91,7 +96,19 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
9196
}
9297
}
9398

94-
customThreadPool.shutdownNow();
99+
}
100+
101+
private void iterateAndClearWriteResults(Map<Row, CompletionStage<AsyncResultSet>> writeResults) {
102+
for (Row sr : writeResults.keySet()) {
103+
Row ar = null;
104+
try {
105+
ar = writeResults.get(sr).toCompletableFuture().get().one();
106+
} catch (Exception e) {
107+
logger.error("Could not perform diff for Key: " + getKey(sr), e);
108+
}
109+
diff(sr, ar);
110+
}
111+
writeResults.clear();
95112
}
96113

97114
public void printCounts(String finalStr) {
@@ -150,15 +167,13 @@ private void diff(Row sourceRow, Row astraRow) {
150167
private String isDifferent(Row sourceRow, Row astraRow) {
151168
StringBuffer diffData = new StringBuffer();
152169
IntStream.range(0, selectColTypes.size()).parallel().forEach(index -> {
153-
if (!writeTimeStampCols.contains(index)) {
154-
MigrateDataType dataType = selectColTypes.get(index);
155-
Object source = getData(dataType, index, sourceRow);
156-
Object astra = getData(dataType, index, astraRow);
157-
158-
boolean isDiff = dataType.diff(source, astra);
159-
if (isDiff) {
160-
diffData.append(" (Index: " + index + " Source: " + source + " Astra: " + astra + " ) ");
161-
}
170+
MigrateDataType dataType = selectColTypes.get(index);
171+
Object source = getData(dataType, index, sourceRow);
172+
Object astra = getData(dataType, index, astraRow);
173+
174+
boolean isDiff = dataType.diff(source, astra);
175+
if (isDiff) {
176+
diffData.append(" (Index: " + index + " Source: " + source + " Astra: " + astra + " ) ");
162177
}
163178
});
164179

0 commit comments

Comments
 (0)