diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index 76df05d4..0f38c3bc 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -143,6 +143,11 @@ public void start(Map props) { @Override public List poll() throws InterruptedException { + if (maxRecords > 0 && count >= maxRecords) { + throw new ConnectException( + String.format("Stopping connector: generated the configured %d number of messages", count) + ); + } if (maxInterval > 0) { try { @@ -189,12 +194,6 @@ public List poll() throws InterruptedException { final org.apache.kafka.connect.data.Schema messageSchema = avroData.toConnectSchema(avroSchema); final Object messageValue = avroData.toConnectData(avroSchema, randomAvroMessage).value(); - if (maxRecords > 0 && count >= maxRecords) { - throw new ConnectException( - String.format("Stopping connector: generated the configured %d number of messages", count) - ); - } - final List records = new ArrayList<>(); SourceRecord record = new SourceRecord( SOURCE_PARTITION,