diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java new file mode 100644 index 0000000000000..3fe0110e237d9 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.sink; + +public interface ErrantRecordReporter { + + /** + * Serialize and produce records to the error topic + * + * @param record the errant record + */ + void report(SinkRecord record, Throwable error); + +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java index 340ef80485286..600cd2af7484b 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.TopicPartition; +import java.util.Collection; import java.util.Map; import java.util.Set; @@ -95,4 +96,12 @@ public interface SinkTaskContext { */ void requestCommit(); + /** + * Get the reporter to which the sink task can report problematic or + * failed {@link SinkRecord} passed to the {@link SinkTask#put} method. + * + * @return a errant record reporter + */ + ErrantRecordReporter reporter(); + } diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java index 136e8998c200c..711d384ec337b 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java @@ -23,6 +23,8 @@ import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; @@ -35,6 +37,8 @@ */ public class FileStreamSinkConnector extends SinkConnector { + private static final Logger log = LoggerFactory.getLogger(FileStreamSinkConnector.class); + public static final String FILE_CONFIG = "file"; private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Destination filename. If not specified, the standard output will be used"); @@ -43,11 +47,12 @@ public class FileStreamSinkConnector extends SinkConnector { @Override public String version() { - return AppInfoParser.getVersion(); + return "v2.4.1"; } @Override public void start(Map props) { + log.info("Starting connector {}", props); AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props); filename = parsedConfig.getString(FILE_CONFIG); } @@ -59,6 +64,7 @@ public Class taskClass() { @Override public List> taskConfigs(int maxTasks) { + log.info("Creating configs"); ArrayList> configs = new ArrayList<>(); for (int i = 0; i < maxTasks; i++) { Map config = new HashMap<>(); @@ -66,6 +72,7 @@ public List> taskConfigs(int maxTasks) { config.put(FILE_CONFIG, filename); configs.add(config); } + log.info("Returning configs"); return configs; } diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java index 3d1d2b8543529..aef78c6b8d11c 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.slf4j.Logger; @@ -41,6 +42,7 @@ public class FileStreamSinkTask extends SinkTask { private String filename; private PrintStream outputStream; + private ErrantRecordReporter reporter; public FileStreamSinkTask() { } @@ -51,6 +53,10 @@ public FileStreamSinkTask(PrintStream outputStream) { this.outputStream = outputStream; } + public void errantRecordReporter(ErrantRecordReporter reporter) { + this.reporter = reporter; + } + @Override public String version() { return new FileStreamSinkConnector().version(); @@ -58,15 +64,29 @@ public String version() { @Override public void start(Map props) { + /* + without this try catch block, we observe that the worker starts up correctly, but starting a connector, + and then the task fails with the error: + [2020-05-16 18:51:58,723] ERROR WorkerSinkTask{id=fs-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186) + java.lang.NoSuchMethodError: org.apache.kafka.connect.sink.SinkTaskContext.reporter()Lorg/apache/kafka/connect/sink/ErrantRecordReporter; + at org.apache.kafka.connect.file.FileStreamSinkTask.start(FileStreamSinkTask.java:67) + at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:305) + */ + try { + this.reporter = context.reporter(); + } catch (NoSuchMethodError e) { + log.warn("error reporter not available. running with an older runtime?"); + } + filename = props.get(FileStreamSinkConnector.FILE_CONFIG); if (filename == null) { outputStream = System.out; } else { try { outputStream = new PrintStream( - Files.newOutputStream(Paths.get(filename), StandardOpenOption.CREATE, StandardOpenOption.APPEND), - false, - StandardCharsets.UTF_8.name()); + Files.newOutputStream(Paths.get(filename), StandardOpenOption.CREATE, StandardOpenOption.APPEND), + false, + StandardCharsets.UTF_8.name()); } catch (IOException e) { throw new ConnectException("Couldn't find or create file '" + filename + "' for FileStreamSinkTask", e); } @@ -75,9 +95,48 @@ public void start(Map props) { @Override public void put(Collection sinkRecords) { + if (sinkRecords.isEmpty()) { + log.info("put() called with empty collection. returning."); + return; + } + for (SinkRecord record : sinkRecords) { log.trace("Writing line to {}: {}", logFilename(), record.value()); - outputStream.println(record.value()); + String writableStr = String.valueOf(record.value()); + if ("fail".equalsIgnoreCase(writableStr)) { + reportBadRecord(record); + continue; + } + outputStream.println(writableStr); + } + } + + private void reportBadRecord(SinkRecord record) { + /* error raised here in older version of runtime is: + [2020-05-16 18:58:04,438] WARN Could not report error because of compatibility issues (org.apache.kafka.connect.file.FileStreamSinkTask:116) + java.lang.NoClassDefFoundError: org/apache/kafka/connect/sink/ErrantRecordReporter + at org.apache.kafka.connect.file.FileStreamSinkTask.reportBadRecord(FileStreamSinkTask.java:114) + at org.apache.kafka.connect.file.FileStreamSinkTask.put(FileStreamSinkTask.java:105) + at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546) + at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326) + at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228) + at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196) + at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184) + at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) + at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) + at java.util.concurrent.FutureTask.run(FutureTask.java:266) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) + at java.lang.Thread.run(Thread.java:748) + Caused by: java.lang.ClassNotFoundException: org.apache.kafka.connect.sink.ErrantRecordReporter + at java.net.URLClassLoader.findClass(URLClassLoader.java:381) + at java.lang.ClassLoader.loadClass(ClassLoader.java:424) + */ + try { + reporter.report(record, new RuntimeException("bad record, value=" + record.value())); + } catch (NoClassDefFoundError | NoSuchMethodError e) { + // in a real connector, we would probably not want to print out the entire stacktrace. + log.warn("Could not report error because of compatibility issues", e); } } diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java index a5142a13324ce..5dec542f665f2 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java @@ -19,11 +19,18 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.easymock.Mock; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.powermock.api.easymock.PowerMock; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; @@ -33,6 +40,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -45,6 +53,8 @@ public class FileStreamSinkTaskTest { private ByteArrayOutputStream os; private PrintStream printStream; + private SinkTaskContext sinkTaskContext = EasyMock.mock(SinkTaskContext.class); + @Rule public TemporaryFolder topDir = new TemporaryFolder(); private String outputFile; @@ -60,6 +70,14 @@ public void setup() throws Exception { @Test public void testPutFlush() { + ErrantRecordReporter errorReporter = EasyMock.mock(ErrantRecordReporter.class); + errorReporter.report(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall(); + + EasyMock.replay(sinkTaskContext, errorReporter); + + task.errantRecordReporter(errorReporter); + HashMap offsets = new HashMap<>(); final String newLine = System.getProperty("line.separator"); @@ -74,17 +92,25 @@ public void testPutFlush() { task.put(Arrays.asList( new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line2", 2), + new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "fail", 3), new SinkRecord("topic2", 0, null, null, Schema.STRING_SCHEMA, "line3", 1) )); offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(2L)); offsets.put(new TopicPartition("topic2", 0), new OffsetAndMetadata(1L)); task.flush(offsets); assertEquals("line1" + newLine + "line2" + newLine + "line3" + newLine, os.toString()); + + EasyMock.verify(sinkTaskContext, errorReporter); } @Test public void testStart() throws IOException { + EasyMock.expect(sinkTaskContext.reporter()).andReturn(null); + + EasyMock.replay(sinkTaskContext); + task = new FileStreamSinkTask(); + task.initialize(sinkTaskContext); Map props = new HashMap<>(); props.put(FileStreamSinkConnector.FILE_CONFIG, outputFile); task.start(props); @@ -115,5 +141,7 @@ public void testStart() throws IOException { while (--i >= 0) { assertEquals("line" + i, lines[i]); } + + EasyMock.verify(sinkTaskContext); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 9a71a6658a3b3..ac08ab5a65de3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -136,7 +136,7 @@ public WorkerSinkTask(ConnectorTaskId id, public void initialize(TaskConfig taskConfig) { try { this.taskConfig = taskConfig.originalsStrings(); - this.context = new WorkerSinkTaskContext(consumer, this, configState); + this.context = new WorkerSinkTaskContext(consumer, this, configState, retryWithToleranceOperator); } catch (Throwable t) { log.error("{} Task failed initialization and will not be started.", this, t); onFailure(t); @@ -184,6 +184,10 @@ public void transitionTo(TargetState state) { consumer.wakeup(); } + public SinkTask sinkTask() { + return task; + } + @Override public void execute() { initializeAndStart(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java index 3a6b0d6d7b8da..d1d8bb52abf74 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java @@ -20,6 +20,10 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.IllegalWorkerStateException; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; +import org.apache.kafka.connect.runtime.errors.ErrorReporter; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.runtime.errors.Stage; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,16 +45,26 @@ public class WorkerSinkTaskContext implements SinkTaskContext { private final ClusterConfigState configState; private final Set pausedPartitions; private boolean commitRequested; + // for kip-610 + private final RetryWithToleranceOperator errorReporter; public WorkerSinkTaskContext(KafkaConsumer consumer, WorkerSinkTask sinkTask, - ClusterConfigState configState) { + ClusterConfigState configState, + RetryWithToleranceOperator errorReporter) { this.offsets = new HashMap<>(); this.timeoutMs = -1L; this.consumer = consumer; this.sinkTask = sinkTask; this.configState = configState; this.pausedPartitions = new HashSet<>(); + this.errorReporter = errorReporter; + } + + public WorkerSinkTaskContext(KafkaConsumer consumer, + WorkerSinkTask sinkTask, + ClusterConfigState configState) { + this(consumer, sinkTask, configState, null); } @Override @@ -150,6 +164,15 @@ public void requestCommit() { commitRequested = true; } + @Override + public ErrantRecordReporter reporter() { + return (record, error) -> { + if (errorReporter != null) { + errorReporter.executeFailed(Stage.TASK_PUT, sinkTask.sinkTask().getClass(), record, error); + } + }; + } + public boolean isCommitRequested() { return commitRequested; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java index 2513514475582..d26f9b073c464 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java @@ -21,11 +21,15 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.storage.Converter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -73,14 +77,29 @@ public class RetryWithToleranceOperator { private final Time time; private ErrorHandlingMetrics errorHandlingMetrics; + private Converter converter; + protected ProcessingContext context = new ProcessingContext(); public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis, - ToleranceType toleranceType, Time time) { + ToleranceType toleranceType, Time time, Converter converter) { this.errorRetryTimeout = errorRetryTimeout; this.errorMaxDelayInMillis = errorMaxDelayInMillis; this.errorToleranceType = toleranceType; this.time = time; + this.converter = converter; + } + + public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis, + ToleranceType toleranceType, Time time) { + // using default as jsonconverter + this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, defaultConverter()); + } + + static Converter defaultConverter() { + JsonConverter converter = new JsonConverter(); + converter.configure(Collections.singletonMap("converter.type", "value")); + return converter; } /** @@ -110,6 +129,32 @@ public V execute(Operation operation, Stage stage, Class executingClas } } + /** + * Record an out of band failure on a previously submitted SinkRecord. This should typially happen in the put() + * method, but stage and executingClass have been parametrized to allow for errors in other SinkTask methods as + * well. + * + * @param stage the stage of failure + * @param executingClass the class that was executing the stage. + * @param record the record that failed to execute, and needs to be reported. + * @param error the reason for failure + */ + public void executeFailed(Stage stage, Class executingClass, SinkRecord record, Throwable error) { + byte[] key = converter.fromConnectData(record.topic(), record.keySchema(), record.key()); + byte[] val = converter.fromConnectData(record.topic(), record.valueSchema(), record.value()); + context.consumerRecord(new ConsumerRecord<>(record.topic(), + record.kafkaPartition(), + record.kafkaOffset(), + // missing timestamp for now + key, + val + )); + + context.currentContext(stage, executingClass); + context.error(error); + context.report(); + } + /** * Attempt to execute an operation. Retry if a {@link RetriableException} is raised. Re-throw everything else. * diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 428b3e4f022f5..09e358732ff62 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -29,12 +29,15 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; import org.apache.kafka.connect.runtime.errors.LogReporter; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.runtime.errors.Stage; import org.apache.kafka.connect.runtime.errors.ToleranceType; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.Plugins; @@ -67,6 +70,7 @@ import java.time.Duration; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -213,6 +217,50 @@ private RetryWithToleranceOperator operator() { return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS, OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE, SYSTEM); } + private static final Schema SCHEMA = SchemaBuilder.struct().name("Person") + .field("firstName", Schema.STRING_SCHEMA) + .field("lastName", Schema.STRING_SCHEMA) + .field("age", Schema.OPTIONAL_INT32_SCHEMA) + .field("bool", Schema.OPTIONAL_BOOLEAN_SCHEMA) + .field("short", Schema.OPTIONAL_INT16_SCHEMA) + .field("byte", Schema.OPTIONAL_INT8_SCHEMA) + .field("long", Schema.OPTIONAL_INT64_SCHEMA) + .field("float", Schema.OPTIONAL_FLOAT32_SCHEMA) + .field("double", Schema.OPTIONAL_FLOAT64_SCHEMA) + .field("modified", Timestamp.SCHEMA) + .build(); + + @Test + public void testLogReporter() { + Map reportProps = new HashMap<>(); + reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); + reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); + LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics); + + RetryWithToleranceOperator retryWithToleranceOperator = operator(); + retryWithToleranceOperator.metrics(errorHandlingMetrics); + retryWithToleranceOperator.reporters(singletonList(reporter)); + + final Struct struct = new Struct(SCHEMA) + .put("firstName", "Alex") + .put("lastName", "Smith") + .put("bool", true) + .put("short", (short) 1234) + .put("byte", (byte) -32) + .put("long", 12425436L) + .put("float", (float) 2356.3) + .put("double", -2436546.56457) + .put("age", 21) + .put("modified", new Date(1474661402123L)); + SinkRecord record = new SinkRecord(TOPIC, 1, null, null, SCHEMA, struct, 42); + + PowerMock.replayAll(); + + retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, SinkTask.class, record, new ConnectException("blah")); + + PowerMock.verifyAll(); + } + @Test public void testErrorHandlingInSourceTasks() throws Exception { Map reportProps = new HashMap<>();