Skip to content

Supporting SMTs that change the topic name when upsertEnabled is set #181

@james-johnston-thumbtack

Description

The original Confluent documentation that this project was forked from stated the following limitation: https://docs.confluent.io/kafka-connectors/bigquery/current/overview.html#limitations

When the connector is configured with upsertEnabled or deleteEnabled, it does not support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed: ... org.apache.kafka.connect.transforms.RegexRouter ....

From looking around the old GitHub issues for the original project from a few years ago, I got the impression that this was due to a fundamental Kafka Connect limitation that was subsequently fixed in KIP-793: Allow sink connectors to be used with topic-mutating SMTs. According to KIP-793, connectors can subsequently gain compatibility with these SMTs by ensuring that the original pre-transformation offsets are used with the preCommit sink task function:

  1. Connectors that examine offsets for SinkRecord instances provided to put function must use new functions originalTopic, originalKafkaPartition, and originalKafkaOffset if the results would later be returned to the preCommit function.
  2. The preCommit function has to return the values from these new functions.

Examining the function at

public void recordOffsetFor(SinkRecord record) {
offsets.put(
new TopicPartition(record.topic(), record.kafkaPartition()),
// Use the offset of the record plus one here since that'll be the offset that we'll
// resume at if/when this record is the last-committed record and then the task is
// restarted
record.kafkaOffset() + 1);
}
it does not look like these new functions are being used for the offsets returned by preCommit when flushing. (I found the same code as-is unmodified in Confluent's old connector.)

I did find a reference to the new Kafka 3.6 functions here:

kafkaData.put(KAFKA_DATA_OFFSET_FIELD_NAME, maybeGetOriginalKafkaOffset(kafkaConnectRecord));
but it only seems to be used for the output field populated by the kafkaDataFieldName config.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions