diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java index 9f926dc5040aa..9ab24bae28213 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java @@ -106,6 +106,7 @@ public Map, Map> offsets(Collection doFlush(final Callback callback) { return backingStore.set(offsetsSerialized, new Callback() { @Override public void onCompletion(Throwable error, Void result) { + log.debug("Completed wrting offsets {}, error?={}", toFlush, error != null); boolean isCurrent = handleFinishWrite(flushId, error, result); if (isCurrent && callback != null) { callback.onCompletion(error, result); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java index c7ba96de21aeb..5579b5fc5d0ef 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java @@ -87,7 +87,7 @@ public void start(Map props) { startingSeqno = seqno; throttler = new ThroughputThrottler(throughput, System.currentTimeMillis()); - log.info("Started VerifiableSourceTask {}-{} producing to topic {} resuming from seqno {}", name, id, topic, startingSeqno); + log.info("Started VerifiableSourceTask task {} producing to topic {} resuming from seqno {}", id, topic, startingSeqno); } @Override @@ -140,6 +140,7 @@ public void commitRecord(SourceRecord record) throws InterruptedException { @Override public void stop() { + log.info("Stopping VerifiableSourceTask task {}, seqno: {}", id, seqno); throttler.wakeup(); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 9d77d21767a89..892e5e1972b1b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -119,6 +119,8 @@ public KafkaBasedLog(String topic, public void run() { } }; + log.info("Adding acks all to the KafkaBasedLog producer"); + this.producerConfigs.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all"); } public void start() { diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index a27b54d6f89ee..c81a982c95ed6 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -340,8 +340,7 @@ def test_file_source_and_sink(self, security_protocol): wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=70, err_msg="Sink output file never converged to the same state as the input file") @cluster(num_nodes=6) - @matrix(clean=[True, False]) - def test_bounce(self, clean): + def test_bounce(self, clean=True): """ Validates that source and sink tasks that run continuously and produce a predictable sequence of messages run correctly and deliver messages exactly once when Kafka Connect workers undergo clean rolling bounces. @@ -407,7 +406,7 @@ def test_bounce(self, clean): success = False if not allow_dups and duplicate_src_seqnos: self.logger.error("Duplicate source sequence numbers for task " + str(task)) - errors.append("Found duplicate source sequence numbers for task %d: %s" % (task, duplicate_src_seqnos)) + errors.append("Found duplicate source sequence numbers for task %d: %s" % (task, duplicate_src_seqnos[0:20])) success = False @@ -427,7 +426,7 @@ def test_bounce(self, clean): success = False if not allow_dups and duplicate_sink_seqnos: self.logger.error("Duplicate sink sequence numbers for task " + str(task)) - errors.append("Found duplicate sink sequence numbers for task %d: %s" % (task, duplicate_sink_seqnos)) + errors.append("Found duplicate sink sequence numbers for task %d: %s" % (task, duplicate_sink_seqnos[0:20])) success = False # Validate source and sink match