Skip to content

Commit 1fc123e

Browse files
authored
[cdc] Fix database sync performance issue of schema evolution (#5382)
1 parent 0b3ed9d commit 1fc123e

File tree

4 files changed

+56
-33
lines changed

4 files changed

+56
-33
lines changed

paimon-common/src/main/java/org/apache/paimon/types/FieldIdentifier.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222

2323
/** Used to indicate the uniqueness of a field. */
2424
public class FieldIdentifier {
25-
private String name;
26-
private DataType type;
27-
private String description;
25+
private final String name;
26+
private final DataType type;
27+
private final String description;
2828

2929
public FieldIdentifier(DataField dataField) {
3030
this.name = dataField.name();

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.paimon.schema.SchemaChange;
2626
import org.apache.paimon.schema.SchemaManager;
2727
import org.apache.paimon.table.FileStoreTable;
28+
import org.apache.paimon.types.DataField;
29+
import org.apache.paimon.types.FieldIdentifier;
2830

2931
import org.apache.flink.api.java.tuple.Tuple2;
3032
import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -33,8 +35,11 @@
3335
import org.slf4j.LoggerFactory;
3436

3537
import java.util.HashMap;
38+
import java.util.HashSet;
39+
import java.util.List;
3640
import java.util.Map;
3741
import java.util.Objects;
42+
import java.util.Set;
3843

3944
/**
4045
* A {@link ProcessFunction} to handle schema changes. New schema is represented by a {@link
@@ -51,6 +56,8 @@ public class MultiTableUpdatedDataFieldsProcessFunction
5156

5257
private final Map<Identifier, SchemaManager> schemaManagers = new HashMap<>();
5358

59+
private final Map<Identifier, Set<FieldIdentifier>> latestFieldsMap = new HashMap<>();
60+
5461
public MultiTableUpdatedDataFieldsProcessFunction(
5562
CatalogLoader catalogLoader, TypeMapping typeMapping) {
5663
super(catalogLoader, typeMapping);
@@ -73,14 +80,34 @@ public void processElement(
7380
}
7481
return new SchemaManager(table.fileIO(), table.location());
7582
});
76-
7783
if (Objects.isNull(schemaManager)) {
7884
LOG.error("Failed to get schema manager for table " + tableId);
79-
} else {
80-
for (SchemaChange schemaChange :
81-
extractSchemaChanges(schemaManager, updatedSchema.f1)) {
82-
applySchemaChange(schemaManager, schemaChange, tableId);
83-
}
85+
return;
86+
}
87+
88+
Set<FieldIdentifier> latestFields =
89+
latestFieldsMap.computeIfAbsent(tableId, id -> new HashSet<>());
90+
List<DataField> actualUpdatedDataFields =
91+
actualUpdatedDataFields(updatedSchema.f1.fields(), latestFields);
92+
93+
if (actualUpdatedDataFields.isEmpty() && updatedSchema.f1.comment() == null) {
94+
return;
95+
}
96+
97+
CdcSchema actualUpdatedSchema =
98+
new CdcSchema(
99+
actualUpdatedDataFields,
100+
updatedSchema.f1.primaryKeys(),
101+
updatedSchema.f1.comment());
102+
103+
for (SchemaChange schemaChange : extractSchemaChanges(schemaManager, actualUpdatedSchema)) {
104+
applySchemaChange(schemaManager, schemaChange, tableId);
84105
}
106+
/*
107+
* Here, actualUpdatedDataFields cannot be used to update latestFields because there is a
108+
* non-SchemaChange.AddColumn scenario. Otherwise, the previously existing fields cannot be
109+
* modified again.
110+
*/
111+
latestFieldsMap.put(tableId, updateLatestFields(schemaManager));
85112
}
86113
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,13 @@
2525
import org.apache.paimon.schema.SchemaManager;
2626
import org.apache.paimon.types.DataField;
2727
import org.apache.paimon.types.FieldIdentifier;
28-
import org.apache.paimon.types.RowType;
2928

30-
import org.apache.commons.collections.CollectionUtils;
3129
import org.apache.flink.streaming.api.functions.ProcessFunction;
3230
import org.apache.flink.util.Collector;
3331

3432
import java.util.HashSet;
3533
import java.util.List;
36-
import java.util.Objects;
3734
import java.util.Set;
38-
import java.util.stream.Collectors;
3935

4036
/**
4137
* A {@link ProcessFunction} to handle schema changes. New schema is represented by a {@link
@@ -68,12 +64,8 @@ public UpdatedDataFieldsProcessFunction(
6864
public void processElement(CdcSchema updatedSchema, Context context, Collector<Void> collector)
6965
throws Exception {
7066
List<DataField> actualUpdatedDataFields =
71-
updatedSchema.fields().stream()
72-
.filter(
73-
dataField ->
74-
!latestDataFieldContain(new FieldIdentifier(dataField)))
75-
.collect(Collectors.toList());
76-
if (CollectionUtils.isEmpty(actualUpdatedDataFields) && updatedSchema.comment() == null) {
67+
actualUpdatedDataFields(updatedSchema.fields(), latestFields);
68+
if (actualUpdatedDataFields.isEmpty() && updatedSchema.comment() == null) {
7769
return;
7870
}
7971
CdcSchema actualUpdatedSchema =
@@ -89,19 +81,6 @@ public void processElement(CdcSchema updatedSchema, Context context, Collector<V
8981
* non-SchemaChange.AddColumn scenario. Otherwise, the previously existing fields cannot be
9082
* modified again.
9183
*/
92-
updateLatestFields();
93-
}
94-
95-
private boolean latestDataFieldContain(FieldIdentifier dataField) {
96-
return latestFields.stream().anyMatch(previous -> Objects.equals(previous, dataField));
97-
}
98-
99-
private void updateLatestFields() {
100-
RowType oldRowType = schemaManager.latest().get().logicalRowType();
101-
Set<FieldIdentifier> fieldIdentifiers =
102-
oldRowType.getFields().stream()
103-
.map(item -> new FieldIdentifier(item))
104-
.collect(Collectors.toSet());
105-
latestFields = fieldIdentifiers;
84+
latestFields = updateLatestFields(schemaManager);
10685
}
10786
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.paimon.types.DataType;
3030
import org.apache.paimon.types.DataTypeChecks;
3131
import org.apache.paimon.types.DataTypeRoot;
32+
import org.apache.paimon.types.FieldIdentifier;
3233
import org.apache.paimon.types.RowType;
3334
import org.apache.paimon.utils.Preconditions;
3435
import org.apache.paimon.utils.StringUtils;
@@ -44,6 +45,8 @@
4445
import java.util.HashMap;
4546
import java.util.List;
4647
import java.util.Map;
48+
import java.util.Set;
49+
import java.util.stream.Collectors;
4750

4851
/** Base class for update data fields process function. */
4952
public abstract class UpdatedDataFieldsProcessFunctionBase<I, O> extends ProcessFunction<I, O> {
@@ -280,6 +283,20 @@ protected List<SchemaChange> extractSchemaChanges(
280283
return result;
281284
}
282285

286+
protected List<DataField> actualUpdatedDataFields(
287+
List<DataField> newFields, Set<FieldIdentifier> latestFields) {
288+
return newFields.stream()
289+
.filter(dataField -> !latestFields.contains(new FieldIdentifier(dataField)))
290+
.collect(Collectors.toList());
291+
}
292+
293+
protected Set<FieldIdentifier> updateLatestFields(SchemaManager schemaManager) {
294+
RowType oldRowType = schemaManager.latest().get().logicalRowType();
295+
return oldRowType.getFields().stream()
296+
.map(FieldIdentifier::new)
297+
.collect(Collectors.toSet());
298+
}
299+
283300
@Override
284301
public void close() throws Exception {
285302
if (catalog != null) {

0 commit comments

Comments
 (0)