Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -44,26 +46,46 @@ public class AlterColumnTypeEvent implements SchemaChangeEventWithPreSchema, Sch

private final Map<String, DataType> oldTypeMapping;

public AlterColumnTypeEvent(TableId tableId, Map<String, DataType> typeMapping) {
/** key => column name, value => column comment after changing. */
private final Map<String, String> comments;

public AlterColumnTypeEvent(
TableId tableId,
Map<String, DataType> typeMapping,
Map<String, DataType> oldTypeMapping,
Map<String, String> comments) {
this.tableId = tableId;
this.typeMapping = typeMapping;
this.oldTypeMapping = new HashMap<>();
this.oldTypeMapping = oldTypeMapping;
this.comments = comments;
}

public AlterColumnTypeEvent(TableId tableId, Map<String, DataType> typeMapping) {
this(tableId, typeMapping, new HashMap<>());
}

public AlterColumnTypeEvent(
TableId tableId,
Map<String, DataType> typeMapping,
Map<String, DataType> oldTypeMapping) {
this.tableId = tableId;
this.typeMapping = typeMapping;
this.oldTypeMapping = oldTypeMapping;
this(tableId, typeMapping, oldTypeMapping, new HashMap<>());
}

public void addColumnComment(String columnName, @Nullable String comment) {
if (comment != null) {
this.comments.put(columnName, comment);
}
}

/** Returns the type mapping. */
public Map<String, DataType> getTypeMapping() {
return typeMapping;
}

public Map<String, String> getComments() {
return comments;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -75,12 +97,13 @@ public boolean equals(Object o) {
AlterColumnTypeEvent that = (AlterColumnTypeEvent) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(typeMapping, that.typeMapping)
&& Objects.equals(oldTypeMapping, that.oldTypeMapping);
&& Objects.equals(oldTypeMapping, that.oldTypeMapping)
&& Objects.equals(comments, that.comments);
}

@Override
public int hashCode() {
return Objects.hash(tableId, typeMapping, oldTypeMapping);
return Objects.hash(tableId, typeMapping, oldTypeMapping, comments);
}

@Override
Expand All @@ -93,13 +116,17 @@ public String toString() {
+ typeMapping
+ ", oldTypeMapping="
+ oldTypeMapping
+ ", comments="
+ comments
+ '}';
} else {
return "AlterColumnTypeEvent{"
+ "tableId="
+ tableId
+ ", typeMapping="
+ typeMapping
+ ", comments="
+ comments
+ '}';
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.event;

import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.utils.Preconditions;

import java.util.Objects;

/**
* A {@link SchemaChangeEvent} that represents an {@code ALTER TABLE COMMENT} or {@code ALTER TABLE
* SET COMMENT} DDL.
*/
@PublicEvolving
public class AlterTableCommentEvent implements SchemaChangeEvent {

private static final long serialVersionUID = 1L;

private final TableId tableId;
private final String comment;

public AlterTableCommentEvent(TableId tableId, String comment) {
Preconditions.checkArgument(comment != null, "comment should not be empty.");
this.tableId = tableId;
this.comment = comment;
}

public String getComment() {
return comment;
}

@Override
public SchemaChangeEventType getType() {
return SchemaChangeEventType.ALTER_TABLE_COMMENT;
}

@Override
public SchemaChangeEvent copy(TableId newTableId) {
return new AlterTableCommentEvent(newTableId, comment);
}

@Override
public TableId tableId() {
return tableId;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof AlterTableCommentEvent)) {
return false;
}
AlterTableCommentEvent that = (AlterTableCommentEvent) o;
return Objects.equals(tableId, that.tableId) && Objects.equals(comment, that.comment);
}

@Override
public int hashCode() {
return Objects.hash(tableId, comment);
}

@Override
public String toString() {
return "AlterTableCommentEvent{"
+ "tableId="
+ tableId
+ ", comment='"
+ comment
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public enum SchemaChangeEventType {
DROP_COLUMN("drop.column"),
DROP_TABLE("drop.table"),
RENAME_COLUMN("rename.column"),
TRUNCATE_TABLE("truncate.table");
TRUNCATE_TABLE("truncate.table"),
ALTER_TABLE_COMMENT("alter.table.comment");

private final String tag;

Expand All @@ -55,6 +56,8 @@ public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) {
return RENAME_COLUMN;
} else if (event instanceof TruncateTableEvent) {
return TRUNCATE_TABLE;
} else if (event instanceof AlterTableCommentEvent) {
return ALTER_TABLE_COMMENT;
} else {
throw new RuntimeException("Unknown schema change event type: " + event.getClass());
}
Expand All @@ -76,6 +79,8 @@ public static SchemaChangeEventType ofTag(String tag) {
return RENAME_COLUMN;
case "truncate.table":
return TRUNCATE_TABLE;
case "alter.table.comment":
return ALTER_TABLE_COMMENT;
default:
throw new RuntimeException("Unknown schema change event type: " + tag);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_TABLE_COMMENT;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
Expand All @@ -44,7 +45,9 @@ public class SchemaChangeEventTypeFamily {

public static final SchemaChangeEventType[] RENAME = {RENAME_COLUMN};

public static final SchemaChangeEventType[] TABLE = {CREATE_TABLE, DROP_TABLE, TRUNCATE_TABLE};
public static final SchemaChangeEventType[] TABLE = {
CREATE_TABLE, DROP_TABLE, TRUNCATE_TABLE, ALTER_TABLE_COMMENT
};

public static final SchemaChangeEventType[] COLUMN = {
ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, RENAME_COLUMN
Expand All @@ -57,7 +60,8 @@ public class SchemaChangeEventTypeFamily {
DROP_COLUMN,
DROP_TABLE,
RENAME_COLUMN,
TRUNCATE_TABLE
TRUNCATE_TABLE,
ALTER_TABLE_COMMENT
};

public static final SchemaChangeEventType[] NONE = {};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.event.visitor;

import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.AlterTableCommentEvent;

/** Visitor for {@link AlterTableCommentEvent}s. */
@Internal
@FunctionalInterface
public interface AlterTableCommentEventVisitor<T, E extends Throwable> {
T visit(AlterTableCommentEvent event) throws E;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.AlterTableCommentEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
Expand All @@ -38,7 +39,8 @@ public static <T, E extends Throwable> T visit(
DropColumnEventVisitor<T, E> dropColumnEventVisitor,
DropTableEventVisitor<T, E> dropTableEventVisitor,
RenameColumnEventVisitor<T, E> renameColumnEventVisitor,
TruncateTableEventVisitor<T, E> truncateTableEventVisitor)
TruncateTableEventVisitor<T, E> truncateTableEventVisitor,
AlterTableCommentEventVisitor<T, E> alterTableCommentEventVisitor)
throws E {
if (event instanceof AddColumnEvent) {
if (addColumnVisitor == null) {
Expand Down Expand Up @@ -75,6 +77,11 @@ public static <T, E extends Throwable> T visit(
return null;
}
return truncateTableEventVisitor.visit((TruncateTableEvent) event);
} else if (event instanceof AlterTableCommentEvent) {
if (alterTableCommentEventVisitor == null) {
return null;
}
return alterTableCommentEventVisitor.visit((AlterTableCommentEvent) event);
} else {
throw new IllegalArgumentException(
"Unknown schema change event type " + event.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,16 @@ public Schema copy(List<Column> columns) {
comment);
}

/** Returns a copy of the schema with a replaced comment. */
public Schema copy(String comment) {
return new Schema(
columns,
new ArrayList<>(primaryKeys),
new ArrayList<>(partitionKeys),
new HashMap<>(options),
comment);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,10 @@
package org.apache.flink.cdc.common.utils;

import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -66,26 +57,6 @@ public static DataChangeEvent recreateDataChangeEvent(
}
}

public static SchemaChangeEvent recreateSchemaChangeEvent(
SchemaChangeEvent schemaChangeEvent, TableId tableId) {

return SchemaChangeEventVisitor.visit(
schemaChangeEvent,
addColumnEvent -> new AddColumnEvent(tableId, addColumnEvent.getAddedColumns()),
alterColumnEvent ->
new AlterColumnTypeEvent(
tableId,
alterColumnEvent.getTypeMapping(),
alterColumnEvent.getOldTypeMapping()),
createTableEvent -> new CreateTableEvent(tableId, createTableEvent.getSchema()),
dropColumnEvent ->
new DropColumnEvent(tableId, dropColumnEvent.getDroppedColumnNames()),
dropTableEvent -> new DropTableEvent(tableId),
renameColumnEvent ->
new RenameColumnEvent(tableId, renameColumnEvent.getNameMapping()),
truncateTableEvent -> new TruncateTableEvent(tableId));
}

public static Set<SchemaChangeEventType> resolveSchemaEvolutionOptions(
List<String> includedSchemaEvolutionTypes, List<String> excludedSchemaEvolutionTypes) {
List<SchemaChangeEventType> resultTypes = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ public static List<SchemaChangeEvent> getSchemaDifference(

Map<String, DataType> oldTypeMapping = new HashMap<>();
Map<String, DataType> newTypeMapping = new HashMap<>();
Map<String, String> comments = new HashMap<>();
List<AddColumnEvent.ColumnWithPosition> appendedColumns = new ArrayList<>();

String afterWhichColumnPosition = null;
Expand Down Expand Up @@ -231,6 +232,9 @@ public static List<SchemaChangeEvent> getSchemaDifference(
}
}
afterWhichColumnPosition = afterColumn.getName();
if (afterColumn.getComment() != null) {
comments.put(columnName, afterColumn.getComment());
}
}

List<SchemaChangeEvent> schemaChangeEvents = new ArrayList<>();
Expand All @@ -240,7 +244,7 @@ public static List<SchemaChangeEvent> getSchemaDifference(

if (!newTypeMapping.isEmpty()) {
schemaChangeEvents.add(
new AlterColumnTypeEvent(tableId, newTypeMapping, oldTypeMapping));
new AlterColumnTypeEvent(tableId, newTypeMapping, oldTypeMapping, comments));
}

if (!beforeColumns.isEmpty()) {
Expand Down
Loading