diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java index 292ba416a52..83de379268d 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java @@ -186,9 +186,7 @@ private TransformExpressionKey generateTransformExpressionKey() { break; } } - } - for (String originalColumnName : originalColumnNames) { METADATA_COLUMNS.stream() .filter(col -> col.f0.equals(originalColumnName)) .findFirst() diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java index 4d873384561..d992c277dd1 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java @@ -95,6 +95,10 @@ public class PostTransformOperatorTest { .physicalColumn("name", DataTypes.STRING()) .physicalColumn("name_upper", DataTypes.STRING()) .physicalColumn("tbname", DataTypes.STRING().notNull()) + .physicalColumn("tbname_sid", DataTypes.STRING()) + .physicalColumn("sid_tbname", DataTypes.STRING()) + .physicalColumn("tbname_name", DataTypes.STRING()) + .physicalColumn("name_tbname", DataTypes.STRING()) .primaryKey("sid") .build(); @@ -543,7 +547,9 @@ void testMetadataASTransform() throws Exception { PostTransformOperator.newBuilder() .addTransform( METADATA_AS_TABLEID.identifier(), - "sid, name, UPPER(name) as name_upper, __table_name__ as tbname", + "sid, name, UPPER(name) as name_upper, __table_name__ as tbname, " + + "concat(__table_name__,'_',sid) as tbname_sid, concat(sid,'_',__table_name__) as sid_tbname," + + "concat(__table_name__,'_',name) as tbname_name, concat(name,'_',__table_name__) as name_tbname", "sid < 3") .build(); RegularEventOperatorTestHarness @@ -561,7 +567,16 @@ void testMetadataASTransform() throws Exception { DataChangeEvent.insertEvent( METADATA_AS_TABLEID, recordDataGenerator.generate( - new Object[] {1, new BinaryStringData("abc"), null, null})); + new Object[] { + 1, + new BinaryStringData("abc"), + null, + null, + null, + null, + null, + null + })); DataChangeEvent insertEventExpect = DataChangeEvent.insertEvent( METADATA_AS_TABLEID, @@ -570,7 +585,11 @@ void testMetadataASTransform() throws Exception { 1, new BinaryStringData("abc"), new BinaryStringData("ABC"), - new BinaryStringData("metadata_as_table") + new BinaryStringData("metadata_as_table"), + new BinaryStringData("metadata_as_table_1"), + new BinaryStringData("1_metadata_as_table"), + new BinaryStringData("metadata_as_table_abc"), + new BinaryStringData("abc_metadata_as_table") })); transform.processElement(new StreamRecord<>(createTableEvent)); Assertions.assertThat(