Skip to content

Commit 2f3c883

Browse files
committed
Fix counter table bug & improve code consistency
1 parent f1f063a commit 2f3c883

File tree

3 files changed

+35
-44
lines changed

3 files changed

+35
-44
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>com.datastax.spark.example</groupId>
55
<artifactId>migrate</artifactId>
6-
<version>0.15</version>
6+
<version>0.16</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 32 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
7676
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
7777

7878
try {
79-
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner? min : min.longValueExact(), hasRandomPartitioner? max : max.longValueExact()));
79+
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ? min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact()));
8080
Collection<CompletionStage<AsyncResultSet>> writeResults = new ArrayList<CompletionStage<AsyncResultSet>>();
8181

8282
// cannot do batching if the writeFilter is greater than 0 or
@@ -85,7 +85,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
8585
for (Row sourceRow : resultSet) {
8686
readLimiter.acquire(1);
8787

88-
if(writeTimeStampFilter) {
88+
if (writeTimeStampFilter) {
8989
// only process rows greater than writeTimeStampFilter
9090
Long sourceWriteTimeStamp = getLargestWriteTimeStamp(sourceRow);
9191
if (sourceWriteTimeStamp < minWriteTimeStampFilter
@@ -99,22 +99,16 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
9999
logger.info("TreadID: " + Thread.currentThread().getId() + " Read Record Count: "
100100
+ readCounter.get());
101101
}
102-
if (minWriteTimeStampFilter > 0l) {
103-
Row astraRow = null;
104-
if (isCounterTable) {
105-
ResultSet astraReadResultSet = astraSession
106-
.execute(selectFromAstra(astraSelectStatement, sourceRow));
107-
astraRow = astraReadResultSet.one();
108-
}
109-
110-
CompletionStage<AsyncResultSet> astraWriteResultSet = astraSession
111-
.executeAsync(bindInsert(astraInsertStatement, sourceRow, astraRow));
112-
writeResults.add(astraWriteResultSet);
113-
} else {
114-
CompletionStage<AsyncResultSet> astraWriteResultSet = astraSession
115-
.executeAsync(bindInsert(astraInsertStatement, sourceRow));
116-
writeResults.add(astraWriteResultSet);
102+
Row astraRow = null;
103+
if (isCounterTable) {
104+
ResultSet astraReadResultSet = astraSession
105+
.execute(selectFromAstra(astraSelectStatement, sourceRow));
106+
astraRow = astraReadResultSet.one();
117107
}
108+
109+
CompletionStage<AsyncResultSet> astraWriteResultSet = astraSession
110+
.executeAsync(bindInsert(astraInsertStatement, sourceRow, astraRow));
111+
writeResults.add(astraWriteResultSet);
118112
if (writeResults.size() > 1000) {
119113
iterateAndClearWriteResults(writeResults, 1);
120114
}
@@ -124,13 +118,19 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
124118
iterateAndClearWriteResults(writeResults, 1);
125119
} else {
126120
BatchStatement batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
127-
for (Row row : resultSet) {
121+
for (Row sourceRow : resultSet) {
128122
readLimiter.acquire(1);
129123
writeLimiter.acquire(1);
130124
if (readCounter.incrementAndGet() % 1000 == 0) {
131125
logger.info("TreadID: " + Thread.currentThread().getId() + " Read Record Count: " + readCounter.get());
132126
}
133-
batchStatement = batchStatement.add(bindInsert(astraInsertStatement, row));
127+
Row astraRow = null;
128+
if (isCounterTable) {
129+
ResultSet astraReadResultSet = astraSession
130+
.execute(selectFromAstra(astraSelectStatement, sourceRow));
131+
astraRow = astraReadResultSet.one();
132+
}
133+
batchStatement = batchStatement.add(bindInsert(astraInsertStatement, sourceRow, astraRow));
134134

135135
// if batch threshold is met, send the writes and clear the batch
136136
if (batchStatement.size() >= batchSize) {
@@ -166,11 +166,9 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
166166
}
167167
}
168168

169-
170-
171169
}
172170

173-
private void iterateAndClearWriteResults(Collection<CompletionStage<AsyncResultSet>> writeResults, int incrementBy) throws Exception{
171+
private void iterateAndClearWriteResults(Collection<CompletionStage<AsyncResultSet>> writeResults, int incrementBy) throws Exception {
174172
for (CompletionStage<AsyncResultSet> writeResult : writeResults) {
175173
//wait for the writes to complete for the batch. The Retry policy, if defined, should retry the write on timeouts.
176174
writeResult.toCompletableFuture().get().one();
@@ -181,41 +179,34 @@ private void iterateAndClearWriteResults(Collection<CompletionStage<AsyncResultS
181179
writeResults.clear();
182180
}
183181

184-
public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRow) {
185-
return bindInsert(insertStatement, sourceRow, null);
186-
}
187-
188182
public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRow, Row astraRow) {
189-
if (isCounterTable) {
183+
BoundStatement boundInsertStatement = insertStatement.bind();
190184

191-
BoundStatement boundInsertStatement = insertStatement.bind();
185+
if (isCounterTable) {
192186
for (int index = 0; index < insertColTypes.size(); index++) {
193187
MigrateDataType dataType = insertColTypes.get(index);
194188
// compute the counter delta if reading from astra for the difference
195189
if (astraRow != null && isCounterTable && index <= counterDeltaMaxIndex) {
196-
boundInsertStatement = boundInsertStatement.set(index,getCounterDelta(sourceRow.getLong(updateSelectMapping.get(index)), astraRow.getLong(updateSelectMapping.get(index))),Long.class);
190+
boundInsertStatement = boundInsertStatement.set(index, getCounterDelta(sourceRow.getLong(updateSelectMapping.get(index)), astraRow.getLong(updateSelectMapping.get(index))), Long.class);
197191
} else {
198192
boundInsertStatement = boundInsertStatement.set(index, getData(dataType, updateSelectMapping.get(index), sourceRow), dataType.typeClass);
199193
}
200194
}
201-
202-
return boundInsertStatement;
203-
204195
} else {
205-
BoundStatement boundInsertStatement = insertStatement.bind();
206196
int index = 0;
207197
for (index = 0; index < insertColTypes.size(); index++) {
208-
MigrateDataType dataType = insertColTypes.get(index);
198+
MigrateDataType dataTypeObj = insertColTypes.get(index);
199+
Class dataType = dataTypeObj.typeClass;
209200

210201
try {
211-
Object colData = getData(dataType, index, sourceRow);
212-
if(index < idColTypes.size() && colData==null && dataType.typeClass==String.class){
213-
colData="";
202+
Object colData = getData(dataTypeObj, index, sourceRow);
203+
if (index < idColTypes.size() && colData == null && dataType == String.class) {
204+
colData = "";
214205
}
215-
boundInsertStatement = boundInsertStatement.set(index, colData, dataType.typeClass);
206+
boundInsertStatement = boundInsertStatement.set(index, colData, dataType);
216207
} catch (NullPointerException e) {
217208
// ignore the exception for map values being null
218-
if (dataType.typeClass != Map.class) {
209+
if (dataType != Map.class) {
219210
throw e;
220211
}
221212
}
@@ -226,9 +217,9 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
226217
index++;
227218
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow), Long.class);
228219
}
229-
return boundInsertStatement;
230-
231220
}
221+
222+
return boundInsertStatement;
232223
}
233224

234225
public Long getCounterDelta(Long sourceRow, Long astraRow) {

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ private void diff(Row sourceRow, Row astraRow) {
128128
//correct data
129129

130130
if (autoCorrectMissing) {
131-
astraSession.execute(bindInsert(astraInsertStatement, sourceRow));
131+
astraSession.execute(bindInsert(astraInsertStatement, sourceRow, null));
132132
correctedMissingCounter.incrementAndGet();
133133
logger.error("Corrected missing data in Astra: " + getKey(sourceRow));
134134
}
@@ -142,7 +142,7 @@ private void diff(Row sourceRow, Row astraRow) {
142142
logger.error("Data mismatch found - Key: " + getKey(sourceRow) + " Data: " + diffData);
143143

144144
if (autoCorrectMismatch) {
145-
astraSession.execute(bindInsert(astraInsertStatement, sourceRow));
145+
astraSession.execute(bindInsert(astraInsertStatement, sourceRow, null));
146146
correctedMismatchCounter.incrementAndGet();
147147
logger.error("Corrected mismatch data in Astra: " + getKey(sourceRow));
148148
}

0 commit comments

Comments
 (0)