diff --git a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java index 83699098..a1b91deb 100644 --- a/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java +++ b/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java @@ -18,6 +18,8 @@ import io.confluent.avro.random.generator.Generator; import io.confluent.connect.avro.AvroData; + +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -26,6 +28,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.*; import java.util.stream.Collectors; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericRecord; @@ -51,6 +54,9 @@ public class DatagenTask extends SourceTask { public static final String CURRENT_ITERATION = "current.iteration"; public static final String RANDOM_SEED = "random.seed"; + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private static final Duration timeout = Duration.ofMillis(100); + private DatagenConnectorConfig config; private String topic; @@ -167,7 +173,14 @@ public List poll() throws InterruptedException { } } - final Object generatedObject = generator.generate(); + final Object generatedObject; + Future generatedObjectFuture = executor.submit(() -> generator.generate()); + try { + generatedObject = generatedObjectFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (TimeoutException | ExecutionException | InterruptedException ex) { + throw new ConnectException("Task failed with error", ex); + } + if (!(generatedObject instanceof GenericRecord)) { throw new RuntimeException(String.format( "Expected Avro Random Generator to return instance of GenericRecord, found %s instead",