From e6f18789c0345afeab87f9a7dc71c5c193eec146 Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 12:12:20 +0530 Subject: [PATCH 01/27] Resolving indentation --- .../java/io/confluent/kafka/connect/datagen/DatagenTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index 83699098..6ead2690 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -298,8 +298,8 @@ private Object getOptionalValue( case ARRAY: final List list = (List) value; return list.stream() - .map(listItem -> getOptionalValue(schema.valueSchema(), listItem)) - .collect(Collectors.toList()); + .map(listItem -> getOptionalValue(schema.valueSchema(), listItem)) + .collect(Collectors.toList()); case MAP: final Map map = (Map) value; return map.entrySet().stream() From b5ef9792941ab6aa3982f72383668999c7ac7e31 Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 12:25:58 +0530 Subject: [PATCH 02/27] Changing modifier for logger --- .../java/io/confluent/kafka/connect/datagen/DatagenTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index 6ead2690..8912ce91 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -43,7 +43,7 @@ public class DatagenTask extends SourceTask { - static final Logger log = LoggerFactory.getLogger(DatagenTask.class); + private static final Logger log = LoggerFactory.getLogger(DatagenTask.class); private static final Schema DEFAULT_KEY_SCHEMA = Schema.OPTIONAL_STRING_SCHEMA; public static final String TASK_ID = "task.id"; From ee05caac8182dcadea48961f2757f630b1788f7c Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 12:39:14 +0530 Subject: [PATCH 03/27] Adding timeout config and doc --- .../kafka/connect/datagen/DatagenConnectorConfig.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java index 97a0f850..611974cf 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java @@ -49,6 +49,8 @@ public class DatagenConnectorConfig extends AbstractConfig { private static final String RANDOM_SEED_DOC = "Numeric seed for generating random data. " + "Two connectors started with the same seed will deterministically produce the same data. " + "Each task will generate different data than the other tasks in the same connector."; + private static final String GENERATE_TIMEOUT_CONF = "generate.timeout"; + private static final String GENERATE_TIMEOUT_DOC = "Timeout for random message generation"; public DatagenConnectorConfig(ConfigDef config, Map parsedConfig) { super(config, parsedConfig); @@ -90,7 +92,8 @@ public static ConfigDef conf() { Importance.HIGH, QUICKSTART_DOC ) - .define(RANDOM_SEED_CONF, Type.LONG, null, Importance.LOW, RANDOM_SEED_DOC); + .define(RANDOM_SEED_CONF, Type.LONG, null, Importance.LOW, RANDOM_SEED_DOC) + .define(GENERATE_TIMEOUT_CONF, Type.LONG, null, Importance.LOW, GENERATE_TIMEOUT_DOC); } public String getKafkaTopic() { @@ -131,6 +134,10 @@ public String getSchemaString() { return this.getString(SCHEMA_STRING_CONF); } + public Long getGenerateTimeout() { + return this.getLong(GENERATE_TIMEOUT_CONF); + } + public Schema getSchema() { String quickstart = getQuickstart(); if (quickstart != null && !quickstart.isEmpty()) { From b38ac7c5cd9f78bea1db8167a1902ff02d2c7258 Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 12:57:21 +0530 Subject: [PATCH 04/27] Added executor for timeout bound call to generator.generate() --- .../datagen/DatagenConnectorConfig.java | 3 +- .../kafka/connect/datagen/DatagenTask.java | 41 +++++++++++++++---- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java index 611974cf..7530553d 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java @@ -50,7 +50,8 @@ public class DatagenConnectorConfig extends AbstractConfig { + "Two connectors started with the same seed will deterministically produce the same data. " + "Each task will generate different data than the other tasks in the same connector."; private static final String GENERATE_TIMEOUT_CONF = "generate.timeout"; - private static final String GENERATE_TIMEOUT_DOC = "Timeout for random message generation"; + private static final String GENERATE_TIMEOUT_DOC = "Timeout in milliseconds for random message " + + "generation"; public DatagenConnectorConfig(ConfigDef config, Map parsedConfig) { super(config, parsedConfig); diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index 8912ce91..97a72998 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -26,6 +26,12 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericRecord; @@ -51,6 +57,7 @@ public class DatagenTask extends SourceTask { public static final String CURRENT_ITERATION = "current.iteration"; public static final String RANDOM_SEED = "random.seed"; + private final ExecutorService generateExecutor = Executors.newSingleThreadExecutor(); private DatagenConnectorConfig config; private String topic; @@ -166,15 +173,7 @@ public List poll() throws InterruptedException { return null; } } - - final Object generatedObject = generator.generate(); - if (!(generatedObject instanceof GenericRecord)) { - throw new RuntimeException(String.format( - "Expected Avro Random Generator to return instance of GenericRecord, found %s instead", - generatedObject.getClass().getName() - )); - } - final GenericRecord randomAvroMessage = (GenericRecord) generatedObject; + final GenericRecord randomAvroMessage = generateRecord(); final List genericRowValues = new ArrayList<>(); for (org.apache.avro.Schema.Field field : avroSchema.getFields()) { @@ -245,6 +244,30 @@ public List poll() throws InterruptedException { return records; } + private GenericRecord generateRecord() throws DatagenException { + Future generatedObjectFuture = generateExecutor.submit(generator::generate); + Long timeout = config.getGenerateTimeout(); + Object generatedObject; + try { + if (timeout == null) { + generatedObject = generatedObjectFuture.get(); + } else { + generatedObject = generatedObjectFuture.get(timeout, TimeUnit.MILLISECONDS); + } + } catch (InterruptedException | ExecutionException e) { + throw new DatagenException("Unable to generate random record", e); + } catch (TimeoutException e) { + throw new DatagenException("Record generation timed out", e); + } + if (!(generatedObject instanceof GenericRecord)) { + throw new DatagenException(String.format( + "Expected Avro Random Generator to return instance of GenericRecord, found %s instead", + generatedObject.getClass().getName() + )); + } + return (GenericRecord) generatedObject; + } + @Override public void stop() { } From 98256f010003423cac69471080d677289b35e1fb Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 12:58:29 +0530 Subject: [PATCH 05/27] Added DatagenException --- .../kafka/connect/datagen/DatagenException.java | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 src/main/java/io/confluent/kafka/connect/datagen/DatagenException.java diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenException.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenException.java new file mode 100644 index 00000000..9e5fcfe6 --- /dev/null +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenException.java @@ -0,0 +1,11 @@ +package io.confluent.kafka.connect.datagen; + +public class DatagenException extends RuntimeException { + public DatagenException(String message) { + super(message); + } + + public DatagenException(String message, Throwable cause) { + super(message, cause); + } +} From 2271315f449fffb9e34ce156373400698900729b Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 12:59:08 +0530 Subject: [PATCH 06/27] Renamed DatagenException to DatagenTaskException --- .../kafka/connect/datagen/DatagenException.java | 11 ----------- .../confluent/kafka/connect/datagen/DatagenTask.java | 8 ++++---- .../kafka/connect/datagen/DatagenTaskException.java | 11 +++++++++++ 3 files changed, 15 insertions(+), 15 deletions(-) delete mode 100644 src/main/java/io/confluent/kafka/connect/datagen/DatagenException.java create mode 100644 src/main/java/io/confluent/kafka/connect/datagen/DatagenTaskException.java diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenException.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenException.java deleted file mode 100644 index 9e5fcfe6..00000000 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenException.java +++ /dev/null @@ -1,11 +0,0 @@ -package io.confluent.kafka.connect.datagen; - -public class DatagenException extends RuntimeException { - public DatagenException(String message) { - super(message); - } - - public DatagenException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index 97a72998..f5aa1a29 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -244,7 +244,7 @@ public List poll() throws InterruptedException { return records; } - private GenericRecord generateRecord() throws DatagenException { + private GenericRecord generateRecord() throws DatagenTaskException { Future generatedObjectFuture = generateExecutor.submit(generator::generate); Long timeout = config.getGenerateTimeout(); Object generatedObject; @@ -255,12 +255,12 @@ private GenericRecord generateRecord() throws DatagenException { generatedObject = generatedObjectFuture.get(timeout, TimeUnit.MILLISECONDS); } } catch (InterruptedException | ExecutionException e) { - throw new DatagenException("Unable to generate random record", e); + throw new DatagenTaskException("Unable to generate random record", e); } catch (TimeoutException e) { - throw new DatagenException("Record generation timed out", e); + throw new DatagenTaskException("Record generation timed out", e); } if (!(generatedObject instanceof GenericRecord)) { - throw new DatagenException(String.format( + throw new DatagenTaskException(String.format( "Expected Avro Random Generator to return instance of GenericRecord, found %s instead", generatedObject.getClass().getName() )); diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTaskException.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTaskException.java new file mode 100644 index 00000000..c85cc009 --- /dev/null +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTaskException.java @@ -0,0 +1,11 @@ +package io.confluent.kafka.connect.datagen; + +public class DatagenTaskException extends RuntimeException { + public DatagenTaskException(String message) { + super(message); + } + + public DatagenTaskException(String message, Throwable cause) { + super(message, cause); + } +} From 1e0ee072ea7fd415372da0c04a8528538cc9344a Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 13:01:29 +0530 Subject: [PATCH 07/27] Changed modifier for DatagenTaskException --- .../confluent/kafka/connect/datagen/DatagenTaskException.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTaskException.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTaskException.java index c85cc009..9d55636b 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTaskException.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTaskException.java @@ -1,6 +1,6 @@ package io.confluent.kafka.connect.datagen; -public class DatagenTaskException extends RuntimeException { +class DatagenTaskException extends RuntimeException { public DatagenTaskException(String message) { super(message); } From 34d0e69e5b8239862ae0b45530c8b27a368bcb49 Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 13:05:35 +0530 Subject: [PATCH 08/27] Removing unused variable in start() --- .../kafka/connect/datagen/DatagenTask.java | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index f5aa1a29..8e984fe7 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -174,20 +174,7 @@ public List poll() throws InterruptedException { } } final GenericRecord randomAvroMessage = generateRecord(); - - final List genericRowValues = new ArrayList<>(); - for (org.apache.avro.Schema.Field field : avroSchema.getFields()) { - final Object value = randomAvroMessage.get(field.name()); - if (value instanceof Record) { - final Record record = (Record) value; - final Object ksqlValue = avroData.toConnectData(record.getSchema(), record).value(); - Object optionValue = getOptionalValue(ksqlSchema.field(field.name()).schema(), ksqlValue); - genericRowValues.add(optionValue); - } else { - genericRowValues.add(value); - } - } - + // Key SchemaAndValue key = new SchemaAndValue(DEFAULT_KEY_SCHEMA, null); if (!schemaKeyField.isEmpty()) { From f2847f2273bdf08497fe2f3328f6e54e818fcbe8 Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 13:09:16 +0530 Subject: [PATCH 09/27] Removing unused import --- .../java/io/confluent/kafka/connect/datagen/DatagenTask.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index 8e984fe7..4d9e0e9c 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -33,7 +33,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; From 0c0f43a9f04aa48bded72f2a947a61dedfb2f3d0 Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 13:10:48 +0530 Subject: [PATCH 10/27] Replacing DatagenTaskException with ConnectException --- .../confluent/kafka/connect/datagen/DatagenTask.java | 10 +++++----- .../kafka/connect/datagen/DatagenTaskException.java | 11 ----------- 2 files changed, 5 insertions(+), 16 deletions(-) delete mode 100644 src/main/java/io/confluent/kafka/connect/datagen/DatagenTaskException.java diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index 4d9e0e9c..b663fae7 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -162,7 +162,7 @@ public void start(Map props) { } @Override - public List poll() throws InterruptedException { + public List poll() throws ConnectException { if (maxInterval > 0) { try { @@ -230,7 +230,7 @@ public List poll() throws InterruptedException { return records; } - private GenericRecord generateRecord() throws DatagenTaskException { + private GenericRecord generateRecord() throws ConnectException { Future generatedObjectFuture = generateExecutor.submit(generator::generate); Long timeout = config.getGenerateTimeout(); Object generatedObject; @@ -241,12 +241,12 @@ private GenericRecord generateRecord() throws DatagenTaskException { generatedObject = generatedObjectFuture.get(timeout, TimeUnit.MILLISECONDS); } } catch (InterruptedException | ExecutionException e) { - throw new DatagenTaskException("Unable to generate random record", e); + throw new ConnectException("Unable to generate random record", e); } catch (TimeoutException e) { - throw new DatagenTaskException("Record generation timed out", e); + throw new ConnectException("Record generation timed out", e); } if (!(generatedObject instanceof GenericRecord)) { - throw new DatagenTaskException(String.format( + throw new ConnectException(String.format( "Expected Avro Random Generator to return instance of GenericRecord, found %s instead", generatedObject.getClass().getName() )); diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTaskException.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTaskException.java deleted file mode 100644 index 9d55636b..00000000 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTaskException.java +++ /dev/null @@ -1,11 +0,0 @@ -package io.confluent.kafka.connect.datagen; - -class DatagenTaskException extends RuntimeException { - public DatagenTaskException(String message) { - super(message); - } - - public DatagenTaskException(String message, Throwable cause) { - super(message, cause); - } -} From eaebfdcafa9a310b4ce54e81358095fb39fa8a29 Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 13:14:13 +0530 Subject: [PATCH 11/27] Replacing fully qualified names with class name --- .../kafka/connect/datagen/DatagenTask.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index b663fae7..94ab4bb2 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -66,7 +66,7 @@ public class DatagenTask extends SourceTask { private String schemaKeyField; private Generator generator; private org.apache.avro.Schema avroSchema; - private org.apache.kafka.connect.data.Schema ksqlSchema; + private Schema ksqlSchema; private AvroData avroData; private int taskId; private Map sourcePartition; @@ -184,7 +184,7 @@ public List poll() throws ConnectException { } // Value - final org.apache.kafka.connect.data.Schema messageSchema = avroData.toConnectSchema(avroSchema); + final Schema messageSchema = avroData.toConnectSchema(avroSchema); final Object messageValue = avroData.toConnectData(avroSchema, randomAvroMessage).value(); if (maxRecords > 0 && count >= maxRecords) { @@ -258,20 +258,20 @@ private GenericRecord generateRecord() throws ConnectException { public void stop() { } - private org.apache.kafka.connect.data.Schema getOptionalSchema( - final org.apache.kafka.connect.data.Schema schema + private Schema getOptionalSchema( + final Schema schema ) { switch (schema.type()) { case BOOLEAN: - return org.apache.kafka.connect.data.Schema.OPTIONAL_BOOLEAN_SCHEMA; + return Schema.OPTIONAL_BOOLEAN_SCHEMA; case INT32: - return org.apache.kafka.connect.data.Schema.OPTIONAL_INT32_SCHEMA; + return Schema.OPTIONAL_INT32_SCHEMA; case INT64: - return org.apache.kafka.connect.data.Schema.OPTIONAL_INT64_SCHEMA; + return Schema.OPTIONAL_INT64_SCHEMA; case FLOAT64: - return org.apache.kafka.connect.data.Schema.OPTIONAL_FLOAT64_SCHEMA; + return Schema.OPTIONAL_FLOAT64_SCHEMA; case STRING: - return org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA; + return Schema.OPTIONAL_STRING_SCHEMA; case ARRAY: return SchemaBuilder.array(getOptionalSchema(schema.valueSchema())).optional().build(); case MAP: @@ -294,7 +294,7 @@ private org.apache.kafka.connect.data.Schema getOptionalSchema( } private Object getOptionalValue( - final org.apache.kafka.connect.data.Schema schema, + final Schema schema, final Object value ) { switch (schema.type()) { From e26d4e9a9b2386df3dacca7ab0828257f85c2c83 Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 13:17:24 +0530 Subject: [PATCH 12/27] Minor refactoring --- .../confluent/kafka/connect/datagen/DatagenTask.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index 94ab4bb2..e1d82060 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -19,6 +19,7 @@ import io.confluent.avro.random.generator.Generator; import io.confluent.connect.avro.AvroData; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -90,12 +91,13 @@ protected enum Quickstart { STORES("stores.avro", "store_id"), CREDIT_CARDS("credit_cards.avro", "card_id"); - static final Set configValues = new HashSet<>(); + static final Set configValues; static { - for (Quickstart q : Quickstart.values()) { - configValues.add(q.name().toLowerCase()); - } + configValues = Arrays.stream(Quickstart.values()) + .map(Quickstart::name) + .map(String::toLowerCase) + .collect(Collectors.toSet()); } private final String schemaFilename; From 1b7592b6800607538775bf0994ec536db31ac15d Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 13:19:52 +0530 Subject: [PATCH 13/27] Removed unused methods --- .../kafka/connect/datagen/DatagenTask.java | 76 ------------------- 1 file changed, 76 deletions(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index e1d82060..1434720d 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -259,80 +259,4 @@ private GenericRecord generateRecord() throws ConnectException { @Override public void stop() { } - - private Schema getOptionalSchema( - final Schema schema - ) { - switch (schema.type()) { - case BOOLEAN: - return Schema.OPTIONAL_BOOLEAN_SCHEMA; - case INT32: - return Schema.OPTIONAL_INT32_SCHEMA; - case INT64: - return Schema.OPTIONAL_INT64_SCHEMA; - case FLOAT64: - return Schema.OPTIONAL_FLOAT64_SCHEMA; - case STRING: - return Schema.OPTIONAL_STRING_SCHEMA; - case ARRAY: - return SchemaBuilder.array(getOptionalSchema(schema.valueSchema())).optional().build(); - case MAP: - return SchemaBuilder.map( - getOptionalSchema(schema.keySchema()), - getOptionalSchema(schema.valueSchema()) - ).optional().build(); - case STRUCT: - final SchemaBuilder schemaBuilder = SchemaBuilder.struct(); - for (Field field : schema.fields()) { - schemaBuilder.field( - field.name(), - getOptionalSchema(field.schema()) - ); - } - return schemaBuilder.optional().build(); - default: - throw new ConnectException("Unsupported type: " + schema); - } - } - - private Object getOptionalValue( - final Schema schema, - final Object value - ) { - switch (schema.type()) { - case BOOLEAN: - case INT32: - case INT64: - case FLOAT64: - case STRING: - return value; - case ARRAY: - final List list = (List) value; - return list.stream() - .map(listItem -> getOptionalValue(schema.valueSchema(), listItem)) - .collect(Collectors.toList()); - case MAP: - final Map map = (Map) value; - return map.entrySet().stream() - .collect(Collectors.toMap( - k -> getOptionalValue(schema.keySchema(), k), - v -> getOptionalValue(schema.valueSchema(), v) - )); - case STRUCT: - final Struct struct = (Struct) value; - final Struct optionalStruct = new Struct(getOptionalSchema(schema)); - for (Field field : schema.fields()) { - optionalStruct.put( - field.name(), - getOptionalValue( - field.schema(), - struct.get(field.name()) - ) - ); - } - return optionalStruct; - default: - throw new ConnectException("Invalid value schema: " + schema + ", value = " + value); - } - } } From eb4660d275919540e35f8562f4c8362b85ddc6e9 Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 13:21:33 +0530 Subject: [PATCH 14/27] Removed unused imports --- .../java/io/confluent/kafka/connect/datagen/DatagenTask.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index 1434720d..8d5dd160 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -22,7 +22,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; @@ -35,11 +34,8 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.source.SourceRecord; From 4faaaf510ef2047b44447e84629974f44cedb832 Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 13:22:53 +0530 Subject: [PATCH 15/27] Resolving checkstyle errors --- .../confluent/kafka/connect/datagen/DatagenConnectorConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java index 7530553d..df9cc250 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java @@ -51,7 +51,7 @@ public class DatagenConnectorConfig extends AbstractConfig { + "Each task will generate different data than the other tasks in the same connector."; private static final String GENERATE_TIMEOUT_CONF = "generate.timeout"; private static final String GENERATE_TIMEOUT_DOC = "Timeout in milliseconds for random message " - + "generation"; + + "generation"; public DatagenConnectorConfig(ConfigDef config, Map parsedConfig) { super(config, parsedConfig); From 0be2c796f5c0eb63bc007f552edb72e56115d856 Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 13:26:48 +0530 Subject: [PATCH 16/27] Making configValues immmutable set --- .../io/confluent/kafka/connect/datagen/DatagenTask.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index 8d5dd160..ecc7a412 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -16,6 +16,7 @@ package io.confluent.kafka.connect.datagen; +import com.google.common.collect.ImmutableSet; import io.confluent.avro.random.generator.Generator; import io.confluent.connect.avro.AvroData; import java.util.ArrayList; @@ -32,7 +33,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; @@ -90,10 +90,12 @@ protected enum Quickstart { static final Set configValues; static { - configValues = Arrays.stream(Quickstart.values()) + ImmutableSet.Builder immutableSetBuilder = ImmutableSet.builder(); + Arrays.stream(Quickstart.values()) .map(Quickstart::name) .map(String::toLowerCase) - .collect(Collectors.toSet()); + .forEach(immutableSetBuilder::add); + configValues = immutableSetBuilder.build(); } private final String schemaFilename; From 47d95e939957e8c9c2481c91d86aa330b8170ecd Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 13:31:03 +0530 Subject: [PATCH 17/27] Minor refactoring --- .../kafka/connect/datagen/DatagenConnectorConfig.java | 4 ++-- .../io/confluent/kafka/connect/datagen/DatagenTask.java | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java index df9cc250..3e040919 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java @@ -171,11 +171,11 @@ public void ensureValid(String name, Object value) { if (((String) value).isEmpty()) { return; } - if (!Quickstart.configValues.contains(((String) value).toLowerCase())) { + if (!Quickstart.configValues().contains(((String) value).toLowerCase())) { throw new ConfigException(String.format( "%s must be one out of %s", name, - String.join(",", DatagenTask.Quickstart.configValues) + String.join(",", DatagenTask.Quickstart.configValues()) )); } } diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index ecc7a412..de273d80 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -87,7 +87,7 @@ protected enum Quickstart { STORES("stores.avro", "store_id"), CREDIT_CARDS("credit_cards.avro", "card_id"); - static final Set configValues; + private static final Set configValues; static { ImmutableSet.Builder immutableSetBuilder = ImmutableSet.builder(); @@ -106,6 +106,10 @@ protected enum Quickstart { this.keyName = keyName; } + public static Set configValues() { + return configValues; + } + public String getSchemaFilename() { return schemaFilename; } From adb4e35ff3041f840c8b18044e53de5d25c36e02 Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 15:17:09 +0530 Subject: [PATCH 18/27] Added Unit Tests for timeout and stackoverflow error --- .../datagen/DatagenConnectorConfig.java | 2 +- .../connect/datagen/DatagenTaskTest.java | 46 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java index 3e040919..9f1f24d2 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java @@ -49,7 +49,7 @@ public class DatagenConnectorConfig extends AbstractConfig { private static final String RANDOM_SEED_DOC = "Numeric seed for generating random data. " + "Two connectors started with the same seed will deterministically produce the same data. " + "Each task will generate different data than the other tasks in the same connector."; - private static final String GENERATE_TIMEOUT_CONF = "generate.timeout"; + public static final String GENERATE_TIMEOUT_CONF = "generate.timeout"; private static final String GENERATE_TIMEOUT_DOC = "Timeout in milliseconds for random message " + "generation"; diff --git a/src/test/java/io/confluent/kafka/connect/datagen/DatagenTaskTest.java b/src/test/java/io/confluent/kafka/connect/datagen/DatagenTaskTest.java index 39561fc6..e370ce08 100644 --- a/src/test/java/io/confluent/kafka/connect/datagen/DatagenTaskTest.java +++ b/src/test/java/io/confluent/kafka/connect/datagen/DatagenTaskTest.java @@ -44,7 +44,9 @@ import org.apache.kafka.connect.storage.OffsetStorageReader; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -61,6 +63,9 @@ public class DatagenTaskTest { private static final AvroData AVRO_DATA = new AvroData(20); + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private Map config; private DatagenTask task; private List records; @@ -223,6 +228,36 @@ public void shouldFailToGenerateMoreRecordsThanSpecified() throws Exception { } } + /** + *

+ * This test is non-deterministic. This test might fail in case avro-random-generator library + * updates/changes its underlying implementation. + *

+ * @throws Exception + */ + @Test + public void shouldFailToGenerateComplexRegexWithTimeout() throws Exception { + createTaskWithSchemaAndTimeout("regex_schema_timeout.avro", "regex_key", "100"); + expectedException.expect(ConnectException.class); + expectedException.expectMessage("Record generation timed out"); + generateRecords(); + } + + /** + *

+ * This test is non-deterministic. This test might fail in case avro-random-generator library + * updates/changes its underlying implementation. + *

+ * @throws Exception + */ + @Test + public void shouldFailToGenerateComplexRegexWithStackOverflow() throws Exception { + createTaskWithSchemaAndTimeout("regex_schema_stackoverflow.avro", "regex_key", "100"); + expectedException.expect(ConnectException.class); + expectedException.expectMessage("Unable to generate random record"); + generateRecords(); + } + private void generateAndValidateRecordsFor(DatagenTask.Quickstart quickstart) throws Exception { createTaskWith(quickstart); generateRecords(); @@ -337,6 +372,17 @@ private void createTaskWithSchema(String schemaResourceFilename, String idFieldN loadKeyAndValueSchemas(schemaResourceFilename, idFieldName); } + private void createTaskWithSchemaAndTimeout(String schemaResourceFile, String idFieldName, + String timeout) { + dropSchemaSourceConfigs(); + config.put(DatagenConnectorConfig.SCHEMA_FILENAME_CONF, schemaResourceFile); + config.put(DatagenConnectorConfig.SCHEMA_KEYFIELD_CONF, idFieldName); + config.put(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF, timeout); + createTask(); + loadKeyAndValueSchemas(schemaResourceFile, idFieldName); + + } + private void createTaskWithSchemaText(String schemaText, String keyField) { dropSchemaSourceConfigs(); config.put(DatagenConnectorConfig.SCHEMA_STRING_CONF, schemaText); From d069b140c035e6d55f21b3bddc672c9d26603fe9 Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 15:17:45 +0530 Subject: [PATCH 19/27] Added test resource files --- .../kafka/connect/datagen/DatagenTaskTest.java | 2 +- .../resources/regex_schema_stackoverflow.avro | 18 ++++++++++++++++++ src/test/resources/regex_schema_timeout.avro | 17 +++++++++++++++++ 3 files changed, 36 insertions(+), 1 deletion(-) create mode 100644 src/test/resources/regex_schema_stackoverflow.avro create mode 100644 src/test/resources/regex_schema_timeout.avro diff --git a/src/test/java/io/confluent/kafka/connect/datagen/DatagenTaskTest.java b/src/test/java/io/confluent/kafka/connect/datagen/DatagenTaskTest.java index e370ce08..ad5e739c 100644 --- a/src/test/java/io/confluent/kafka/connect/datagen/DatagenTaskTest.java +++ b/src/test/java/io/confluent/kafka/connect/datagen/DatagenTaskTest.java @@ -257,7 +257,7 @@ public void shouldFailToGenerateComplexRegexWithStackOverflow() throws Exception expectedException.expectMessage("Unable to generate random record"); generateRecords(); } - + private void generateAndValidateRecordsFor(DatagenTask.Quickstart quickstart) throws Exception { createTaskWith(quickstart); generateRecords(); diff --git a/src/test/resources/regex_schema_stackoverflow.avro b/src/test/resources/regex_schema_stackoverflow.avro new file mode 100644 index 00000000..0abd8a63 --- /dev/null +++ b/src/test/resources/regex_schema_stackoverflow.avro @@ -0,0 +1,18 @@ +{ + "type": "record", + "name": "regex_record", + "fields": + [ + { + "name": "regex_key", + "type": + { + "type": "string", + "arg.properties": { + "regex": "\\S+_a+/", + "length": 100 + } + } + } + ] +} \ No newline at end of file diff --git a/src/test/resources/regex_schema_timeout.avro b/src/test/resources/regex_schema_timeout.avro new file mode 100644 index 00000000..650edf66 --- /dev/null +++ b/src/test/resources/regex_schema_timeout.avro @@ -0,0 +1,17 @@ +{ + "type": "record", + "name": "regex_record", + "fields": + [ + { + "name": "regex_key", + "type": + { + "type": "string", + "arg.properties": { + "regex": "(.{1,6})( *, *(.{1,6})){0,12}" + } + } + } + ] +} \ No newline at end of file From 9c3b0f1dca1a9e38a0f081f66f0d61cdbc6b5972 Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Tue, 2 Aug 2022 15:20:02 +0530 Subject: [PATCH 20/27] Added newline --- src/test/resources/regex_schema_stackoverflow.avro | 2 +- src/test/resources/regex_schema_timeout.avro | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/resources/regex_schema_stackoverflow.avro b/src/test/resources/regex_schema_stackoverflow.avro index 0abd8a63..975adc96 100644 --- a/src/test/resources/regex_schema_stackoverflow.avro +++ b/src/test/resources/regex_schema_stackoverflow.avro @@ -15,4 +15,4 @@ } } ] -} \ No newline at end of file +} diff --git a/src/test/resources/regex_schema_timeout.avro b/src/test/resources/regex_schema_timeout.avro index 650edf66..41cb0496 100644 --- a/src/test/resources/regex_schema_timeout.avro +++ b/src/test/resources/regex_schema_timeout.avro @@ -14,4 +14,4 @@ } } ] -} \ No newline at end of file +} From 96107582254e9ae027a0158e86bd769ed8f9abfe Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Wed, 3 Aug 2022 10:51:56 +0530 Subject: [PATCH 21/27] Added validator to generate.timeout --- .../datagen/DatagenConnectorConfig.java | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java index 9f1f24d2..d9647d9d 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java @@ -93,8 +93,17 @@ public static ConfigDef conf() { Importance.HIGH, QUICKSTART_DOC ) - .define(RANDOM_SEED_CONF, Type.LONG, null, Importance.LOW, RANDOM_SEED_DOC) - .define(GENERATE_TIMEOUT_CONF, Type.LONG, null, Importance.LOW, GENERATE_TIMEOUT_DOC); + .define(RANDOM_SEED_CONF, + Type.LONG, + null, + Importance.LOW, + RANDOM_SEED_DOC) + .define(GENERATE_TIMEOUT_CONF, + Type.LONG, + null, + new GenerateTimeoutValidator(), + Importance.LOW, + GENERATE_TIMEOUT_DOC); } public String getKafkaTopic() { @@ -202,5 +211,17 @@ public void ensureValid(String name, Object value) { ConfigUtils.getSchemaFromSchemaFileName((String) value); } } + + private static class GenerateTimeoutValidator implements Validator { + + @Override + public void ensureValid(String name, Object value) { + Long longValue = (Long) value; + if (longValue > 0 && longValue <= 60000L) { + return; + } + throw new ConfigException(name + " must be in the range [1, 60,000] ms"); + } + } } From fe819181cd9b6093015898614f850a4ab09de5c2 Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Wed, 3 Aug 2022 10:57:35 +0530 Subject: [PATCH 22/27] Added null check for generate.timeout validator --- .../kafka/connect/datagen/DatagenConnectorConfig.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java index d9647d9d..6510105f 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java @@ -216,6 +216,9 @@ private static class GenerateTimeoutValidator implements Validator { @Override public void ensureValid(String name, Object value) { + if (value == null) { + return; + } Long longValue = (Long) value; if (longValue > 0 && longValue <= 60000L) { return; From 9728f7e9ac7d6d7187ddde86e09fec08bfe3ba1f Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Wed, 3 Aug 2022 10:58:34 +0530 Subject: [PATCH 23/27] Minor refactoring --- .../kafka/connect/datagen/DatagenConnectorConfig.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java index 6510105f..552f1601 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java @@ -219,11 +219,11 @@ public void ensureValid(String name, Object value) { if (value == null) { return; } - Long longValue = (Long) value; + long longValue = (Long) value; if (longValue > 0 && longValue <= 60000L) { return; } - throw new ConfigException(name + " must be in the range [1, 60,000] ms"); + throw new ConfigException(name + " must be in the range [1, 60000] ms"); } } } From b663c97df30882cb6ed4efd92f33c04a0cda77fd Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Wed, 3 Aug 2022 11:10:20 +0530 Subject: [PATCH 24/27] Added Unit Tests for generate.timeout config --- .../datagen/DatagenConnectorConfig.java | 2 +- .../connect/datagen/DatagenConnectorTest.java | 24 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java index 552f1601..bbe75ebc 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java @@ -51,7 +51,7 @@ public class DatagenConnectorConfig extends AbstractConfig { + "Each task will generate different data than the other tasks in the same connector."; public static final String GENERATE_TIMEOUT_CONF = "generate.timeout"; private static final String GENERATE_TIMEOUT_DOC = "Timeout in milliseconds for random message " - + "generation"; + + "generation. This timeout can be configured for upto 1 minute, i.e 60000ms"; public DatagenConnectorConfig(ConfigDef config, Map parsedConfig) { super(config, parsedConfig); diff --git a/src/test/java/io/confluent/kafka/connect/datagen/DatagenConnectorTest.java b/src/test/java/io/confluent/kafka/connect/datagen/DatagenConnectorTest.java index 196c5f00..07d0b7d7 100644 --- a/src/test/java/io/confluent/kafka/connect/datagen/DatagenConnectorTest.java +++ b/src/test/java/io/confluent/kafka/connect/datagen/DatagenConnectorTest.java @@ -220,6 +220,30 @@ public void shouldNotValidateSchemaKeyFieldWhenMultipleSchemaSourcesAreSet() { ); } + @Test + public void shouldAllowSettingGenerateTimeoutInRange() { + clearSchemaSources(); + config.put(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF, "100"); + Config validated = connector.validate(config); + assertThat(validated, hasNoValidationErrorsFor(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF)); + } + + @Test + public void shouldNotAllowSettingGenerateTimeoutNegative() { + clearSchemaSources(); + config.put(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF, "-1"); + Config validated = connector.validate(config); + assertThat(validated, hasValidationError(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF, 1)); + } + + @Test + public void shouldNotAllowSettingGenerateTimeoutOutOfRange() { + clearSchemaSources(); + config.put(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF, "70000"); + Config validated = connector.validate(config); + assertThat(validated, hasValidationError(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF, 1)); + } + protected void assertTaskConfigs(int maxTasks) { List> taskConfigs = connector.taskConfigs(maxTasks); assertEquals(maxTasks, taskConfigs.size()); From 148cfa59d6473b80a001b371c868ccf32aab787c Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Thu, 4 Aug 2022 11:13:41 +0530 Subject: [PATCH 25/27] Added shutdown for the executor --- .../java/io/confluent/kafka/connect/datagen/DatagenTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index de273d80..f1d8c3c9 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -260,5 +260,6 @@ private GenericRecord generateRecord() throws ConnectException { @Override public void stop() { + generateExecutor.shutdown(); } } From b99be5a2dc05c96bc8a3f68254898c6638c39469 Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Thu, 4 Aug 2022 11:23:42 +0530 Subject: [PATCH 26/27] Cancelled future before throwing ConnectException --- .../java/io/confluent/kafka/connect/datagen/DatagenTask.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index f1d8c3c9..e6e5de03 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -245,8 +245,10 @@ private GenericRecord generateRecord() throws ConnectException { generatedObject = generatedObjectFuture.get(timeout, TimeUnit.MILLISECONDS); } } catch (InterruptedException | ExecutionException e) { + generatedObjectFuture.cancel(true); throw new ConnectException("Unable to generate random record", e); } catch (TimeoutException e) { + generatedObjectFuture.cancel(true); throw new ConnectException("Record generation timed out", e); } if (!(generatedObject instanceof GenericRecord)) { From 2d40594272766c8b7d40f71b328d861a48bab42b Mon Sep 17 00:00:00 2001 From: Tushar Singh Date: Wed, 10 Aug 2022 15:12:30 +0530 Subject: [PATCH 27/27] Removing Exception from method signature --- .../java/io/confluent/kafka/connect/datagen/DatagenTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index e6e5de03..005da4ef 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -166,7 +166,7 @@ public void start(Map props) { } @Override - public List poll() throws ConnectException { + public List poll() { if (maxInterval > 0) { try { @@ -234,7 +234,7 @@ public List poll() throws ConnectException { return records; } - private GenericRecord generateRecord() throws ConnectException { + private GenericRecord generateRecord() { Future generatedObjectFuture = generateExecutor.submit(generator::generate); Long timeout = config.getGenerateTimeout(); Object generatedObject;