Skip to content

Commit 1297a34

Browse files
arvydasjmsmygit
authored andcommitted
Fixes issue-257: Sets write consistency on Batch statement
1 parent d2d1957 commit 1297a34

File tree

1 file changed

+3
-0
lines changed

1 file changed

+3
-0
lines changed

src/main/java/com/datastax/cdm/job/CopyJobSession.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
141141

142142
private void flushAndClearWrites(BatchStatement batch, Collection<CompletionStage<AsyncResultSet>> writeResults) throws Exception {
143143
if (batch.size() > 0) {
144+
batch = batch.setConsistencyLevel(targetSession.getCqlTable().getWriteConsistencyLevel());
144145
writeResults.add(targetUpsertStatement.executeAsync(batch));
145146
}
146147
for (CompletionStage<AsyncResultSet> writeResult : writeResults) {
@@ -165,11 +166,13 @@ private BatchStatement writeAsync(BatchStatement batch, Collection<CompletionSta
165166
if (batchSize > 1) {
166167
batch = batch.add(boundUpsert);
167168
if (batch.size() >= batchSize) {
169+
batch = batch.setConsistencyLevel(targetSession.getCqlTable().getWriteConsistencyLevel());
168170
writeResults.add(targetUpsertStatement.executeAsync(batch));
169171
return BatchStatement.newInstance(BatchType.UNLOGGED);
170172
}
171173
return batch;
172174
} else {
175+
boundUpsert = boundUpsert.setConsistencyLevel(targetSession.getCqlTable().getWriteConsistencyLevel());
173176
writeResults.add(targetUpsertStatement.executeAsync(boundUpsert));
174177
return batch;
175178
}

0 commit comments

Comments
 (0)