Skip to content

Commit b3ff9d0

Browse files
committed
Disabled batch for counters
1 parent 2f3c883 commit b3ff9d0

File tree

1 file changed

+3
-8
lines changed

1 file changed

+3
-8
lines changed

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
8181

8282
// cannot do batching if the writeFilter is greater than 0 or
8383
// maxWriteTimeStampFilter is less than max long
84-
if (batchSize == 1 || writeTimeStampFilter) {
84+
// do not batch for counters as it adds latency & increases chance of discrepancy
85+
if (batchSize == 1 || writeTimeStampFilter || isCounterTable) {
8586
for (Row sourceRow : resultSet) {
8687
readLimiter.acquire(1);
8788

@@ -124,13 +125,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
124125
if (readCounter.incrementAndGet() % 1000 == 0) {
125126
logger.info("TreadID: " + Thread.currentThread().getId() + " Read Record Count: " + readCounter.get());
126127
}
127-
Row astraRow = null;
128-
if (isCounterTable) {
129-
ResultSet astraReadResultSet = astraSession
130-
.execute(selectFromAstra(astraSelectStatement, sourceRow));
131-
astraRow = astraReadResultSet.one();
132-
}
133-
batchStatement = batchStatement.add(bindInsert(astraInsertStatement, sourceRow, astraRow));
128+
batchStatement = batchStatement.add(bindInsert(astraInsertStatement, sourceRow, null));
134129

135130
// if batch threshold is met, send the writes and clear the batch
136131
if (batchStatement.size() >= batchSize) {

0 commit comments

Comments
 (0)