From 41a6f936cb9e11bff4b6dad59592479186c6cdfc Mon Sep 17 00:00:00 2001 From: Arjun Satish Date: Sat, 16 May 2020 17:11:43 -0700 Subject: [PATCH 1/3] POC for KIP-610 Signed-off-by: Arjun Satish --- .../connect/sink/ErrantRecordReporter.java | 28 ++++++++++++ .../kafka/connect/sink/SinkTaskContext.java | 9 ++++ .../connect/file/FileStreamSinkConnector.java | 9 +++- .../connect/file/FileStreamSinkTask.java | 25 +++++++++-- .../runtime/WorkerSinkTaskContext.java | 23 +++++++++- .../errors/RetryWithToleranceOperator.java | 38 +++++++++++++++- .../runtime/ErrorHandlingTaskTest.java | 43 +++++++++++++++++++ 7 files changed, 168 insertions(+), 7 deletions(-) create mode 100644 connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java new file mode 100644 index 0000000000000..3fe0110e237d9 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.sink; + +public interface ErrantRecordReporter { + + /** + * Serialize and produce records to the error topic + * + * @param record the errant record + */ + void report(SinkRecord record, Throwable error); + +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java index 340ef80485286..600cd2af7484b 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.TopicPartition; +import java.util.Collection; import java.util.Map; import java.util.Set; @@ -95,4 +96,12 @@ public interface SinkTaskContext { */ void requestCommit(); + /** + * Get the reporter to which the sink task can report problematic or + * failed {@link SinkRecord} passed to the {@link SinkTask#put} method. + * + * @return a errant record reporter + */ + ErrantRecordReporter reporter(); + } diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java index 136e8998c200c..711d384ec337b 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java @@ -23,6 +23,8 @@ import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; @@ -35,6 +37,8 @@ */ public class FileStreamSinkConnector extends SinkConnector { + private static final Logger log = LoggerFactory.getLogger(FileStreamSinkConnector.class); + public static final String FILE_CONFIG = "file"; private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Destination filename. If not specified, the standard output will be used"); @@ -43,11 +47,12 @@ public class FileStreamSinkConnector extends SinkConnector { @Override public String version() { - return AppInfoParser.getVersion(); + return "v2.4.1"; } @Override public void start(Map props) { + log.info("Starting connector {}", props); AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props); filename = parsedConfig.getString(FILE_CONFIG); } @@ -59,6 +64,7 @@ public Class taskClass() { @Override public List> taskConfigs(int maxTasks) { + log.info("Creating configs"); ArrayList> configs = new ArrayList<>(); for (int i = 0; i < maxTasks; i++) { Map config = new HashMap<>(); @@ -66,6 +72,7 @@ public List> taskConfigs(int maxTasks) { config.put(FILE_CONFIG, filename); configs.add(config); } + log.info("Returning configs"); return configs; } diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java index 3d1d2b8543529..5501938783f9a 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.slf4j.Logger; @@ -41,6 +42,7 @@ public class FileStreamSinkTask extends SinkTask { private String filename; private PrintStream outputStream; + private ErrantRecordReporter reporter; public FileStreamSinkTask() { } @@ -58,15 +60,16 @@ public String version() { @Override public void start(Map props) { + this.reporter = context.reporter(); filename = props.get(FileStreamSinkConnector.FILE_CONFIG); if (filename == null) { outputStream = System.out; } else { try { outputStream = new PrintStream( - Files.newOutputStream(Paths.get(filename), StandardOpenOption.CREATE, StandardOpenOption.APPEND), - false, - StandardCharsets.UTF_8.name()); + Files.newOutputStream(Paths.get(filename), StandardOpenOption.CREATE, StandardOpenOption.APPEND), + false, + StandardCharsets.UTF_8.name()); } catch (IOException e) { throw new ConnectException("Couldn't find or create file '" + filename + "' for FileStreamSinkTask", e); } @@ -75,9 +78,23 @@ public void start(Map props) { @Override public void put(Collection sinkRecords) { + if (sinkRecords.isEmpty()) { + log.info("Returning because empty topic"); + return; + } + for (SinkRecord record : sinkRecords) { log.trace("Writing line to {}: {}", logFilename(), record.value()); - outputStream.println(record.value()); + try { + outputStream.println(record.value()); + } catch (Throwable error) { + try { + ErrantRecordReporter reporter = context.reporter(); + reporter.report(record, error); + } catch (NoSuchMethodError|NoClassDefFoundError e) { + log.info("Boooooooooo!"); + } + } } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java index 3a6b0d6d7b8da..e1ff4345215cf 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java @@ -20,6 +20,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.IllegalWorkerStateException; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; +import org.apache.kafka.connect.runtime.errors.ErrorReporter; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,16 +43,26 @@ public class WorkerSinkTaskContext implements SinkTaskContext { private final ClusterConfigState configState; private final Set pausedPartitions; private boolean commitRequested; + // for kip-610 + private final ErrorReporter errorReporter; public WorkerSinkTaskContext(KafkaConsumer consumer, WorkerSinkTask sinkTask, - ClusterConfigState configState) { + ClusterConfigState configState, + ErrorReporter errorReporter) { this.offsets = new HashMap<>(); this.timeoutMs = -1L; this.consumer = consumer; this.sinkTask = sinkTask; this.configState = configState; this.pausedPartitions = new HashSet<>(); + this.errorReporter = errorReporter; + } + + public WorkerSinkTaskContext(KafkaConsumer consumer, + WorkerSinkTask sinkTask, + ClusterConfigState configState) { + this(consumer, sinkTask, configState, null); } @Override @@ -150,6 +162,15 @@ public void requestCommit() { commitRequested = true; } + @Override + public ErrantRecordReporter reporter() { + return (record, error) -> { + if (errorReporter != null) { + errorReporter.report(null); + } + }; + } + public boolean isCommitRequested() { return commitRequested; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java index 2513514475582..814e4ba9c4ead 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java @@ -21,8 +21,11 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.storage.Converter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,14 +76,23 @@ public class RetryWithToleranceOperator { private final Time time; private ErrorHandlingMetrics errorHandlingMetrics; + private Converter converter; + protected ProcessingContext context = new ProcessingContext(); public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis, - ToleranceType toleranceType, Time time) { + ToleranceType toleranceType, Time time, Converter converter) { this.errorRetryTimeout = errorRetryTimeout; this.errorMaxDelayInMillis = errorMaxDelayInMillis; this.errorToleranceType = toleranceType; this.time = time; + this.converter = converter; + } + + public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis, + ToleranceType toleranceType, Time time) { + // using default as jsonconverter + this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, new JsonConverter()); } /** @@ -110,6 +122,30 @@ public V execute(Operation operation, Stage stage, Class executingClas } } + /** + * Record an out of band failure on a previously submitted SinkRecord. This should typially happen in the put() + * method, but stage and executingClass have been parametrized to allow for errors in other SinkTask methods as + * well. + * + * @param stage the stage of failure + * @param executingClass the class that was executing the stage. + * @param record the record that failed to execute, and needs to be reported. + */ + public void executeFailed(Stage stage, Class executingClass, SinkRecord record) { + byte[] key = converter.fromConnectData(record.topic(), record.keySchema(), record.key()); + byte[] val = converter.fromConnectData(record.topic(), record.valueSchema(), record.value()); + context.consumerRecord(new ConsumerRecord<>(record.topic(), + record.kafkaPartition(), + record.kafkaOffset(), + // missing timestamp for now + key, + val + )); + + context.currentContext(stage, executingClass); + context.failed(); + } + /** * Attempt to execute an operation. Retry if a {@link RetriableException} is raised. Re-throw everything else. * diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 428b3e4f022f5..cabf7c5011175 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -29,12 +29,14 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; import org.apache.kafka.connect.runtime.errors.LogReporter; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.runtime.errors.Stage; import org.apache.kafka.connect.runtime.errors.ToleranceType; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.Plugins; @@ -67,6 +69,7 @@ import java.time.Duration; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -213,6 +216,46 @@ private RetryWithToleranceOperator operator() { return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS, OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE, SYSTEM); } + private static final Schema SCHEMA = SchemaBuilder.struct().name("Person") + .field("firstName", Schema.STRING_SCHEMA) + .field("lastName", Schema.STRING_SCHEMA) + .field("age", Schema.OPTIONAL_INT32_SCHEMA) + .field("bool", Schema.OPTIONAL_BOOLEAN_SCHEMA) + .field("short", Schema.OPTIONAL_INT16_SCHEMA) + .field("byte", Schema.OPTIONAL_INT8_SCHEMA) + .field("long", Schema.OPTIONAL_INT64_SCHEMA) + .field("float", Schema.OPTIONAL_FLOAT32_SCHEMA) + .field("double", Schema.OPTIONAL_FLOAT64_SCHEMA) + .field("modified", Timestamp.SCHEMA) + .build(); + + @Test + public void testLogReporter() { + Map reportProps = new HashMap<>(); + reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); + reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); + LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics); + + RetryWithToleranceOperator retryWithToleranceOperator = operator(); + retryWithToleranceOperator.metrics(errorHandlingMetrics); + retryWithToleranceOperator.reporters(singletonList(reporter)); + + final Struct struct = new Struct(SCHEMA) + .put("firstName", "Alex") + .put("lastName", "Smith") + .put("bool", true) + .put("short", (short) 1234) + .put("byte", (byte) -32) + .put("long", 12425436L) + .put("float", (float) 2356.3) + .put("double", -2436546.56457) + .put("age", 21) + .put("modified", new Date(1474661402123L)); + SinkRecord record = new SinkRecord(TOPIC, 1, null, null, SCHEMA, struct, 42); + + retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, SinkTask.class, record); + } + @Test public void testErrorHandlingInSourceTasks() throws Exception { Map reportProps = new HashMap<>(); From 19b1b5de02658040849d7b54dd957e7fa8fee77d Mon Sep 17 00:00:00 2001 From: Arjun Satish Date: Sat, 16 May 2020 18:01:04 -0700 Subject: [PATCH 2/3] POC added to FileStream sink connector Signed-off-by: Arjun Satish --- .../connect/file/FileStreamSinkTask.java | 26 +++++++++++------ .../connect/file/FileStreamSinkTaskTest.java | 28 +++++++++++++++++++ .../kafka/connect/runtime/WorkerSinkTask.java | 6 +++- .../runtime/WorkerSinkTaskContext.java | 8 ++++-- .../errors/RetryWithToleranceOperator.java | 15 ++++++++-- .../runtime/ErrorHandlingTaskTest.java | 7 ++++- 6 files changed, 73 insertions(+), 17 deletions(-) diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java index 5501938783f9a..88a3d72f08bda 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java @@ -53,6 +53,10 @@ public FileStreamSinkTask(PrintStream outputStream) { this.outputStream = outputStream; } + public void errantRecordReporter(ErrantRecordReporter reporter) { + this.reporter = reporter; + } + @Override public String version() { return new FileStreamSinkConnector().version(); @@ -85,16 +89,20 @@ public void put(Collection sinkRecords) { for (SinkRecord record : sinkRecords) { log.trace("Writing line to {}: {}", logFilename(), record.value()); - try { - outputStream.println(record.value()); - } catch (Throwable error) { - try { - ErrantRecordReporter reporter = context.reporter(); - reporter.report(record, error); - } catch (NoSuchMethodError|NoClassDefFoundError e) { - log.info("Boooooooooo!"); - } + String writableStr = String.valueOf(record.value()); + if ("fail".equalsIgnoreCase(writableStr)) { + reportBadRecord(record); + continue; } + outputStream.println(writableStr); + } + } + + private void reportBadRecord(SinkRecord record) { + try { + reporter.report(record, new RuntimeException("bad record, value=" + record.value())); + } catch (NoClassDefFoundError | NoSuchMethodError e) { + log.warn("Could not report error because of compatibility issues: {}", e.getMessage()); } } diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java index a5142a13324ce..5dec542f665f2 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java @@ -19,11 +19,18 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.easymock.Mock; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.powermock.api.easymock.PowerMock; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; @@ -33,6 +40,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -45,6 +53,8 @@ public class FileStreamSinkTaskTest { private ByteArrayOutputStream os; private PrintStream printStream; + private SinkTaskContext sinkTaskContext = EasyMock.mock(SinkTaskContext.class); + @Rule public TemporaryFolder topDir = new TemporaryFolder(); private String outputFile; @@ -60,6 +70,14 @@ public void setup() throws Exception { @Test public void testPutFlush() { + ErrantRecordReporter errorReporter = EasyMock.mock(ErrantRecordReporter.class); + errorReporter.report(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall(); + + EasyMock.replay(sinkTaskContext, errorReporter); + + task.errantRecordReporter(errorReporter); + HashMap offsets = new HashMap<>(); final String newLine = System.getProperty("line.separator"); @@ -74,17 +92,25 @@ public void testPutFlush() { task.put(Arrays.asList( new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line2", 2), + new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "fail", 3), new SinkRecord("topic2", 0, null, null, Schema.STRING_SCHEMA, "line3", 1) )); offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(2L)); offsets.put(new TopicPartition("topic2", 0), new OffsetAndMetadata(1L)); task.flush(offsets); assertEquals("line1" + newLine + "line2" + newLine + "line3" + newLine, os.toString()); + + EasyMock.verify(sinkTaskContext, errorReporter); } @Test public void testStart() throws IOException { + EasyMock.expect(sinkTaskContext.reporter()).andReturn(null); + + EasyMock.replay(sinkTaskContext); + task = new FileStreamSinkTask(); + task.initialize(sinkTaskContext); Map props = new HashMap<>(); props.put(FileStreamSinkConnector.FILE_CONFIG, outputFile); task.start(props); @@ -115,5 +141,7 @@ public void testStart() throws IOException { while (--i >= 0) { assertEquals("line" + i, lines[i]); } + + EasyMock.verify(sinkTaskContext); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 9a71a6658a3b3..ac08ab5a65de3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -136,7 +136,7 @@ public WorkerSinkTask(ConnectorTaskId id, public void initialize(TaskConfig taskConfig) { try { this.taskConfig = taskConfig.originalsStrings(); - this.context = new WorkerSinkTaskContext(consumer, this, configState); + this.context = new WorkerSinkTaskContext(consumer, this, configState, retryWithToleranceOperator); } catch (Throwable t) { log.error("{} Task failed initialization and will not be started.", this, t); onFailure(t); @@ -184,6 +184,10 @@ public void transitionTo(TargetState state) { consumer.wakeup(); } + public SinkTask sinkTask() { + return task; + } + @Override public void execute() { initializeAndStart(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java index e1ff4345215cf..d1d8bb52abf74 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java @@ -21,6 +21,8 @@ import org.apache.kafka.connect.errors.IllegalWorkerStateException; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.errors.ErrorReporter; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.runtime.errors.Stage; import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; @@ -44,12 +46,12 @@ public class WorkerSinkTaskContext implements SinkTaskContext { private final Set pausedPartitions; private boolean commitRequested; // for kip-610 - private final ErrorReporter errorReporter; + private final RetryWithToleranceOperator errorReporter; public WorkerSinkTaskContext(KafkaConsumer consumer, WorkerSinkTask sinkTask, ClusterConfigState configState, - ErrorReporter errorReporter) { + RetryWithToleranceOperator errorReporter) { this.offsets = new HashMap<>(); this.timeoutMs = -1L; this.consumer = consumer; @@ -166,7 +168,7 @@ public void requestCommit() { public ErrantRecordReporter reporter() { return (record, error) -> { if (errorReporter != null) { - errorReporter.report(null); + errorReporter.executeFailed(Stage.TASK_PUT, sinkTask.sinkTask().getClass(), record, error); } }; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java index 814e4ba9c4ead..d26f9b073c464 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -92,7 +93,13 @@ public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMi public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis, ToleranceType toleranceType, Time time) { // using default as jsonconverter - this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, new JsonConverter()); + this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, defaultConverter()); + } + + static Converter defaultConverter() { + JsonConverter converter = new JsonConverter(); + converter.configure(Collections.singletonMap("converter.type", "value")); + return converter; } /** @@ -130,8 +137,9 @@ public V execute(Operation operation, Stage stage, Class executingClas * @param stage the stage of failure * @param executingClass the class that was executing the stage. * @param record the record that failed to execute, and needs to be reported. + * @param error the reason for failure */ - public void executeFailed(Stage stage, Class executingClass, SinkRecord record) { + public void executeFailed(Stage stage, Class executingClass, SinkRecord record, Throwable error) { byte[] key = converter.fromConnectData(record.topic(), record.keySchema(), record.key()); byte[] val = converter.fromConnectData(record.topic(), record.valueSchema(), record.value()); context.consumerRecord(new ConsumerRecord<>(record.topic(), @@ -143,7 +151,8 @@ public void executeFailed(Stage stage, Class executingClass, SinkRecord r )); context.currentContext(stage, executingClass); - context.failed(); + context.error(error); + context.report(); } /** diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index cabf7c5011175..09e358732ff62 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; @@ -253,7 +254,11 @@ public void testLogReporter() { .put("modified", new Date(1474661402123L)); SinkRecord record = new SinkRecord(TOPIC, 1, null, null, SCHEMA, struct, 42); - retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, SinkTask.class, record); + PowerMock.replayAll(); + + retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, SinkTask.class, record, new ConnectException("blah")); + + PowerMock.verifyAll(); } @Test From 295e6a46925993060a60b79e646e06432480ed0d Mon Sep 17 00:00:00 2001 From: Arjun Satish Date: Sat, 16 May 2020 19:05:32 -0700 Subject: [PATCH 3/3] Comments describing experience when run with older runtime Added some comments showing errors we get when runing the FileSink connector with versions of the connect-runtime that do not have the new code (specifically, ran it in AK 2.5.0 running on the following JVMs: (1) OpenJDK 64-Bit Server VM, 11.0.1, 11.0.1+13, and (2) Java HotSpot(TM) 64-Bit Server VM, 1.8.0_151, 25.151-b12 Signed-off-by: Arjun Satish --- .../connect/file/FileStreamSinkTask.java | 40 +++++++++++++++++-- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java index 88a3d72f08bda..aef78c6b8d11c 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java @@ -64,7 +64,20 @@ public String version() { @Override public void start(Map props) { - this.reporter = context.reporter(); + /* + without this try catch block, we observe that the worker starts up correctly, but starting a connector, + and then the task fails with the error: + [2020-05-16 18:51:58,723] ERROR WorkerSinkTask{id=fs-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186) + java.lang.NoSuchMethodError: org.apache.kafka.connect.sink.SinkTaskContext.reporter()Lorg/apache/kafka/connect/sink/ErrantRecordReporter; + at org.apache.kafka.connect.file.FileStreamSinkTask.start(FileStreamSinkTask.java:67) + at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:305) + */ + try { + this.reporter = context.reporter(); + } catch (NoSuchMethodError e) { + log.warn("error reporter not available. running with an older runtime?"); + } + filename = props.get(FileStreamSinkConnector.FILE_CONFIG); if (filename == null) { outputStream = System.out; @@ -83,7 +96,7 @@ public void start(Map props) { @Override public void put(Collection sinkRecords) { if (sinkRecords.isEmpty()) { - log.info("Returning because empty topic"); + log.info("put() called with empty collection. returning."); return; } @@ -99,10 +112,31 @@ public void put(Collection sinkRecords) { } private void reportBadRecord(SinkRecord record) { + /* error raised here in older version of runtime is: + [2020-05-16 18:58:04,438] WARN Could not report error because of compatibility issues (org.apache.kafka.connect.file.FileStreamSinkTask:116) + java.lang.NoClassDefFoundError: org/apache/kafka/connect/sink/ErrantRecordReporter + at org.apache.kafka.connect.file.FileStreamSinkTask.reportBadRecord(FileStreamSinkTask.java:114) + at org.apache.kafka.connect.file.FileStreamSinkTask.put(FileStreamSinkTask.java:105) + at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546) + at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326) + at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228) + at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196) + at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184) + at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) + at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) + at java.util.concurrent.FutureTask.run(FutureTask.java:266) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) + at java.lang.Thread.run(Thread.java:748) + Caused by: java.lang.ClassNotFoundException: org.apache.kafka.connect.sink.ErrantRecordReporter + at java.net.URLClassLoader.findClass(URLClassLoader.java:381) + at java.lang.ClassLoader.loadClass(ClassLoader.java:424) + */ try { reporter.report(record, new RuntimeException("bad record, value=" + record.value())); } catch (NoClassDefFoundError | NoSuchMethodError e) { - log.warn("Could not report error because of compatibility issues: {}", e.getMessage()); + // in a real connector, we would probably not want to print out the entire stacktrace. + log.warn("Could not report error because of compatibility issues", e); } }