diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/DefaultKafkaSinkContext.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/DefaultKafkaSinkContext.java index bec5ab67e..13e531b97 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/DefaultKafkaSinkContext.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/DefaultKafkaSinkContext.java @@ -28,6 +28,8 @@ import java.util.Map; import java.util.Properties; +import static org.apache.flink.connector.kafka.sink.KafkaSink.withClientId; + /** * Context providing information to assist constructing a {@link * org.apache.kafka.clients.producer.ProducerRecord}. @@ -37,6 +39,7 @@ public class DefaultKafkaSinkContext implements KafkaRecordSerializationSchema.K private final int subtaskId; private final int numberOfParallelInstances; private final Properties kafkaProducerConfig; + private static final String clientIdSuffix = "-metadata-fetcher"; private final Map cachedPartitions = new HashMap<>(); @@ -63,7 +66,8 @@ public int[] getPartitionsForTopic(String topic) { } private int[] fetchPartitionsForTopic(String topic) { - try (final Producer producer = new KafkaProducer<>(kafkaProducerConfig)) { + try (final Producer producer = + new KafkaProducer<>(withClientId(kafkaProducerConfig, clientIdSuffix))) { // the fetched list is immutable, so we're creating a mutable copy in order to sort // it final List partitionsList = diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java index e514054d7..6db1821c6 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java @@ -38,13 +38,13 @@ import java.util.Properties; import java.util.concurrent.Future; +import static org.apache.flink.connector.kafka.sink.KafkaSink.withClientId; import static org.apache.flink.util.Preconditions.checkState; /** * A {@link KafkaProducer} that exposes private fields to allow resume producing from a given state. */ class FlinkKafkaInternalProducer extends KafkaProducer { - private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaInternalProducer.class); private static final String TRANSACTION_MANAGER_FIELD_NAME = "transactionManager"; private static final String TRANSACTION_MANAGER_STATE_ENUM = @@ -55,9 +55,10 @@ class FlinkKafkaInternalProducer extends KafkaProducer { private volatile boolean inTransaction; private volatile boolean hasRecordsInTransaction; private volatile boolean closed; + private static final String clientIdSuffix = ""; public FlinkKafkaInternalProducer(Properties properties, @Nullable String transactionalId) { - super(withTransactionalId(properties, transactionalId)); + super(withClientId(withTransactionalId(properties, transactionalId), clientIdSuffix)); this.transactionalId = transactionalId; } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java index d5b1c3700..84e900300 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java @@ -24,10 +24,15 @@ import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.kafka.clients.producer.ProducerConfig; + import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.connector.kafka.source.KafkaSourceOptions.CLIENT_ID_PREFIX; /** * Flink Sink to produce data into a Kafka topic. The sink supports all delivery guarantees @@ -56,6 +61,9 @@ public class KafkaSink implements TwoPhaseCommittingStatefulSink { + private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); + private static final String clientIdWriterSuffix = "-writer-"; + private static final String clientIdCommitterSuffix = "-commiter"; private final DeliveryGuarantee deliveryGuarantee; private final KafkaRecordSerializationSchema recordSerializer; @@ -86,7 +94,8 @@ public static KafkaSinkBuilder builder() { @Internal @Override public Committer createCommitter() throws IOException { - return new KafkaCommitter(kafkaProducerConfig); + return new KafkaCommitter( + maybeOverwriteClientIdPrefix(kafkaProducerConfig, clientIdCommitterSuffix)); } @Internal @@ -100,7 +109,8 @@ public SimpleVersionedSerializer getCommittableSerializer() { public KafkaWriter createWriter(InitContext context) throws IOException { return new KafkaWriter( deliveryGuarantee, - kafkaProducerConfig, + maybeOverwriteClientIdPrefix( + kafkaProducerConfig, clientIdWriterSuffix + context.getSubtaskId()), transactionalIdPrefix, context, recordSerializer, @@ -114,7 +124,8 @@ public KafkaWriter restoreWriter( InitContext context, Collection recoveredState) throws IOException { return new KafkaWriter<>( deliveryGuarantee, - kafkaProducerConfig, + maybeOverwriteClientIdPrefix( + kafkaProducerConfig, clientIdWriterSuffix + context.getSubtaskId()), transactionalIdPrefix, context, recordSerializer, @@ -132,4 +143,30 @@ public SimpleVersionedSerializer getWriterStateSerializer() { protected Properties getKafkaProducerConfig() { return kafkaProducerConfig; } + + private Properties maybeOverwriteClientIdPrefix(Properties kafkaProducerConfig, String suffix) { + String clientIdPrefix = kafkaProducerConfig.getProperty(CLIENT_ID_PREFIX.key()); + if (clientIdPrefix == null) { + return kafkaProducerConfig; + } + + Properties props = new Properties(); + props.putAll(kafkaProducerConfig); + props.setProperty(CLIENT_ID_PREFIX.key(), clientIdPrefix + suffix); + return props; + } + + public static Properties withClientId(Properties properties, String suffix) { + String clientIdPrefix = properties.getProperty(CLIENT_ID_PREFIX.key()); + if (clientIdPrefix == null) { + return properties; + } + String clientId = + clientIdPrefix + suffix + "-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); + + Properties props = new Properties(); + props.putAll(properties); + props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, clientId); + return props; + } }