Skip to content

Commit 4d0174c

Browse files
authored
[FLINK-36565][transform] Route allows merging Decimals with various precisions (#3743)
1 parent 1bf40f0 commit 4d0174c

File tree

4 files changed

+234
-15
lines changed

4 files changed

+234
-15
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -214,25 +214,21 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
214214
lhsDecimal.getPrecision() - lhsDecimal.getScale(),
215215
rhsDecimal.getPrecision() - rhsDecimal.getScale());
216216
int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
217+
Preconditions.checkArgument(
218+
resultIntDigits + resultScale <= DecimalType.MAX_PRECISION,
219+
String.format(
220+
"Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available",
221+
lType,
222+
rType,
223+
resultIntDigits + resultScale,
224+
DecimalType.MAX_PRECISION));
217225
mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
218226
} else if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
219227
// Merge decimal and int
220-
DecimalType lhsDecimal = (DecimalType) lType;
221-
mergedType =
222-
DataTypes.DECIMAL(
223-
Math.max(
224-
lhsDecimal.getPrecision(),
225-
lhsDecimal.getScale() + getNumericPrecision(rType)),
226-
lhsDecimal.getScale());
228+
mergedType = mergeExactNumericsIntoDecimal((DecimalType) lType, rType);
227229
} else if (rType instanceof DecimalType && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
228230
// Merge decimal and int
229-
DecimalType rhsDecimal = (DecimalType) rType;
230-
mergedType =
231-
DataTypes.DECIMAL(
232-
Math.max(
233-
rhsDecimal.getPrecision(),
234-
rhsDecimal.getScale() + getNumericPrecision(lType)),
235-
rhsDecimal.getScale());
231+
mergedType = mergeExactNumericsIntoDecimal((DecimalType) rType, lType);
236232
} else {
237233
throw new IllegalStateException(
238234
String.format("Incompatible types: \"%s\" and \"%s\"", lType, rType));
@@ -245,6 +241,20 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
245241
}
246242
}
247243

244+
private static DataType mergeExactNumericsIntoDecimal(
245+
DecimalType decimalType, DataType otherType) {
246+
int resultPrecision =
247+
Math.max(
248+
decimalType.getPrecision(),
249+
decimalType.getScale() + getNumericPrecision(otherType));
250+
Preconditions.checkArgument(
251+
resultPrecision <= DecimalType.MAX_PRECISION,
252+
String.format(
253+
"Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available",
254+
decimalType, otherType, resultPrecision, DecimalType.MAX_PRECISION));
255+
return DataTypes.DECIMAL(resultPrecision, decimalType.getScale());
256+
}
257+
248258
@VisibleForTesting
249259
public static int getNumericPrecision(DataType dataType) {
250260
if (dataType.is(DataTypeFamily.EXACT_NUMERIC)) {

flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,23 @@ public void testInferWiderType() {
273273
DataTypes.DECIMAL(5, 4), DataTypes.DECIMAL(10, 2)))
274274
.isEqualTo(DataTypes.DECIMAL(12, 4));
275275

276+
// Test overflow decimal conversions
277+
Assertions.assertThatThrownBy(
278+
() ->
279+
SchemaUtils.inferWiderType(
280+
DataTypes.DECIMAL(5, 5), DataTypes.DECIMAL(38, 0)))
281+
.isExactlyInstanceOf(IllegalArgumentException.class)
282+
.hasMessage(
283+
"Failed to merge DECIMAL(5, 5) NOT NULL and DECIMAL(38, 0) NOT NULL type into DECIMAL. 43 precision digits required, 38 available");
284+
285+
Assertions.assertThatThrownBy(
286+
() ->
287+
SchemaUtils.inferWiderType(
288+
DataTypes.DECIMAL(38, 0), DataTypes.DECIMAL(5, 5)))
289+
.isExactlyInstanceOf(IllegalArgumentException.class)
290+
.hasMessage(
291+
"Failed to merge DECIMAL(38, 0) NOT NULL and DECIMAL(5, 5) NOT NULL type into DECIMAL. 43 precision digits required, 38 available");
292+
276293
// Test merging with nullability
277294
Assertions.assertThat(
278295
SchemaUtils.inferWiderType(

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.composer.flink;
1919

2020
import org.apache.flink.cdc.common.configuration.Configuration;
21+
import org.apache.flink.cdc.common.data.DecimalData;
2122
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
2223
import org.apache.flink.cdc.common.data.TimestampData;
2324
import org.apache.flink.cdc.common.data.ZonedTimestampData;
@@ -61,6 +62,7 @@
6162

6263
import java.io.ByteArrayOutputStream;
6364
import java.io.PrintStream;
65+
import java.math.BigDecimal;
6466
import java.time.Instant;
6567
import java.time.LocalDateTime;
6668
import java.time.ZoneId;
@@ -1216,6 +1218,88 @@ void testMergingTemporalTypesWithPromotedPrecisions(ValuesDataSink.SinkApi sinkA
12161218
assertThat(outputEvents).containsExactlyInAnyOrder(expected);
12171219
}
12181220

1221+
@ParameterizedTest
1222+
@EnumSource
1223+
void testMergingDecimalWithVariousPrecisions(ValuesDataSink.SinkApi sinkApi) throws Exception {
1224+
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
1225+
1226+
// Setup value source
1227+
Configuration sourceConfig = new Configuration();
1228+
sourceConfig.set(
1229+
ValuesDataSourceOptions.EVENT_SET_ID,
1230+
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
1231+
1232+
List<Event> events = generateDecimalColumnEvents("default_table_");
1233+
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
1234+
1235+
SourceDef sourceDef =
1236+
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
1237+
1238+
// Setup value sink
1239+
Configuration sinkConfig = new Configuration();
1240+
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
1241+
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
1242+
1243+
// Setup pipeline
1244+
Configuration pipelineConfig = new Configuration();
1245+
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
1246+
pipelineConfig.set(
1247+
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
1248+
PipelineDef pipelineDef =
1249+
new PipelineDef(
1250+
sourceDef,
1251+
sinkDef,
1252+
Collections.singletonList(
1253+
new RouteDef(
1254+
"default_namespace.default_schema.default_table_\\.*",
1255+
"default_namespace.default_schema.default_everything_merged",
1256+
null,
1257+
"Merge all decimal columns with different precision")),
1258+
Collections.emptyList(),
1259+
Collections.emptyList(),
1260+
pipelineConfig);
1261+
1262+
// Execute the pipeline
1263+
PipelineExecution execution = composer.compose(pipelineDef);
1264+
1265+
execution.execute();
1266+
1267+
// Check the order and content of all received events
1268+
String[] outputEvents = outCaptor.toString().trim().split("\n");
1269+
1270+
String[] expected =
1271+
Stream.of(
1272+
"CreateTableEvent{tableId={}, schema=columns={`id` INT,`name` STRING,`age` INT,`fav_num` TINYINT}, primaryKeys=id, options=()}",
1273+
"DataChangeEvent{tableId={}, before=[], after=[1, Alice, 17, 1], op=INSERT, meta=()}",
1274+
"AlterColumnTypeEvent{tableId={}, nameMapping={fav_num=BIGINT}}",
1275+
"DataChangeEvent{tableId={}, before=[], after=[2, Alice, 17, 22], op=INSERT, meta=()}",
1276+
"DataChangeEvent{tableId={}, before=[], after=[3, Alice, 17, 3333], op=INSERT, meta=()}",
1277+
"DataChangeEvent{tableId={}, before=[], after=[4, Alice, 17, 44444444], op=INSERT, meta=()}",
1278+
"AlterColumnTypeEvent{tableId={}, nameMapping={fav_num=DECIMAL(19, 0)}}",
1279+
"DataChangeEvent{tableId={}, before=[], after=[5, Alice, 17, 555555555555555], op=INSERT, meta=()}",
1280+
"AlterColumnTypeEvent{tableId={}, nameMapping={fav_num=DECIMAL(24, 5)}}",
1281+
"DataChangeEvent{tableId={}, before=[], after=[6, Alice, 17, 66666.66666], op=INSERT, meta=()}",
1282+
"DataChangeEvent{tableId={}, before=[], after=[7, Alice, 17, 77777777.17000], op=INSERT, meta=()}",
1283+
"AlterColumnTypeEvent{tableId={}, nameMapping={fav_num=DECIMAL(38, 19)}}",
1284+
"DataChangeEvent{tableId={}, before=[], after=[8, Alice, 17, 888888888.8888888888888888888], op=INSERT, meta=()}",
1285+
"DataChangeEvent{tableId={}, before=[], after=[101, Zen, 19, 1.0000000000000000000], op=INSERT, meta=()}",
1286+
"DataChangeEvent{tableId={}, before=[], after=[102, Zen, 19, 22.0000000000000000000], op=INSERT, meta=()}",
1287+
"DataChangeEvent{tableId={}, before=[], after=[103, Zen, 19, 3333.0000000000000000000], op=INSERT, meta=()}",
1288+
"DataChangeEvent{tableId={}, before=[], after=[104, Zen, 19, 44444444.0000000000000000000], op=INSERT, meta=()}",
1289+
"DataChangeEvent{tableId={}, before=[], after=[105, Zen, 19, 555555555555555.0000000000000000000], op=INSERT, meta=()}",
1290+
"DataChangeEvent{tableId={}, before=[], after=[106, Zen, 19, 66666.6666600000000000000], op=INSERT, meta=()}",
1291+
"DataChangeEvent{tableId={}, before=[], after=[107, Zen, 19, 77777777.1700000000000000000], op=INSERT, meta=()}",
1292+
"DataChangeEvent{tableId={}, before=[], after=[108, Zen, 19, 888888888.8888888888888888888], op=INSERT, meta=()}")
1293+
.map(
1294+
s ->
1295+
s.replace(
1296+
"{}",
1297+
"default_namespace.default_schema.default_everything_merged"))
1298+
.toArray(String[]::new);
1299+
1300+
assertThat(outputEvents).containsExactlyInAnyOrder(expected);
1301+
}
1302+
12191303
private List<Event> generateTemporalColumnEvents(String tableNamePrefix) {
12201304
List<Event> events = new ArrayList<>();
12211305

@@ -1286,6 +1370,83 @@ private List<Event> generateTemporalColumnEvents(String tableNamePrefix) {
12861370
return events;
12871371
}
12881372

1373+
private List<Event> generateDecimalColumnEvents(String tableNamePrefix) {
1374+
List<Event> events = new ArrayList<>();
1375+
1376+
// Initialize schemas
1377+
List<String> names =
1378+
Arrays.asList(
1379+
"tiny",
1380+
"small",
1381+
"vanilla",
1382+
"big",
1383+
"dec_15_0",
1384+
"decimal_10_10",
1385+
"decimal_16_2",
1386+
"decimal_29_19");
1387+
1388+
List<DataType> types =
1389+
Arrays.asList(
1390+
DataTypes.TINYINT(),
1391+
DataTypes.SMALLINT(),
1392+
DataTypes.INT(),
1393+
DataTypes.BIGINT(),
1394+
DataTypes.DECIMAL(15, 0),
1395+
DataTypes.DECIMAL(10, 5),
1396+
DataTypes.DECIMAL(16, 2),
1397+
DataTypes.DECIMAL(29, 19));
1398+
1399+
List<Object> values =
1400+
Arrays.asList(
1401+
(byte) 1,
1402+
(short) 22,
1403+
3333,
1404+
(long) 44444444,
1405+
DecimalData.fromBigDecimal(new BigDecimal("555555555555555"), 15, 0),
1406+
DecimalData.fromBigDecimal(new BigDecimal("66666.66666"), 10, 5),
1407+
DecimalData.fromBigDecimal(new BigDecimal("77777777.17"), 16, 2),
1408+
DecimalData.fromBigDecimal(
1409+
new BigDecimal("888888888.8888888888888888888"), 29, 19));
1410+
1411+
List<Schema> schemas =
1412+
types.stream()
1413+
.map(
1414+
temporalColumnType ->
1415+
Schema.newBuilder()
1416+
.physicalColumn("id", DataTypes.INT())
1417+
.physicalColumn("name", DataTypes.STRING())
1418+
.physicalColumn("age", DataTypes.INT())
1419+
.physicalColumn("fav_num", temporalColumnType)
1420+
.primaryKey("id")
1421+
.build())
1422+
.collect(Collectors.toList());
1423+
1424+
for (int i = 0; i < names.size(); i++) {
1425+
TableId generatedTableId =
1426+
TableId.tableId(
1427+
"default_namespace", "default_schema", tableNamePrefix + names.get(i));
1428+
Schema generatedSchema = schemas.get(i);
1429+
events.add(new CreateTableEvent(generatedTableId, generatedSchema));
1430+
events.add(
1431+
DataChangeEvent.insertEvent(
1432+
generatedTableId,
1433+
generate(generatedSchema, 1 + i, "Alice", 17, values.get(i))));
1434+
}
1435+
1436+
for (int i = 0; i < names.size(); i++) {
1437+
TableId generatedTableId =
1438+
TableId.tableId(
1439+
"default_namespace", "default_schema", tableNamePrefix + names.get(i));
1440+
Schema generatedSchema = schemas.get(i);
1441+
events.add(
1442+
DataChangeEvent.insertEvent(
1443+
generatedTableId,
1444+
generate(generatedSchema, 101 + i, "Zen", 19, values.get(i))));
1445+
}
1446+
1447+
return events;
1448+
}
1449+
12891450
BinaryRecordData generate(Schema schema, Object... fields) {
12901451
return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
12911452
.generate(

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.api.java.tuple.Tuple3;
2121
import org.apache.flink.cdc.common.annotation.Internal;
2222
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
23+
import org.apache.flink.cdc.common.data.DecimalData;
2324
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
2425
import org.apache.flink.cdc.common.data.RecordData;
2526
import org.apache.flink.cdc.common.data.StringData;
@@ -39,6 +40,7 @@
3940
import org.apache.flink.cdc.common.types.DataType;
4041
import org.apache.flink.cdc.common.types.DataTypeFamily;
4142
import org.apache.flink.cdc.common.types.DataTypeRoot;
43+
import org.apache.flink.cdc.common.types.DecimalType;
4244
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
4345
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
4446
import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
@@ -73,6 +75,7 @@
7375
import javax.annotation.Nullable;
7476

7577
import java.io.Serializable;
78+
import java.math.BigDecimal;
7679
import java.time.Duration;
7780
import java.time.LocalDateTime;
7881
import java.time.ZoneId;
@@ -604,14 +607,42 @@ public Object getFieldOrNull(RecordData recordData) {
604607
} else if (originalField instanceof Integer) {
605608
// INT
606609
return ((Integer) originalField).longValue();
610+
} else if (originalField instanceof Long) {
611+
// BIGINT
612+
return originalField;
607613
} else {
608614
return fail(
609615
new IllegalArgumentException(
610616
String.format(
611617
"Cannot fit type \"%s\" into a BIGINT column. "
612-
+ "Currently only TINYINT / SMALLINT / INT can be accepted by a BIGINT column",
618+
+ "Currently only TINYINT / SMALLINT / INT / LONG can be accepted by a BIGINT column",
619+
originalField.getClass())));
620+
}
621+
} else if (destinationType instanceof DecimalType) {
622+
DecimalType decimalType = (DecimalType) destinationType;
623+
BigDecimal decimalValue;
624+
if (originalField instanceof Byte) {
625+
decimalValue = BigDecimal.valueOf(((Byte) originalField).longValue(), 0);
626+
} else if (originalField instanceof Short) {
627+
decimalValue = BigDecimal.valueOf(((Short) originalField).longValue(), 0);
628+
} else if (originalField instanceof Integer) {
629+
decimalValue = BigDecimal.valueOf(((Integer) originalField).longValue(), 0);
630+
} else if (originalField instanceof Long) {
631+
decimalValue = BigDecimal.valueOf((Long) originalField, 0);
632+
} else if (originalField instanceof DecimalData) {
633+
decimalValue = ((DecimalData) originalField).toBigDecimal();
634+
} else {
635+
return fail(
636+
new IllegalArgumentException(
637+
String.format(
638+
"Cannot fit type \"%s\" into a DECIMAL column. "
639+
+ "Currently only BYTE / SHORT / INT / LONG / DECIMAL can be accepted by a DECIMAL column",
613640
originalField.getClass())));
614641
}
642+
return decimalValue != null
643+
? DecimalData.fromBigDecimal(
644+
decimalValue, decimalType.getPrecision(), decimalType.getScale())
645+
: null;
615646
} else if (destinationType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
616647
if (originalField instanceof Float) {
617648
// FLOAT

0 commit comments

Comments
 (0)