diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java index c081178b88b..43cac6095d9 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java @@ -17,8 +17,6 @@ package org.apache.flink.cdc.composer.definition; -import org.apache.flink.cdc.common.utils.StringUtils; - import java.util.Objects; /** @@ -78,18 +76,10 @@ public String getProjection() { return projection; } - public boolean isValidProjection() { - return !StringUtils.isNullOrWhitespaceOnly(projection); - } - public String getFilter() { return filter; } - public boolean isValidFilter() { - return !StringUtils.isNullOrWhitespaceOnly(filter); - } - public String getDescription() { return description; } diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java index fb8da7ceaf4..4cc7a0b2423 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java @@ -105,17 +105,15 @@ public DataStream translatePostTransform( PostTransformOperatorBuilder postTransformFunctionBuilder = PostTransformOperator.newBuilder(); for (TransformDef transform : transforms) { - if (transform.isValidProjection() || transform.isValidFilter()) { - postTransformFunctionBuilder.addTransform( - transform.getSourceTable(), - transform.getProjection(), - transform.getFilter(), - transform.getPrimaryKeys(), - transform.getPartitionKeys(), - transform.getTableOptions(), - transform.getPostTransformConverter(), - supportedMetadataColumns); - } + postTransformFunctionBuilder.addTransform( + transform.getSourceTable(), + transform.getProjection(), + transform.getFilter(), + transform.getPrimaryKeys(), + transform.getPartitionKeys(), + transform.getTableOptions(), + transform.getPostTransformConverter(), + supportedMetadataColumns); } postTransformFunctionBuilder.addTimezone(timezone); postTransformFunctionBuilder.addUdfFunctions( diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 47a7ead5bc1..10ea31e5315 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -577,11 +577,11 @@ void testMetadataInfoWithoutChangingSchema(ValuesDataSink.SinkApi sinkApi) throw "A Transform Block without projection or filter", null)), Arrays.asList( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING NOT NULL,`age` INT}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`age` TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}")); @@ -2032,7 +2032,7 @@ void testExplicitPrimaryKeyWithNullable() throws Exception { assertThat(outputEvents) .containsExactly( // Initial stage - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=name, partitionKeys=id;name, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING NOT NULL,`age` INT}, primaryKeys=name, partitionKeys=id;name, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 21], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23], op=INSERT, meta=()}", @@ -2048,7 +2048,7 @@ void testExplicitPrimaryKeyWithNullable() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING NOT NULL, age=INT}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", @@ -2997,6 +2997,32 @@ void testDateAndTimeCastingFunctions() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.my_table, before=[], after=[2, null, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); } + @ParameterizedTest + @EnumSource + void testPostTransformConvertersWoProjection(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + null, + null, + null, + null, + null, + null, + "SOFT_DELETE")), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}")); + } + private List generateFloorCeilAndRoundEvents(TableId tableId) { List events = new ArrayList<>(); Schema schema =