From 2d29375296b3ce14cb112d87b1e116f65f2e654c Mon Sep 17 00:00:00 2001 From: Michal Redlarski Date: Mon, 4 Mar 2024 13:12:22 +0100 Subject: [PATCH] feat: static interval between messages --- pom.xml | 2 +- .../datagen/DatagenConnectorConfig.java | 9 +++++++- .../kafka/connect/datagen/DatagenTask.java | 21 ++++++++++++------- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/pom.xml b/pom.xml index fcfba59d..ca2d840a 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ io.confluent.kafka.connect kafka-connect-datagen - 0.7.0-SNAPSHOT + 0.7.1-SNAPSHOT jar diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java index 59974bbf..3231611f 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java @@ -33,6 +33,9 @@ public class DatagenConnectorConfig extends AbstractConfig { private static final String KAFKA_TOPIC_DOC = "Topic to write to"; public static final String MAXINTERVAL_CONF = "max.interval"; private static final String MAXINTERVAL_DOC = "Max interval between messages (ms)"; + private static final String STATICINTERVAL_CONF = "static.interval"; + private static final String STATICINTERVAL_DOC = "Static interval between messages (ms), " + + "when set ignores max.interval setting"; public static final String ITERATIONS_CONF = "iterations"; private static final String ITERATIONS_DOC = "Number of messages to send from each task, " + "or less than 1 for unlimited"; @@ -61,6 +64,7 @@ public static ConfigDef conf() { return new ConfigDef() .define(KAFKA_TOPIC_CONF, Type.STRING, Importance.HIGH, KAFKA_TOPIC_DOC) .define(MAXINTERVAL_CONF, Type.LONG, 500L, Importance.HIGH, MAXINTERVAL_DOC) + .define(STATICINTERVAL_CONF, Type.LONG, 0L, Importance.HIGH, STATICINTERVAL_DOC) .define(ITERATIONS_CONF, Type.INT, -1, Importance.HIGH, ITERATIONS_DOC) .define(SCHEMA_STRING_CONF, Type.STRING, @@ -100,6 +104,10 @@ public Long getMaxInterval() { return this.getLong(MAXINTERVAL_CONF); } + public Long getStaticInterval() { + return this.getLong(STATICINTERVAL_CONF); + } + public Integer getIterations() { return this.getInt(ITERATIONS_CONF); } @@ -194,4 +202,3 @@ public void ensureValid(String name, Object value) { } } } - diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index 883a3b59..9a8254b1 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -44,10 +44,11 @@ public class DatagenTask extends SourceTask { public static final String CURRENT_ITERATION = "current.iteration"; public static final String RANDOM_SEED = "random.seed"; - private DatagenConnectorConfig config; private String topic; private long maxInterval; + private long staticInterval; + private long interval; private int maxRecords; private long count = 0L; private String schemaKeyField; @@ -70,6 +71,7 @@ public void start(Map props) { config = new DatagenConnectorConfig(props); topic = config.getKafkaTopic(); maxInterval = config.getMaxInterval(); + staticInterval = config.getStaticInterval(); maxRecords = config.getIterations(); schemaKeyField = config.getSchemaKeyfield(); taskGeneration = 0; @@ -108,13 +110,16 @@ public void start(Map props) { @Override public List poll() throws InterruptedException { - if (maxInterval > 0) { - try { - Thread.sleep((long) (maxInterval * Math.random())); - } catch (InterruptedException e) { - Thread.interrupted(); - return null; - } + if (staticInterval > 0) { + interval = staticInterval; + } else if (maxInterval > 0) { + interval = (long) (maxInterval * Math.random()); + } + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + Thread.interrupted(); + return null; } final Object generatedObject = generator.generate();