Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.flink.cdc.composer.definition;

import org.apache.flink.cdc.common.utils.StringUtils;

import java.util.Objects;

/**
Expand Down Expand Up @@ -78,18 +76,10 @@ public String getProjection() {
return projection;
}

public boolean isValidProjection() {
return !StringUtils.isNullOrWhitespaceOnly(projection);
}

public String getFilter() {
return filter;
}

public boolean isValidFilter() {
return !StringUtils.isNullOrWhitespaceOnly(filter);
}

public String getDescription() {
return description;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,15 @@ public DataStream<Event> translatePostTransform(
PostTransformOperatorBuilder postTransformFunctionBuilder =
PostTransformOperator.newBuilder();
for (TransformDef transform : transforms) {
if (transform.isValidProjection() || transform.isValidFilter()) {
postTransformFunctionBuilder.addTransform(
transform.getSourceTable(),
transform.getProjection(),
transform.getFilter(),
transform.getPrimaryKeys(),
transform.getPartitionKeys(),
transform.getTableOptions(),
transform.getPostTransformConverter(),
supportedMetadataColumns);
}
postTransformFunctionBuilder.addTransform(
transform.getSourceTable(),
transform.getProjection(),
transform.getFilter(),
transform.getPrimaryKeys(),
transform.getPartitionKeys(),
transform.getTableOptions(),
transform.getPostTransformConverter(),
supportedMetadataColumns);
}
postTransformFunctionBuilder.addTimezone(timezone);
postTransformFunctionBuilder.addUdfFunctions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,11 +577,11 @@ void testMetadataInfoWithoutChangingSchema(ValuesDataSink.SinkApi sinkApi) throw
"A Transform Block without projection or filter",
null)),
Arrays.asList(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING NOT NULL,`age` INT}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`age` TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}"));
Expand Down Expand Up @@ -2032,7 +2032,7 @@ void testExplicitPrimaryKeyWithNullable() throws Exception {
assertThat(outputEvents)
.containsExactly(
// Initial stage
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=name, partitionKeys=id;name, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING NOT NULL,`age` INT}, primaryKeys=name, partitionKeys=id;name, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 21], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23], op=INSERT, meta=()}",
Expand All @@ -2048,7 +2048,7 @@ void testExplicitPrimaryKeyWithNullable() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",

// Alter column type stage
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING NOT NULL, age=INT}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}",
Expand Down Expand Up @@ -2997,6 +2997,32 @@ void testDateAndTimeCastingFunctions() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.my_table, before=[], after=[2, null, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}");
}

@ParameterizedTest
@EnumSource
void testPostTransformConvertersWoProjection(ValuesDataSink.SinkApi sinkApi) throws Exception {
runGenericTransformTest(
sinkApi,
Collections.singletonList(
new TransformDef(
"default_namespace.default_schema.\\.*",
null,
null,
null,
null,
null,
null,
"SOFT_DELETE")),
Arrays.asList(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}"));
}

private List<Event> generateFloorCeilAndRoundEvents(TableId tableId) {
List<Event> events = new ArrayList<>();
Schema schema =
Expand Down