diff --git a/src/main/java/com/datastax/cdm/job/CopyJobSession.java b/src/main/java/com/datastax/cdm/job/CopyJobSession.java index aa24f16d..c12c233b 100644 --- a/src/main/java/com/datastax/cdm/job/CopyJobSession.java +++ b/src/main/java/com/datastax/cdm/job/CopyJobSession.java @@ -135,6 +135,7 @@ protected void processPartitionRange(PartitionRange range) { private void flushAndClearWrites(BatchStatement batch, Collection> writeResults) { if (batch.size() > 0) { + batch = batch.setConsistencyLevel(targetSession.getCqlTable().getWriteConsistencyLevel()); writeResults.add(targetUpsertStatement.executeAsync(batch)); } writeResults.stream().forEach(writeResult -> writeResult.toCompletableFuture().join().one()); @@ -157,11 +158,13 @@ private BatchStatement writeAsync(BatchStatement batch, Collection 1) { batch = batch.add(boundUpsert); if (batch.size() >= batchSize) { + batch = batch.setConsistencyLevel(targetSession.getCqlTable().getWriteConsistencyLevel()); writeResults.add(targetUpsertStatement.executeAsync(batch)); return BatchStatement.newInstance(BatchType.UNLOGGED); } return batch; } else { + boundUpsert = boundUpsert.setConsistencyLevel(targetSession.getCqlTable().getWriteConsistencyLevel()); writeResults.add(targetUpsertStatement.executeAsync(boundUpsert)); return batch; }