Skip to content

Commit 7c112ab

Browse files
[FLINK-37380][connector/kafka] Change TransactionalIdPrefix as required option when Exactly-Once enabled
This closes #156.
1 parent dd7d848 commit 7c112ab

File tree

4 files changed

+19
-3
lines changed

4 files changed

+19
-3
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class KafkaSinkBuilder<IN> {
7070
private static final int MAXIMUM_PREFIX_BYTES = 64000;
7171

7272
private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE;
73-
private String transactionalIdPrefix = "kafka-sink";
73+
private String transactionalIdPrefix;
7474

7575
private final Properties kafkaProducerConfig;
7676
private KafkaRecordSerializationSchema<IN> recordSerializer;

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.connector.kafka.sink;
1919

2020
import org.apache.flink.api.common.serialization.SimpleStringSchema;
21+
import org.apache.flink.connector.base.DeliveryGuarantee;
2122
import org.apache.flink.util.TestLogger;
2223

2324
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -28,6 +29,7 @@
2829
import java.util.function.Consumer;
2930

3031
import static org.assertj.core.api.Assertions.assertThat;
32+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3133

3234
/** Tests for {@link KafkaSinkBuilder}. */
3335
public class KafkaSinkBuilderTest extends TestLogger {
@@ -41,7 +43,7 @@ public class KafkaSinkBuilderTest extends TestLogger {
4143
};
4244

4345
@Test
44-
public void testPropertyHandling() {
46+
void testPropertyHandling() {
4547
validateProducerConfig(
4648
getBasicBuilder(),
4749
p -> {
@@ -77,7 +79,7 @@ public void testPropertyHandling() {
7779
}
7880

7981
@Test
80-
public void testBootstrapServerSetting() {
82+
void testBootstrapServerSetting() {
8183
Properties testConf1 = new Properties();
8284
testConf1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "testServer");
8385

@@ -86,6 +88,18 @@ public void testBootstrapServerSetting() {
8688
p -> assertThat(p).containsKeys(DEFAULT_KEYS));
8789
}
8890

91+
@Test
92+
void testTransactionIdSanityCheck() {
93+
assertThatThrownBy(
94+
() ->
95+
getBasicBuilder()
96+
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
97+
.build())
98+
.isExactlyInstanceOf(IllegalStateException.class)
99+
.hasMessageContaining(
100+
"EXACTLY_ONCE delivery guarantee requires a transactionIdPrefix to be set to provide unique transaction names across multiple KafkaSinks writing to the same Kafka cluster.");
101+
}
102+
89103
private void validateProducerConfig(
90104
KafkaSinkBuilder<?> builder, Consumer<Properties> validator) {
91105
validator.accept(builder.build().getKafkaProducerConfig());

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,7 @@ private void writeRecordsToKafka(String topic, List<String> lines) throws Except
474474
.setPartitioner(partitioner)
475475
.build())
476476
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
477+
.setTransactionalIdPrefix("kafka-sink")
477478
.build());
478479
env.execute("Write sequence");
479480
}

flink-python/pyflink/datastream/connectors/tests/test_kafka.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,7 @@ def test_set_delivery_guarantee(self):
472472
sink = KafkaSink.builder() \
473473
.set_bootstrap_servers('localhost:9092') \
474474
.set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE) \
475+
.set_transactional_id_prefix("kafka-sink") \
475476
.set_record_serializer(self._build_serialization_schema()) \
476477
.build()
477478
guarantee = get_field_value(sink.get_java_function(), 'deliveryGuarantee')

0 commit comments

Comments
 (0)