From 26edf1d46ba0c32d4205f112115f5ebd744f9d3e Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Fri, 24 Jan 2025 13:55:18 +0800 Subject: [PATCH 1/5] [FLINK-37203] Alter table comment or column comment --- .../common/event/AlterColumnTypeEvent.java | 41 +++++-- .../common/event/AlterTableCommentEvent.java | 89 +++++++++++++++ .../common/event/SchemaChangeEventType.java | 7 +- .../event/SchemaChangeEventTypeFamily.java | 8 +- .../AlterTableCommentEventVisitor.java | 28 +++++ .../visitor/SchemaChangeEventVisitor.java | 9 +- .../flink/cdc/common/schema/Schema.java | 10 ++ .../cdc/common/utils/ChangeEventUtils.java | 29 ----- .../cdc/common/utils/SchemaMergingUtils.java | 6 +- .../flink/cdc/common/utils/SchemaUtils.java | 24 +++- .../common/utils/ChangeEventUtilsTest.java | 20 +++- .../doris/sink/DorisMetadataApplier.java | 24 +++- .../doris/sink/DorisSchemaChangeManager.java | 16 +++ .../maxcompute/MaxComputeMetadataApplier.java | 11 +- .../utils/SchemaEvolutionUtils.java | 57 ++++++--- .../CustomAlterTableParserListener.java | 10 +- .../oceanbase/catalog/OceanBaseCatalog.java | 8 +- .../catalog/OceanBaseMySQLCatalog.java | 32 +++++- .../catalog/OceanBaseOracleCatalog.java | 12 +- .../sink/OceanBaseMetadataApplier.java | 25 +++- .../paimon/sink/PaimonMetadataApplier.java | 24 ++++ .../sink/StarRocksMetadataApplier.java | 6 + .../cdc/connectors/values/ValuesDatabase.java | 21 +++- .../event/AlterColumnTypeEventSerializer.java | 9 +- .../AlterTableCommentEventSerializer.java | 108 ++++++++++++++++++ .../event/SchemaChangeEventSerializer.java | 12 +- .../AlterColumnTypeEventSerializerTest.java | 9 +- .../AlterTableCommentEventSerializerTest.java | 52 +++++++++ 28 files changed, 623 insertions(+), 84 deletions(-) create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterTableCommentEvent.java create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterTableCommentEventVisitor.java create mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/AlterTableCommentEventSerializer.java create mode 100644 flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/AlterTableCommentEventSerializerTest.java diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java index 2257f134d96..58dfa641bd7 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java @@ -22,6 +22,8 @@ import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataType; +import javax.annotation.Nullable; + import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -44,19 +46,35 @@ public class AlterColumnTypeEvent implements SchemaChangeEventWithPreSchema, Sch private final Map oldTypeMapping; - public AlterColumnTypeEvent(TableId tableId, Map typeMapping) { + /** key => column name, value => column comment after changing. */ + private final Map comments; + + public AlterColumnTypeEvent( + TableId tableId, + Map typeMapping, + Map oldTypeMapping, + Map comments) { this.tableId = tableId; this.typeMapping = typeMapping; - this.oldTypeMapping = new HashMap<>(); + this.oldTypeMapping = oldTypeMapping; + this.comments = comments; + } + + public AlterColumnTypeEvent(TableId tableId, Map typeMapping) { + this(tableId, typeMapping, new HashMap<>()); } public AlterColumnTypeEvent( TableId tableId, Map typeMapping, Map oldTypeMapping) { - this.tableId = tableId; - this.typeMapping = typeMapping; - this.oldTypeMapping = oldTypeMapping; + this(tableId, typeMapping, oldTypeMapping, new HashMap<>()); + } + + public void addColumnComment(String columnName, @Nullable String comment) { + if (comment != null) { + this.comments.put(columnName, comment); + } } /** Returns the type mapping. */ @@ -64,6 +82,10 @@ public Map getTypeMapping() { return typeMapping; } + public Map getComments() { + return comments; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -75,12 +97,13 @@ public boolean equals(Object o) { AlterColumnTypeEvent that = (AlterColumnTypeEvent) o; return Objects.equals(tableId, that.tableId) && Objects.equals(typeMapping, that.typeMapping) - && Objects.equals(oldTypeMapping, that.oldTypeMapping); + && Objects.equals(oldTypeMapping, that.oldTypeMapping) + && Objects.equals(comments, that.comments); } @Override public int hashCode() { - return Objects.hash(tableId, typeMapping, oldTypeMapping); + return Objects.hash(tableId, typeMapping, oldTypeMapping, comments); } @Override @@ -93,6 +116,8 @@ public String toString() { + typeMapping + ", oldTypeMapping=" + oldTypeMapping + + ", comments='" + + comments + '}'; } else { return "AlterColumnTypeEvent{" @@ -100,6 +125,8 @@ public String toString() { + tableId + ", typeMapping=" + typeMapping + + ", comments='" + + comments + '}'; } } diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterTableCommentEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterTableCommentEvent.java new file mode 100644 index 00000000000..ca0e566720a --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterTableCommentEvent.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.event; + +import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.utils.Preconditions; + +import java.util.Objects; + +/** + * A {@link SchemaChangeEvent} that represents an {@code ALTER TABLE COMMENT} or {@code ALTER TABLE + * SET COMMENT} DDL. + */ +@PublicEvolving +public class AlterTableCommentEvent implements SchemaChangeEvent { + + private static final long serialVersionUID = 1L; + + private final TableId tableId; + private final String comment; + + public AlterTableCommentEvent(TableId tableId, String comment) { + Preconditions.checkArgument(comment != null, "comment should not be empty."); + this.tableId = tableId; + this.comment = comment; + } + + public String getComment() { + return comment; + } + + @Override + public SchemaChangeEventType getType() { + return SchemaChangeEventType.ALTER_TABLE_COMMENT; + } + + @Override + public SchemaChangeEvent copy(TableId newTableId) { + return new AlterTableCommentEvent(newTableId, comment); + } + + @Override + public TableId tableId() { + return tableId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof AlterTableCommentEvent)) { + return false; + } + AlterTableCommentEvent that = (AlterTableCommentEvent) o; + return Objects.equals(tableId, that.tableId) && Objects.equals(comment, that.comment); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, comment); + } + + @Override + public String toString() { + return "AlterTableCommentEvent{" + + "tableId=" + + tableId + + ", comment='" + + comment + + '\'' + + '}'; + } +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java index bbe4b415c6a..4877257cb33 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java @@ -28,7 +28,8 @@ public enum SchemaChangeEventType { DROP_COLUMN("drop.column"), DROP_TABLE("drop.table"), RENAME_COLUMN("rename.column"), - TRUNCATE_TABLE("truncate.table"); + TRUNCATE_TABLE("truncate.table"), + ALTER_TABLE_COMMENT("alter.table.comment"); private final String tag; @@ -55,6 +56,8 @@ public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) { return RENAME_COLUMN; } else if (event instanceof TruncateTableEvent) { return TRUNCATE_TABLE; + } else if (event instanceof AlterTableCommentEvent) { + return ALTER_TABLE_COMMENT; } else { throw new RuntimeException("Unknown schema change event type: " + event.getClass()); } @@ -76,6 +79,8 @@ public static SchemaChangeEventType ofTag(String tag) { return RENAME_COLUMN; case "truncate.table": return TRUNCATE_TABLE; + case "alter.table.comment": + return ALTER_TABLE_COMMENT; default: throw new RuntimeException("Unknown schema change event type: " + tag); } diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventTypeFamily.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventTypeFamily.java index c1adfd71618..851f21c92c6 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventTypeFamily.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventTypeFamily.java @@ -21,6 +21,7 @@ import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_TABLE_COMMENT; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE; @@ -44,7 +45,9 @@ public class SchemaChangeEventTypeFamily { public static final SchemaChangeEventType[] RENAME = {RENAME_COLUMN}; - public static final SchemaChangeEventType[] TABLE = {CREATE_TABLE, DROP_TABLE, TRUNCATE_TABLE}; + public static final SchemaChangeEventType[] TABLE = { + CREATE_TABLE, DROP_TABLE, TRUNCATE_TABLE, ALTER_TABLE_COMMENT + }; public static final SchemaChangeEventType[] COLUMN = { ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, RENAME_COLUMN @@ -57,7 +60,8 @@ public class SchemaChangeEventTypeFamily { DROP_COLUMN, DROP_TABLE, RENAME_COLUMN, - TRUNCATE_TABLE + TRUNCATE_TABLE, + ALTER_TABLE_COMMENT }; public static final SchemaChangeEventType[] NONE = {}; diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterTableCommentEventVisitor.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterTableCommentEventVisitor.java new file mode 100644 index 00000000000..997e4e0672a --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterTableCommentEventVisitor.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.event.visitor; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.AlterTableCommentEvent; + +/** Visitor for {@link AlterTableCommentEvent}s. */ +@Internal +@FunctionalInterface +public interface AlterTableCommentEventVisitor { + T visit(AlterTableCommentEvent event) throws E; +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java index c905c8dee4f..8564a6cfb6a 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java @@ -20,6 +20,7 @@ import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.AlterTableCommentEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; import org.apache.flink.cdc.common.event.DropTableEvent; @@ -38,7 +39,8 @@ public static T visit( DropColumnEventVisitor dropColumnEventVisitor, DropTableEventVisitor dropTableEventVisitor, RenameColumnEventVisitor renameColumnEventVisitor, - TruncateTableEventVisitor truncateTableEventVisitor) + TruncateTableEventVisitor truncateTableEventVisitor, + AlterTableCommentEventVisitor alterTableCommentEventVisitor) throws E { if (event instanceof AddColumnEvent) { if (addColumnVisitor == null) { @@ -75,6 +77,11 @@ public static T visit( return null; } return truncateTableEventVisitor.visit((TruncateTableEvent) event); + } else if (event instanceof AlterTableCommentEvent) { + if (alterTableCommentEventVisitor == null) { + return null; + } + return alterTableCommentEventVisitor.visit((AlterTableCommentEvent) event); } else { throw new IllegalArgumentException( "Unknown schema change event type " + event.getType()); diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java index 61f7d36bcaf..8d3a978736e 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java @@ -175,6 +175,16 @@ public Schema copy(List columns) { comment); } + /** Returns a copy of the schema with a replaced comment. */ + public Schema copy(String comment) { + return new Schema( + columns, + new ArrayList<>(primaryKeys), + new ArrayList<>(partitionKeys), + new HashMap<>(options), + comment); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java index 483752ce92e..0f478898e9d 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java @@ -18,19 +18,10 @@ package org.apache.flink.cdc.common.utils; import org.apache.flink.cdc.common.annotation.VisibleForTesting; -import org.apache.flink.cdc.common.event.AddColumnEvent; -import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; -import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; -import org.apache.flink.cdc.common.event.DropColumnEvent; -import org.apache.flink.cdc.common.event.DropTableEvent; -import org.apache.flink.cdc.common.event.RenameColumnEvent; -import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.common.event.TruncateTableEvent; -import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; import java.util.ArrayList; import java.util.Arrays; @@ -66,26 +57,6 @@ public static DataChangeEvent recreateDataChangeEvent( } } - public static SchemaChangeEvent recreateSchemaChangeEvent( - SchemaChangeEvent schemaChangeEvent, TableId tableId) { - - return SchemaChangeEventVisitor.visit( - schemaChangeEvent, - addColumnEvent -> new AddColumnEvent(tableId, addColumnEvent.getAddedColumns()), - alterColumnEvent -> - new AlterColumnTypeEvent( - tableId, - alterColumnEvent.getTypeMapping(), - alterColumnEvent.getOldTypeMapping()), - createTableEvent -> new CreateTableEvent(tableId, createTableEvent.getSchema()), - dropColumnEvent -> - new DropColumnEvent(tableId, dropColumnEvent.getDroppedColumnNames()), - dropTableEvent -> new DropTableEvent(tableId), - renameColumnEvent -> - new RenameColumnEvent(tableId, renameColumnEvent.getNameMapping()), - truncateTableEvent -> new TruncateTableEvent(tableId)); - } - public static Set resolveSchemaEvolutionOptions( List includedSchemaEvolutionTypes, List excludedSchemaEvolutionTypes) { List resultTypes = new ArrayList<>(); diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java index 1851f893522..814345c5fbb 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java @@ -200,6 +200,7 @@ public static List getSchemaDifference( Map oldTypeMapping = new HashMap<>(); Map newTypeMapping = new HashMap<>(); + Map comments = new HashMap<>(); List appendedColumns = new ArrayList<>(); String afterWhichColumnPosition = null; @@ -226,6 +227,9 @@ public static List getSchemaDifference( } } afterWhichColumnPosition = afterColumn.getName(); + if (afterColumn.getComment() != null) { + comments.put(columnName, afterColumn.getComment()); + } } List schemaChangeEvents = new ArrayList<>(); @@ -235,7 +239,7 @@ public static List getSchemaDifference( if (!newTypeMapping.isEmpty()) { schemaChangeEvents.add( - new AlterColumnTypeEvent(tableId, newTypeMapping, oldTypeMapping)); + new AlterColumnTypeEvent(tableId, newTypeMapping, oldTypeMapping, comments)); } return schemaChangeEvents; diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index 131a783f551..5a3cfb001b1 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -115,7 +115,8 @@ public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent eve dropColumnEvent -> applyDropColumnEvent(dropColumnEvent, schema), dropTableEvent -> schema, renameColumnEvent -> applyRenameColumnEvent(renameColumnEvent, schema), - truncateTableEvent -> schema); + truncateTableEvent -> schema, + alterTableCommentEvent -> schema.copy(alterTableCommentEvent.getComment())); } private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema) { @@ -280,10 +281,18 @@ public static Optional transformSchemaChangeEvent( .filter(e -> referencedColumns.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); if (!newDataTypeMap.isEmpty()) { - evolvedSchemaChangeEvent = - Optional.of( - new AlterColumnTypeEvent( - alterColumnTypeEvent.tableId(), newDataTypeMap)); + AlterColumnTypeEvent value = + new AlterColumnTypeEvent( + alterColumnTypeEvent.tableId(), newDataTypeMap); + alterColumnTypeEvent + .getComments() + .forEach( + (name, comment) -> { + if (referencedColumns.contains(name)) { + value.addColumnComment(name, comment); + } + }); + evolvedSchemaChangeEvent = Optional.of(value); } } } else if (event instanceof RenameColumnEvent) { @@ -388,6 +397,11 @@ public static boolean isSchemaChangeEventRedundant( // We have no way to ensure if a TruncateTableEvent has been applied // before. Just assume it's not. return false; + }, + tableCommentEvent -> { + // We have no way to ensure if a AlterTableCommentEvent has been applied + // before. Just assume it's not. + return false; })); } diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java index 76d32db5128..56645bb2239 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java @@ -30,6 +30,7 @@ import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_TABLE_COMMENT; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE; @@ -55,7 +56,8 @@ void testResolveSchemaEvolutionOptions() { DROP_TABLE, ALTER_COLUMN_TYPE, ADD_COLUMN, - DROP_COLUMN)); + DROP_COLUMN, + ALTER_TABLE_COMMENT)); assertThat( ChangeEventUtils.resolveSchemaEvolutionOptions( @@ -66,7 +68,8 @@ void testResolveSchemaEvolutionOptions() { ALTER_COLUMN_TYPE, RENAME_COLUMN, CREATE_TABLE, - TRUNCATE_TABLE)); + TRUNCATE_TABLE, + ALTER_TABLE_COMMENT)); assertThat( ChangeEventUtils.resolveSchemaEvolutionOptions( @@ -89,7 +92,8 @@ void testResolveSchemaEvolutionOptions() { TRUNCATE_TABLE, RENAME_COLUMN, ALTER_COLUMN_TYPE, - CREATE_TABLE)); + CREATE_TABLE, + ALTER_TABLE_COMMENT)); } @Test @@ -103,14 +107,17 @@ void testResolveSchemaEvolutionTag() { DROP_COLUMN, DROP_TABLE, RENAME_COLUMN, - TRUNCATE_TABLE)); + TRUNCATE_TABLE, + ALTER_TABLE_COMMENT)); assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("column")) .isEqualTo( Arrays.asList(ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, RENAME_COLUMN)); assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("table")) - .isEqualTo(Arrays.asList(CREATE_TABLE, DROP_TABLE, TRUNCATE_TABLE)); + .isEqualTo( + Arrays.asList( + CREATE_TABLE, DROP_TABLE, TRUNCATE_TABLE, ALTER_TABLE_COMMENT)); assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("rename.column")) .isEqualTo(Collections.singletonList(RENAME_COLUMN)); @@ -138,5 +145,8 @@ void testResolveSchemaEvolutionTag() { assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("add.column")) .isEqualTo(Collections.singletonList(ADD_COLUMN)); + + assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter.table.comment")) + .isEqualTo(Collections.singletonList(ALTER_TABLE_COMMENT)); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java index 559b8557bc4..e694f1e25d7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java @@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.AlterTableCommentEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; import org.apache.flink.cdc.common.event.DropTableEvent; @@ -139,6 +140,10 @@ public void applySchemaChange(SchemaChangeEvent event) { truncateTableEvent -> { applyTruncateTableEvent(truncateTableEvent); return null; + }, + alterTableCommentEvent -> { + applyAlterTableCommentEvent(alterTableCommentEvent); + return null; }); } @@ -281,6 +286,7 @@ private void applyAlterColumnTypeEvent(AlterColumnTypeEvent event) try { TableId tableId = event.tableId(); Map typeMapping = event.getTypeMapping(); + Map comments = event.getComments(); for (Map.Entry entry : typeMapping.entrySet()) { schemaChangeManager.modifyColumnDataType( @@ -289,9 +295,7 @@ private void applyAlterColumnTypeEvent(AlterColumnTypeEvent event) new FieldSchema( entry.getKey(), buildTypeString(entry.getValue()), - null)); // Currently, AlterColumnTypeEvent carries no comment info. - // This - // will be fixed after FLINK-35243 got merged. + comments.get(entry.getKey()))); } } catch (Exception e) { throw new SchemaEvolveException(event, "fail to apply alter column type event", e); @@ -316,4 +320,18 @@ private void applyDropTableEvent(DropTableEvent dropTableEvent) throws SchemaEvo throw new SchemaEvolveException(dropTableEvent, "fail to drop table", e); } } + + private void applyAlterTableCommentEvent(AlterTableCommentEvent alterTableCommentEvent) + throws SchemaEvolveException { + TableId tableId = alterTableCommentEvent.tableId(); + try { + schemaChangeManager.alterTableComment( + tableId.getSchemaName(), + tableId.getTableName(), + alterTableCommentEvent.getComment()); + } catch (Exception e) { + throw new SchemaEvolveException( + alterTableCommentEvent, "fail to alter table comment", e); + } + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java index d212fafaedf..c276344e3d8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java @@ -44,4 +44,20 @@ public boolean dropTable(String databaseName, String tableName) "DROP TABLE " + identifier(databaseName) + "." + identifier(tableName); return this.execute(dropTableDDL, databaseName); } + + public boolean alterTableComment(String databaseName, String tableName, String comment) + throws IOException, IllegalArgumentException { + String alterTableCommentDDL = + "ALTER TABLE " + + identifier(databaseName) + + "." + + identifier(tableName) + + " MODIFY COMMENT " + + quoted(comment); + return this.execute(alterTableCommentDDL, databaseName); + } + + private String quoted(String str) { + return "\"" + str + "\""; + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeMetadataApplier.java index bafe980ec72..06d202c1b78 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeMetadataApplier.java @@ -20,6 +20,7 @@ import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.AlterTableCommentEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; import org.apache.flink.cdc.common.event.DropTableEvent; @@ -83,7 +84,8 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) { SchemaEvolutionUtils.alterColumnType( maxComputeOptions, alterColumnTypeEvent.tableId(), - alterColumnTypeEvent.getTypeMapping()); + alterColumnTypeEvent.getTypeMapping(), + alterColumnTypeEvent.getComments()); } else if (schemaChangeEvent instanceof DropColumnEvent) { DropColumnEvent dropColumnEvent = (DropColumnEvent) schemaChangeEvent; SchemaEvolutionUtils.dropColumn( @@ -108,6 +110,13 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) { } else if (schemaChangeEvent instanceof TruncateTableEvent) { TruncateTableEvent truncateTableEvent = (TruncateTableEvent) schemaChangeEvent; SchemaEvolutionUtils.truncateTable(maxComputeOptions, truncateTableEvent.tableId()); + } else if (schemaChangeEvent instanceof AlterTableCommentEvent) { + AlterTableCommentEvent alterTableCommentEvent = + (AlterTableCommentEvent) schemaChangeEvent; + SchemaEvolutionUtils.alterTableComment( + maxComputeOptions, + alterTableCommentEvent.tableId(), + alterTableCommentEvent.getComment()); } else { throw new UnsupportedOperationException( "Unsupported schema change event: " diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java index fc31d8f95a6..015b049d19b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java @@ -43,13 +43,13 @@ /** Schema evolution utils for maxcompute. */ public class SchemaEvolutionUtils { private static final Logger LOG = LoggerFactory.getLogger(SchemaEvolutionUtils.class); - private static final Map unsupportSchemahints = new HashMap<>(); + private static final Map unsupportedSchemaHints = new HashMap<>(); private static final Map supportSchemaHints = new HashMap<>(); static { - unsupportSchemahints.put("odps.sql.type.system.odps2", "true"); - unsupportSchemahints.put("odps.sql.decimal.odps2", "true"); - unsupportSchemahints.put("odps.sql.allow.schema.evolution", "true"); + unsupportedSchemaHints.put("odps.sql.type.system.odps2", "true"); + unsupportedSchemaHints.put("odps.sql.decimal.odps2", "true"); + unsupportedSchemaHints.put("odps.sql.allow.schema.evolution", "true"); supportSchemaHints.put("odps.sql.type.system.odps2", "true"); supportSchemaHints.put("odps.sql.decimal.odps2", "true"); @@ -82,7 +82,7 @@ public static void createTable(MaxComputeOptions options, TableId tableId, Schem odps.tables() .newTableCreator( odps.getDefaultProject(), tableId.getTableName(), tableSchema) - .withHints(unsupportSchemahints) + .withHints(unsupportedSchemaHints) .ifNotExists() .debug(); if (!CollectionUtil.isNullOrEmpty(schema.primaryKeys())) { @@ -143,7 +143,7 @@ public static void addColumns( odps, odps.getDefaultProject(), sqlBuilder.toString(), - options.isSupportSchema() ? supportSchemaHints : unsupportSchemahints, + options.isSupportSchema() ? supportSchemaHints : unsupportedSchemaHints, null); LOG.info("execute add column task: `{}`, instanceId: {}", sqlBuilder, instance.getId()); instance.waitForSuccess(); @@ -155,7 +155,10 @@ public static void addColumns( * 'col_comment''; */ public static void alterColumnType( - MaxComputeOptions options, TableId tableId, Map typeMapping) + MaxComputeOptions options, + TableId tableId, + Map typeMapping, + Map comments) throws OdpsException { Odps odps = MaxComputeUtils.getOdps(options); @@ -163,19 +166,19 @@ public static void alterColumnType( for (Map.Entry entry : typeMapping.entrySet()) { String alterColumnSql = - prefix - + entry.getKey() - + " " - + entry.getKey() - + " " - + string(entry.getValue()) - + ";"; + prefix + entry.getKey() + " " + entry.getKey() + " " + string(entry.getValue()); + String comment = comments.get(entry.getKey()); + if (comment == null) { + alterColumnSql += ";"; + } else { + alterColumnSql += " comment '" + comment + "';"; + } Instance instance = SQLTask.run( odps, odps.getDefaultProject(), alterColumnSql, - options.isSupportSchema() ? supportSchemaHints : unsupportSchemahints, + options.isSupportSchema() ? supportSchemaHints : unsupportedSchemaHints, null); LOG.info( "execute alter column task: `{}`, instanceId: {}", @@ -206,7 +209,7 @@ public static void dropColumn( odps, odps.getDefaultProject(), sqlBuilder.toString(), - options.isSupportSchema() ? supportSchemaHints : unsupportSchemahints, + options.isSupportSchema() ? supportSchemaHints : unsupportedSchemaHints, null); LOG.info("execute drop column task: `{}`, instanceId: {}", sqlBuilder, instance.getId()); instance.waitForSuccess(); @@ -228,7 +231,7 @@ public static void renameColumn( odps, odps.getDefaultProject(), sql, - options.isSupportSchema() ? supportSchemaHints : unsupportSchemahints, + options.isSupportSchema() ? supportSchemaHints : unsupportedSchemaHints, null); LOG.info("execute rename column task: `{}`, instanceId: {}", sql, instance.getId()); instance.waitForSuccess(); @@ -247,6 +250,26 @@ public static void truncateTable(MaxComputeOptions options, TableId tableId) table.truncate(); } + public static void alterTableComment(MaxComputeOptions options, TableId tableId, String comment) + throws OdpsException { + Odps odps = MaxComputeUtils.getOdps(options); + String sql = + "alter table " + + getFullTableName(options, tableId) + + " set comment '" + + comment + + "';"; + Instance instance = + SQLTask.run( + odps, + odps.getDefaultProject(), + sql, + options.isSupportSchema() ? supportSchemaHints : unsupportedSchemaHints, + null); + LOG.info("execute alter table comment task: `{}`, instanceId: {}", sql, instance.getId()); + instance.waitForSuccess(); + } + private static String getFullTableName(MaxComputeOptions options, TableId tableId) { if (options.isSupportSchema()) { if (StringUtils.isNullOrWhitespaceOnly(tableId.getNamespace())) { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java index af20c531c0d..6bc9a43297e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java @@ -320,7 +320,10 @@ public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) Map typeMapping = new HashMap<>(); typeMapping.put(column.name(), fromDbzColumn(column, tinyInt1isBit)); - changes.add(new AlterColumnTypeEvent(currentTable, typeMapping)); + AlterColumnTypeEvent alterColumnTypeEvent = + new AlterColumnTypeEvent(currentTable, typeMapping); + alterColumnTypeEvent.addColumnComment(column.name(), column.comment()); + changes.add(alterColumnTypeEvent); if (newColumnName != null && !column.name().equalsIgnoreCase(newColumnName)) { Map renameMap = new HashMap<>(); @@ -371,7 +374,10 @@ public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) Column column = columnDefinitionListener.getColumn(); Map typeMapping = new HashMap<>(); typeMapping.put(column.name(), fromDbzColumn(column, tinyInt1isBit)); - changes.add(new AlterColumnTypeEvent(currentTable, typeMapping)); + AlterColumnTypeEvent alterColumnTypeEvent = + new AlterColumnTypeEvent(currentTable, typeMapping); + alterColumnTypeEvent.addColumnComment(column.name(), column.comment()); + changes.add(alterColumnTypeEvent); listeners.remove(columnDefinitionListener); }, columnDefinitionListener); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseCatalog.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseCatalog.java index 67828e66313..a136a9bc990 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseCatalog.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseCatalog.java @@ -91,7 +91,11 @@ public abstract void alterDropColumns( String schemaName, String tableName, List dropColumns); public abstract void alterColumnType( - String schemaName, String tableName, String columnName, DataType dataType); + String schemaName, + String tableName, + String columnName, + DataType dataType, + String comment); public abstract void renameColumn( String schemaName, String tableName, String oldColumnName, String newColumnName); @@ -100,6 +104,8 @@ public abstract void renameColumn( public abstract void truncateTable(String schemaName, String tableName); + public abstract void alterTable(String schemaName, String tableName, String comment); + public void close() { LOG.info("Close OceanBase catalog"); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseMySQLCatalog.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseMySQLCatalog.java index 900b0e050df..5ab2eded6da 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseMySQLCatalog.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseMySQLCatalog.java @@ -227,7 +227,11 @@ public void alterDropColumns(String databaseName, String tableName, List @Override public void alterColumnType( - String databaseName, String tableName, String columnName, DataType dataType) { + String databaseName, + String tableName, + String columnName, + DataType dataType, + String comment) { Preconditions.checkArgument( !StringUtils.isNullOrWhitespaceOnly(databaseName), "database name cannot be null or empty."); @@ -369,6 +373,32 @@ public void truncateTable(String databaseName, String tableName) { } } + @Override + public void alterTable(String schemaName, String tableName, String comment) { + String alterTableDDL = + String.format( + "ALTER TABLE `%s`.`%s` SET COMMENT '%s'", schemaName, tableName, comment); + try { + long startTimeMillis = System.currentTimeMillis(); + executeUpdateStatement(alterTableDDL); + LOG.info( + "Success to alter table {}.{}, duration: {}ms, sql: {}", + schemaName, + tableName, + System.currentTimeMillis() - startTimeMillis, + alterTableDDL); + } catch (Exception e) { + LOG.error( + "Failed to alter table {}.{}, sql: {}", + schemaName, + tableName, + alterTableDDL, + e); + throw new OceanBaseCatalogException( + String.format("Failed to alter table %s.%s ", schemaName, tableName), e); + } + } + // ------------------------------------------------------------------------------------------ // OceanBase DDL SQL // ------------------------------------------------------------------------------------------ diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseOracleCatalog.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseOracleCatalog.java index c77236988f6..c35a929bc06 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseOracleCatalog.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseOracleCatalog.java @@ -79,7 +79,11 @@ public void alterDropColumns(String schemaName, String tableName, List d @Override public void alterColumnType( - String schemaName, String tableName, String columnName, DataType dataType) { + String schemaName, + String tableName, + String columnName, + DataType dataType, + String comment) { throw new OceanBaseCatalogException( "This operation under oracle tenant is not supported currently."); } @@ -102,4 +106,10 @@ public void truncateTable(String schemaName, String tableName) { throw new OceanBaseCatalogException( "This operation under oracle tenant is not supported currently."); } + + @Override + public void alterTable(String schemaName, String tableName, String comment) { + throw new OceanBaseCatalogException( + "This operation under oracle tenant is not supported currently."); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/sink/OceanBaseMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/sink/OceanBaseMetadataApplier.java index 65aa715194f..3a1ea735461 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/sink/OceanBaseMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/sink/OceanBaseMetadataApplier.java @@ -19,6 +19,7 @@ import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.AlterTableCommentEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; import org.apache.flink.cdc.common.event.DropTableEvent; @@ -94,6 +95,10 @@ public void applySchemaChange(SchemaChangeEvent event) { truncateTableEvent -> { applyTruncateTableEvent(truncateTableEvent); return null; + }, + alterTableCommentEvent -> { + applyAlterTableCommentEvent(alterTableCommentEvent); + return null; }); } @@ -152,13 +157,15 @@ private void applyDropColumnEvent(DropColumnEvent dropColumnEvent) { private void applyAlterColumnTypeEvent(AlterColumnTypeEvent alterColumnTypeEvent) { TableId tableId = alterColumnTypeEvent.tableId(); Map typeMapping = alterColumnTypeEvent.getTypeMapping(); + Map comments = alterColumnTypeEvent.getComments(); for (Map.Entry entry : typeMapping.entrySet()) { catalog.alterColumnType( tableId.getSchemaName(), tableId.getTableName(), entry.getKey(), - entry.getValue()); + entry.getValue(), + comments.get(entry.getKey())); } } @@ -192,4 +199,20 @@ private void applyTruncateTableEvent(TruncateTableEvent truncateTableEvent) { tableId.getSchemaName(), tableId.getTableName())); } } + + private void applyAlterTableCommentEvent(AlterTableCommentEvent alterTableCommentEvent) { + TableId tableId = alterTableCommentEvent.tableId(); + // check table exists + if (catalog.tableExists(tableId.getSchemaName(), tableId.getTableName())) { + catalog.alterTable( + tableId.getSchemaName(), + tableId.getTableName(), + alterTableCommentEvent.getComment()); + } else { + throw new OceanBaseCatalogException( + String.format( + "Failed to alter table %s.%s, because the table not exist", + tableId.getSchemaName(), tableId.getTableName())); + } + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java index 7d2c06c15f9..0375c0008cf 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java @@ -19,6 +19,7 @@ import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.AlterTableCommentEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; import org.apache.flink.cdc.common.event.DropTableEvent; @@ -150,6 +151,10 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) truncateTableEvent -> { applyTruncateTable(truncateTableEvent); return null; + }, + alterTableCommentEvent -> { + applyAlterTableComment(alterTableCommentEvent); + return null; }); } @@ -332,6 +337,14 @@ private void applyAlterColumnType(AlterColumnTypeEvent event) throws SchemaEvolv tableChangeList.add( SchemaChangeProvider.updateColumnType( oldName, newType))); + event.getComments() + .forEach( + (name, comment) -> { + if (comment != null) { + tableChangeList.add( + SchemaChange.updateColumnComment(name, comment)); + } + }); catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException @@ -363,6 +376,17 @@ private void applyDropTable(DropTableEvent event) throws SchemaEvolveException { } } + private void applyAlterTableComment(AlterTableCommentEvent event) throws SchemaEvolveException { + try { + catalog.alterTable( + tableIdToIdentifier(event), + SchemaChange.updateComment(event.getComment()), + true); + } catch (Exception e) { + throw new SchemaEvolveException(event, "Failed to apply alter table comment event", e); + } + } + private static Identifier tableIdToIdentifier(SchemaChangeEvent event) { return new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java index 4204dbf9c9e..318a2ae0be1 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java @@ -130,6 +130,12 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) truncateTableEvent -> { applyTruncateTable(truncateTableEvent); return null; + }, + alterTableCommentEvent -> { + // TODO Currently, column comments cannot be modified. + // See + // https://docs.starrocks.io/docs/sql-reference/sql-statements/table_bucket_part_index/ALTER_TABLE/#alter-table-comment-from-v31 + return null; }); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java index 0004961f38d..fe64ecdd23c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java @@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.AlterTableCommentEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; @@ -104,7 +105,8 @@ public Set getSupportedSchemaEvolutionTypes() { SchemaChangeEventType.ALTER_COLUMN_TYPE, SchemaChangeEventType.CREATE_TABLE, SchemaChangeEventType.DROP_COLUMN, - SchemaChangeEventType.RENAME_COLUMN); + SchemaChangeEventType.RENAME_COLUMN, + SchemaChangeEventType.ALTER_TABLE_COMMENT); } @Override @@ -223,6 +225,7 @@ public static Schema getTableSchema(TableId tableId) { for (Column column : table.columns) { builder.physicalColumn(column.getName(), column.getType()); } + builder.comment(table.comment); return builder.primaryKey(table.primaryKeys).build(); } @@ -296,6 +299,8 @@ private static class ValuesTable { // indexes of primaryKeys in columns private final List primaryKeyIndexes; + private String comment; + public ValuesTable(TableId tableId, Schema schema) { this.tableId = tableId; this.lock = new Object(); @@ -304,6 +309,7 @@ public ValuesTable(TableId tableId, Schema schema) { this.records = new HashMap<>(); this.primaryKeys = new LinkedList<>(schema.primaryKeys()); this.primaryKeyIndexes = new ArrayList<>(); + this.comment = schema.comment(); updatePrimaryKeyIndexes(); } @@ -384,6 +390,8 @@ public void applySchemaChangeEvent(SchemaChangeEvent event) { applyRenameColumnEvent((RenameColumnEvent) event); } else if (event instanceof AlterColumnTypeEvent) { applyAlterColumnTypeEvent((AlterColumnTypeEvent) event); + } else if (event instanceof AlterTableCommentEvent) { + applyAlterTableCommentEvent((AlterTableCommentEvent) event); } updatePrimaryKeyIndexes(); } @@ -412,18 +420,27 @@ private String buildPrimaryKeyStr(RecordData recordData) { } private void applyAlterColumnTypeEvent(AlterColumnTypeEvent event) { + Map comments = event.getComments(); event.getTypeMapping() .forEach( (columnName, columnType) -> { for (int i = 0; i < columns.size(); i++) { if (columns.get(i).getName().equals(columnName)) { columns.set( - i, Column.physicalColumn(columnName, columnType)); + i, + Column.physicalColumn( + columnName, + columnType, + comments.get(columnName))); } } }); } + private void applyAlterTableCommentEvent(AlterTableCommentEvent event) { + this.comment = event.getComment(); + } + private void applyAddColumnEvent(AddColumnEvent event) { for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) { if (columns.contains(columnWithPosition.getAddColumn())) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/AlterColumnTypeEventSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/AlterColumnTypeEventSerializer.java index 978b9b87ecd..0b74c8e4c62 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/AlterColumnTypeEventSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/AlterColumnTypeEventSerializer.java @@ -46,6 +46,8 @@ public class AlterColumnTypeEventSerializer extends TypeSerializerSingleton typeMapSerializer = new MapSerializer<>(StringSerializer.INSTANCE, new DataTypeSerializer()); + private final MapSerializer commentsSerializer = + new MapSerializer<>(StringSerializer.INSTANCE, StringSerializer.INSTANCE); @Override public boolean isImmutableType() { @@ -62,7 +64,8 @@ public AlterColumnTypeEvent copy(AlterColumnTypeEvent from) { return new AlterColumnTypeEvent( from.tableId(), typeMapSerializer.copy(from.getTypeMapping()), - typeMapSerializer.copy(from.getOldTypeMapping())); + typeMapSerializer.copy(from.getOldTypeMapping()), + commentsSerializer.copy(from.getComments())); } @Override @@ -80,6 +83,7 @@ public void serialize(AlterColumnTypeEvent record, DataOutputView target) throws tableIdSerializer.serialize(record.tableId(), target); typeMapSerializer.serialize(record.getTypeMapping(), target); typeMapSerializer.serialize(record.getOldTypeMapping(), target); + commentsSerializer.serialize(record.getComments(), target); } @Override @@ -87,7 +91,8 @@ public AlterColumnTypeEvent deserialize(DataInputView source) throws IOException return new AlterColumnTypeEvent( tableIdSerializer.deserialize(source), typeMapSerializer.deserialize(source), - typeMapSerializer.deserialize(source)); + typeMapSerializer.deserialize(source), + commentsSerializer.deserialize(source)); } @Override diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/AlterTableCommentEventSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/AlterTableCommentEventSerializer.java new file mode 100644 index 00000000000..2bc61956423 --- /dev/null +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/AlterTableCommentEventSerializer.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.serializer.event; + +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.cdc.common.event.AlterTableCommentEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.runtime.serializer.StringSerializer; +import org.apache.flink.cdc.runtime.serializer.TableIdSerializer; +import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +/** A {@link TypeSerializer} for {@link AlterTableCommentEvent}. */ +public class AlterTableCommentEventSerializer + extends TypeSerializerSingleton { + + private static final long serialVersionUID = 1L; + + /** Sharable instance of the TableIdSerializer. */ + public static final AlterTableCommentEventSerializer INSTANCE = + new AlterTableCommentEventSerializer(); + + private final TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE; + private final StringSerializer commentSerializer = StringSerializer.INSTANCE; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public AlterTableCommentEvent createInstance() { + return new AlterTableCommentEvent(TableId.tableId("unknown"), ""); + } + + @Override + public AlterTableCommentEvent copy(AlterTableCommentEvent from) { + return new AlterTableCommentEvent(from.tableId(), from.getComment()); + } + + @Override + public AlterTableCommentEvent copy(AlterTableCommentEvent from, AlterTableCommentEvent reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(AlterTableCommentEvent record, DataOutputView target) throws IOException { + tableIdSerializer.serialize(record.tableId(), target); + commentSerializer.serialize(record.getComment(), target); + } + + @Override + public AlterTableCommentEvent deserialize(DataInputView source) throws IOException { + return new AlterTableCommentEvent( + tableIdSerializer.deserialize(source), commentSerializer.deserialize(source)); + } + + @Override + public AlterTableCommentEvent deserialize(AlterTableCommentEvent reuse, DataInputView source) + throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + serialize(deserialize(source), target); + } + + @Override + public TypeSerializerSnapshot snapshotConfiguration() { + return new AlterTableCommentEventSerializerSnapshot(); + } + + /** Serializer configuration snapshot for compatibility and format evolution. */ + @SuppressWarnings("WeakerAccess") + public static final class AlterTableCommentEventSerializerSnapshot + extends SimpleTypeSerializerSnapshot { + + public AlterTableCommentEventSerializerSnapshot() { + super(() -> INSTANCE); + } + } +} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java index 7a5da9c7231..f7a66b4d2a1 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java @@ -33,6 +33,7 @@ import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_TABLE_COMMENT; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE; @@ -85,7 +86,8 @@ public SchemaChangeEvent copy(SchemaChangeEvent from) { DropColumnEventSerializer.INSTANCE::copy, DropTableEventSerializer.INSTANCE::copy, RenameColumnEventSerializer.INSTANCE::copy, - TruncateTableEventSerializer.INSTANCE::copy); + TruncateTableEventSerializer.INSTANCE::copy, + AlterTableCommentEventSerializer.INSTANCE::copy); } @Override @@ -137,6 +139,12 @@ public void serialize(SchemaChangeEvent record, DataOutputView target) throws IO enumSerializer.serialize(TRUNCATE_TABLE, target); TruncateTableEventSerializer.INSTANCE.serialize(truncateTableEvent, target); return null; + }, + alterTableCommentEvent -> { + enumSerializer.serialize(ALTER_TABLE_COMMENT, target); + AlterTableCommentEventSerializer.INSTANCE.serialize( + alterTableCommentEvent, target); + return null; }); } @@ -158,6 +166,8 @@ public SchemaChangeEvent deserialize(DataInputView source) throws IOException { return DropTableEventSerializer.INSTANCE.deserialize(source); case TRUNCATE_TABLE: return TruncateTableEventSerializer.INSTANCE.deserialize(source); + case ALTER_TABLE_COMMENT: + return AlterTableCommentEventSerializer.INSTANCE.deserialize(source); default: throw new IllegalArgumentException( "Unknown schema change event class: " + schemaChangeEventType); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/AlterColumnTypeEventSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/AlterColumnTypeEventSerializerTest.java index 474590892fb..e63579e59c6 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/AlterColumnTypeEventSerializerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/AlterColumnTypeEventSerializerTest.java @@ -53,13 +53,20 @@ protected AlterColumnTypeEvent[] getTestData() { Map oldMap = new HashMap<>(); oldMap.put("col1", DataTypes.TIME()); oldMap.put("col2", DataTypes.BYTES()); + + Map comments = new HashMap<>(); + comments.put("col1", "col1 comment"); + comments.put("col2", "col2 comment"); + return new AlterColumnTypeEvent[] { new AlterColumnTypeEvent(TableId.tableId("table"), map), new AlterColumnTypeEvent(TableId.tableId("schema", "table"), map), new AlterColumnTypeEvent(TableId.tableId("namespace", "schema", "table"), map), new AlterColumnTypeEvent(TableId.tableId("table"), map, oldMap), new AlterColumnTypeEvent(TableId.tableId("schema", "table"), map, oldMap), - new AlterColumnTypeEvent(TableId.tableId("namespace", "schema", "table"), map, oldMap) + new AlterColumnTypeEvent(TableId.tableId("namespace", "schema", "table"), map, oldMap), + new AlterColumnTypeEvent( + TableId.tableId("namespace", "schema", "table"), map, oldMap, comments) }; } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/AlterTableCommentEventSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/AlterTableCommentEventSerializerTest.java new file mode 100644 index 00000000000..d782b5ddff2 --- /dev/null +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/AlterTableCommentEventSerializerTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.serializer.event; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.cdc.common.event.AlterTableCommentEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.runtime.serializer.SerializerTestBase; + +/** A test for the {@link AlterTableCommentEventSerializer}. */ +public class AlterTableCommentEventSerializerTest + extends SerializerTestBase { + @Override + protected TypeSerializer createSerializer() { + return AlterTableCommentEventSerializer.INSTANCE; + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class getTypeClass() { + return AlterTableCommentEvent.class; + } + + @Override + protected AlterTableCommentEvent[] getTestData() { + String comment = "This is a table comment."; + return new AlterTableCommentEvent[] { + new AlterTableCommentEvent(TableId.tableId("table"), comment), + new AlterTableCommentEvent(TableId.tableId("schema", "table"), comment), + new AlterTableCommentEvent(TableId.tableId("namespace", "schema", "table"), comment) + }; + } +} From 1572ddb84e979d9b98da4254134e87474cb3836b Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Fri, 24 Jan 2025 16:30:26 +0800 Subject: [PATCH 2/5] Fix AlterColumnTypeEvent#toString --- .../common/event/AlterColumnTypeEvent.java | 4 +-- .../flink/FlinkPipelineComposerITCase.java | 28 +++++++++---------- .../FlinkPipelineComposerLenientITCase.java | 4 +-- .../flink/FlinkPipelineTransformITCase.java | 18 ++++++------ .../cdc/pipeline/tests/MysqlE2eITCase.java | 4 +-- .../cdc/pipeline/tests/RouteE2eITCase.java | 14 +++++----- .../pipeline/tests/SchemaEvolveE2eITCase.java | 14 +++++----- .../SchemaEvolvingTransformE2eITCase.java | 10 +++---- .../pipeline/tests/TransformE2eITCase.java | 4 +-- 9 files changed, 50 insertions(+), 50 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java index 58dfa641bd7..0302ed1bd34 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java @@ -116,7 +116,7 @@ public String toString() { + typeMapping + ", oldTypeMapping=" + oldTypeMapping - + ", comments='" + + ", comments=" + comments + '}'; } else { @@ -125,7 +125,7 @@ public String toString() { + tableId + ", typeMapping=" + typeMapping - + ", comments='" + + ", comments=" + comments + '}'; } diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 13cf54e7f13..5a1a34332e3 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -836,7 +836,7 @@ void testMergingWithRoute(ValuesDataSink.SinkApi sinkApi) throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=AFTER, existedColumnName=age}]}", - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT}, oldTypeMapping={id=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT}, oldTypeMapping={id=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, student], after=[], op=DELETE, meta=()}", @@ -1047,7 +1047,7 @@ void testTransformMergingWithRoute(ValuesDataSink.SinkApi sinkApi) throws Except "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name], after=[2, Bob, 30, last_name], op=UPDATE, meta=()}", "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=AFTER, existedColumnName=last_name}]}", - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT NOT NULL}, oldTypeMapping={id=INT NOT NULL}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT NOT NULL}, oldTypeMapping={id=INT NOT NULL}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, last_name, student], after=[], op=DELETE, meta=()}", @@ -1241,7 +1241,7 @@ void testTransformMergingWithRouteChangeOrder(ValuesDataSink.SinkApi sinkApi) th .containsExactly( "CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}", "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=AFTER, existedColumnName=last_name}]}", - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT NOT NULL}, oldTypeMapping={id=INT NOT NULL}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT NOT NULL}, oldTypeMapping={id=INT NOT NULL}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18, last_name, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name, null], after=[2, Bob, 30, last_name, null], op=UPDATE, meta=()}", @@ -1385,7 +1385,7 @@ void testMergingTemporalTypesWithPromotedPrecisions(ValuesDataSink.SinkApi sinkA // Merging timestamp with different precision "CreateTableEvent{tableId={}_table_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0)}, primaryKeys=id, options=()}", "DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[1, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}_table_timestamp_merged, typeMapping={birthday=TIMESTAMP(9)}, oldTypeMapping={birthday=TIMESTAMP(0)}}", + "AlterColumnTypeEvent{tableId={}_table_timestamp_merged, typeMapping={birthday=TIMESTAMP(9)}, oldTypeMapping={birthday=TIMESTAMP(0)}, comments={}}", "DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[2, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", "DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[101, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}", "DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[102, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", @@ -1393,7 +1393,7 @@ void testMergingTemporalTypesWithPromotedPrecisions(ValuesDataSink.SinkApi sinkA // Merging zoned timestamp with different precision "CreateTableEvent{tableId={}_table_zoned_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0) WITH TIME ZONE}, primaryKeys=id, options=()}", "DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[3, Alice, 17, 2020-01-01T14:28:57Z], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}_table_zoned_timestamp_merged, typeMapping={birthday=TIMESTAMP(9) WITH TIME ZONE}, oldTypeMapping={birthday=TIMESTAMP(0) WITH TIME ZONE}}", + "AlterColumnTypeEvent{tableId={}_table_zoned_timestamp_merged, typeMapping={birthday=TIMESTAMP(9) WITH TIME ZONE}, oldTypeMapping={birthday=TIMESTAMP(0) WITH TIME ZONE}, comments={}}", "DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[4, Alice, 17, 2020-01-01T14:28:57.123456789Z], op=INSERT, meta=()}", "DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[103, Zen, 19, 2020-01-01T14:28:57Z], op=INSERT, meta=()}", "DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[104, Zen, 19, 2020-01-01T14:28:57.123456789Z], op=INSERT, meta=()}", @@ -1401,7 +1401,7 @@ void testMergingTemporalTypesWithPromotedPrecisions(ValuesDataSink.SinkApi sinkA // Merging local-zoned timestamp with different precision "CreateTableEvent{tableId={}_table_local_zoned_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP_LTZ(0)}, primaryKeys=id, options=()}", "DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[5, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}_table_local_zoned_timestamp_merged, typeMapping={birthday=TIMESTAMP_LTZ(9)}, oldTypeMapping={birthday=TIMESTAMP_LTZ(0)}}", + "AlterColumnTypeEvent{tableId={}_table_local_zoned_timestamp_merged, typeMapping={birthday=TIMESTAMP_LTZ(9)}, oldTypeMapping={birthday=TIMESTAMP_LTZ(0)}, comments={}}", "DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[6, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", "DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[105, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}", "DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[106, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", @@ -1409,9 +1409,9 @@ void testMergingTemporalTypesWithPromotedPrecisions(ValuesDataSink.SinkApi sinkA // Merging all "CreateTableEvent{tableId={}_everything_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0)}, primaryKeys=id, options=()}", "DataChangeEvent{tableId={}_everything_merged, before=[], after=[1, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}_everything_merged, typeMapping={birthday=TIMESTAMP(9)}, oldTypeMapping={birthday=TIMESTAMP(0)}}", + "AlterColumnTypeEvent{tableId={}_everything_merged, typeMapping={birthday=TIMESTAMP(9)}, oldTypeMapping={birthday=TIMESTAMP(0)}, comments={}}", "DataChangeEvent{tableId={}_everything_merged, before=[], after=[2, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}_everything_merged, typeMapping={birthday=TIMESTAMP(9) WITH TIME ZONE}, oldTypeMapping={birthday=TIMESTAMP(9)}}", + "AlterColumnTypeEvent{tableId={}_everything_merged, typeMapping={birthday=TIMESTAMP(9) WITH TIME ZONE}, oldTypeMapping={birthday=TIMESTAMP(9)}, comments={}}", "DataChangeEvent{tableId={}_everything_merged, before=[], after=[3, Alice, 17, 2020-01-01T14:28:57Z], op=INSERT, meta=()}", "DataChangeEvent{tableId={}_everything_merged, before=[], after=[4, Alice, 17, 2020-01-01T14:28:57.123456789Z], op=INSERT, meta=()}", "DataChangeEvent{tableId={}_everything_merged, before=[], after=[5, Alice, 17, 2020-01-01T04:28:57-05:00], op=INSERT, meta=()}", @@ -1481,18 +1481,18 @@ void testMergingDecimalWithVariousPrecisions(ValuesDataSink.SinkApi sinkApi) thr Stream.of( "CreateTableEvent{tableId={}, schema=columns={`id` INT,`name` STRING,`age` INT,`fav_num` TINYINT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId={}, before=[], after=[1, Alice, 17, 1], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=SMALLINT}, oldTypeMapping={fav_num=TINYINT}}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=SMALLINT}, oldTypeMapping={fav_num=TINYINT}, comments={}}", "DataChangeEvent{tableId={}, before=[], after=[2, Alice, 17, 22], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=INT}, oldTypeMapping={fav_num=SMALLINT}}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=INT}, oldTypeMapping={fav_num=SMALLINT}, comments={}}", "DataChangeEvent{tableId={}, before=[], after=[3, Alice, 17, 3333], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=BIGINT}, oldTypeMapping={fav_num=INT}}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=BIGINT}, oldTypeMapping={fav_num=INT}, comments={}}", "DataChangeEvent{tableId={}, before=[], after=[4, Alice, 17, 44444444], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(19, 0)}, oldTypeMapping={fav_num=BIGINT}}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(19, 0)}, oldTypeMapping={fav_num=BIGINT}, comments={}}", "DataChangeEvent{tableId={}, before=[], after=[5, Alice, 17, 555555555555555], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(24, 5)}, oldTypeMapping={fav_num=DECIMAL(19, 0)}}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(24, 5)}, oldTypeMapping={fav_num=DECIMAL(19, 0)}, comments={}}", "DataChangeEvent{tableId={}, before=[], after=[6, Alice, 17, 66666.66666], op=INSERT, meta=()}", "DataChangeEvent{tableId={}, before=[], after=[7, Alice, 17, 77777777.17000], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(38, 19)}, oldTypeMapping={fav_num=DECIMAL(24, 5)}}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(38, 19)}, oldTypeMapping={fav_num=DECIMAL(24, 5)}, comments={}}", "DataChangeEvent{tableId={}, before=[], after=[8, Alice, 17, 888888888.8888888888888888888], op=INSERT, meta=()}", "DataChangeEvent{tableId={}, before=[], after=[101, Zen, 19, 1.0000000000000000000], op=INSERT, meta=()}", "DataChangeEvent{tableId={}, before=[], after=[102, Zen, 19, 22.0000000000000000000], op=INSERT, meta=()}", diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java index f6bb4f4295e..13cbbc33ce5 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java @@ -865,7 +865,7 @@ void testMergingWithRoute() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=LAST, existedColumnName=null}]}", - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT}, oldTypeMapping={id=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT}, oldTypeMapping={id=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, student], after=[], op=DELETE, meta=()}", @@ -1073,7 +1073,7 @@ void testTransformMergingWithRoute() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name], after=[2, Bob, 30, last_name], op=UPDATE, meta=()}", "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=LAST, existedColumnName=null}]}", - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT NOT NULL}, oldTypeMapping={id=INT NOT NULL}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT NOT NULL}, oldTypeMapping={id=INT NOT NULL}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, last_name, student], after=[], op=DELETE, meta=()}", diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index e1df23a11a5..8289931e865 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -1213,7 +1213,7 @@ void testVanillaTransformWithSchemaEvolution() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", @@ -1308,7 +1308,7 @@ void testWildcardTransformWithSchemaEvolution() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", @@ -1397,7 +1397,7 @@ void testExplicitTransformWithSchemaEvolution() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5, Eve, 5 -> Eve], after=[5, Eva, 5 -> Eva], op=UPDATE, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[6, Fiona, 6 -> Fiona], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6, Fiona, 6 -> Fiona], after=[], op=DELETE, meta=()}", - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={name=VARCHAR(17)}, oldTypeMapping={name=STRING}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={name=VARCHAR(17)}, oldTypeMapping={name=STRING}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7, Gem, 7 -> Gem], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8, Helen, 8 -> Helen], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8, Helen, 8 -> Helen], after=[8, Harry, 8 -> Harry], op=UPDATE, meta=()}", @@ -1486,7 +1486,7 @@ void testPreAsteriskWithSchemaEvolution() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3, 6 -> Fiona], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1, 7 -> Gem], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2, 8 -> Helen], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2, 8 -> Helen], after=[5th, 8, Harry, 18.0, -3, 8 -> Harry], op=UPDATE, meta=()}", @@ -1581,7 +1581,7 @@ void testPostAsteriskWithSchemaEvolution() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6 -> Fiona, 3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7 -> Gem, 4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8 -> Helen, 5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8 -> Helen, 5th, 8, Helen, 18.0, -2], after=[8 -> Harry, 5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", @@ -1676,7 +1676,7 @@ void testTransformWithFilterButNoProjection() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", @@ -1772,7 +1772,7 @@ void testTransformUnmatchedSchemaEvolution() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", @@ -1867,7 +1867,7 @@ void testExplicitPrimaryKeyWithNullable() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", @@ -2227,7 +2227,7 @@ void testTransformWithLargeLiterals() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}", // Alter column type - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[5th, 8, Harry, 18.0, -3, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}", diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java index b649eb907c4..9b28c65b7e9 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java @@ -386,7 +386,7 @@ void testSchemaChangeEvents() throws Exception { "DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}", "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}", "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}", - "AlterColumnTypeEvent{tableId=%s.products, typeMapping={new_col=BIGINT}, oldTypeMapping={new_col=INT}}", + "AlterColumnTypeEvent{tableId=%s.products, typeMapping={new_col=BIGINT}, oldTypeMapping={new_col=INT}, comments={}}", "DataChangeEvent{tableId=%s.products, before=[], after=[112, derrida, forever 21, 2.1728, null, null, null, 2147483649], op=INSERT, meta=()}", "RenameColumnEvent{tableId=%s.products, nameMapping={new_col=new_column}}", "DataChangeEvent{tableId=%s.products, before=[], after=[113, dynazenon, SSSS, 2.1728, null, null, null, 2147483649], op=INSERT, meta=()}", @@ -535,7 +535,7 @@ void testSoftDelete() throws Exception { "DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1, -U], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1, +U], op=UPDATE, meta=()}", "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1, -U], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1, +U], op=UPDATE, meta=()}", "DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1, -D], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.products, typeMapping={new_col=BIGINT}, oldTypeMapping={new_col=INT}}", + "AlterColumnTypeEvent{tableId=%s.products, typeMapping={new_col=BIGINT}, oldTypeMapping={new_col=INT}, comments={}}", "DataChangeEvent{tableId=%s.products, before=[], after=[112, derrida, forever 21, 2.1728, null, null, null, 2147483649, +I], op=INSERT, meta=()}", "RenameColumnEvent{tableId=%s.products, nameMapping={new_col=new_column}}", "DataChangeEvent{tableId=%s.products, before=[], after=[113, dynazenon, SSSS, 2.1728, null, null, null, 2147483649, +I], op=INSERT, meta=()}", diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java index 23b340cad4f..78d3677ef66 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java @@ -224,7 +224,7 @@ void testDefaultRoute() throws Exception { "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", "RenameColumnEvent{tableId=%s.TABLEBETA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[10002, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}}", + "AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}, comments={}}", "RenameColumnEvent{tableId=%s.TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=%s.TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}", "DropColumnEvent{tableId=%s.TABLEDELTA, droppedColumnNames=[VERSION]}"); @@ -380,7 +380,7 @@ void testMergeTableRoute() throws Exception { "DataChangeEvent{tableId=%s.ALL, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", "AddColumnEvent{tableId=%s.ALL, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=AFTER, existedColumnName=NAME}]}", "DataChangeEvent{tableId=%s.ALL, before=[], after=[10002, null, null, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.ALL, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}}", + "AlterColumnTypeEvent{tableId=%s.ALL, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}, comments={}}", "DataChangeEvent{tableId=%s.ALL, before=[], after=[10003, null, null, Fluorite], op=INSERT, meta=()}", "DataChangeEvent{tableId=%s.ALL, before=[], after=[10004, null, null, null], op=INSERT, meta=()}"); } @@ -538,7 +538,7 @@ void testPartialRoute() throws Exception { "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", "AddColumnEvent{tableId=NEW_%s.ALPHABET, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=AFTER, existedColumnName=NAME}]}", "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[10002, null, null, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}}", + "AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}, comments={}}", "RenameColumnEvent{tableId=%s.TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=%s.TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}", "DropColumnEvent{tableId=%s.TABLEDELTA, droppedColumnNames=[VERSION]}", @@ -719,7 +719,7 @@ void testMultipleRoute() throws Exception { "AddColumnEvent{tableId=NEW_%s.BETAGAMM, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=AFTER, existedColumnName=VERSION}]}", "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[10002, null, null, 15], op=INSERT, meta=()}", "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[10002, null, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=NEW_%s.BETAGAMM, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}}", + "AlterColumnTypeEvent{tableId=NEW_%s.BETAGAMM, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}, comments={}}", "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[10003, null, Fluorite], op=INSERT, meta=()}", "DropColumnEvent{tableId=%s.TABLEDELTA, droppedColumnNames=[VERSION]}", "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[10004], op=INSERT, meta=()}"); @@ -914,7 +914,7 @@ void testOneToManyRoute() throws Exception { "DataChangeEvent{tableId=NEW_%s.TABLEC, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", "RenameColumnEvent{tableId=%s.TABLEBETA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[10002, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}}", + "AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}, comments={}}", "RenameColumnEvent{tableId=%s.TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=%s.TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}", "DropColumnEvent{tableId=%s.TABLEDELTA, droppedColumnNames=[VERSION]}", @@ -1081,7 +1081,7 @@ void testMergeTableRouteWithTransform() throws Exception { "DataChangeEvent{tableId=%s.ALL, before=[], after=[10001, 12, extras, Derrida], op=INSERT, meta=()}", "AddColumnEvent{tableId=%s.ALL, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=AFTER, existedColumnName=NAME}]}", "DataChangeEvent{tableId=%s.ALL, before=[], after=[10002, null, extras, null, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.ALL, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}}", + "AlterColumnTypeEvent{tableId=%s.ALL, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}, comments={}}", "DataChangeEvent{tableId=%s.ALL, before=[], after=[10003, null, extras, null, Fluorite], op=INSERT, meta=()}", "DataChangeEvent{tableId=%s.ALL, before=[], after=[10004, null, extras, null, null], op=INSERT, meta=()}"); } @@ -1244,7 +1244,7 @@ void testReplacementSymbol() throws Exception { "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", "RenameColumnEvent{tableId=NEW_%s.NEW_TABLEBETA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], after=[10002, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}}", + "AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}, comments={}}", "RenameColumnEvent{tableId=NEW_%s.NEW_TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}", "DropColumnEvent{tableId=NEW_%s.NEW_TABLEDELTA, droppedColumnNames=[VERSION]}", diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java index 8fc16d23ea9..eefd933da0c 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java @@ -96,7 +96,7 @@ void testSchemaEvolve() throws Exception { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}", "RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}", "DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}", @@ -116,7 +116,7 @@ void testSchemaEvolveWithIncompatibleChanges() throws Exception { Arrays.asList( "AddColumnEvent{tableId=%s.merged, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", "DataChangeEvent{tableId=%s.merged, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.merged, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.merged, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "AddColumnEvent{tableId=%s.merged, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=AFTER, existedColumnName=gender}]}", "AddColumnEvent{tableId=%s.merged, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=AFTER, existedColumnName=precise_age}]}", "DataChangeEvent{tableId=%s.merged, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}", @@ -151,7 +151,7 @@ void testSchemaTryEvolveWithException() throws Exception { "DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, null], op=INSERT, meta=()}"), Arrays.asList( "Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", - "Failed to apply schema change AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", + "Failed to apply schema change AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", "Failed to apply schema change RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", "Failed to apply schema change RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", "Failed to apply schema change DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", @@ -197,7 +197,7 @@ void testLenientSchemaEvolution() throws Exception { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}", "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}", @@ -220,7 +220,7 @@ void testFineGrainedSchemaEvolution() throws Exception { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}", "RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0, null], op=INSERT, meta=()}", @@ -309,7 +309,7 @@ void testLenientWithRoute() throws Exception { Arrays.asList( "AddColumnEvent{tableId=%s.redirect, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.redirect, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.redirect, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.redirect, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "AddColumnEvent{tableId=%s.redirect, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}", "AddColumnEvent{tableId=%s.redirect, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.redirect, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}", @@ -439,7 +439,7 @@ void testByDefaultTransform() throws Exception { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}", "RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}", "DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}", diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java index 5a671fc87dc..27f004a66de 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java @@ -97,7 +97,7 @@ void testSchemaEvolve() throws Exception { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012 -> Eve, 1012, Eve, 17, 0, 1024144, age < 20], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}", "DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, 1026169, age < 20], op=INSERT, meta=()}", @@ -116,7 +116,7 @@ void testSchemaEvolveWithIncompatibleChanges() throws Exception { Arrays.asList( "AddColumnEvent{tableId=%s.merged, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=tag}]}", "DataChangeEvent{tableId=%s.merged, before=[], after=[1012 -> Eve, 1012, Eve, 17, 1024144, age < 20, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.merged, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.merged, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "AddColumnEvent{tableId=%s.merged, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=AFTER, existedColumnName=gender}]}", "DataChangeEvent{tableId=%s.merged, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, 1026169, age < 20, null, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=%s.merged, before=[], after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20, null, null], op=INSERT, meta=()}")); @@ -151,7 +151,7 @@ void testSchemaTryEvolveWithException() throws Exception { "DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, null, 1028196, age < 20], op=INSERT, meta=()}"), Arrays.asList( "Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", - "Failed to apply schema change AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", + "Failed to apply schema change AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", "Failed to apply schema change RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", "Failed to apply schema change DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", "Failed to apply schema change TruncateTableEvent{tableId=%s.members}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=TruncateTableEvent{tableId=%s.members}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", @@ -195,7 +195,7 @@ void testLenientSchemaEvolution() throws Exception { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012 -> Eve, 1012, Eve, 17, 1024144, age < 20, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, 1026169, age < 20, null, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20, null, null], op=INSERT, meta=()}")); @@ -212,7 +212,7 @@ void testFineGrainedSchemaEvolution() throws Exception { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012 -> Eve, 1012, Eve, 17, 0, 1024144, age < 20], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, null, 1026169, age < 20], op=INSERT, meta=()}", "TruncateTableEvent{tableId=%s.members}", diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index 53391183d08..28e22758ea2 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -1086,7 +1086,7 @@ void testTransformWildcardPrefixWithSchemaEvolution() throws Exception { validateEvents( "AddColumnEvent{tableId=%s.TABLEALPHA, addedColumns=[ColumnWithPosition{column=`LAST` VARCHAR(17), position=AFTER, existedColumnName=NAMEALPHA}]}", "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3008, 8, 8, 80, 17, Jazz, Last, id -> 3008], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.TABLEALPHA, typeMapping={CODENAME=DOUBLE}, oldTypeMapping={CODENAME=TINYINT}}", + "AlterColumnTypeEvent{tableId=%s.TABLEALPHA, typeMapping={CODENAME=DOUBLE}, oldTypeMapping={CODENAME=TINYINT}, comments={}}", "RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODENAME=CODE_NAME}}", "RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODE_NAME=CODE_NAME_EX}}", "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3009, 9, 9.0, 90, 18, Keka, Finale, id -> 3009], op=INSERT, meta=()}", @@ -1195,7 +1195,7 @@ void testTransformWildcardSuffixWithSchemaEvolution() throws Exception { "AddColumnEvent{tableId=%s.TABLEALPHA, addedColumns=[ColumnWithPosition{column=`CODENAME` TINYINT, position=AFTER, existedColumnName=VERSION}]}", "AddColumnEvent{tableId=%s.TABLEALPHA, addedColumns=[ColumnWithPosition{column=`FIRST` VARCHAR(17), position=BEFORE, existedColumnName=ID}]}", "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3008 <- id, First, 3008, 8, 8, 80, 17, Jazz], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.TABLEALPHA, typeMapping={CODENAME=DOUBLE}, oldTypeMapping={CODENAME=TINYINT}}", + "AlterColumnTypeEvent{tableId=%s.TABLEALPHA, typeMapping={CODENAME=DOUBLE}, oldTypeMapping={CODENAME=TINYINT}, comments={}}", "RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODENAME=CODE_NAME}}", "RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODE_NAME=CODE_NAME_EX}}", "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3009 <- id, 1st, 3009, 9, 9.0, 90, 18, Keka], op=INSERT, meta=()}", From b92b64f11cc2a225b4885774ad9a45126d04a196 Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Tue, 4 Mar 2025 15:06:07 +0800 Subject: [PATCH 3/5] parse ddl and add tests --- .../CustomAlterTableParserListener.java | 24 +++-- .../CustomMySqlAntlrDdlParserListener.java | 2 + .../mysql/source/MySqlPipelineITCase.java | 89 ++++++++++++++++--- 3 files changed, 95 insertions(+), 20 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java index 6bc9a43297e..65e7976b8e3 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java @@ -17,14 +17,7 @@ package org.apache.flink.cdc.connectors.mysql.source.parser; -import org.apache.flink.cdc.common.event.AddColumnEvent; -import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; -import org.apache.flink.cdc.common.event.CreateTableEvent; -import org.apache.flink.cdc.common.event.DropColumnEvent; -import org.apache.flink.cdc.common.event.DropTableEvent; -import org.apache.flink.cdc.common.event.RenameColumnEvent; -import org.apache.flink.cdc.common.event.SchemaChangeEvent; -import org.apache.flink.cdc.common.event.TruncateTableEvent; +import org.apache.flink.cdc.common.event.*; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataType; @@ -36,6 +29,7 @@ import io.debezium.relational.ColumnEditor; import io.debezium.relational.TableEditor; import io.debezium.relational.TableId; +import io.debezium.relational.ddl.AbstractDdlParser; import org.antlr.v4.runtime.tree.ParseTreeListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -427,7 +421,8 @@ public void enterTableOptionComment(MySqlParser.TableOptionCommentContext ctx) { () -> { if (ctx.COMMENT() != null) { tableEditor.setComment( - parser.withoutQuotes(ctx.STRING_LITERAL().getText())); + AbstractDdlParser.withoutQuotes( + ctx.STRING_LITERAL().getText())); } }, tableEditor); @@ -435,6 +430,17 @@ public void enterTableOptionComment(MySqlParser.TableOptionCommentContext ctx) { super.enterTableOptionComment(ctx); } + @Override + public void exitTableOptionComment(MySqlParser.TableOptionCommentContext ctx) { + if (!parser.skipComments() && tableEditor.hasComment()) { + changes.add( + new AlterTableCommentEvent( + currentTable, + AbstractDdlParser.withoutQuotes(ctx.STRING_LITERAL().getText()))); + } + super.exitTableOptionComment(ctx); + } + private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) { return org.apache.flink.cdc.common.schema.Column.physicalColumn( dbzColumn.name(), diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java index 7e24e264759..0385c01b50d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java @@ -22,6 +22,7 @@ import io.debezium.antlr.AntlrDdlParserListener; import io.debezium.antlr.ProxyParseTreeListenerUtil; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.connector.mysql.antlr.listener.AlterTableParserListener; import io.debezium.connector.mysql.antlr.listener.AlterViewParserListener; import io.debezium.connector.mysql.antlr.listener.CreateAndAlterDatabaseParserListener; import io.debezium.connector.mysql.antlr.listener.CreateTableParserListener; @@ -88,6 +89,7 @@ public CustomMySqlAntlrDdlParserListener( listeners.add(new TruncateTableParserListener(parser)); listeners.add(new CreateViewParserListener(parser, listeners)); listeners.add(new AlterViewParserListener(parser, listeners)); + listeners.add(new AlterTableParserListener(parser, listeners)); listeners.add(new DropViewParserListener(parser)); listeners.add(new CreateUniqueIndexParserListener(parser)); listeners.add(new SetStatementParserListener(parser)); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 9415fe42ae0..8d08ec2d44f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -21,17 +21,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.data.binary.BinaryStringData; -import org.apache.flink.cdc.common.event.AddColumnEvent; -import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; -import org.apache.flink.cdc.common.event.CreateTableEvent; -import org.apache.flink.cdc.common.event.DataChangeEvent; -import org.apache.flink.cdc.common.event.DropColumnEvent; -import org.apache.flink.cdc.common.event.DropTableEvent; -import org.apache.flink.cdc.common.event.Event; -import org.apache.flink.cdc.common.event.RenameColumnEvent; -import org.apache.flink.cdc.common.event.SchemaChangeEvent; -import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.common.event.TruncateTableEvent; +import org.apache.flink.cdc.common.event.*; import org.apache.flink.cdc.common.factories.Factory; import org.apache.flink.cdc.common.factories.FactoryHelper; import org.apache.flink.cdc.common.schema.Column; @@ -1093,6 +1083,63 @@ public void testIncludeComments() throws Exception { actual.stream().map(Object::toString).collect(Collectors.toList())); } + @Test + public void testAlterTableComment() throws Exception { + env.setParallelism(1); + inventoryDatabase.createAndInitialize(); + TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), "tbl_with_comments"); + + String createTableSql = + String.format( + "CREATE TABLE IF NOT EXISTS `%s`.`%s` (\n" + + " id INTEGER NOT NULL AUTO_INCREMENT COMMENT 'column comment of id' PRIMARY KEY,\n" + + " name VARCHAR(255) NOT NULL DEFAULT 'flink' COMMENT 'column comment of name',\n" + + " weight FLOAT(6) COMMENT 'column comment of weight'\n" + + ")\n" + + "COMMENT 'table comment of products';", + inventoryDatabase.getDatabaseName(), "tbl_with_comments"); + executeSql(inventoryDatabase, createTableSql); + + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL8_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL8_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(SERVER_TIME_ZONE.key(), "UTC"); + options.put(INCLUDE_COMMENTS_ENABLED.key(), "true"); + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".tbl_with_comments"); + Factory.Context context = + new FactoryHelper.DefaultContext( + Configuration.fromMap(options), null, this.getClass().getClassLoader()); + + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) dataSource.getEventSourceProvider(); + + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + MySqlDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + Thread.sleep(5_000); + + // alter table comment + String addColumnSql = + String.format( + "ALTER TABLE `%s`.`tbl_with_comments` COMMENT = 'new table comment';", + inventoryDatabase.getDatabaseName()); + executeSql(inventoryDatabase, addColumnSql); + + List expectedEvents = getEventsWithTableComments(tableId); + List actual = fetchResults(events, expectedEvents.size()); + assertEqualsInAnyOrder( + expectedEvents.stream().map(Object::toString).collect(Collectors.toList()), + actual.stream().map(Object::toString).collect(Collectors.toList())); + } + @Test public void testIncludeCommentsForScanBinlogNewlyAddedTableEnabled() throws Exception { env.setParallelism(1); @@ -1169,6 +1216,26 @@ private void executeSql(UniqueDatabase database, String sql) throws SQLException } } + private List getEventsWithTableComments(TableId tableId) { + return Arrays.asList( + new CreateTableEvent( + tableId, + Schema.newBuilder() + .physicalColumn( + "id", DataTypes.INT().notNull(), "column comment of id") + .physicalColumn( + "name", + DataTypes.VARCHAR(255).notNull(), + "column comment of name", + "flink") + .physicalColumn( + "weight", DataTypes.FLOAT(), "column comment of weight") + .primaryKey(Collections.singletonList("id")) + .comment("table comment of products") + .build()), + new AlterTableCommentEvent(tableId, "new table comment")); + } + private List getEventsWithComments(TableId tableId) { return Arrays.asList( new CreateTableEvent( From 15ce2dc057eba99c0b022c42e1bc568d2d3991a4 Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Tue, 4 Mar 2025 16:11:28 +0800 Subject: [PATCH 4/5] checkstyle --- .../parser/CustomAlterTableParserListener.java | 10 +++++++++- .../mysql/source/MySqlPipelineITCase.java | 13 ++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java index 65e7976b8e3..3a154374235 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java @@ -17,7 +17,15 @@ package org.apache.flink.cdc.connectors.mysql.source.parser; -import org.apache.flink.cdc.common.event.*; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.AlterTableCommentEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataType; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 8d08ec2d44f..0e3bf593b35 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -21,7 +21,18 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.data.binary.BinaryStringData; -import org.apache.flink.cdc.common.event.*; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.AlterTableCommentEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.factories.Factory; import org.apache.flink.cdc.common.factories.FactoryHelper; import org.apache.flink.cdc.common.schema.Column; From 9b92048fe059f8bf096f2e35ec82b8b781876413 Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Fri, 19 Dec 2025 18:42:18 +0800 Subject: [PATCH 5/5] fix IcebergMetadataApplier --- .../cdc/connectors/iceberg/sink/IcebergMetadataApplier.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java index 7427960f9a6..ec18cfd3076 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java @@ -124,6 +124,9 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) }, truncateTableEvent -> { throw new UnsupportedSchemaChangeEventException(truncateTableEvent); + }, + alterTableEvent -> { + throw new UnsupportedSchemaChangeEventException(alterTableEvent); }); }