From 534261b2af1381186c750b8aa8d7655d917eb427 Mon Sep 17 00:00:00 2001 From: Arvydas Jonusonis Date: Tue, 9 Apr 2024 00:06:42 -0700 Subject: [PATCH] Fixes issue-257: Sets write consistency on Batch statement --- src/main/java/com/datastax/cdm/job/CopyJobSession.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/com/datastax/cdm/job/CopyJobSession.java b/src/main/java/com/datastax/cdm/job/CopyJobSession.java index aa24f16d..c12c233b 100644 --- a/src/main/java/com/datastax/cdm/job/CopyJobSession.java +++ b/src/main/java/com/datastax/cdm/job/CopyJobSession.java @@ -135,6 +135,7 @@ protected void processPartitionRange(PartitionRange range) { private void flushAndClearWrites(BatchStatement batch, Collection> writeResults) { if (batch.size() > 0) { + batch = batch.setConsistencyLevel(targetSession.getCqlTable().getWriteConsistencyLevel()); writeResults.add(targetUpsertStatement.executeAsync(batch)); } writeResults.stream().forEach(writeResult -> writeResult.toCompletableFuture().join().one()); @@ -157,11 +158,13 @@ private BatchStatement writeAsync(BatchStatement batch, Collection 1) { batch = batch.add(boundUpsert); if (batch.size() >= batchSize) { + batch = batch.setConsistencyLevel(targetSession.getCqlTable().getWriteConsistencyLevel()); writeResults.add(targetUpsertStatement.executeAsync(batch)); return BatchStatement.newInstance(BatchType.UNLOGGED); } return batch; } else { + boundUpsert = boundUpsert.setConsistencyLevel(targetSession.getCqlTable().getWriteConsistencyLevel()); writeResults.add(targetUpsertStatement.executeAsync(boundUpsert)); return batch; }