Skip to content

Commit f1451a6

Browse files
authored
Merge pull request #22 from Ankitp1342/feature/fix-08162022
Fix counter table bug & improve code consistency
2 parents f1f063a + b3ff9d0 commit f1451a6

File tree

3 files changed

+31
-45
lines changed

3 files changed

+31
-45
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>com.datastax.spark.example</groupId>
55
<artifactId>migrate</artifactId>
6-
<version>0.15</version>
6+
<version>0.16</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 28 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -76,16 +76,17 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
7676
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
7777

7878
try {
79-
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner? min : min.longValueExact(), hasRandomPartitioner? max : max.longValueExact()));
79+
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ? min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact()));
8080
Collection<CompletionStage<AsyncResultSet>> writeResults = new ArrayList<CompletionStage<AsyncResultSet>>();
8181

8282
// cannot do batching if the writeFilter is greater than 0 or
8383
// maxWriteTimeStampFilter is less than max long
84-
if (batchSize == 1 || writeTimeStampFilter) {
84+
// do not batch for counters as it adds latency & increases chance of discrepancy
85+
if (batchSize == 1 || writeTimeStampFilter || isCounterTable) {
8586
for (Row sourceRow : resultSet) {
8687
readLimiter.acquire(1);
8788

88-
if(writeTimeStampFilter) {
89+
if (writeTimeStampFilter) {
8990
// only process rows greater than writeTimeStampFilter
9091
Long sourceWriteTimeStamp = getLargestWriteTimeStamp(sourceRow);
9192
if (sourceWriteTimeStamp < minWriteTimeStampFilter
@@ -99,22 +100,16 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
99100
logger.info("TreadID: " + Thread.currentThread().getId() + " Read Record Count: "
100101
+ readCounter.get());
101102
}
102-
if (minWriteTimeStampFilter > 0l) {
103-
Row astraRow = null;
104-
if (isCounterTable) {
105-
ResultSet astraReadResultSet = astraSession
106-
.execute(selectFromAstra(astraSelectStatement, sourceRow));
107-
astraRow = astraReadResultSet.one();
108-
}
109-
110-
CompletionStage<AsyncResultSet> astraWriteResultSet = astraSession
111-
.executeAsync(bindInsert(astraInsertStatement, sourceRow, astraRow));
112-
writeResults.add(astraWriteResultSet);
113-
} else {
114-
CompletionStage<AsyncResultSet> astraWriteResultSet = astraSession
115-
.executeAsync(bindInsert(astraInsertStatement, sourceRow));
116-
writeResults.add(astraWriteResultSet);
103+
Row astraRow = null;
104+
if (isCounterTable) {
105+
ResultSet astraReadResultSet = astraSession
106+
.execute(selectFromAstra(astraSelectStatement, sourceRow));
107+
astraRow = astraReadResultSet.one();
117108
}
109+
110+
CompletionStage<AsyncResultSet> astraWriteResultSet = astraSession
111+
.executeAsync(bindInsert(astraInsertStatement, sourceRow, astraRow));
112+
writeResults.add(astraWriteResultSet);
118113
if (writeResults.size() > 1000) {
119114
iterateAndClearWriteResults(writeResults, 1);
120115
}
@@ -124,13 +119,13 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
124119
iterateAndClearWriteResults(writeResults, 1);
125120
} else {
126121
BatchStatement batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
127-
for (Row row : resultSet) {
122+
for (Row sourceRow : resultSet) {
128123
readLimiter.acquire(1);
129124
writeLimiter.acquire(1);
130125
if (readCounter.incrementAndGet() % 1000 == 0) {
131126
logger.info("TreadID: " + Thread.currentThread().getId() + " Read Record Count: " + readCounter.get());
132127
}
133-
batchStatement = batchStatement.add(bindInsert(astraInsertStatement, row));
128+
batchStatement = batchStatement.add(bindInsert(astraInsertStatement, sourceRow, null));
134129

135130
// if batch threshold is met, send the writes and clear the batch
136131
if (batchStatement.size() >= batchSize) {
@@ -166,11 +161,9 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
166161
}
167162
}
168163

169-
170-
171164
}
172165

173-
private void iterateAndClearWriteResults(Collection<CompletionStage<AsyncResultSet>> writeResults, int incrementBy) throws Exception{
166+
private void iterateAndClearWriteResults(Collection<CompletionStage<AsyncResultSet>> writeResults, int incrementBy) throws Exception {
174167
for (CompletionStage<AsyncResultSet> writeResult : writeResults) {
175168
//wait for the writes to complete for the batch. The Retry policy, if defined, should retry the write on timeouts.
176169
writeResult.toCompletableFuture().get().one();
@@ -181,41 +174,34 @@ private void iterateAndClearWriteResults(Collection<CompletionStage<AsyncResultS
181174
writeResults.clear();
182175
}
183176

184-
public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRow) {
185-
return bindInsert(insertStatement, sourceRow, null);
186-
}
187-
188177
public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRow, Row astraRow) {
189-
if (isCounterTable) {
178+
BoundStatement boundInsertStatement = insertStatement.bind();
190179

191-
BoundStatement boundInsertStatement = insertStatement.bind();
180+
if (isCounterTable) {
192181
for (int index = 0; index < insertColTypes.size(); index++) {
193182
MigrateDataType dataType = insertColTypes.get(index);
194183
// compute the counter delta if reading from astra for the difference
195184
if (astraRow != null && isCounterTable && index <= counterDeltaMaxIndex) {
196-
boundInsertStatement = boundInsertStatement.set(index,getCounterDelta(sourceRow.getLong(updateSelectMapping.get(index)), astraRow.getLong(updateSelectMapping.get(index))),Long.class);
185+
boundInsertStatement = boundInsertStatement.set(index, getCounterDelta(sourceRow.getLong(updateSelectMapping.get(index)), astraRow.getLong(updateSelectMapping.get(index))), Long.class);
197186
} else {
198187
boundInsertStatement = boundInsertStatement.set(index, getData(dataType, updateSelectMapping.get(index), sourceRow), dataType.typeClass);
199188
}
200189
}
201-
202-
return boundInsertStatement;
203-
204190
} else {
205-
BoundStatement boundInsertStatement = insertStatement.bind();
206191
int index = 0;
207192
for (index = 0; index < insertColTypes.size(); index++) {
208-
MigrateDataType dataType = insertColTypes.get(index);
193+
MigrateDataType dataTypeObj = insertColTypes.get(index);
194+
Class dataType = dataTypeObj.typeClass;
209195

210196
try {
211-
Object colData = getData(dataType, index, sourceRow);
212-
if(index < idColTypes.size() && colData==null && dataType.typeClass==String.class){
213-
colData="";
197+
Object colData = getData(dataTypeObj, index, sourceRow);
198+
if (index < idColTypes.size() && colData == null && dataType == String.class) {
199+
colData = "";
214200
}
215-
boundInsertStatement = boundInsertStatement.set(index, colData, dataType.typeClass);
201+
boundInsertStatement = boundInsertStatement.set(index, colData, dataType);
216202
} catch (NullPointerException e) {
217203
// ignore the exception for map values being null
218-
if (dataType.typeClass != Map.class) {
204+
if (dataType != Map.class) {
219205
throw e;
220206
}
221207
}
@@ -226,9 +212,9 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
226212
index++;
227213
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow), Long.class);
228214
}
229-
return boundInsertStatement;
230-
231215
}
216+
217+
return boundInsertStatement;
232218
}
233219

234220
public Long getCounterDelta(Long sourceRow, Long astraRow) {

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ private void diff(Row sourceRow, Row astraRow) {
128128
//correct data
129129

130130
if (autoCorrectMissing) {
131-
astraSession.execute(bindInsert(astraInsertStatement, sourceRow));
131+
astraSession.execute(bindInsert(astraInsertStatement, sourceRow, null));
132132
correctedMissingCounter.incrementAndGet();
133133
logger.error("Corrected missing data in Astra: " + getKey(sourceRow));
134134
}
@@ -142,7 +142,7 @@ private void diff(Row sourceRow, Row astraRow) {
142142
logger.error("Data mismatch found - Key: " + getKey(sourceRow) + " Data: " + diffData);
143143

144144
if (autoCorrectMismatch) {
145-
astraSession.execute(bindInsert(astraInsertStatement, sourceRow));
145+
astraSession.execute(bindInsert(astraInsertStatement, sourceRow, null));
146146
correctedMismatchCounter.incrementAndGet();
147147
logger.error("Corrected mismatch data in Astra: " + getKey(sourceRow));
148148
}

0 commit comments

Comments
 (0)