diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java index 97a0f850..bbe75ebc 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java @@ -49,6 +49,9 @@ public class DatagenConnectorConfig extends AbstractConfig { private static final String RANDOM_SEED_DOC = "Numeric seed for generating random data. " + "Two connectors started with the same seed will deterministically produce the same data. " + "Each task will generate different data than the other tasks in the same connector."; + public static final String GENERATE_TIMEOUT_CONF = "generate.timeout"; + private static final String GENERATE_TIMEOUT_DOC = "Timeout in milliseconds for random message " + + "generation. This timeout can be configured for upto 1 minute, i.e 60000ms"; public DatagenConnectorConfig(ConfigDef config, Map parsedConfig) { super(config, parsedConfig); @@ -90,7 +93,17 @@ public static ConfigDef conf() { Importance.HIGH, QUICKSTART_DOC ) - .define(RANDOM_SEED_CONF, Type.LONG, null, Importance.LOW, RANDOM_SEED_DOC); + .define(RANDOM_SEED_CONF, + Type.LONG, + null, + Importance.LOW, + RANDOM_SEED_DOC) + .define(GENERATE_TIMEOUT_CONF, + Type.LONG, + null, + new GenerateTimeoutValidator(), + Importance.LOW, + GENERATE_TIMEOUT_DOC); } public String getKafkaTopic() { @@ -131,6 +144,10 @@ public String getSchemaString() { return this.getString(SCHEMA_STRING_CONF); } + public Long getGenerateTimeout() { + return this.getLong(GENERATE_TIMEOUT_CONF); + } + public Schema getSchema() { String quickstart = getQuickstart(); if (quickstart != null && !quickstart.isEmpty()) { @@ -163,11 +180,11 @@ public void ensureValid(String name, Object value) { if (((String) value).isEmpty()) { return; } - if (!Quickstart.configValues.contains(((String) value).toLowerCase())) { + if (!Quickstart.configValues().contains(((String) value).toLowerCase())) { throw new ConfigException(String.format( "%s must be one out of %s", name, - String.join(",", DatagenTask.Quickstart.configValues) + String.join(",", DatagenTask.Quickstart.configValues()) )); } } @@ -194,5 +211,20 @@ public void ensureValid(String name, Object value) { ConfigUtils.getSchemaFromSchemaFileName((String) value); } } + + private static class GenerateTimeoutValidator implements Validator { + + @Override + public void ensureValid(String name, Object value) { + if (value == null) { + return; + } + long longValue = (Long) value; + if (longValue > 0 && longValue <= 60000L) { + return; + } + throw new ConfigException(name + " must be in the range [1, 60000] ms"); + } + } } diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index 83699098..005da4ef 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -16,24 +16,26 @@ package io.confluent.kafka.connect.datagen; +import com.google.common.collect.ImmutableSet; import io.confluent.avro.random.generator.Generator; import io.confluent.connect.avro.AvroData; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.stream.Collectors; -import org.apache.avro.generic.GenericData.Record; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.source.SourceRecord; @@ -43,7 +45,7 @@ public class DatagenTask extends SourceTask { - static final Logger log = LoggerFactory.getLogger(DatagenTask.class); + private static final Logger log = LoggerFactory.getLogger(DatagenTask.class); private static final Schema DEFAULT_KEY_SCHEMA = Schema.OPTIONAL_STRING_SCHEMA; public static final String TASK_ID = "task.id"; @@ -51,6 +53,7 @@ public class DatagenTask extends SourceTask { public static final String CURRENT_ITERATION = "current.iteration"; public static final String RANDOM_SEED = "random.seed"; + private final ExecutorService generateExecutor = Executors.newSingleThreadExecutor(); private DatagenConnectorConfig config; private String topic; @@ -60,7 +63,7 @@ public class DatagenTask extends SourceTask { private String schemaKeyField; private Generator generator; private org.apache.avro.Schema avroSchema; - private org.apache.kafka.connect.data.Schema ksqlSchema; + private Schema ksqlSchema; private AvroData avroData; private int taskId; private Map sourcePartition; @@ -84,12 +87,15 @@ protected enum Quickstart { STORES("stores.avro", "store_id"), CREDIT_CARDS("credit_cards.avro", "card_id"); - static final Set configValues = new HashSet<>(); + private static final Set configValues; static { - for (Quickstart q : Quickstart.values()) { - configValues.add(q.name().toLowerCase()); - } + ImmutableSet.Builder immutableSetBuilder = ImmutableSet.builder(); + Arrays.stream(Quickstart.values()) + .map(Quickstart::name) + .map(String::toLowerCase) + .forEach(immutableSetBuilder::add); + configValues = immutableSetBuilder.build(); } private final String schemaFilename; @@ -100,6 +106,10 @@ protected enum Quickstart { this.keyName = keyName; } + public static Set configValues() { + return configValues; + } + public String getSchemaFilename() { return schemaFilename; } @@ -156,7 +166,7 @@ public void start(Map props) { } @Override - public List poll() throws InterruptedException { + public List poll() { if (maxInterval > 0) { try { @@ -166,29 +176,8 @@ public List poll() throws InterruptedException { return null; } } - - final Object generatedObject = generator.generate(); - if (!(generatedObject instanceof GenericRecord)) { - throw new RuntimeException(String.format( - "Expected Avro Random Generator to return instance of GenericRecord, found %s instead", - generatedObject.getClass().getName() - )); - } - final GenericRecord randomAvroMessage = (GenericRecord) generatedObject; - - final List genericRowValues = new ArrayList<>(); - for (org.apache.avro.Schema.Field field : avroSchema.getFields()) { - final Object value = randomAvroMessage.get(field.name()); - if (value instanceof Record) { - final Record record = (Record) value; - final Object ksqlValue = avroData.toConnectData(record.getSchema(), record).value(); - Object optionValue = getOptionalValue(ksqlSchema.field(field.name()).schema(), ksqlValue); - genericRowValues.add(optionValue); - } else { - genericRowValues.add(value); - } - } - + final GenericRecord randomAvroMessage = generateRecord(); + // Key SchemaAndValue key = new SchemaAndValue(DEFAULT_KEY_SCHEMA, null); if (!schemaKeyField.isEmpty()) { @@ -199,7 +188,7 @@ public List poll() throws InterruptedException { } // Value - final org.apache.kafka.connect.data.Schema messageSchema = avroData.toConnectSchema(avroSchema); + final Schema messageSchema = avroData.toConnectSchema(avroSchema); final Object messageValue = avroData.toConnectData(avroSchema, randomAvroMessage).value(); if (maxRecords > 0 && count >= maxRecords) { @@ -245,83 +234,34 @@ public List poll() throws InterruptedException { return records; } - @Override - public void stop() { - } - - private org.apache.kafka.connect.data.Schema getOptionalSchema( - final org.apache.kafka.connect.data.Schema schema - ) { - switch (schema.type()) { - case BOOLEAN: - return org.apache.kafka.connect.data.Schema.OPTIONAL_BOOLEAN_SCHEMA; - case INT32: - return org.apache.kafka.connect.data.Schema.OPTIONAL_INT32_SCHEMA; - case INT64: - return org.apache.kafka.connect.data.Schema.OPTIONAL_INT64_SCHEMA; - case FLOAT64: - return org.apache.kafka.connect.data.Schema.OPTIONAL_FLOAT64_SCHEMA; - case STRING: - return org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA; - case ARRAY: - return SchemaBuilder.array(getOptionalSchema(schema.valueSchema())).optional().build(); - case MAP: - return SchemaBuilder.map( - getOptionalSchema(schema.keySchema()), - getOptionalSchema(schema.valueSchema()) - ).optional().build(); - case STRUCT: - final SchemaBuilder schemaBuilder = SchemaBuilder.struct(); - for (Field field : schema.fields()) { - schemaBuilder.field( - field.name(), - getOptionalSchema(field.schema()) - ); - } - return schemaBuilder.optional().build(); - default: - throw new ConnectException("Unsupported type: " + schema); + private GenericRecord generateRecord() { + Future generatedObjectFuture = generateExecutor.submit(generator::generate); + Long timeout = config.getGenerateTimeout(); + Object generatedObject; + try { + if (timeout == null) { + generatedObject = generatedObjectFuture.get(); + } else { + generatedObject = generatedObjectFuture.get(timeout, TimeUnit.MILLISECONDS); + } + } catch (InterruptedException | ExecutionException e) { + generatedObjectFuture.cancel(true); + throw new ConnectException("Unable to generate random record", e); + } catch (TimeoutException e) { + generatedObjectFuture.cancel(true); + throw new ConnectException("Record generation timed out", e); } + if (!(generatedObject instanceof GenericRecord)) { + throw new ConnectException(String.format( + "Expected Avro Random Generator to return instance of GenericRecord, found %s instead", + generatedObject.getClass().getName() + )); + } + return (GenericRecord) generatedObject; } - private Object getOptionalValue( - final org.apache.kafka.connect.data.Schema schema, - final Object value - ) { - switch (schema.type()) { - case BOOLEAN: - case INT32: - case INT64: - case FLOAT64: - case STRING: - return value; - case ARRAY: - final List list = (List) value; - return list.stream() - .map(listItem -> getOptionalValue(schema.valueSchema(), listItem)) - .collect(Collectors.toList()); - case MAP: - final Map map = (Map) value; - return map.entrySet().stream() - .collect(Collectors.toMap( - k -> getOptionalValue(schema.keySchema(), k), - v -> getOptionalValue(schema.valueSchema(), v) - )); - case STRUCT: - final Struct struct = (Struct) value; - final Struct optionalStruct = new Struct(getOptionalSchema(schema)); - for (Field field : schema.fields()) { - optionalStruct.put( - field.name(), - getOptionalValue( - field.schema(), - struct.get(field.name()) - ) - ); - } - return optionalStruct; - default: - throw new ConnectException("Invalid value schema: " + schema + ", value = " + value); - } + @Override + public void stop() { + generateExecutor.shutdown(); } } diff --git a/src/test/java/io/confluent/kafka/connect/datagen/DatagenConnectorTest.java b/src/test/java/io/confluent/kafka/connect/datagen/DatagenConnectorTest.java index 196c5f00..07d0b7d7 100644 --- a/src/test/java/io/confluent/kafka/connect/datagen/DatagenConnectorTest.java +++ b/src/test/java/io/confluent/kafka/connect/datagen/DatagenConnectorTest.java @@ -220,6 +220,30 @@ public void shouldNotValidateSchemaKeyFieldWhenMultipleSchemaSourcesAreSet() { ); } + @Test + public void shouldAllowSettingGenerateTimeoutInRange() { + clearSchemaSources(); + config.put(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF, "100"); + Config validated = connector.validate(config); + assertThat(validated, hasNoValidationErrorsFor(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF)); + } + + @Test + public void shouldNotAllowSettingGenerateTimeoutNegative() { + clearSchemaSources(); + config.put(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF, "-1"); + Config validated = connector.validate(config); + assertThat(validated, hasValidationError(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF, 1)); + } + + @Test + public void shouldNotAllowSettingGenerateTimeoutOutOfRange() { + clearSchemaSources(); + config.put(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF, "70000"); + Config validated = connector.validate(config); + assertThat(validated, hasValidationError(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF, 1)); + } + protected void assertTaskConfigs(int maxTasks) { List> taskConfigs = connector.taskConfigs(maxTasks); assertEquals(maxTasks, taskConfigs.size()); diff --git a/src/test/java/io/confluent/kafka/connect/datagen/DatagenTaskTest.java b/src/test/java/io/confluent/kafka/connect/datagen/DatagenTaskTest.java index 39561fc6..ad5e739c 100644 --- a/src/test/java/io/confluent/kafka/connect/datagen/DatagenTaskTest.java +++ b/src/test/java/io/confluent/kafka/connect/datagen/DatagenTaskTest.java @@ -44,7 +44,9 @@ import org.apache.kafka.connect.storage.OffsetStorageReader; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -61,6 +63,9 @@ public class DatagenTaskTest { private static final AvroData AVRO_DATA = new AvroData(20); + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private Map config; private DatagenTask task; private List records; @@ -223,6 +228,36 @@ public void shouldFailToGenerateMoreRecordsThanSpecified() throws Exception { } } + /** + *

+ * This test is non-deterministic. This test might fail in case avro-random-generator library + * updates/changes its underlying implementation. + *

+ * @throws Exception + */ + @Test + public void shouldFailToGenerateComplexRegexWithTimeout() throws Exception { + createTaskWithSchemaAndTimeout("regex_schema_timeout.avro", "regex_key", "100"); + expectedException.expect(ConnectException.class); + expectedException.expectMessage("Record generation timed out"); + generateRecords(); + } + + /** + *

+ * This test is non-deterministic. This test might fail in case avro-random-generator library + * updates/changes its underlying implementation. + *

+ * @throws Exception + */ + @Test + public void shouldFailToGenerateComplexRegexWithStackOverflow() throws Exception { + createTaskWithSchemaAndTimeout("regex_schema_stackoverflow.avro", "regex_key", "100"); + expectedException.expect(ConnectException.class); + expectedException.expectMessage("Unable to generate random record"); + generateRecords(); + } + private void generateAndValidateRecordsFor(DatagenTask.Quickstart quickstart) throws Exception { createTaskWith(quickstart); generateRecords(); @@ -337,6 +372,17 @@ private void createTaskWithSchema(String schemaResourceFilename, String idFieldN loadKeyAndValueSchemas(schemaResourceFilename, idFieldName); } + private void createTaskWithSchemaAndTimeout(String schemaResourceFile, String idFieldName, + String timeout) { + dropSchemaSourceConfigs(); + config.put(DatagenConnectorConfig.SCHEMA_FILENAME_CONF, schemaResourceFile); + config.put(DatagenConnectorConfig.SCHEMA_KEYFIELD_CONF, idFieldName); + config.put(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF, timeout); + createTask(); + loadKeyAndValueSchemas(schemaResourceFile, idFieldName); + + } + private void createTaskWithSchemaText(String schemaText, String keyField) { dropSchemaSourceConfigs(); config.put(DatagenConnectorConfig.SCHEMA_STRING_CONF, schemaText); diff --git a/src/test/resources/regex_schema_stackoverflow.avro b/src/test/resources/regex_schema_stackoverflow.avro new file mode 100644 index 00000000..975adc96 --- /dev/null +++ b/src/test/resources/regex_schema_stackoverflow.avro @@ -0,0 +1,18 @@ +{ + "type": "record", + "name": "regex_record", + "fields": + [ + { + "name": "regex_key", + "type": + { + "type": "string", + "arg.properties": { + "regex": "\\S+_a+/", + "length": 100 + } + } + } + ] +} diff --git a/src/test/resources/regex_schema_timeout.avro b/src/test/resources/regex_schema_timeout.avro new file mode 100644 index 00000000..41cb0496 --- /dev/null +++ b/src/test/resources/regex_schema_timeout.avro @@ -0,0 +1,17 @@ +{ + "type": "record", + "name": "regex_record", + "fields": + [ + { + "name": "regex_key", + "type": + { + "type": "string", + "arg.properties": { + "regex": "(.{1,6})( *, *(.{1,6})){0,12}" + } + } + } + ] +}