Skip to content

Commit 534261b

Browse files
arvydasjmsmygit
authored andcommitted
Fixes issue-257: Sets write consistency on Batch statement
1 parent ca102c7 commit 534261b

File tree

1 file changed

+3
-0
lines changed

1 file changed

+3
-0
lines changed

src/main/java/com/datastax/cdm/job/CopyJobSession.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ protected void processPartitionRange(PartitionRange range) {
135135

136136
private void flushAndClearWrites(BatchStatement batch, Collection<CompletionStage<AsyncResultSet>> writeResults) {
137137
if (batch.size() > 0) {
138+
batch = batch.setConsistencyLevel(targetSession.getCqlTable().getWriteConsistencyLevel());
138139
writeResults.add(targetUpsertStatement.executeAsync(batch));
139140
}
140141
writeResults.stream().forEach(writeResult -> writeResult.toCompletableFuture().join().one());
@@ -157,11 +158,13 @@ private BatchStatement writeAsync(BatchStatement batch, Collection<CompletionSta
157158
if (batchSize > 1) {
158159
batch = batch.add(boundUpsert);
159160
if (batch.size() >= batchSize) {
161+
batch = batch.setConsistencyLevel(targetSession.getCqlTable().getWriteConsistencyLevel());
160162
writeResults.add(targetUpsertStatement.executeAsync(batch));
161163
return BatchStatement.newInstance(BatchType.UNLOGGED);
162164
}
163165
return batch;
164166
} else {
167+
boundUpsert = boundUpsert.setConsistencyLevel(targetSession.getCqlTable().getWriteConsistencyLevel());
165168
writeResults.add(targetUpsertStatement.executeAsync(boundUpsert));
166169
return batch;
167170
}

0 commit comments

Comments
 (0)