Skip to content

Commit 8abbc5e

Browse files
JackeyLee007李鹏程
andauthored
[cdc] Ignore decimal length and precision change by type-mapping (#5124)
Co-authored-by: 李鹏程 <li.pc@topsports.com.cn>
1 parent 8cc2309 commit 8abbc5e

File tree

15 files changed

+126
-9
lines changed

15 files changed

+126
-9
lines changed

docs/layouts/shortcodes/generated/kafka_sync_database.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
<li>"char-to-string": maps MySQL CHAR(length)/VARCHAR(length) types to STRING.</li>
8787
<li>"longtext-to-bytes": maps MySQL LONGTEXT types to BYTES.</li>
8888
<li>"bigint-unsigned-to-bigint": maps MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL to BIGINT. You should ensure overflow won't occur when using this option.</li>
89+
<li>"decimal-no-change": Ignore decimal type change.</li>
8990
</ul>
9091
</td>
9192
</tr>

docs/layouts/shortcodes/generated/kafka_sync_table.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
<li>"char-to-string": maps MySQL CHAR(length)/VARCHAR(length) types to STRING.</li>
5959
<li>"longtext-to-bytes": maps MySQL LONGTEXT types to BYTES.</li>
6060
<li>"bigint-unsigned-to-bigint": maps MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL to BIGINT. You should ensure overflow won't occur when using this option.</li>
61+
<li>"decimal-no-change": Ignore decimal type change.</li>
6162
</ul>
6263
</td>
6364
</tr>

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ protected void buildSink(
231231
.withInput(input)
232232
.withParserFactory(parserFactory)
233233
.withCatalogLoader(catalogLoader())
234+
.withTypeMapping(typeMapping)
234235
.withDatabase(database)
235236
.withTables(tables)
236237
.withMode(mode)

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ protected void buildSink(
168168
.withParserFactory(parserFactory)
169169
.withTable(fileStoreTable)
170170
.withIdentifier(new Identifier(database, table))
171+
.withTypeMapping(typeMapping)
171172
.withCatalogLoader(catalogLoader());
172173
String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
173174
if (sinkParallelism != null) {

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public enum TypeMappingMode {
7676
TO_STRING,
7777
CHAR_TO_STRING,
7878
LONGTEXT_TO_BYTES,
79+
DECIMAL_NO_CHANGE,
7980
BIGINT_UNSIGNED_TO_BIGINT;
8081

8182
private static final Map<String, TypeMappingMode> TYPE_MAPPING_OPTIONS =

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.annotation.Experimental;
2222
import org.apache.paimon.catalog.CatalogLoader;
2323
import org.apache.paimon.catalog.Identifier;
24+
import org.apache.paimon.flink.action.cdc.TypeMapping;
2425
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
2526
import org.apache.paimon.schema.SchemaManager;
2627
import org.apache.paimon.table.BucketMode;
@@ -50,6 +51,7 @@ public class CdcSinkBuilder<T> {
5051
private Table table = null;
5152
private Identifier identifier = null;
5253
private CatalogLoader catalogLoader = null;
54+
private TypeMapping typeMapping = null;
5355

5456
@Nullable private Integer parallelism;
5557

@@ -83,6 +85,11 @@ public CdcSinkBuilder<T> withCatalogLoader(CatalogLoader catalogLoader) {
8385
return this;
8486
}
8587

88+
public CdcSinkBuilder<T> withTypeMapping(TypeMapping typeMapping) {
89+
this.typeMapping = typeMapping;
90+
return this;
91+
}
92+
8693
public DataStreamSink<?> build() {
8794
Preconditions.checkNotNull(input, "Input DataStream can not be null.");
8895
Preconditions.checkNotNull(parserFactory, "Event ParserFactory can not be null.");
@@ -110,7 +117,8 @@ public DataStreamSink<?> build() {
110117
new UpdatedDataFieldsProcessFunction(
111118
new SchemaManager(dataTable.fileIO(), dataTable.location()),
112119
identifier,
113-
catalogLoader))
120+
catalogLoader,
121+
typeMapping))
114122
.name("Schema Evolution");
115123
schemaChangeProcessFunction.getTransformation().setParallelism(1);
116124
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.paimon.catalog.Identifier;
2323
import org.apache.paimon.flink.FlinkConnectorOptions;
2424
import org.apache.paimon.flink.action.MultiTablesSinkMode;
25+
import org.apache.paimon.flink.action.cdc.TypeMapping;
2526
import org.apache.paimon.flink.sink.FlinkWriteSink;
2627
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
2728
import org.apache.paimon.options.MemorySize;
@@ -74,6 +75,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
7475
// Paimon tables. 2) in multiplex sink where it is used to
7576
// initialize different writers to multiple tables.
7677
private CatalogLoader catalogLoader;
78+
private TypeMapping typeMapping;
79+
7780
// database to sync, currently only support single database
7881
private String database;
7982
private MultiTablesSinkMode mode;
@@ -122,6 +125,11 @@ public FlinkCdcSyncDatabaseSinkBuilder<T> withMode(MultiTablesSinkMode mode) {
122125
return this;
123126
}
124127

128+
public FlinkCdcSyncDatabaseSinkBuilder<T> withTypeMapping(TypeMapping typeMapping) {
129+
this.typeMapping = typeMapping;
130+
return this;
131+
}
132+
125133
public void build() {
126134
Preconditions.checkNotNull(input);
127135
Preconditions.checkNotNull(parserFactory);
@@ -154,7 +162,7 @@ private void buildCombinedCdcSink() {
154162
parsed,
155163
CdcDynamicTableParsingProcessFunction.DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG)
156164
.keyBy(t -> t.f0)
157-
.process(new MultiTableUpdatedDataFieldsProcessFunction(catalogLoader))
165+
.process(new MultiTableUpdatedDataFieldsProcessFunction(catalogLoader, typeMapping))
158166
.name("Schema Evolution");
159167

160168
DataStream<CdcMultiplexRecord> converted =
@@ -202,7 +210,8 @@ private void buildDividedCdcSink() {
202210
new UpdatedDataFieldsProcessFunction(
203211
new SchemaManager(table.fileIO(), table.location()),
204212
Identifier.create(database, table.name()),
205-
catalogLoader))
213+
catalogLoader,
214+
typeMapping))
206215
.name("Schema Evolution");
207216
schemaChangeProcessFunction.getTransformation().setParallelism(1);
208217
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.catalog.Catalog;
2222
import org.apache.paimon.catalog.CatalogLoader;
2323
import org.apache.paimon.catalog.Identifier;
24+
import org.apache.paimon.flink.action.cdc.TypeMapping;
2425
import org.apache.paimon.schema.SchemaChange;
2526
import org.apache.paimon.schema.SchemaManager;
2627
import org.apache.paimon.table.FileStoreTable;
@@ -52,8 +53,9 @@ public class MultiTableUpdatedDataFieldsProcessFunction
5253

5354
private final Map<Identifier, SchemaManager> schemaManagers = new HashMap<>();
5455

55-
public MultiTableUpdatedDataFieldsProcessFunction(CatalogLoader catalogLoader) {
56-
super(catalogLoader);
56+
public MultiTableUpdatedDataFieldsProcessFunction(
57+
CatalogLoader catalogLoader, TypeMapping typeMapping) {
58+
super(catalogLoader, typeMapping);
5759
}
5860

5961
@Override

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.paimon.catalog.CatalogLoader;
2222
import org.apache.paimon.catalog.Identifier;
23+
import org.apache.paimon.flink.action.cdc.TypeMapping;
2324
import org.apache.paimon.schema.SchemaChange;
2425
import org.apache.paimon.schema.SchemaManager;
2526
import org.apache.paimon.types.DataField;
@@ -53,8 +54,11 @@ public class UpdatedDataFieldsProcessFunction
5354
private Set<FieldIdentifier> latestFields;
5455

5556
public UpdatedDataFieldsProcessFunction(
56-
SchemaManager schemaManager, Identifier identifier, CatalogLoader catalogLoader) {
57-
super(catalogLoader);
57+
SchemaManager schemaManager,
58+
Identifier identifier,
59+
CatalogLoader catalogLoader,
60+
TypeMapping typeMapping) {
61+
super(catalogLoader, typeMapping);
5862
this.schemaManager = schemaManager;
5963
this.identifier = identifier;
6064
this.latestFields = new HashSet<>();

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.catalog.Catalog;
2222
import org.apache.paimon.catalog.CatalogLoader;
2323
import org.apache.paimon.catalog.Identifier;
24+
import org.apache.paimon.flink.action.cdc.TypeMapping;
2425
import org.apache.paimon.schema.SchemaChange;
2526
import org.apache.paimon.schema.SchemaManager;
2627
import org.apache.paimon.schema.TableSchema;
@@ -52,6 +53,7 @@ public abstract class UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
5253
protected final CatalogLoader catalogLoader;
5354
protected Catalog catalog;
5455
private boolean caseSensitive;
56+
private TypeMapping typeMapping;
5557

5658
private static final List<DataTypeRoot> STRING_TYPES =
5759
Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
@@ -71,8 +73,10 @@ public abstract class UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
7173
private static final List<DataTypeRoot> TIMESTAMP_TYPES =
7274
Arrays.asList(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
7375

74-
protected UpdatedDataFieldsProcessFunctionBase(CatalogLoader catalogLoader) {
76+
protected UpdatedDataFieldsProcessFunctionBase(
77+
CatalogLoader catalogLoader, TypeMapping typeMapping) {
7578
this.catalogLoader = catalogLoader;
79+
this.typeMapping = typeMapping;
7680
}
7781

7882
/**
@@ -214,6 +218,11 @@ protected List<SchemaChange> extractSchemaChanges(
214218
oldFields.put(oldField.name(), oldField);
215219
}
216220

221+
boolean allowDecimalTypeChange =
222+
this.typeMapping == null
223+
|| !this.typeMapping.containsMode(
224+
TypeMapping.TypeMappingMode.DECIMAL_NO_CHANGE);
225+
217226
List<SchemaChange> result = new ArrayList<>();
218227
for (DataField newField : updatedDataFields) {
219228
String newFieldName = StringUtils.toLowerCaseIfNeed(newField.name(), caseSensitive);
@@ -232,6 +241,9 @@ protected List<SchemaChange> extractSchemaChanges(
232241
new String[] {newFieldName}, newField.description()));
233242
}
234243
} else {
244+
if (oldField.type().is(DataTypeRoot.DECIMAL) && !allowDecimalTypeChange) {
245+
continue;
246+
}
235247
// update column type
236248
result.add(SchemaChange.updateColumnType(newFieldName, newField.type()));
237249
// update column comment

0 commit comments

Comments
 (0)