diff --git a/pom.xml b/pom.xml
index fcfba59d..ca2d840a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
io.confluent.kafka.connect
kafka-connect-datagen
- 0.7.0-SNAPSHOT
+ 0.7.1-SNAPSHOT
jar
diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java
index 59974bbf..3231611f 100644
--- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java
+++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java
@@ -33,6 +33,9 @@ public class DatagenConnectorConfig extends AbstractConfig {
private static final String KAFKA_TOPIC_DOC = "Topic to write to";
public static final String MAXINTERVAL_CONF = "max.interval";
private static final String MAXINTERVAL_DOC = "Max interval between messages (ms)";
+ private static final String STATICINTERVAL_CONF = "static.interval";
+ private static final String STATICINTERVAL_DOC = "Static interval between messages (ms), "
+ + "when set ignores max.interval setting";
public static final String ITERATIONS_CONF = "iterations";
private static final String ITERATIONS_DOC = "Number of messages to send from each task, "
+ "or less than 1 for unlimited";
@@ -61,6 +64,7 @@ public static ConfigDef conf() {
return new ConfigDef()
.define(KAFKA_TOPIC_CONF, Type.STRING, Importance.HIGH, KAFKA_TOPIC_DOC)
.define(MAXINTERVAL_CONF, Type.LONG, 500L, Importance.HIGH, MAXINTERVAL_DOC)
+ .define(STATICINTERVAL_CONF, Type.LONG, 0L, Importance.HIGH, STATICINTERVAL_DOC)
.define(ITERATIONS_CONF, Type.INT, -1, Importance.HIGH, ITERATIONS_DOC)
.define(SCHEMA_STRING_CONF,
Type.STRING,
@@ -100,6 +104,10 @@ public Long getMaxInterval() {
return this.getLong(MAXINTERVAL_CONF);
}
+ public Long getStaticInterval() {
+ return this.getLong(STATICINTERVAL_CONF);
+ }
+
public Integer getIterations() {
return this.getInt(ITERATIONS_CONF);
}
@@ -194,4 +202,3 @@ public void ensureValid(String name, Object value) {
}
}
}
-
diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java
index 883a3b59..9a8254b1 100644
--- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java
+++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java
@@ -44,10 +44,11 @@ public class DatagenTask extends SourceTask {
public static final String CURRENT_ITERATION = "current.iteration";
public static final String RANDOM_SEED = "random.seed";
-
private DatagenConnectorConfig config;
private String topic;
private long maxInterval;
+ private long staticInterval;
+ private long interval;
private int maxRecords;
private long count = 0L;
private String schemaKeyField;
@@ -70,6 +71,7 @@ public void start(Map props) {
config = new DatagenConnectorConfig(props);
topic = config.getKafkaTopic();
maxInterval = config.getMaxInterval();
+ staticInterval = config.getStaticInterval();
maxRecords = config.getIterations();
schemaKeyField = config.getSchemaKeyfield();
taskGeneration = 0;
@@ -108,13 +110,16 @@ public void start(Map props) {
@Override
public List poll() throws InterruptedException {
- if (maxInterval > 0) {
- try {
- Thread.sleep((long) (maxInterval * Math.random()));
- } catch (InterruptedException e) {
- Thread.interrupted();
- return null;
- }
+ if (staticInterval > 0) {
+ interval = staticInterval;
+ } else if (maxInterval > 0) {
+ interval = (long) (maxInterval * Math.random());
+ }
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ return null;
}
final Object generatedObject = generator.generate();