Skip to content

Commit 79e868b

Browse files
authored
[FLINK-36741][transform] Fix the decimal precision and length lost during transform
This closes #3740
1 parent 8b55445 commit 79e868b

File tree

25 files changed

+972
-49
lines changed

25 files changed

+972
-49
lines changed

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1979,7 +1979,7 @@ void testTransformWithLargeLiterals() throws Exception {
19791979
+ "2147483648 AS greater_than_int_max, "
19801980
+ "-2147483648 AS int_min, "
19811981
+ "-2147483649 AS less_than_int_min, "
1982-
+ "CAST(1234567890123456789 AS DECIMAL(20, 0)) AS really_big_decimal",
1982+
+ "CAST(1234567890123456789 AS DECIMAL(19, 0)) AS really_big_decimal",
19831983
"CAST(id AS BIGINT) + 2147483648 > 2147483649", // equivalent to id > 1
19841984
null,
19851985
null,

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -841,7 +841,7 @@ void testComplicatedFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) th
841841
@ParameterizedTest
842842
@MethodSource("testParams")
843843
@Disabled("For manual test as there is a limit for quota.")
844-
void testTransformWithModel(ValuesDataSink.SinkApi sinkApi) throws Exception {
844+
void testTransformWithModel(ValuesDataSink.SinkApi sinkApi, String language) throws Exception {
845845
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
846846

847847
// Setup value source
@@ -909,6 +909,112 @@ void testTransformWithModel(ValuesDataSink.SinkApi sinkApi) throws Exception {
909909
.hasSize(9);
910910
}
911911

912+
@ParameterizedTest
913+
@MethodSource("testParams")
914+
void testComplicatedUdfReturnTypes(ValuesDataSink.SinkApi sinkApi, String language)
915+
throws Exception {
916+
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
917+
918+
// Setup value source
919+
Configuration sourceConfig = new Configuration();
920+
sourceConfig.set(
921+
ValuesDataSourceOptions.EVENT_SET_ID,
922+
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE);
923+
SourceDef sourceDef =
924+
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
925+
926+
// Setup value sink
927+
Configuration sinkConfig = new Configuration();
928+
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
929+
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
930+
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
931+
932+
// Setup transform
933+
TransformDef transformDef =
934+
new TransformDef(
935+
"default_namespace.default_schema.table1",
936+
"*, get_char() AS char_col, get_varchar() AS varchar_col, get_binary() AS binary_col, get_varbinary() AS varbinary_col, get_ts() AS ts_col, get_ts_ltz() AS ts_ltz_col, get_decimal() AS decimal_col, get_non_null() AS non_null_col",
937+
null,
938+
"col1",
939+
null,
940+
"key1=value1",
941+
"",
942+
null);
943+
944+
// Setup pipeline
945+
Configuration pipelineConfig = new Configuration();
946+
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
947+
pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/Los_Angeles");
948+
pipelineConfig.set(
949+
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
950+
PipelineDef pipelineDef =
951+
new PipelineDef(
952+
sourceDef,
953+
sinkDef,
954+
Collections.emptyList(),
955+
Collections.singletonList(transformDef),
956+
Arrays.asList(
957+
new UdfDef(
958+
"get_char",
959+
String.format(
960+
"org.apache.flink.cdc.udf.examples.%s.precision.CharTypeReturningClass",
961+
language)),
962+
new UdfDef(
963+
"get_varchar",
964+
String.format(
965+
"org.apache.flink.cdc.udf.examples.%s.precision.VarCharTypeReturningClass",
966+
language)),
967+
new UdfDef(
968+
"get_binary",
969+
String.format(
970+
"org.apache.flink.cdc.udf.examples.%s.precision.BinaryTypeReturningClass",
971+
language)),
972+
new UdfDef(
973+
"get_varbinary",
974+
String.format(
975+
"org.apache.flink.cdc.udf.examples.%s.precision.VarBinaryTypeReturningClass",
976+
language)),
977+
new UdfDef(
978+
"get_ts",
979+
String.format(
980+
"org.apache.flink.cdc.udf.examples.%s.precision.TimestampTypeReturningClass",
981+
language)),
982+
new UdfDef(
983+
"get_ts_ltz",
984+
String.format(
985+
"org.apache.flink.cdc.udf.examples.%s.precision.LocalZonedTimestampTypeReturningClass",
986+
language)),
987+
new UdfDef(
988+
"get_decimal",
989+
String.format(
990+
"org.apache.flink.cdc.udf.examples.%s.precision.DecimalTypeReturningClass",
991+
language)),
992+
new UdfDef(
993+
"get_non_null",
994+
String.format(
995+
"org.apache.flink.cdc.udf.examples.%s.precision.DecimalTypeNonNullReturningClass",
996+
language))),
997+
pipelineConfig);
998+
999+
// Execute the pipeline
1000+
PipelineExecution execution = composer.compose(pipelineDef);
1001+
execution.execute();
1002+
1003+
// Check the order and content of all received events
1004+
String[] outputEvents = outCaptor.toString().trim().split("\n");
1005+
assertThat(outputEvents)
1006+
.contains(
1007+
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`char_col` STRING,`varchar_col` STRING,`binary_col` BINARY(17),`varbinary_col` VARBINARY(17),`ts_col` TIMESTAMP(2),`ts_ltz_col` TIMESTAMP_LTZ(2),`decimal_col` DECIMAL(10, 3),`non_null_col` DECIMAL(10, 3)}, primaryKeys=col1, options=({key1=value1})}",
1008+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], op=INSERT, meta=()}",
1009+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], op=INSERT, meta=()}",
1010+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], op=INSERT, meta=()}",
1011+
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}",
1012+
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
1013+
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
1014+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], after=[], op=DELETE, meta=()}",
1015+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], after=[2, x, This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], op=UPDATE, meta=()}");
1016+
}
1017+
9121018
private static Stream<Arguments> testParams() {
9131019
return Stream.of(
9141020
arguments(ValuesDataSink.SinkApi.SINK_FUNCTION, "java"),

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelper.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,13 @@ private static List<Object> getFields(
6767
return fields.stream()
6868
.map(
6969
o -> {
70+
if (o == null) {
71+
return "null";
72+
}
7073
if (o instanceof byte[]) {
7174
return BaseEncoding.base64().encode((byte[]) o);
7275
} else {
73-
return o;
76+
return o.toString();
7477
}
7578
})
7679
.collect(Collectors.toList());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.udf.examples.java.precision;
19+
20+
import org.apache.flink.cdc.common.types.DataType;
21+
import org.apache.flink.cdc.common.types.DataTypes;
22+
import org.apache.flink.cdc.common.udf.UserDefinedFunction;
23+
24+
/** This is an example UDF class for testing purposes only. */
25+
public class BinaryTypeReturningClass implements UserDefinedFunction {
26+
@Override
27+
public DataType getReturnType() {
28+
return DataTypes.BINARY(17);
29+
}
30+
31+
public byte[] eval() {
32+
return "xyzzy".getBytes();
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.udf.examples.java.precision;
19+
20+
import org.apache.flink.cdc.common.types.DataType;
21+
import org.apache.flink.cdc.common.types.DataTypes;
22+
import org.apache.flink.cdc.common.udf.UserDefinedFunction;
23+
24+
/** This is an example UDF class for testing purposes only. */
25+
public class CharTypeReturningClass implements UserDefinedFunction {
26+
27+
@Override
28+
public DataType getReturnType() {
29+
return DataTypes.CHAR(17);
30+
}
31+
32+
public String eval() {
33+
return "This is a string.";
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.udf.examples.java.precision;
19+
20+
import org.apache.flink.cdc.common.data.DecimalData;
21+
import org.apache.flink.cdc.common.types.DataType;
22+
import org.apache.flink.cdc.common.types.DataTypes;
23+
import org.apache.flink.cdc.common.udf.UserDefinedFunction;
24+
25+
import java.math.BigDecimal;
26+
27+
/** This is an example UDF class for testing purposes only. */
28+
public class DecimalTypeNonNullReturningClass implements UserDefinedFunction {
29+
@Override
30+
public DataType getReturnType() {
31+
return DataTypes.DECIMAL(10, 3).notNull();
32+
}
33+
34+
public DecimalData eval() {
35+
return DecimalData.fromBigDecimal(new BigDecimal("12.315"), 10, 3);
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.udf.examples.java.precision;
19+
20+
import org.apache.flink.cdc.common.data.DecimalData;
21+
import org.apache.flink.cdc.common.types.DataType;
22+
import org.apache.flink.cdc.common.types.DataTypes;
23+
import org.apache.flink.cdc.common.udf.UserDefinedFunction;
24+
25+
import java.math.BigDecimal;
26+
27+
/** This is an example UDF class for testing purposes only. */
28+
public class DecimalTypeReturningClass implements UserDefinedFunction {
29+
@Override
30+
public DataType getReturnType() {
31+
return DataTypes.DECIMAL(10, 3);
32+
}
33+
34+
public DecimalData eval() {
35+
return DecimalData.fromBigDecimal(new BigDecimal("12.315"), 10, 3);
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.udf.examples.java.precision;
19+
20+
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
21+
import org.apache.flink.cdc.common.types.DataType;
22+
import org.apache.flink.cdc.common.types.DataTypes;
23+
import org.apache.flink.cdc.common.udf.UserDefinedFunction;
24+
25+
/** This is an example UDF class for testing purposes only. */
26+
public class LocalZonedTimestampTypeReturningClass implements UserDefinedFunction {
27+
@Override
28+
public DataType getReturnType() {
29+
return DataTypes.TIMESTAMP_LTZ(2);
30+
}
31+
32+
public LocalZonedTimestampData eval() {
33+
return LocalZonedTimestampData.fromEpochMillis(24 * 60 * 60 * 1000);
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.udf.examples.java.precision;
19+
20+
import org.apache.flink.cdc.common.data.TimestampData;
21+
import org.apache.flink.cdc.common.types.DataType;
22+
import org.apache.flink.cdc.common.types.DataTypes;
23+
import org.apache.flink.cdc.common.udf.UserDefinedFunction;
24+
25+
/** This is an example UDF class for testing purposes only. */
26+
public class TimestampTypeReturningClass implements UserDefinedFunction {
27+
@Override
28+
public DataType getReturnType() {
29+
return DataTypes.TIMESTAMP(2);
30+
}
31+
32+
public TimestampData eval() {
33+
return TimestampData.fromMillis(24 * 60 * 60 * 1000);
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.udf.examples.java.precision;
19+
20+
import org.apache.flink.cdc.common.types.DataType;
21+
import org.apache.flink.cdc.common.types.DataTypes;
22+
import org.apache.flink.cdc.common.udf.UserDefinedFunction;
23+
24+
/** This is an example UDF class for testing purposes only. */
25+
public class VarBinaryTypeReturningClass implements UserDefinedFunction {
26+
@Override
27+
public DataType getReturnType() {
28+
return DataTypes.VARBINARY(17);
29+
}
30+
31+
public byte[] eval() {
32+
return "xyzzy".getBytes();
33+
}
34+
}

0 commit comments

Comments
 (0)