Skip to content

Commit 399b286

Browse files
committed
[FLINK-36913][pipeline-connector][kafka] Add sink.table.mapping option to allow user to define a custom map for upstream TableId to downstream topic name.
1 parent 39b560e commit 399b286

File tree

9 files changed

+197
-5
lines changed

9 files changed

+197
-5
lines changed

docs/content.zh/docs/connectors/pipeline-connectors/kafka.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,13 @@ Pipeline 连接器配置项
143143
<td>String</td>
144144
<td>Kafka 记录自定义的 Header。每个 Header 使用 ','分割, 键值使用 ':' 分割。举例来说,可以使用这种方式 'key1:value1,key2:value2'。 </td>
145145
</tr>
146+
<tr>
147+
<td>sink.table.mapping</td>
148+
<td>optional</td>
149+
<td style="word-wrap: break-word;">(none)</td>
150+
<td>String</td>
151+
<td>自定义的上游表名到下游 Kafka Topic 名的映射关系。 每个映射关系由 `;` 分割,上游表的 TableId 和下游 Kafka 的 Topic 名由 `:` 分割。 举个例子,我们可以配置 `sink.table.mapping` 的值为 `mydb.mytable1:topic1;mydb.mytable2:topic2`。 </td>
152+
</tr>
146153
</tbody>
147154
</table>
148155
</div>

docs/content/docs/connectors/pipeline-connectors/kafka.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,13 @@ Pipeline Connector Options
141141
<td>String</td>
142142
<td>custom headers for each kafka record. Each header are separated by ',', separate key and value by ':'. For example, we can set headers like 'key1:value1,key2:value2'. </td>
143143
</tr>
144+
<tr>
145+
<td>sink.table.mapping</td>
146+
<td>optional</td>
147+
<td style="word-wrap: break-word;">(none)</td>
148+
<td>String</td>
149+
<td>Custom table mappings for each table from upstream tableId to downstream Kafka topic. Each mapping is separated by `;`, separate upstream tableId and downstream Kafka topic by `:`, For example, we can set `sink.table.mapping` like `mydb.mytable1:topic1;mydb.mytable2:topic2`. </td>
150+
</tr>
144151
</tbody>
145152
</table>
146153
</div>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public class KafkaDataSink implements DataSink {
5353

5454
final String customHeaders;
5555

56+
final String tableMapping;
57+
5658
public KafkaDataSink(
5759
DeliveryGuarantee deliveryGuarantee,
5860
Properties kafkaProperties,
@@ -62,7 +64,8 @@ public KafkaDataSink(
6264
SerializationSchema<Event> valueSerialization,
6365
String topic,
6466
boolean addTableToHeaderEnabled,
65-
String customHeaders) {
67+
String customHeaders,
68+
String tableMapping) {
6669
this.deliveryGuarantee = deliveryGuarantee;
6770
this.kafkaProperties = kafkaProperties;
6871
this.partitionStrategy = partitionStrategy;
@@ -72,6 +75,7 @@ public KafkaDataSink(
7275
this.topic = topic;
7376
this.addTableToHeaderEnabled = addTableToHeaderEnabled;
7477
this.customHeaders = customHeaders;
78+
this.tableMapping = tableMapping;
7579
}
7680

7781
@Override
@@ -92,7 +96,8 @@ public EventSinkProvider getEventSinkProvider() {
9296
valueSerialization,
9397
topic,
9498
addTableToHeaderEnabled,
95-
customHeaders))
99+
customHeaders,
100+
tableMapping))
96101
.build());
97102
}
98103

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PROPERTIES_PREFIX;
4242
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED;
4343
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_CUSTOM_HEADER;
44+
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_TABLE_MAPPING;
4445
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.TOPIC;
4546
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.VALUE_FORMAT;
4647

@@ -92,6 +93,7 @@ public DataSink createDataSink(Context context) {
9293
context.getFactoryConfiguration().get(KafkaDataSinkOptions.SINK_CUSTOM_HEADER);
9394
PartitionStrategy partitionStrategy =
9495
context.getFactoryConfiguration().get(KafkaDataSinkOptions.PARTITION_STRATEGY);
96+
String tableMapping = context.getFactoryConfiguration().get(SINK_TABLE_MAPPING);
9597
return new KafkaDataSink(
9698
deliveryGuarantee,
9799
kafkaProperties,
@@ -101,7 +103,8 @@ public DataSink createDataSink(Context context) {
101103
valueSerialization,
102104
topic,
103105
addTableToHeaderEnabled,
104-
customHeaders);
106+
customHeaders,
107+
tableMapping);
105108
}
106109

107110
@Override
@@ -124,6 +127,7 @@ public Set<ConfigOption<?>> optionalOptions() {
124127
options.add(SINK_ADD_TABLEID_TO_HEADER_ENABLED);
125128
options.add(SINK_CUSTOM_HEADER);
126129
options.add(KafkaDataSinkOptions.DELIVERY_GUARANTEE);
130+
options.add(SINK_TABLE_MAPPING);
127131
return options;
128132
}
129133
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.connectors.kafka.sink;
1919

2020
import org.apache.flink.cdc.common.configuration.ConfigOption;
21+
import org.apache.flink.cdc.common.configuration.description.Description;
2122
import org.apache.flink.cdc.connectors.kafka.json.JsonSerializationType;
2223
import org.apache.flink.connector.base.DeliveryGuarantee;
2324

@@ -29,6 +30,10 @@ public class KafkaDataSinkOptions {
2930
// Prefix for Kafka specific properties.
3031
public static final String PROPERTIES_PREFIX = "properties.";
3132

33+
public static final String DILIMITER_TABLE_MAPPINGS = ";";
34+
35+
public static final String DILIMITER_SELECTOR_TABLEID = ":";
36+
3237
public static final ConfigOption<DeliveryGuarantee> DELIVERY_GUARANTEE =
3338
key("sink.delivery-guarantee")
3439
.enumType(DeliveryGuarantee.class)
@@ -79,4 +84,20 @@ public class KafkaDataSinkOptions {
7984
.defaultValue("")
8085
.withDescription(
8186
"custom headers for each kafka record. Each header are separated by ',', separate key and value by ':'. For example, we can set headers like 'key1:value1,key2:value2'.");
87+
88+
public static final ConfigOption<String> SINK_TABLE_MAPPING =
89+
key("sink.table.mapping")
90+
.stringType()
91+
.noDefaultValue()
92+
.withDescription(
93+
Description.builder()
94+
.text(
95+
"Custom table mappings for each table from upstream tableId to downstream Kafka topic. Each mapping is separated by ")
96+
.text(DILIMITER_TABLE_MAPPINGS)
97+
.text(
98+
", separate upstream tableId and downstream Kafka topic by ")
99+
.text(DILIMITER_SELECTOR_TABLEID)
100+
.text(
101+
". For example, we can set sink.table.mapping like 'mydb.mytable1:topic1;mydb.mytable2:topic2'.")
102+
.build());
82103
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.flink.cdc.common.event.Event;
2323
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2424
import org.apache.flink.cdc.common.event.TableId;
25+
import org.apache.flink.cdc.common.schema.Selectors;
26+
import org.apache.flink.cdc.connectors.kafka.utils.KafkaSinkUtils;
2527
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
2628

2729
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -57,6 +59,13 @@ public class PipelineKafkaRecordSerializationSchema
5759
// key value pairs to be put into Kafka Record Header.
5860
public final Map<String, String> customHeaders;
5961

62+
private final String tableMapping;
63+
64+
private Map<Selectors, TableId> tableMappings;
65+
66+
// A cache to speed up TableId to Topic mapping.
67+
private Map<TableId, String> tableIdToTopicCache;
68+
6069
public static final String NAMESPACE_HEADER_KEY = "namespace";
6170

6271
public static final String SCHEMA_NAME_HEADER_KEY = "schemaName";
@@ -69,7 +78,8 @@ public class PipelineKafkaRecordSerializationSchema
6978
SerializationSchema<Event> valueSerialization,
7079
String unifiedTopic,
7180
boolean addTableToHeaderEnabled,
72-
String customHeaderString) {
81+
String customHeaderString,
82+
String tableMapping) {
7383
this.keySerialization = keySerialization;
7484
this.valueSerialization = checkNotNull(valueSerialization);
7585
this.unifiedTopic = unifiedTopic;
@@ -90,6 +100,7 @@ public class PipelineKafkaRecordSerializationSchema
90100
}
91101
}
92102
partition = partitionStrategy.equals(PartitionStrategy.ALL_TO_ZERO) ? 0 : null;
103+
this.tableMapping = tableMapping;
93104
}
94105

95106
@Override
@@ -102,7 +113,7 @@ public ProducerRecord<byte[], byte[]> serialize(
102113
// skip sending SchemaChangeEvent.
103114
return null;
104115
}
105-
String topic = unifiedTopic == null ? changeEvent.tableId().toString() : unifiedTopic;
116+
String topic = inferTopicName(changeEvent.tableId());
106117
RecordHeaders recordHeaders = new RecordHeaders();
107118
if (addTableToHeaderEnabled) {
108119
String namespace =
@@ -128,10 +139,30 @@ public ProducerRecord<byte[], byte[]> serialize(
128139
topic, partition, null, keySerialized, valueSerialized, recordHeaders);
129140
}
130141

142+
private String inferTopicName(TableId tableId) {
143+
return tableIdToTopicCache.computeIfAbsent(
144+
tableId,
145+
(table -> {
146+
if (unifiedTopic != null && !unifiedTopic.isEmpty()) {
147+
return unifiedTopic;
148+
}
149+
if (tableMappings != null && !tableMappings.isEmpty()) {
150+
for (Map.Entry<Selectors, TableId> entry : tableMappings.entrySet()) {
151+
if (entry.getKey().isMatch(tableId)) {
152+
return entry.getValue().toString();
153+
}
154+
}
155+
}
156+
return table.toString();
157+
}));
158+
}
159+
131160
@Override
132161
public void open(
133162
SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext)
134163
throws Exception {
164+
this.tableMappings = KafkaSinkUtils.parseSelectorsToTableIdMapping(tableMapping);
165+
this.tableIdToTopicCache = new HashMap<>();
135166
valueSerialization.open(context);
136167
}
137168
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.kafka.utils;
19+
20+
import org.apache.flink.cdc.common.event.TableId;
21+
import org.apache.flink.cdc.common.schema.Selectors;
22+
import org.apache.flink.cdc.common.utils.Preconditions;
23+
import org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions;
24+
25+
import java.util.LinkedHashMap;
26+
import java.util.Map;
27+
28+
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DILIMITER_SELECTOR_TABLEID;
29+
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DILIMITER_TABLE_MAPPINGS;
30+
31+
/** Util class for {@link org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSink}. */
32+
public class KafkaSinkUtils {
33+
34+
/** Parse the mapping text to a map from Selectors to TableId. */
35+
public static Map<Selectors, TableId> parseSelectorsToTableIdMapping(String tableMappings) {
36+
// Keep the order.
37+
Map<Selectors, TableId> result = new LinkedHashMap<>();
38+
if (tableMappings == null || tableMappings.isEmpty()) {
39+
return result;
40+
}
41+
for (String mapping : tableMappings.split(DILIMITER_TABLE_MAPPINGS)) {
42+
String[] selectorsAndTableId = mapping.split(DILIMITER_SELECTOR_TABLEID);
43+
Preconditions.checkArgument(
44+
selectorsAndTableId.length == 2,
45+
"Please check you configuration of " + KafkaDataSinkOptions.SINK_TABLE_MAPPING);
46+
Selectors selectors =
47+
new Selectors.SelectorsBuilder().includeTables(selectorsAndTableId[0]).build();
48+
result.put(selectors, TableId.parse(selectorsAndTableId[1]));
49+
}
50+
return result;
51+
}
52+
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ void testCreateDataSink() {
3939
Assertions.assertThat(sinkFactory).isInstanceOf(KafkaDataSinkFactory.class);
4040

4141
Configuration conf = Configuration.fromMap(ImmutableMap.<String, String>builder().build());
42+
conf.set(
43+
KafkaDataSinkOptions.SINK_TABLE_MAPPING,
44+
"mydb.mytable1:topic1;mydb.mytable2:topic2");
4245
DataSink dataSink =
4346
sinkFactory.createDataSink(
4447
new FactoryHelper.DefaultContext(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,68 @@ void testTopicAndHeaderOption() throws Exception {
560560
checkProducerLeak();
561561
}
562562

563+
@Test
564+
void testSINKTABLEMAPPING() throws Exception {
565+
final StreamExecutionEnvironment env = new LocalStreamEnvironment();
566+
env.enableCheckpointing(1000L);
567+
env.setRestartStrategy(RestartStrategies.noRestart());
568+
final DataStream<Event> source =
569+
env.fromCollection(createSourceEvents(), new EventTypeInfo());
570+
Map<String, String> config = new HashMap<>();
571+
config.put(
572+
KafkaDataSinkOptions.SINK_TABLE_MAPPING.key(),
573+
"default_namespace.default_schema_copy.\\.*:test_topic_mapping_copy;default_namespace.default_schema.\\.*:test_topic_mapping");
574+
Properties properties = getKafkaClientConfiguration();
575+
properties.forEach(
576+
(key, value) ->
577+
config.put(
578+
KafkaDataSinkOptions.PROPERTIES_PREFIX + key.toString(),
579+
value.toString()));
580+
source.sinkTo(
581+
((FlinkSinkProvider)
582+
(new KafkaDataSinkFactory()
583+
.createDataSink(
584+
new FactoryHelper.DefaultContext(
585+
Configuration.fromMap(config),
586+
Configuration.fromMap(new HashMap<>()),
587+
this.getClass().getClassLoader()))
588+
.getEventSinkProvider()))
589+
.getSink());
590+
env.execute();
591+
592+
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
593+
drainAllRecordsFromTopic("test_topic_mapping", false, 0);
594+
final long recordsCount = 5;
595+
assertThat(recordsCount).isEqualTo(collectedRecords.size());
596+
ObjectMapper mapper =
597+
JacksonMapperFactory.createObjectMapper()
598+
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
599+
List<JsonNode> expected =
600+
Arrays.asList(
601+
mapper.readTree(
602+
String.format(
603+
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
604+
table1.getTableName())),
605+
mapper.readTree(
606+
String.format(
607+
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
608+
table1.getTableName())),
609+
mapper.readTree(
610+
String.format(
611+
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
612+
table1.getTableName())),
613+
mapper.readTree(
614+
String.format(
615+
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
616+
table1.getTableName())),
617+
mapper.readTree(
618+
String.format(
619+
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
620+
table1.getTableName())));
621+
assertThat(deserializeValues(collectedRecords)).containsAll(expected);
622+
checkProducerLeak();
623+
}
624+
563625
private List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(
564626
String topic, boolean committed, int... partitionArr) {
565627
Properties properties = getKafkaClientConfiguration();

0 commit comments

Comments
 (0)