Skip to content

Commit 2d9f260

Browse files
committed
Address comments.
1 parent 399b286 commit 2d9f260

File tree

8 files changed

+41
-43
lines changed

8 files changed

+41
-43
lines changed

docs/content.zh/docs/connectors/pipeline-connectors/kafka.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,11 @@ Pipeline 连接器配置项
144144
<td>Kafka 记录自定义的 Header。每个 Header 使用 ','分割, 键值使用 ':' 分割。举例来说,可以使用这种方式 'key1:value1,key2:value2'。 </td>
145145
</tr>
146146
<tr>
147-
<td>sink.table.mapping</td>
147+
<td>sink.tableId-to-topic.mapping</td>
148148
<td>optional</td>
149149
<td style="word-wrap: break-word;">(none)</td>
150150
<td>String</td>
151-
<td>自定义的上游表名到下游 Kafka Topic 名的映射关系。 每个映射关系由 `;` 分割,上游表的 TableId 和下游 Kafka 的 Topic 名由 `:` 分割。 举个例子,我们可以配置 `sink.table.mapping` 的值为 `mydb.mytable1:topic1;mydb.mytable2:topic2`。 </td>
151+
<td>自定义的上游表名到下游 Kafka Topic 名的映射关系。 每个映射关系由 `;` 分割,上游表的 TableId 和下游 Kafka 的 Topic 名由 `:` 分割。 举个例子,我们可以配置 `sink.tableId-to-topic.mapping` 的值为 `mydb.mytable1:topic1;mydb.mytable2:topic2`。 </td>
152152
</tr>
153153
</tbody>
154154
</table>

docs/content/docs/connectors/pipeline-connectors/kafka.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,11 @@ Pipeline Connector Options
142142
<td>custom headers for each kafka record. Each header are separated by ',', separate key and value by ':'. For example, we can set headers like 'key1:value1,key2:value2'. </td>
143143
</tr>
144144
<tr>
145-
<td>sink.table.mapping</td>
145+
<td>sink.tableId-to-topic.mapping</td>
146146
<td>optional</td>
147147
<td style="word-wrap: break-word;">(none)</td>
148148
<td>String</td>
149-
<td>Custom table mappings for each table from upstream tableId to downstream Kafka topic. Each mapping is separated by `;`, separate upstream tableId and downstream Kafka topic by `:`, For example, we can set `sink.table.mapping` like `mydb.mytable1:topic1;mydb.mytable2:topic2`. </td>
149+
<td>Custom table mappings for each table from upstream tableId to downstream Kafka topic. Each mapping is separated by `;`, separate upstream tableId and downstream Kafka topic by `:`, For example, we can set `sink.tableId-to-topic.mapping` like `mydb.mytable1:topic1;mydb.mytable2:topic2`. </td>
150150
</tr>
151151
</tbody>
152152
</table>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PROPERTIES_PREFIX;
4242
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED;
4343
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_CUSTOM_HEADER;
44-
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_TABLE_MAPPING;
44+
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING;
4545
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.TOPIC;
4646
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.VALUE_FORMAT;
4747

@@ -93,7 +93,7 @@ public DataSink createDataSink(Context context) {
9393
context.getFactoryConfiguration().get(KafkaDataSinkOptions.SINK_CUSTOM_HEADER);
9494
PartitionStrategy partitionStrategy =
9595
context.getFactoryConfiguration().get(KafkaDataSinkOptions.PARTITION_STRATEGY);
96-
String tableMapping = context.getFactoryConfiguration().get(SINK_TABLE_MAPPING);
96+
String tableMapping = context.getFactoryConfiguration().get(SINK_TABLE_ID_TO_TOPIC_MAPPING);
9797
return new KafkaDataSink(
9898
deliveryGuarantee,
9999
kafkaProperties,
@@ -127,7 +127,7 @@ public Set<ConfigOption<?>> optionalOptions() {
127127
options.add(SINK_ADD_TABLEID_TO_HEADER_ENABLED);
128128
options.add(SINK_CUSTOM_HEADER);
129129
options.add(KafkaDataSinkOptions.DELIVERY_GUARANTEE);
130-
options.add(SINK_TABLE_MAPPING);
130+
options.add(SINK_TABLE_ID_TO_TOPIC_MAPPING);
131131
return options;
132132
}
133133
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ public class KafkaDataSinkOptions {
3030
// Prefix for Kafka specific properties.
3131
public static final String PROPERTIES_PREFIX = "properties.";
3232

33-
public static final String DILIMITER_TABLE_MAPPINGS = ";";
33+
public static final String DELIMITER_TABLE_MAPPINGS = ";";
3434

35-
public static final String DILIMITER_SELECTOR_TABLEID = ":";
35+
public static final String DELIMITER_SELECTOR_TOPIC = ":";
3636

3737
public static final ConfigOption<DeliveryGuarantee> DELIVERY_GUARANTEE =
3838
key("sink.delivery-guarantee")
@@ -85,19 +85,19 @@ public class KafkaDataSinkOptions {
8585
.withDescription(
8686
"custom headers for each kafka record. Each header are separated by ',', separate key and value by ':'. For example, we can set headers like 'key1:value1,key2:value2'.");
8787

88-
public static final ConfigOption<String> SINK_TABLE_MAPPING =
89-
key("sink.table.mapping")
88+
public static final ConfigOption<String> SINK_TABLE_ID_TO_TOPIC_MAPPING =
89+
key("sink.tableId-to-topic.mapping")
9090
.stringType()
9191
.noDefaultValue()
9292
.withDescription(
9393
Description.builder()
9494
.text(
9595
"Custom table mappings for each table from upstream tableId to downstream Kafka topic. Each mapping is separated by ")
96-
.text(DILIMITER_TABLE_MAPPINGS)
96+
.text(DELIMITER_TABLE_MAPPINGS)
9797
.text(
98-
", separate upstream tableId and downstream Kafka topic by ")
99-
.text(DILIMITER_SELECTOR_TABLEID)
98+
", separate upstream tableId selectors and downstream Kafka topic by ")
99+
.text(DELIMITER_SELECTOR_TOPIC)
100100
.text(
101-
". For example, we can set sink.table.mapping like 'mydb.mytable1:topic1;mydb.mytable2:topic2'.")
101+
". For example, we can set 'sink.tableId-to-topic.mappingg' like 'mydb.mytable1:topic1;mydb.mytable2:topic2'.")
102102
.build());
103103
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ public class PipelineKafkaRecordSerializationSchema
5959
// key value pairs to be put into Kafka Record Header.
6060
public final Map<String, String> customHeaders;
6161

62-
private final String tableMapping;
62+
private final String mappingRuleString;
6363

64-
private Map<Selectors, TableId> tableMappings;
64+
private Map<Selectors, String> selectorsToTopicMap;
6565

6666
// A cache to speed up TableId to Topic mapping.
6767
private Map<TableId, String> tableIdToTopicCache;
@@ -79,7 +79,7 @@ public class PipelineKafkaRecordSerializationSchema
7979
String unifiedTopic,
8080
boolean addTableToHeaderEnabled,
8181
String customHeaderString,
82-
String tableMapping) {
82+
String mappingRuleString) {
8383
this.keySerialization = keySerialization;
8484
this.valueSerialization = checkNotNull(valueSerialization);
8585
this.unifiedTopic = unifiedTopic;
@@ -100,7 +100,7 @@ public class PipelineKafkaRecordSerializationSchema
100100
}
101101
}
102102
partition = partitionStrategy.equals(PartitionStrategy.ALL_TO_ZERO) ? 0 : null;
103-
this.tableMapping = tableMapping;
103+
this.mappingRuleString = mappingRuleString;
104104
}
105105

106106
@Override
@@ -146,10 +146,10 @@ private String inferTopicName(TableId tableId) {
146146
if (unifiedTopic != null && !unifiedTopic.isEmpty()) {
147147
return unifiedTopic;
148148
}
149-
if (tableMappings != null && !tableMappings.isEmpty()) {
150-
for (Map.Entry<Selectors, TableId> entry : tableMappings.entrySet()) {
149+
if (selectorsToTopicMap != null && !selectorsToTopicMap.isEmpty()) {
150+
for (Map.Entry<Selectors, String> entry : selectorsToTopicMap.entrySet()) {
151151
if (entry.getKey().isMatch(tableId)) {
152-
return entry.getValue().toString();
152+
return entry.getValue();
153153
}
154154
}
155155
}
@@ -161,7 +161,7 @@ private String inferTopicName(TableId tableId) {
161161
public void open(
162162
SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext)
163163
throws Exception {
164-
this.tableMappings = KafkaSinkUtils.parseSelectorsToTableIdMapping(tableMapping);
164+
this.selectorsToTopicMap = KafkaSinkUtils.parseSelectorsToTopicMap(mappingRuleString);
165165
this.tableIdToTopicCache = new HashMap<>();
166166
valueSerialization.open(context);
167167
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/KafkaSinkUtils.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,35 +17,35 @@
1717

1818
package org.apache.flink.cdc.connectors.kafka.utils;
1919

20-
import org.apache.flink.cdc.common.event.TableId;
2120
import org.apache.flink.cdc.common.schema.Selectors;
2221
import org.apache.flink.cdc.common.utils.Preconditions;
2322
import org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions;
2423

2524
import java.util.LinkedHashMap;
2625
import java.util.Map;
2726

28-
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DILIMITER_SELECTOR_TABLEID;
29-
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DILIMITER_TABLE_MAPPINGS;
27+
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DELIMITER_SELECTOR_TOPIC;
28+
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DELIMITER_TABLE_MAPPINGS;
3029

3130
/** Util class for {@link org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSink}. */
3231
public class KafkaSinkUtils {
3332

34-
/** Parse the mapping text to a map from Selectors to TableId. */
35-
public static Map<Selectors, TableId> parseSelectorsToTableIdMapping(String tableMappings) {
33+
/** Parse the mapping text to a map from Selectors to Kafka Topic name. */
34+
public static Map<Selectors, String> parseSelectorsToTopicMap(String mappingRuleString) {
3635
// Keep the order.
37-
Map<Selectors, TableId> result = new LinkedHashMap<>();
38-
if (tableMappings == null || tableMappings.isEmpty()) {
36+
Map<Selectors, String> result = new LinkedHashMap<>();
37+
if (mappingRuleString == null || mappingRuleString.isEmpty()) {
3938
return result;
4039
}
41-
for (String mapping : tableMappings.split(DILIMITER_TABLE_MAPPINGS)) {
42-
String[] selectorsAndTableId = mapping.split(DILIMITER_SELECTOR_TABLEID);
40+
for (String mapping : mappingRuleString.split(DELIMITER_TABLE_MAPPINGS)) {
41+
String[] selectorsAndTopic = mapping.split(DELIMITER_SELECTOR_TOPIC);
4342
Preconditions.checkArgument(
44-
selectorsAndTableId.length == 2,
45-
"Please check you configuration of " + KafkaDataSinkOptions.SINK_TABLE_MAPPING);
43+
selectorsAndTopic.length == 2,
44+
"Please check you configuration of "
45+
+ KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING);
4646
Selectors selectors =
47-
new Selectors.SelectorsBuilder().includeTables(selectorsAndTableId[0]).build();
48-
result.put(selectors, TableId.parse(selectorsAndTableId[1]));
47+
new Selectors.SelectorsBuilder().includeTables(selectorsAndTopic[0]).build();
48+
result.put(selectors, selectorsAndTopic[1]);
4949
}
5050
return result;
5151
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ void testCreateDataSink() {
4040

4141
Configuration conf = Configuration.fromMap(ImmutableMap.<String, String>builder().build());
4242
conf.set(
43-
KafkaDataSinkOptions.SINK_TABLE_MAPPING,
43+
KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING,
4444
"mydb.mytable1:topic1;mydb.mytable2:topic2");
4545
DataSink dataSink =
4646
sinkFactory.createDataSink(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,7 @@ void testDebeziumJsonFormat() throws Exception {
263263

264264
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
265265
drainAllRecordsFromTopic(topic, false, 0);
266-
final long recordsCount = 5;
267-
assertThat(recordsCount).isEqualTo(collectedRecords.size());
266+
assertThat(collectedRecords).hasSize(5);
268267
ObjectMapper mapper =
269268
JacksonMapperFactory.createObjectMapper()
270269
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
@@ -561,15 +560,14 @@ void testTopicAndHeaderOption() throws Exception {
561560
}
562561

563562
@Test
564-
void testSINKTABLEMAPPING() throws Exception {
563+
void testSinkTableMapping() throws Exception {
565564
final StreamExecutionEnvironment env = new LocalStreamEnvironment();
566565
env.enableCheckpointing(1000L);
567566
env.setRestartStrategy(RestartStrategies.noRestart());
568-
final DataStream<Event> source =
569-
env.fromCollection(createSourceEvents(), new EventTypeInfo());
567+
final DataStream<Event> source = env.fromData(createSourceEvents(), new EventTypeInfo());
570568
Map<String, String> config = new HashMap<>();
571569
config.put(
572-
KafkaDataSinkOptions.SINK_TABLE_MAPPING.key(),
570+
KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING.key(),
573571
"default_namespace.default_schema_copy.\\.*:test_topic_mapping_copy;default_namespace.default_schema.\\.*:test_topic_mapping");
574572
Properties properties = getKafkaClientConfiguration();
575573
properties.forEach(

0 commit comments

Comments
 (0)