From b2b050cb64b37842f6e05c88a85394393a55c83e Mon Sep 17 00:00:00 2001 From: Parag Badani Date: Sat, 22 May 2021 17:14:34 +0530 Subject: [PATCH] MINOR: fail fast if max record threshold reached. --- .../confluent/kafka/connect/datagen/DatagenTask.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index 76df05d4..0f38c3bc 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -143,6 +143,11 @@ public void start(Map props) { @Override public List poll() throws InterruptedException { + if (maxRecords > 0 && count >= maxRecords) { + throw new ConnectException( + String.format("Stopping connector: generated the configured %d number of messages", count) + ); + } if (maxInterval > 0) { try { @@ -189,12 +194,6 @@ public List poll() throws InterruptedException { final org.apache.kafka.connect.data.Schema messageSchema = avroData.toConnectSchema(avroSchema); final Object messageValue = avroData.toConnectData(avroSchema, randomAvroMessage).value(); - if (maxRecords > 0 && count >= maxRecords) { - throw new ConnectException( - String.format("Stopping connector: generated the configured %d number of messages", count) - ); - } - final List records = new ArrayList<>(); SourceRecord record = new SourceRecord( SOURCE_PARTITION,