Skip to content

Commit e8a4084

Browse files
yuxiqianleonardBang
authored andcommitted
[FLINK-35982][transform] Fix unable to transform metadata without projections
This closes #3695.
1 parent 7d31be5 commit e8a4084

File tree

2 files changed

+16
-19
lines changed

2 files changed

+16
-19
lines changed

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,13 @@ public DataStream<Event> translatePreTransform(
4444
PreTransformOperator.Builder preTransformFunctionBuilder =
4545
PreTransformOperator.newBuilder();
4646
for (TransformDef transform : transforms) {
47-
if (transform.isValidProjection()) {
48-
preTransformFunctionBuilder.addTransform(
49-
transform.getSourceTable(),
50-
transform.getProjection().orElse(null),
51-
transform.getFilter().orElse(null),
52-
transform.getPrimaryKeys(),
53-
transform.getPartitionKeys(),
54-
transform.getTableOptions());
55-
}
47+
preTransformFunctionBuilder.addTransform(
48+
transform.getSourceTable(),
49+
transform.getProjection().orElse(null),
50+
transform.getFilter().orElse(null),
51+
transform.getPrimaryKeys(),
52+
transform.getPartitionKeys(),
53+
transform.getTableOptions());
5654
}
5755
preTransformFunctionBuilder.addUdfFunctions(
5856
udfFunctions.stream()

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,6 @@ void testMetadataInfo(ValuesDataSink.SinkApi sinkApi) throws Exception {
287287
*/
288288
@ParameterizedTest
289289
@EnumSource
290-
@Disabled("This doesn't work until FLINK-35982 got fixed.")
291290
void testMetadataInfoWithoutChangingSchema(ValuesDataSink.SinkApi sinkApi) throws Exception {
292291
runGenericTransformTest(
293292
sinkApi,
@@ -299,13 +298,13 @@ void testMetadataInfoWithoutChangingSchema(ValuesDataSink.SinkApi sinkApi) throw
299298
"id,name",
300299
"id",
301300
"replication_num=1,bucket=17",
302-
"Just a Transform Block")),
301+
"A Transform Block without projection or filter")),
303302
Arrays.asList(
304303
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}",
305304
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}",
306305
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}",
307306
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
308-
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` STRING,`age` TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}",
307+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}",
309308
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}",
310309
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}",
311310
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}"));
@@ -506,7 +505,7 @@ void testBuiltinStringFunctions(ValuesDataSink.SinkApi sinkApi) throws Exception
506505
+ "LOWER(name) AS col4, "
507506
+ "TRIM(name) AS col5, "
508507
+ "REGEXP_REPLACE(name, 'Al|Bo', '**') AS col6, "
509-
+ "SUBSTR(name, 0, 1) AS col7, "
508+
+ "SUBSTR(name, 1, 1) AS col7, "
510509
+ "SUBSTR(name, 2, 1) AS col8, "
511510
+ "SUBSTR(name, 3) AS col9, "
512511
+ "CONCAT(name, ' - ', CAST(id AS VARCHAR)) AS col10",
@@ -517,13 +516,13 @@ void testBuiltinStringFunctions(ValuesDataSink.SinkApi sinkApi) throws Exception
517516
null)),
518517
Arrays.asList(
519518
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`col1` STRING,`col2` INT,`col3` STRING,`col4` STRING,`col5` STRING,`col6` STRING,`col7` STRING,`col8` STRING,`col9` STRING,`col10` STRING}, primaryKeys=id, options=()}",
520-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, Dear Alice, 5, ALICE, alice, Alice, **ice, A, i, ce, Alice - 1], op=INSERT, meta=()}",
521-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, Dear Bob, 3, BOB, bob, Bob, **b, B, b, , Bob - 2], op=INSERT, meta=()}",
522-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, Dear Bob, 3, BOB, bob, Bob, **b, B, b, , Bob - 2], after=[2, Bob, 30, Dear Bob, 3, BOB, bob, Bob, **b, B, b, , Bob - 2], op=UPDATE, meta=()}",
519+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, Dear Alice, 5, ALICE, alice, Alice, **ice, A, l, ice, Alice - 1], op=INSERT, meta=()}",
520+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, Dear Bob, 3, BOB, bob, Bob, **b, B, o, b, Bob - 2], op=INSERT, meta=()}",
521+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, Dear Bob, 3, BOB, bob, Bob, **b, B, o, b, Bob - 2], after=[2, Bob, 30, Dear Bob, 3, BOB, bob, Bob, **b, B, o, b, Bob - 2], op=UPDATE, meta=()}",
523522
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`col1` STRING,`col2` INT,`col3` STRING,`col4` STRING,`col5` STRING,`col6` STRING,`col7` STRING,`col8` STRING,`col9` STRING,`col10` STRING}, primaryKeys=id, options=()}",
524-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, Dear Carol, 5, CAROL, carol, Carol, Carol, C, r, ol, Carol - 3], op=INSERT, meta=()}",
525-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, Dear Derrida, 7, DERRIDA, derrida, Derrida, Derrida, D, r, rida, Derrida - 4], op=INSERT, meta=()}",
526-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, Dear Derrida, 7, DERRIDA, derrida, Derrida, Derrida, D, r, rida, Derrida - 4], after=[], op=DELETE, meta=()}"));
523+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, Dear Carol, 5, CAROL, carol, Carol, Carol, C, a, rol, Carol - 3], op=INSERT, meta=()}",
524+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, Dear Derrida, 7, DERRIDA, derrida, Derrida, Derrida, D, e, rrida, Derrida - 4], op=INSERT, meta=()}",
525+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, Dear Derrida, 7, DERRIDA, derrida, Derrida, Derrida, D, e, rrida, Derrida - 4], after=[], op=DELETE, meta=()}"));
527526
}
528527

529528
@ParameterizedTest

0 commit comments

Comments
 (0)