Skip to content

Commit da351ab

Browse files
ruanhang1993lvyanquan
authored andcommitted
[FLINK-36784][common] Support to add metadata columns for data in the meta fields of DataChangeEvent at transform (apache#3758)
Co-authored-by: Kunni <lvyanquan.lyq@alibaba-inc.com>
1 parent 6492c06 commit da351ab

File tree

30 files changed

+876
-201
lines changed

30 files changed

+876
-201
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/DataSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,9 @@ public interface DataSource {
3131

3232
/** Get the {@link MetadataAccessor} for accessing metadata from external systems. */
3333
MetadataAccessor getMetadataAccessor();
34+
35+
/** Get the {@link SupportedMetadataColumn}s of the source. */
36+
default SupportedMetadataColumn[] supportedMetadataColumns() {
37+
return new SupportedMetadataColumn[0];
38+
}
3439
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.source;
19+
20+
import org.apache.flink.cdc.common.annotation.Experimental;
21+
import org.apache.flink.cdc.common.event.DataChangeEvent;
22+
import org.apache.flink.cdc.common.types.DataType;
23+
24+
import java.io.Serializable;
25+
import java.util.Map;
26+
27+
/** A metadata column that the source supports to read from the meta field. */
28+
@Experimental
29+
public interface SupportedMetadataColumn extends Serializable {
30+
/** Column name. */
31+
String getName();
32+
33+
/** The data type of this column in Flink CDC. */
34+
DataType getType();
35+
36+
/** The returned java class of the reader. */
37+
Class<?> getJavaClass();
38+
39+
/**
40+
* Read the metadata from the {@link DataChangeEvent#meta()}.
41+
*
42+
* @param metadata the metadata returned from {@link DataChangeEvent#meta()}
43+
* @return the value of this metadata found by metadata name
44+
*/
45+
Object read(Map<String, String> metadata);
46+
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
2424
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
2525
import org.apache.flink.cdc.common.sink.DataSink;
26+
import org.apache.flink.cdc.common.source.DataSource;
2627
import org.apache.flink.cdc.composer.PipelineComposer;
2728
import org.apache.flink.cdc.composer.PipelineExecution;
2829
import org.apache.flink.cdc.composer.definition.PipelineDef;
@@ -99,9 +100,10 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
99100

100101
// Build Source Operator
101102
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
103+
DataSource dataSource =
104+
sourceTranslator.createDataSource(pipelineDef.getSource(), env, pipelineDefConfig);
102105
DataStream<Event> stream =
103-
sourceTranslator.translate(
104-
pipelineDef.getSource(), env, pipelineDefConfig, parallelism);
106+
sourceTranslator.translate(pipelineDef.getSource(), env, parallelism, dataSource);
105107

106108
// Build PreTransformOperator for processing Schema Event
107109
TransformTranslator transformTranslator = new TransformTranslator();
@@ -110,7 +112,8 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
110112
stream,
111113
pipelineDef.getTransforms(),
112114
pipelineDef.getUdfs(),
113-
pipelineDef.getModels());
115+
pipelineDef.getModels(),
116+
dataSource.supportedMetadataColumns());
114117

115118
// Schema operator
116119
SchemaOperatorTranslator schemaOperatorTranslator =
@@ -129,7 +132,8 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
129132
pipelineDef.getTransforms(),
130133
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
131134
pipelineDef.getUdfs(),
132-
pipelineDef.getModels());
135+
pipelineDef.getModels(),
136+
dataSource.supportedMetadataColumns());
133137

134138
// Build DataSink in advance as schema operator requires MetadataApplier
135139
DataSinkTranslator sinkTranslator = new DataSinkTranslator();

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,29 @@
3939
@Internal
4040
public class DataSourceTranslator {
4141

42+
public DataSource createDataSource(
43+
SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) {
44+
// Search the data source factory
45+
DataSourceFactory sourceFactory =
46+
FactoryDiscoveryUtils.getFactoryByIdentifier(
47+
sourceDef.getType(), DataSourceFactory.class);
48+
// Add source JAR to environment
49+
FactoryDiscoveryUtils.getJarPathByIdentifier(sourceFactory)
50+
.ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));
51+
DataSource dataSource =
52+
sourceFactory.createDataSource(
53+
new FactoryHelper.DefaultContext(
54+
sourceDef.getConfig(),
55+
pipelineConfig,
56+
Thread.currentThread().getContextClassLoader()));
57+
return dataSource;
58+
}
59+
4260
public DataStreamSource<Event> translate(
4361
SourceDef sourceDef,
4462
StreamExecutionEnvironment env,
45-
Configuration pipelineConfig,
46-
int sourceParallelism) {
47-
// Create data source
48-
DataSource dataSource = createDataSource(sourceDef, env, pipelineConfig);
49-
63+
int sourceParallelism,
64+
DataSource dataSource) {
5065
// Get source provider
5166
EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider();
5267
if (eventSourceProvider instanceof FlinkSourceProvider) {
@@ -78,24 +93,6 @@ public DataStreamSource<Event> translate(
7893
}
7994
}
8095

81-
private DataSource createDataSource(
82-
SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) {
83-
// Search the data source factory
84-
DataSourceFactory sourceFactory =
85-
FactoryDiscoveryUtils.getFactoryByIdentifier(
86-
sourceDef.getType(), DataSourceFactory.class);
87-
// Add source JAR to environment
88-
FactoryDiscoveryUtils.getJarPathByIdentifier(sourceFactory)
89-
.ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));
90-
DataSource dataSource =
91-
sourceFactory.createDataSource(
92-
new FactoryHelper.DefaultContext(
93-
sourceDef.getConfig(),
94-
pipelineConfig,
95-
Thread.currentThread().getContextClassLoader()));
96-
return dataSource;
97-
}
98-
9996
private String generateDefaultSourceName(SourceDef sourceDef) {
10097
return String.format("Flink CDC Event Source: %s", sourceDef.getType());
10198
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.flink.api.java.tuple.Tuple3;
2121
import org.apache.flink.cdc.common.event.Event;
22+
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
2223
import org.apache.flink.cdc.composer.definition.ModelDef;
2324
import org.apache.flink.cdc.composer.definition.TransformDef;
2425
import org.apache.flink.cdc.composer.definition.UdfDef;
@@ -46,7 +47,8 @@ public DataStream<Event> translatePreTransform(
4647
DataStream<Event> input,
4748
List<TransformDef> transforms,
4849
List<UdfDef> udfFunctions,
49-
List<ModelDef> models) {
50+
List<ModelDef> models,
51+
SupportedMetadataColumn[] supportedMetadataColumns) {
5052
if (transforms.isEmpty()) {
5153
return input;
5254
}
@@ -61,7 +63,8 @@ public DataStream<Event> translatePreTransform(
6163
transform.getPrimaryKeys(),
6264
transform.getPartitionKeys(),
6365
transform.getTableOptions(),
64-
transform.getPostTransformConverter());
66+
transform.getPostTransformConverter(),
67+
supportedMetadataColumns);
6568
}
6669

6770
preTransformFunctionBuilder.addUdfFunctions(
@@ -77,7 +80,8 @@ public DataStream<Event> translatePostTransform(
7780
List<TransformDef> transforms,
7881
String timezone,
7982
List<UdfDef> udfFunctions,
80-
List<ModelDef> models) {
83+
List<ModelDef> models,
84+
SupportedMetadataColumn[] supportedMetadataColumns) {
8185
if (transforms.isEmpty()) {
8286
return input;
8387
}
@@ -93,7 +97,8 @@ public DataStream<Event> translatePostTransform(
9397
transform.getPrimaryKeys(),
9498
transform.getPartitionKeys(),
9599
transform.getTableOptions(),
96-
transform.getPostTransformConverter());
100+
transform.getPostTransformConverter(),
101+
supportedMetadataColumns);
97102
}
98103
}
99104
postTransformFunctionBuilder.addTimezone(timezone);

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -351,13 +351,13 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception {
351351
assertThat(outputEvents)
352352
.containsExactly(
353353
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
354-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10], op=INSERT, meta=()}",
355-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20], op=INSERT, meta=()}",
354+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10], op=INSERT, meta=({op_ts=1})}",
355+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20], op=INSERT, meta=({op_ts=2})}",
356356
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}",
357357
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
358358
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
359-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10], after=[], op=DELETE, meta=()}",
360-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20], after=[2, x, 20], op=UPDATE, meta=()}");
359+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10], after=[], op=DELETE, meta=({op_ts=4})}",
360+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20], after=[2, x, 20], op=UPDATE, meta=({op_ts=5})}");
361361
}
362362

363363
@ParameterizedTest
@@ -383,7 +383,7 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
383383
TransformDef transformDef =
384384
new TransformDef(
385385
"default_namespace.default_schema.table1",
386-
"*,concat(col1,'0') as col12,__data_event_type__ as rk",
386+
"*,concat(col1,'0') as col12,__data_event_type__ as rk,op_ts as opts",
387387
"col1 <> '3'",
388388
"col1",
389389
"col12",
@@ -413,14 +413,14 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
413413
String[] outputEvents = outCaptor.toString().trim().split("\n");
414414
assertThat(outputEvents)
415415
.containsExactly(
416-
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING NOT NULL}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
417-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10, +I], op=INSERT, meta=()}",
418-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I], op=INSERT, meta=()}",
416+
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING NOT NULL,`opts` BIGINT NOT NULL}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
417+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10, +I, 1], op=INSERT, meta=({op_ts=1})}",
418+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I, 2], op=INSERT, meta=({op_ts=2})}",
419419
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}",
420420
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
421421
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
422-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10, -D], after=[], op=DELETE, meta=()}",
423-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20, -U], after=[2, x, 20, +U], op=UPDATE, meta=()}");
422+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10, -D, 4], after=[], op=DELETE, meta=({op_ts=4})}",
423+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20, -U, 5], after=[2, x, 20, +U, 5], op=UPDATE, meta=({op_ts=5})}");
424424
}
425425

426426
@ParameterizedTest
@@ -486,13 +486,13 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
486486
assertThat(outputEvents)
487487
.containsExactly(
488488
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
489-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=()}",
490-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=()}",
489+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}",
490+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}",
491491
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}",
492492
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
493493
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
494-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 11], after=[], op=DELETE, meta=()}",
495-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 22], after=[2, x, 22], op=UPDATE, meta=()}");
494+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 11], after=[], op=DELETE, meta=({op_ts=4})}",
495+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 22], after=[2, x, 22], op=UPDATE, meta=({op_ts=5})}");
496496
}
497497

498498
@Test

0 commit comments

Comments
 (0)