Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<Strin
}
}

log.info("Found offsets {} for task keys {}", result, partitions);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ public Future<Void> doFlush(final Callback<Void> callback) {
return backingStore.set(offsetsSerialized, new Callback<Void>() {
@Override
public void onCompletion(Throwable error, Void result) {
log.debug("Completed wrting offsets {}, error?={}", toFlush, error != null);
boolean isCurrent = handleFinishWrite(flushId, error, result);
if (isCurrent && callback != null) {
callback.onCompletion(error, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void start(Map<String, String> props) {
startingSeqno = seqno;
throttler = new ThroughputThrottler(throughput, System.currentTimeMillis());

log.info("Started VerifiableSourceTask {}-{} producing to topic {} resuming from seqno {}", name, id, topic, startingSeqno);
log.info("Started VerifiableSourceTask task {} producing to topic {} resuming from seqno {}", id, topic, startingSeqno);
}

@Override
Expand Down Expand Up @@ -140,6 +140,7 @@ public void commitRecord(SourceRecord record) throws InterruptedException {

@Override
public void stop() {
log.info("Stopping VerifiableSourceTask task {}, seqno: {}", id, seqno);
throttler.wakeup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public KafkaBasedLog(String topic,
public void run() {
}
};
log.info("Adding acks all to the KafkaBasedLog producer");
this.producerConfigs.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
}

public void start() {
Expand Down
7 changes: 3 additions & 4 deletions tests/kafkatest/tests/connect/connect_distributed_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,7 @@ def test_file_source_and_sink(self, security_protocol):
wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=70, err_msg="Sink output file never converged to the same state as the input file")

@cluster(num_nodes=6)
@matrix(clean=[True, False])
def test_bounce(self, clean):
def test_bounce(self, clean=True):
"""
Validates that source and sink tasks that run continuously and produce a predictable sequence of messages
run correctly and deliver messages exactly once when Kafka Connect workers undergo clean rolling bounces.
Expand Down Expand Up @@ -407,7 +406,7 @@ def test_bounce(self, clean):
success = False
if not allow_dups and duplicate_src_seqnos:
self.logger.error("Duplicate source sequence numbers for task " + str(task))
errors.append("Found duplicate source sequence numbers for task %d: %s" % (task, duplicate_src_seqnos))
errors.append("Found duplicate source sequence numbers for task %d: %s" % (task, duplicate_src_seqnos[0:20]))
success = False


Expand All @@ -427,7 +426,7 @@ def test_bounce(self, clean):
success = False
if not allow_dups and duplicate_sink_seqnos:
self.logger.error("Duplicate sink sequence numbers for task " + str(task))
errors.append("Found duplicate sink sequence numbers for task %d: %s" % (task, duplicate_sink_seqnos))
errors.append("Found duplicate sink sequence numbers for task %d: %s" % (task, duplicate_sink_seqnos[0:20]))
success = False

# Validate source and sink match
Expand Down