diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index a5af5b60093d9..6deeb072eeec2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -1036,7 +1036,7 @@ public List partitionsFor(String topic) { * If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS) * will be called instead. We do this because the sender thread would otherwise try to join itself and * block forever. - *

+ *

. * * @throws InterruptException If the thread is interrupted while blocked */ diff --git a/connect/api/src/main/java/org/apache/kafka/connect/handlers/ErrorHandler.java b/connect/api/src/main/java/org/apache/kafka/connect/handlers/ErrorHandler.java new file mode 100644 index 0000000000000..76224b7ca631b --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/handlers/ErrorHandler.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.handlers; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.SchemaAndValue; + +import java.io.Closeable; +import java.util.Map; + +public interface ErrorHandler extends Configurable, Closeable { + + /** + * Initialize the handler with connector, worker and handler config. The connector and worker configs are only + * used for reporting purposes. the handler config is used to configure this instance of the handler. + */ + void init(); + + /** + * @return the ConfigDef for this handler + */ + ConfigDef config(); + + /** + * This method is called for any error which occurs during the processing of a record in a Connect task. + * + * @param context the processing context + * @return a directive on how to handle this error. + */ + ErrorHandlerResponse onError(ProcessingContext context); + + /** + * Flush any outstanding data, and close this handler. + */ + @Override + void close(); +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/handlers/ErrorHandlerResponse.java b/connect/api/src/main/java/org/apache/kafka/connect/handlers/ErrorHandlerResponse.java new file mode 100644 index 0000000000000..2de32948a18f9 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/handlers/ErrorHandlerResponse.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.handlers; + +/** + * A directive from the error handler to the connect framework on how to handle a given error. + */ +public enum ErrorHandlerResponse { + + /** + * retry the previous operation + */ + RETRY(1), + + /** + * drop the record and move to the next one + */ + SKIP(2), + + /** + * throw an Exception, and kill the task + */ + FAIL(3); + + private final int id; + + ErrorHandlerResponse(int id) { + this.id = id; + } + + public int id() { + return id; + } +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/handlers/ProcessingContext.java b/connect/api/src/main/java/org/apache/kafka/connect/handlers/ProcessingContext.java new file mode 100644 index 0000000000000..0f1427bb6d4a9 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/handlers/ProcessingContext.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.handlers; + +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Struct; + +import java.util.List; +import java.util.Map; + +/** + * This object will contain all the runtime context for an error which occurs in the Connect framework while + * processing a record. + */ +public interface ProcessingContext { + + /** + * @return the configuration of the Connect worker + */ + Map workerConfig(); + + /** + * @return which task reported this error + */ + String taskId(); + + /** + * @return an ordered list of stages. Connect will start with executing stage 0 and then move up the list. + */ + List stages(); + + /** + * @return at what stage did this operation fail (0 indicates first stage) + */ + int index(); + + /** + * @return which attempt was this (first error will be 0) + */ + int attempt(); + + /** + * @return the (epoch) time of failure + */ + long timeOfError(); + + /** + * The exception accompanying this failure (if any) + */ + Exception exception(); + + /** + * @return the record which when input the current stage caused the failure. + */ + ConnectRecord record(); + + /** + * create a {@link Struct} from the various parameters in this Context object. + */ + Struct toStruct(); +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/handlers/Stage.java b/connect/api/src/main/java/org/apache/kafka/connect/handlers/Stage.java new file mode 100644 index 0000000000000..548e92b561df9 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/handlers/Stage.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.handlers; + +import java.util.Map; + +public interface Stage { + + /** + * @return at what stage in processing did the error happen + */ + StageType type(); + + /** + * @return name of the class executing this stage. + */ + Class executingClass(); + + /** + * @return properties used to configure this stage + */ + Map config(); +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/handlers/StageType.java b/connect/api/src/main/java/org/apache/kafka/connect/handlers/StageType.java new file mode 100644 index 0000000000000..4d94c06e26b8a --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/handlers/StageType.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.handlers; + +/** + * A logical stage in a Connect pipeline + */ +public enum StageType { + + /** + * When the task starts up + */ + TASK_START, + + /** + * when running any transform operation on a record + */ + TRANSFORMATION, + + /** + * when calling the poll() method on a SourceConnector + */ + TASK_POLL, + + /** + * when calling the put() method on a SinkConnector + */ + TASK_PUT, + + /** + * when using the key converter to serialize/deserialize keys in ConnectRecords + */ + KEY_CONVERTER, + + /** + * when using the value converter to serialize/deserialize values in ConnectRecords + */ + VALUE_CONVERTER, + + /** + * when using the header converter to serialize/deserialize headers in ConnectRecords + */ + HEADER_CONVERTER, + + /** + * when the worker is committing offsets for the task + */ + COMMIT_OFFSETS, + + /** + * When the task is shutting down + */ + TASK_CLOSE, + + /** + * Producing to Kafka topic + */ + KAFKA_PRODUCE, + + /** + * Consuming from a Kafka topic + */ + KAFKA_CONSUME +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java index e1d8b1f2e856a..530fa1f32587c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java @@ -17,6 +17,10 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.handlers.ErrorHandler; +import org.apache.kafka.connect.handlers.ProcessingContext; +import org.apache.kafka.connect.runtime.handlers.LogAndFailHandler; import org.apache.kafka.connect.transforms.Transformation; import java.util.Collections; @@ -26,17 +30,33 @@ public class TransformationChain> { private final List> transformations; + private final ErrorHandler errorHandler; - public TransformationChain(List> transformations) { + public TransformationChain(List> transformations, ErrorHandler errorHandler) { this.transformations = transformations; + this.errorHandler = errorHandler; } public R apply(R record) { if (transformations.isEmpty()) return record; for (Transformation transformation : transformations) { - record = transformation.apply(record); - if (record == null) break; + boolean failed = true; + while (failed) { + try { + record = transformation.apply(record); + failed = false; + if (record == null) break; + } catch (Exception e) { + ProcessingContext p = null; + switch (errorHandler.onError(p)) { + case FAIL: throw e; + case SKIP: return null; + case RETRY: break; + default: throw new ConnectException("Unknown error handler response"); + } + } + } } return record; @@ -62,7 +82,8 @@ public int hashCode() { } public static > TransformationChain noOp() { - return new TransformationChain(Collections.>emptyList()); + return new TransformationChain(Collections.>emptyList(), + new LogAndFailHandler()); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 1c6465855ff7b..1b0ab72906484 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -28,8 +28,11 @@ import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.handlers.StageType; import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; +import org.apache.kafka.connect.runtime.handlers.StageBuilder; +import org.apache.kafka.connect.runtime.handlers.TaskProcessingContext; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage; import org.apache.kafka.connect.sink.SinkRecord; @@ -374,6 +377,7 @@ public boolean startTask( final TaskConfig taskConfig = new TaskConfig(taskProps); final Class taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class); final Task task = plugins.newTask(taskClass); + final TaskProcessingContext.Builder processingContextBuilder = TaskProcessingContext.newBuilder(config); log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName()); // By maintaining connector's specific class loader for this thread here, we first @@ -395,24 +399,43 @@ public boolean startTask( WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.CURRENT_CLASSLOADER ); + + StageBuilder keyConverterStage = new StageBuilder(StageType.KEY_CONVERTER); if (keyConverter == null) { keyConverter = plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); + keyConverterStage.setConfig(config.originalsWithPrefix(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG + ".")); log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id); } else { + keyConverterStage.setConfig(connConfig.originalsWithPrefix(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG + ".")); log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id); } + keyConverterStage.setClass(keyConverter.getClass()); + + StageBuilder valueConverterStage = new StageBuilder(StageType.VALUE_CONVERTER); if (valueConverter == null) { valueConverter = plugins.newConverter(config, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); + valueConverterStage.setConfig(config.originalsWithPrefix(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG + ".")); log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), id); } else { + valueConverterStage.setConfig(connConfig.originalsWithPrefix(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG + ".")); log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id); } + valueConverterStage.setClass(valueConverter.getClass()); + + StageBuilder headerConverterStage = new StageBuilder(StageType.HEADER_CONVERTER); if (headerConverter == null) { headerConverter = plugins.newHeaderConverter(config, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); + headerConverterStage.setConfig(config.originalsWithPrefix(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG + ".")); log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), id); } else { + headerConverterStage.setConfig(connConfig.originalsWithPrefix(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG + ".")); log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), id); } + headerConverterStage.setClass(headerConverter.getClass()); + + processingContextBuilder.appendStage(keyConverterStage); + processingContextBuilder.appendStage(valueConverterStage); + processingContextBuilder.appendStage(headerConverterStage); workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, connectorLoader); workerTask.initialize(taskConfig); @@ -450,7 +473,7 @@ private WorkerTask buildWorkerTask(ConnectorConfig connConfig, ClassLoader loader) { // Decide which type of worker task we need based on the type of task. if (task instanceof SourceTask) { - TransformationChain transformationChain = new TransformationChain<>(connConfig.transformations()); + TransformationChain transformationChain = new TransformationChain<>(connConfig.transformations(), null); OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(), internalKeyConverter, internalValueConverter); OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(), @@ -459,7 +482,7 @@ private WorkerTask buildWorkerTask(ConnectorConfig connConfig, return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, metrics, loader, time); } else if (task instanceof SinkTask) { - TransformationChain transformationChain = new TransformationChain<>(connConfig.transformations()); + TransformationChain transformationChain = new TransformationChain<>(connConfig.transformations(), null); return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, metrics, keyConverter, valueConverter, headerConverter, transformationChain, loader, time); } else { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 2ba785c4668dd..af2bbc45aa7ac 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -37,9 +37,12 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.handlers.ErrorHandler; +import org.apache.kafka.connect.handlers.ErrorHandlerResponse; import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; +import org.apache.kafka.connect.runtime.handlers.TaskProcessingContext; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.storage.Converter; @@ -89,6 +92,8 @@ class WorkerSinkTask extends WorkerTask { private boolean pausedForRedelivery; private boolean committing; + private final TaskProcessingContext pctx; + public WorkerSinkTask(ConnectorTaskId id, SinkTask task, TaskStatus.Listener statusListener, @@ -123,6 +128,7 @@ public WorkerSinkTask(ConnectorTaskId id, this.commitFailures = 0; this.sinkTaskMetricsGroup = new SinkTaskMetricsGroup(id, connectMetrics); this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno); + this.pctx = null; } @Override @@ -421,17 +427,27 @@ public String toString() { } private ConsumerRecords pollConsumer(long timeoutMs) { - ConsumerRecords msgs = consumer.poll(timeoutMs); + ErrorHandler handler = null; + boolean retry; + do { + try { + ConsumerRecords msgs = consumer.poll(timeoutMs); - // Exceptions raised from the task during a rebalance should be rethrown to stop the worker - if (rebalanceException != null) { - RuntimeException e = rebalanceException; - rebalanceException = null; - throw e; - } + // Exceptions raised from the task during a rebalance should be rethrown to stop the worker + if (rebalanceException != null) { + RuntimeException e = rebalanceException; + rebalanceException = null; + throw e; + } + + sinkTaskMetricsGroup.recordRead(msgs.count()); + return msgs; + } catch (org.apache.kafka.common.errors.RetriableException e) { + retry = handler.onError(pctx) == ErrorHandlerResponse.RETRY; + } + } while (retry); - sinkTaskMetricsGroup.recordRead(msgs.count()); - return msgs; + throw new ConnectException("Unexpected"); } private KafkaConsumer createConsumer() { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 5e9707aa29e9c..168561b20bb1c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -1081,7 +1081,7 @@ public void onConnectorConfigRemove(String connector) { public void onConnectorConfigUpdate(String connector) { log.info("Connector {} config updated", connector); - // Stage the update and wake up the work thread. Connector config *changes* only need the one connector + // StageType the update and wake up the work thread. Connector config *changes* only need the one connector // to be bounced. However, this callback may also indicate a connector *addition*, which does require // a rebalance, so we need to be careful about what operation we request. synchronized (DistributedHerder.this) { @@ -1096,7 +1096,7 @@ public void onConnectorConfigUpdate(String connector) { public void onTaskConfigUpdate(Collection tasks) { log.info("Tasks {} configs updated", tasks); - // Stage the update and wake up the work thread. No need to record the set of tasks here because task reconfigs + // StageType the update and wake up the work thread. No need to record the set of tasks here because task reconfigs // always need a rebalance to ensure offsets get committed. // TODO: As an optimization, some task config updates could avoid a rebalance. In particular, single-task // connectors clearly don't need any coordination. diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/DLQHandler.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/DLQHandler.java new file mode 100644 index 0000000000000..d4c73c8acaf97 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/DLQHandler.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.handlers; + +public class DLQHandler extends RetryNTimesHandler { + + + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/DLQHandlerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/DLQHandlerConfig.java new file mode 100644 index 0000000000000..df67bb09dca9e --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/DLQHandlerConfig.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.handlers; + +import org.apache.kafka.common.config.ConfigDef; + +import java.util.Map; + +public class DLQHandlerConfig extends RetryNTimesHandlerConfig { + + protected static final String BOOTSTRAP_SERVERS = "bootstrap.servers"; + protected static final String BOOTSTRAP_SERVERS_DOC = "bootstrap servers for the Kafka cluster which will contain the DLQ topic"; + + protected static final String DLQ_TOPIC = "dlq.topic"; + protected static final String DLQ_TOPIC_DOC = "name of the topic where bad records along with the failure context will be written to"; + + protected static final String DLQ_PARTITIONS = "dlq.partitions"; + protected static final int DLQ_PARTITIONS_DEFAULT = 5; + protected static final String DLQ_PARTITIONS_DOC = "number of partitions for the DLQ topic"; + + protected static final String DLQ_REPLICATION_FACTOR = "dlq.replication_factor"; + protected static final int DLQ_REPLICATION_FACTOR_DEFAULT = 3; + protected static final String DLQ_REPLICATION_FACTOR_DOC = "the replication factor for the DLQ topic"; + + static ConfigDef DLQCONFIG = new ConfigDef(CONFIG) + .define(BOOTSTRAP_SERVERS, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, BOOTSTRAP_SERVERS_DOC) + .define(DLQ_TOPIC, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, DLQ_TOPIC_DOC) + .define(DLQ_PARTITIONS, ConfigDef.Type.STRING, DLQ_PARTITIONS_DEFAULT, + ConfigDef.Range.atLeast(0), ConfigDef.Importance.MEDIUM, BOOTSTRAP_SERVERS_DOC) + .define(DLQ_REPLICATION_FACTOR, ConfigDef.Type.STRING, DLQ_REPLICATION_FACTOR_DEFAULT, + ConfigDef.Range.atLeast(0), ConfigDef.Importance.MEDIUM, BOOTSTRAP_SERVERS_DOC); + + public DLQHandlerConfig(Map originals) { + super(DLQCONFIG, originals); + } + + public String bootstrapServers() { + return getString(BOOTSTRAP_SERVERS); + } + + public String topic() { + return getString(DLQ_TOPIC); + } + + public int dlqTopicReplicationFactor() { + return getInt(DLQ_REPLICATION_FACTOR); + } + + public int dlqTopicNumPartitions() { + return getInt(DLQ_PARTITIONS); + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/HandlerUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/HandlerUtil.java new file mode 100644 index 0000000000000..d049321b42ecd --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/HandlerUtil.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.handlers; + +public class HandlerUtil { + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/LogAndFailHandler.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/LogAndFailHandler.java new file mode 100644 index 0000000000000..1619e8cd174b8 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/LogAndFailHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.handlers; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.handlers.ErrorHandler; +import org.apache.kafka.connect.handlers.ErrorHandlerResponse; +import org.apache.kafka.connect.handlers.ProcessingContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class LogAndFailHandler implements ErrorHandler { + + private static final Logger log = LoggerFactory.getLogger(LogAndFailHandler.class); + + private final ConfigDef configDef = new ConfigDef(); + + @Override + public void configure(Map configs) { + } + + @Override + public void init() { + } + + @Override + public ConfigDef config() { + return configDef; + } + + @Override + public ErrorHandlerResponse onError(ProcessingContext context) { + log.info("Task failure. context={}", context); + return ErrorHandlerResponse.FAIL; + } + + @Override + public void close() { + + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/ProcessingContextImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/ProcessingContextImpl.java new file mode 100644 index 0000000000000..12587b28cc3aa --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/ProcessingContextImpl.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.handlers; + +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.handlers.ProcessingContext; +import org.apache.kafka.connect.handlers.Stage; + +import java.util.List; +import java.util.Map; + +public class ProcessingContextImpl implements ProcessingContext { + + private final int attempt; + + public ProcessingContextImpl(int attempt) { + this.attempt = attempt; + } + + @Override + public Map connectorConfig() { + return null; + } + + @Override + public Map workerConfig() { + return null; + } + + @Override + public List stages() { + return null; + } + + @Override + public String taskId() { + return null; + } + + @Override + public int index() { + return 0; + } + + @Override + public int attempt() { + return attempt; + } + + @Override + public long timeOfFailure() { + return 0; + } + + @Override + public Exception exception() { + return null; + } + + @Override + public ConnectRecord record() { + return null; + } + + @Override + public Struct toStruct() { + SchemaBuilder builder = new SchemaBuilder(Schema.Type.STRUCT).field("attempt", SchemaBuilder.int32()); + Struct struct = new Struct(builder.build()); + struct.put("attempt", attempt()); + return struct; + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/Retry.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/Retry.java new file mode 100644 index 0000000000000..e5dc17e1585e4 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/Retry.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.handlers; + +/** + * What to do in between retries. + */ +public enum Retry { + + SLEEPING_WAIT { + @Override + public void sleep() { + + } + }; + + public abstract void sleep(); + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/RetryNTimesHandler.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/RetryNTimesHandler.java new file mode 100644 index 0000000000000..24076f8b132c1 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/RetryNTimesHandler.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.handlers; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.handlers.ErrorHandler; +import org.apache.kafka.connect.handlers.ErrorHandlerResponse; +import org.apache.kafka.connect.handlers.ProcessingContext; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +public class RetryNTimesHandler implements ErrorHandler { + + private static final Logger log = LoggerFactory.getLogger(RetryNTimesHandler.class); + + private RetryNTimesHandlerConfig config; + private Time time; + + public RetryNTimesHandler() { + time = new SystemTime(); + } + + RetryNTimesHandler(Time time) { + this.time = time; + } + + @Override + public void configure(Map configs) { + config = new RetryNTimesHandlerConfig(configs); + } + + @Override + public void init() { + } + + @Override + public ConfigDef config() { + return RetryNTimesHandlerConfig.CONFIG; + } + + @Override + public ErrorHandlerResponse onError(ProcessingContext context) { + if (context.attempt() > config.maxAttempts()) { + return ErrorHandlerResponse.FAIL; + } + + // wait + int delay; + if (config.isSleepEnabled()) { + delay = config.minDelay() << (context.attempt() - 1); + if (delay < config.maxDelay()) { + time.sleep(delay); + } else { + delay = ThreadLocalRandom.current().nextInt(config.minDelay(), delay); + } + log.info("ErrorHandler called with context={}. Sleeping for {} millis.", context, delay); + time.sleep(delay); + } + + // return retry + return ErrorHandlerResponse.RETRY; + } + + @Override + public void close() { + + } + + public static void main(String[] args) { + Map confMap = new HashMap<>(); + confMap.put("name", "dummy"); + confMap.put("tasks.max", "1"); + confMap.put("connector.class", "xyz"); + confMap.put("error_handling.strategy.class", "org.apache.kafka.connect.runtime.handlers.RetryNTimesHandler"); + confMap.put("error_handling.attempts.max", "10"); + + ConnectorConfig connectorConfig = new ConnectorConfig(null, confMap); + RetryNTimesHandler handler = new RetryNTimesHandler(); + + handler.configure(connectorConfig.originalsWithPrefix("error_handling.")); + handler.init(); + for (int i=0; i<10; i++) { + handler.onError(new ProcessingContextImpl(i + 1)); + } + } + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/RetryNTimesHandlerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/RetryNTimesHandlerConfig.java new file mode 100644 index 0000000000000..5c45413f6e6f2 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/RetryNTimesHandlerConfig.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.handlers; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import java.util.Map; + +public class RetryNTimesHandlerConfig extends AbstractConfig { + + private static final String MAX_ATTEMPTS = "attempts.max"; + private static final int MAX_ATTEMPTS_DEFAULT = 5; + private static final String MAX_ATTEMPTS_DOC = "the maximum number of times we want to reattempt an operation."; + + private static final String SLEEP_ON_ERROR = "sleep.enable"; + private static final boolean SLEEP_ON_ERROR_DEFAULT = true; + private static final String SLEEP_ON_ERROR_DOC = "if false, the call to onError will immediately return."; + + private static final String INIT_DELAY = "delay.min.millis"; + private static final int INIT_DELAY_DEFAULT = 50; + private static final String INIT_DELAY_DOC = "amount of time to wait for the first failure"; + + private static final String FINAL_DELAY = "delay.max.millis"; + private static final int FINAL_DELAY_DEFAULT = 60000; + private static final String FINAL_DELAY_DOC = "amount of time to wait for the final failure"; + + protected static ConfigDef CONFIG = new ConfigDef() + .define(MAX_ATTEMPTS, ConfigDef.Type.INT, MAX_ATTEMPTS_DEFAULT, + ConfigDef.Range.atLeast(0), ConfigDef.Importance.MEDIUM, MAX_ATTEMPTS_DOC) + .define(SLEEP_ON_ERROR, ConfigDef.Type.BOOLEAN, SLEEP_ON_ERROR_DEFAULT, + ConfigDef.Importance.LOW, SLEEP_ON_ERROR_DOC) + .define(INIT_DELAY, ConfigDef.Type.INT, INIT_DELAY_DEFAULT, + ConfigDef.Range.atLeast(0), ConfigDef.Importance.MEDIUM, INIT_DELAY_DOC) + .define(FINAL_DELAY, ConfigDef.Type.INT, FINAL_DELAY_DEFAULT, + ConfigDef.Range.atLeast(0), ConfigDef.Importance.MEDIUM, FINAL_DELAY_DOC); + + public RetryNTimesHandlerConfig(Map originals) { + super(CONFIG, originals); + } + + public RetryNTimesHandlerConfig(ConfigDef configDef, Map originals) { + super(configDef, originals); + } + + public int maxAttempts() { + return getInt(MAX_ATTEMPTS); + } + + public boolean isSleepEnabled() { + return getBoolean(SLEEP_ON_ERROR); + } + + public int minDelay() { + return getInt(INIT_DELAY); + } + + public int maxDelay() { + return getInt(FINAL_DELAY); + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/StageBuilder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/StageBuilder.java new file mode 100644 index 0000000000000..c33395fd1745d --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/StageBuilder.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.handlers; + +import org.apache.kafka.connect.handlers.Stage; +import org.apache.kafka.connect.handlers.StageType; + +import java.util.Map; +import java.util.Objects; + +public class StageBuilder { + + private final StageType type; + private Map originals; + private Class klass; + + public StageBuilder(StageType type) { + Objects.requireNonNull(type); + this.type = type; + } + + public StageBuilder setConfig(Map originals) { + Objects.requireNonNull(originals); + this.originals = originals; + return this; + } + + public StageBuilder setClass(Class klass) { + Objects.requireNonNull(klass); + this.klass = klass; + return this; + } + + public Stage build() { + return new StageImpl(type, originals, klass); + } + + private static class StageImpl implements Stage { + + private final StageType type; + private final Map originals; + private final Class klass; + + public StageImpl(StageType type, Map originals, Class klass) { + this.type = type; + this.originals = originals; + this.klass = klass; + } + + @Override + public StageType type() { + return type; + } + + @Override + public Class executingClass() { + return klass; + } + + @Override + public Map config() { + return originals; + } + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/TaskProcessingContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/TaskProcessingContext.java new file mode 100644 index 0000000000000..396b4b1aac907 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/handlers/TaskProcessingContext.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.handlers; + +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.handlers.ProcessingContext; +import org.apache.kafka.connect.handlers.Stage; +import org.apache.kafka.connect.runtime.WorkerConfig; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class TaskProcessingContext implements ProcessingContext { + + private final List stages; + private final Map workerConfig; + + private int attempt = 1; + private int index = 0; + + public static Builder newBuilder(WorkerConfig config) { + return new Builder(config.originals()); + } + + private TaskProcessingContext(Map workerConfig, List stages) { + this.stages = stages; + this.workerConfig = workerConfig; + } + + public void markAsFailed() { + attempt++; + } + + public void markAsSuccess() { + attempt = 1; + index++; + } + + @Override + public Map workerConfig() { + return workerConfig; + } + + @Override + public List stages() { + return stages; + } + + @Override + public String taskId() { + return null; + } + + @Override + public int index() { + return index; + } + + @Override + public int attempt() { + return attempt; + } + + @Override + public long timeOfError() { + return 0; + } + + @Override + public Exception exception() { + return null; + } + + @Override + public ConnectRecord record() { + return null; + } + + @Override + public Struct toStruct() { + SchemaBuilder builder = new SchemaBuilder(Schema.Type.STRUCT).field("attempt", SchemaBuilder.int32()); + Struct struct = new Struct(builder.build()); + struct.put("attempt", attempt()); + return struct; + } + + public static class Builder { + + private final Map workerConfig; + LinkedList stages = new LinkedList<>(); + + private Builder(Map workerConfig) { + this.workerConfig = workerConfig; + } + + public void prependStage(StageBuilder builder) { + stages.addFirst(builder.build()); + } + + public void appendStage(StageBuilder builder) { + stages.addLast(builder.build()); + } + + public TaskProcessingContext build() { + return new TaskProcessingContext(workerConfig, stages); + } + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/LogUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/LogUtil.java new file mode 100644 index 0000000000000..cdc714655d094 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/LogUtil.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Properties; + +public class LogUtil { + + private static final Logger log = LoggerFactory.getLogger(LogUtil.class); + + public static void main(String[] args) { + System.out.println(log.isTraceEnabled()); + System.out.println(log.isDebugEnabled()); + System.out.println(log.isInfoEnabled()); + System.out.println(log.isErrorEnabled()); + + final Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + // Create the consumer using props. + final KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList("hello")); + log.info("Subscribed"); + int count = 0; + while (count++ < 100) { + ConsumerRecords rec = consumer.poll(10000); + log.info("Read {} records", rec.count()); + } + consumer.close(); + } +}