Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ public List<PartitionInfo> partitionsFor(String topic) {
* <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
* will be called instead. We do this because the sender thread would otherwise try to join itself and
* block forever.</strong>
* <p>
* <p>.
*
* @throws InterruptException If the thread is interrupted while blocked
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.handlers;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.SchemaAndValue;

import java.io.Closeable;
import java.util.Map;

public interface ErrorHandler extends Configurable, Closeable {

/**
* Initialize the handler with connector, worker and handler config. The connector and worker configs are only
* used for reporting purposes. the handler config is used to configure this instance of the handler.
*/
void init();

/**
* @return the ConfigDef for this handler
*/
ConfigDef config();

/**
* This method is called for any error which occurs during the processing of a record in a Connect task.
*
* @param context the processing context
* @return a directive on how to handle this error.
*/
ErrorHandlerResponse onError(ProcessingContext context);

/**
* Flush any outstanding data, and close this handler.
*/
@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.handlers;

/**
* A directive from the error handler to the connect framework on how to handle a given error.
*/
public enum ErrorHandlerResponse {

/**
* retry the previous operation
*/
RETRY(1),

/**
* drop the record and move to the next one
*/
SKIP(2),

/**
* throw an Exception, and kill the task
*/
FAIL(3);

private final int id;

ErrorHandlerResponse(int id) {
this.id = id;
}

public int id() {
return id;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.handlers;

import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct;

import java.util.List;
import java.util.Map;

/**
* This object will contain all the runtime context for an error which occurs in the Connect framework while
* processing a record.
*/
public interface ProcessingContext {

/**
* @return the configuration of the Connect worker
*/
Map<String, Object> workerConfig();

/**
* @return which task reported this error
*/
String taskId();

/**
* @return an ordered list of stages. Connect will start with executing stage 0 and then move up the list.
*/
List<Stage> stages();

/**
* @return at what stage did this operation fail (0 indicates first stage)
*/
int index();

/**
* @return which attempt was this (first error will be 0)
*/
int attempt();

/**
* @return the (epoch) time of failure
*/
long timeOfError();

/**
* The exception accompanying this failure (if any)
*/
Exception exception();

/**
* @return the record which when input the current stage caused the failure.
*/
ConnectRecord record();

/**
* create a {@link Struct} from the various parameters in this Context object.
*/
Struct toStruct();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.handlers;

import java.util.Map;

public interface Stage {

/**
* @return at what stage in processing did the error happen
*/
StageType type();

/**
* @return name of the class executing this stage.
*/
Class<?> executingClass();

/**
* @return properties used to configure this stage
*/
Map<String, Object> config();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.handlers;

/**
* A logical stage in a Connect pipeline
*/
public enum StageType {

/**
* When the task starts up
*/
TASK_START,

/**
* when running any transform operation on a record
*/
TRANSFORMATION,

/**
* when calling the poll() method on a SourceConnector
*/
TASK_POLL,

/**
* when calling the put() method on a SinkConnector
*/
TASK_PUT,

/**
* when using the key converter to serialize/deserialize keys in ConnectRecords
*/
KEY_CONVERTER,

/**
* when using the value converter to serialize/deserialize values in ConnectRecords
*/
VALUE_CONVERTER,

/**
* when using the header converter to serialize/deserialize headers in ConnectRecords
*/
HEADER_CONVERTER,

/**
* when the worker is committing offsets for the task
*/
COMMIT_OFFSETS,

/**
* When the task is shutting down
*/
TASK_CLOSE,

/**
* Producing to Kafka topic
*/
KAFKA_PRODUCE,

/**
* Consuming from a Kafka topic
*/
KAFKA_CONSUME
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
package org.apache.kafka.connect.runtime;

import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.handlers.ErrorHandler;
import org.apache.kafka.connect.handlers.ProcessingContext;
import org.apache.kafka.connect.runtime.handlers.LogAndFailHandler;
import org.apache.kafka.connect.transforms.Transformation;

import java.util.Collections;
Expand All @@ -26,17 +30,33 @@
public class TransformationChain<R extends ConnectRecord<R>> {

private final List<Transformation<R>> transformations;
private final ErrorHandler errorHandler;

public TransformationChain(List<Transformation<R>> transformations) {
public TransformationChain(List<Transformation<R>> transformations, ErrorHandler errorHandler) {
this.transformations = transformations;
this.errorHandler = errorHandler;
}

public R apply(R record) {
if (transformations.isEmpty()) return record;

for (Transformation<R> transformation : transformations) {
record = transformation.apply(record);
if (record == null) break;
boolean failed = true;
while (failed) {
try {
record = transformation.apply(record);
failed = false;
if (record == null) break;
} catch (Exception e) {
ProcessingContext p = null;
switch (errorHandler.onError(p)) {
case FAIL: throw e;
case SKIP: return null;
case RETRY: break;
default: throw new ConnectException("Unknown error handler response");
}
}
}
}

return record;
Expand All @@ -62,7 +82,8 @@ public int hashCode() {
}

public static <R extends ConnectRecord<R>> TransformationChain<R> noOp() {
return new TransformationChain<R>(Collections.<Transformation<R>>emptyList());
return new TransformationChain<R>(Collections.<Transformation<R>>emptyList(),
new LogAndFailHandler());
}

}
Loading