Skip to content

Commit 17fae8a

Browse files
authored
[cdc] Avoid sending empty schema change events to Schema Evolution (#7261)
1 parent 7914ab0 commit 17fae8a

File tree

1 file changed

+14
-14
lines changed

1 file changed

+14
-14
lines changed

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,25 +42,25 @@ public void setRawEvent(RichCdcRecord rawEvent) {
4242
public CdcSchema parseSchemaChange() {
4343
CdcSchema.Builder change = CdcSchema.newBuilder();
4444
CdcSchema recordedSchema = record.cdcSchema();
45-
recordedSchema
46-
.fields()
47-
.forEach(
48-
dataField -> {
49-
DataField previous = previousDataFields.get(dataField.name());
50-
// When the order of the same field is different, its ID may also be
51-
// different,
52-
// so the comparison should not include the ID.
53-
if (!DataField.dataFieldEqualsIgnoreId(previous, dataField)) {
54-
previousDataFields.put(dataField.name(), dataField);
55-
change.column(dataField);
56-
}
57-
});
45+
boolean hasChange = false;
46+
47+
for (DataField dataField : recordedSchema.fields()) {
48+
DataField previous = previousDataFields.get(dataField.name());
49+
// When the order of the same field is different, its ID may also be different,
50+
// so the comparison should not include the ID.
51+
if (!DataField.dataFieldEqualsIgnoreId(previous, dataField)) {
52+
previousDataFields.put(dataField.name(), dataField);
53+
change.column(dataField);
54+
hasChange = true;
55+
}
56+
}
5857

5958
if (recordedSchema.comment() != null && !recordedSchema.comment().equals(previousComment)) {
6059
previousComment = recordedSchema.comment();
6160
change.comment(recordedSchema.comment());
61+
hasChange = true;
6262
}
63-
return change.build();
63+
return hasChange ? change.build() : null;
6464
}
6565

6666
@Override

0 commit comments

Comments
 (0)