diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index b775cebb05e..2c35ed61f26 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -83,6 +83,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE; @@ -168,6 +169,8 @@ public DataSource createDataSource(Context context) { boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT); boolean isAssignUnboundedChunkFirst = config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); + boolean scanReadChangelogAsAppendOnlyEnabled = + config.get(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); @@ -221,7 +224,8 @@ public DataSource createDataSource(Context context) { .treatTinyInt1AsBoolean(treatTinyInt1AsBoolean) .useLegacyJsonFormat(useLegacyJsonFormat) .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst) - .skipSnapshotBackfill(skipSnapshotBackfill); + .skipSnapshotBackfill(skipSnapshotBackfill) + .scanReadChangelogAsAppendOnly(scanReadChangelogAsAppendOnlyEnabled); List tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null); @@ -359,6 +363,7 @@ public Set> optionalOptions() { options.add(PARSE_ONLINE_SCHEMA_CHANGES); options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); + options.add(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java index 62a4a4ced45..4bbb3e655cb 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java @@ -74,7 +74,8 @@ public EventSourceProvider getEventSourceProvider() { readableMetadataList, includeComments, sourceConfig.isTreatTinyInt1AsBoolean(), - MySqlSchemaUtils.isTableIdCaseInsensitive(sourceConfig)); + MySqlSchemaUtils.isTableIdCaseInsensitive(sourceConfig), + sourceConfig.isScanReadChangelogAsAppendOnly()); MySqlSource source = new MySqlSource<>( @@ -99,7 +100,9 @@ public MySqlSourceConfig getSourceConfig() { @Override public SupportedMetadataColumn[] supportedMetadataColumns() { - return new SupportedMetadataColumn[] {new OpTsMetadataColumn()}; + return new SupportedMetadataColumn[] { + new OpTsMetadataColumn(), new RowKindMetadataColumn() + }; } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 6aff556e7fa..7adb98b6285 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -330,4 +330,12 @@ public class MySqlDataSourceOptions { .defaultValue(false) .withDescription( "Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially."); + + @Experimental + public static final ConfigOption SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED = + ConfigOptions.key("scan.read-changelog-as-append-only.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to convert the changelog data stream to an append-only data stream"); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java index 80b44dc29c9..c8979dab59c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java @@ -18,13 +18,18 @@ package org.apache.flink.cdc.connectors.mysql.source; import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.connectors.mysql.source.parser.CustomMySqlAntlrDdlParser; import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata; import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema; +import org.apache.flink.cdc.debezium.event.SourceRecordEventDeserializer; import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; +import org.apache.flink.cdc.debezium.table.DeserializationRuntimeConverter; import org.apache.flink.table.data.TimestampData; import com.esri.core.geometry.ogc.OGCGeometry; @@ -38,6 +43,8 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; @@ -56,6 +63,7 @@ @Internal public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema { + private static final Logger LOG = LoggerFactory.getLogger(MySqlEventDeserializer.class); private static final long serialVersionUID = 1L; public static final String SCHEMA_CHANGE_EVENT_KEY_NAME = @@ -72,6 +80,7 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema { private List readableMetadataList; private boolean isTableIdCaseInsensitive; + private final boolean appendOnly; public MySqlEventDeserializer( DebeziumChangelogMode changelogMode, @@ -84,7 +93,8 @@ public MySqlEventDeserializer( new ArrayList<>(), includeSchemaChanges, tinyInt1isBit, - isTableIdCaseInsensitive); + isTableIdCaseInsensitive, + false); this.isTableIdCaseInsensitive = isTableIdCaseInsensitive; } @@ -95,12 +105,31 @@ public MySqlEventDeserializer( boolean includeComments, boolean tinyInt1isBit, boolean isTableIdCaseInsensitive) { + this( + changelogMode, + includeSchemaChanges, + readableMetadataList, + includeComments, + tinyInt1isBit, + isTableIdCaseInsensitive, + false); + } + + public MySqlEventDeserializer( + DebeziumChangelogMode changelogMode, + boolean includeSchemaChanges, + List readableMetadataList, + boolean includeComments, + boolean tinyInt1isBit, + boolean isTableIdCaseInsensitive, + boolean appendOnly) { super(new MySqlSchemaDataTypeInference(), changelogMode); this.includeSchemaChanges = includeSchemaChanges; this.readableMetadataList = readableMetadataList; this.includeComments = includeComments; this.tinyInt1isBit = tinyInt1isBit; this.isTableIdCaseInsensitive = isTableIdCaseInsensitive; + this.appendOnly = appendOnly; } @Override @@ -109,7 +138,10 @@ protected List deserializeSchemaChangeRecord(SourceRecord rec if (customParser == null) { customParser = new CustomMySqlAntlrDdlParser( - includeComments, tinyInt1isBit, isTableIdCaseInsensitive); + includeComments, + tinyInt1isBit, + isTableIdCaseInsensitive, + appendOnly); tables = new Tables(); } @@ -157,21 +189,109 @@ protected TableId getTableId(SourceRecord record) { @Override protected Map getMetadata(SourceRecord record) { + return getMetadata(record, null); + } + + protected Map getMetadata(SourceRecord record, String opType) { Map metadataMap = new HashMap<>(); readableMetadataList.forEach( (mySqlReadableMetadata -> { - Object metadata = mySqlReadableMetadata.getConverter().read(record); - if (mySqlReadableMetadata.equals(MySqlReadableMetadata.OP_TS)) { + if (mySqlReadableMetadata.equals(MySqlReadableMetadata.ROW_KIND)) { + if (appendOnly) { + // In append-only mode, map row_kind to the original operation type + metadataMap.put(mySqlReadableMetadata.getKey(), opType); + } + // Skip ROW_KIND in non-append-only mode since it throws + // UnsupportedOperationException + } else if (mySqlReadableMetadata.equals(MySqlReadableMetadata.OP_TS)) { + Object metadata = mySqlReadableMetadata.getConverter().read(record); metadataMap.put( mySqlReadableMetadata.getKey(), String.valueOf(((TimestampData) metadata).getMillisecond())); } else { + Object metadata = mySqlReadableMetadata.getConverter().read(record); metadataMap.put(mySqlReadableMetadata.getKey(), String.valueOf(metadata)); } })); return metadataMap; } + @Override + public List deserializeDataChangeRecord(SourceRecord record) throws Exception { + if (!appendOnly) { + // Use default behavior when append-only is not enabled + return super.deserializeDataChangeRecord(record); + } + + // Append-only mode: convert all operations to INSERT events, but preserve original + // operation type + Envelope.Operation op = Envelope.operationFor(record); + TableId tableId = getTableId(record); + + Struct value = (Struct) record.value(); + Schema valueSchema = record.valueSchema(); + + if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { + Map meta = getMetadata(record, "+I"); + RecordData after = extractAfterData(value, valueSchema); + return Collections.singletonList(DataChangeEvent.insertEvent(tableId, after, meta)); + } else if (op == Envelope.Operation.DELETE) { + Map meta = getMetadata(record, "-D"); + // For DELETE: convert to INSERT with before data + RecordData before = extractBeforeData(value, valueSchema); + return Collections.singletonList(DataChangeEvent.insertEvent(tableId, before, meta)); + } else if (op == Envelope.Operation.UPDATE) { + // For UPDATE: convert to INSERT with after data (and optionally before data if ALL + // mode) + RecordData after = extractAfterData(value, valueSchema); + List events = new ArrayList<>(); + + if (changelogMode == DebeziumChangelogMode.ALL) { + Map beforeMeta = getMetadata(record, "-U"); + // Generate INSERT event for before data (UPDATE_BEFORE -> INSERT) + RecordData before = extractBeforeData(value, valueSchema); + events.add(DataChangeEvent.insertEvent(tableId, before, beforeMeta)); + } + + Map afterMeta = getMetadata(record, "+U"); + // Generate INSERT event for after data (UPDATE_AFTER -> INSERT) + events.add(DataChangeEvent.insertEvent(tableId, after, afterMeta)); + return events; + } else { + LOG.trace("Received {} operation, skip", op); + return Collections.emptyList(); + } + } + + private RecordData extractBeforeData(Struct value, Schema valueSchema) throws Exception { + Schema beforeSchema = + SourceRecordEventDeserializer.fieldSchema(valueSchema, Envelope.FieldName.BEFORE); + Struct beforeValue = + SourceRecordEventDeserializer.fieldStruct(value, Envelope.FieldName.BEFORE); + return extractData(beforeValue, beforeSchema); + } + + private RecordData extractAfterData(Struct value, Schema valueSchema) throws Exception { + Schema afterSchema = + SourceRecordEventDeserializer.fieldSchema(valueSchema, Envelope.FieldName.AFTER); + Struct afterValue = + SourceRecordEventDeserializer.fieldStruct(value, Envelope.FieldName.AFTER); + return extractData(afterValue, afterSchema); + } + + private RecordData extractData(Struct value, Schema valueSchema) throws Exception { + if (value == null) { + return null; + } + DataType dataType = schemaDataTypeInference.infer(value, valueSchema); + DeserializationRuntimeConverter converter = createNotNullConverter(dataType); + // Null-safe wrapper + if (value == null) { + return null; + } + return (RecordData) converter.convert(value, valueSchema); + } + @Override protected Object convertToString(Object dbzObj, Schema schema) { // the Geometry datatype in MySQL will be converted to diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/RowKindMetadataColumn.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/RowKindMetadataColumn.java new file mode 100644 index 00000000000..9ffdbe06889 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/RowKindMetadataColumn.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; + +import java.util.Map; + +/** A {@link SupportedMetadataColumn} for row_kind. */ +public class RowKindMetadataColumn implements SupportedMetadataColumn { + + @Override + public String getName() { + return "row_kind"; + } + + @Override + public DataType getType() { + return DataTypes.STRING().notNull(); + } + + @Override + public Class getJavaClass() { + return String.class; + } + + @Override + public Object read(Map metadata) { + if (metadata.containsKey("row_kind")) { + return metadata.get("row_kind"); + } + throw new IllegalArgumentException("row_kind doesn't exist in the metadata: " + metadata); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java index 82f5be7d976..43529ff86f4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java @@ -68,6 +68,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener { private CustomColumnDefinitionParserListener columnDefinitionListener; private TableEditor tableEditor; private boolean isTableIdCaseInsensitive; + private final boolean appendOnly; private int parsingColumnIndex = STARTING_INDEX; public CustomAlterTableParserListener( @@ -76,11 +77,22 @@ public CustomAlterTableParserListener( LinkedList changes, boolean tinyInt1isBit, boolean isTableIdCaseInsensitive) { + this(parser, listeners, changes, tinyInt1isBit, isTableIdCaseInsensitive, false); + } + + public CustomAlterTableParserListener( + MySqlAntlrDdlParser parser, + List listeners, + LinkedList changes, + boolean tinyInt1isBit, + boolean isTableIdCaseInsensitive, + boolean appendOnly) { this.parser = parser; this.listeners = listeners; this.changes = changes; this.tinyInt1isBit = tinyInt1isBit; this.isTableIdCaseInsensitive = isTableIdCaseInsensitive; + this.appendOnly = appendOnly; } @Override @@ -93,12 +105,12 @@ public void exitCopyCreateTable(MySqlParser.CopyCreateTableContext ctx) { .overwriteTable( tableId, original.columns(), - original.primaryKeyColumnNames(), + appendOnly ? Collections.emptyList() : original.primaryKeyColumnNames(), original.defaultCharsetName()); parser.signalCreateTable(tableId, ctx); Schema.Builder builder = Schema.newBuilder(); original.columns().forEach(column -> builder.column(toCdcColumn(column))); - if (!original.primaryKeyColumnNames().isEmpty()) { + if (!appendOnly && !original.primaryKeyColumnNames().isEmpty()) { builder.primaryKey(original.primaryKeyColumnNames()); } changes.add(new CreateTableEvent(toCdcTableId(tableId), builder.build())); @@ -152,7 +164,7 @@ public void exitColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) { Schema.Builder builder = Schema.newBuilder(); tableEditor.columns().forEach(column -> builder.column(toCdcColumn(column))); - if (tableEditor.hasPrimaryKey()) { + if (!appendOnly && tableEditor.hasPrimaryKey()) { builder.primaryKey(tableEditor.primaryKeyColumnNames()); } builder.comment(tableEditor.create().comment()); @@ -173,7 +185,7 @@ public void enterColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) { if (columnDefinitionListener == null) { columnDefinitionListener = new CustomColumnDefinitionParserListener( - tableEditor, columnEditor, parser, listeners); + tableEditor, columnEditor, parser, listeners, appendOnly); listeners.add(columnDefinitionListener); } else { columnDefinitionListener.setColumnEditor(columnEditor); @@ -198,7 +210,9 @@ public void exitColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) { public void enterPrimaryKeyTableConstraint(MySqlParser.PrimaryKeyTableConstraintContext ctx) { parser.runIfNotNull( () -> { - parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor); + if (!appendOnly) { + parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor); + } }, tableEditor); super.enterPrimaryKeyTableConstraint(ctx); @@ -208,7 +222,7 @@ public void enterPrimaryKeyTableConstraint(MySqlParser.PrimaryKeyTableConstraint public void enterUniqueKeyTableConstraint(MySqlParser.UniqueKeyTableConstraintContext ctx) { parser.runIfNotNull( () -> { - if (!tableEditor.hasPrimaryKey()) { + if (!appendOnly && !tableEditor.hasPrimaryKey()) { parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor); } }, @@ -235,7 +249,7 @@ public void enterAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) { ColumnEditor columnEditor = Column.editor().name(columnName); columnDefinitionListener = new CustomColumnDefinitionParserListener( - tableEditor, columnEditor, parser, listeners); + tableEditor, columnEditor, parser, listeners, appendOnly); listeners.add(columnDefinitionListener); super.exitAlterByAddColumn(ctx); } @@ -291,7 +305,7 @@ public void enterAlterByAddColumns(MySqlParser.AlterByAddColumnsContext ctx) { } columnDefinitionListener = new CustomColumnDefinitionParserListener( - tableEditor, columnEditors.get(0), parser, listeners); + tableEditor, columnEditors.get(0), parser, listeners, appendOnly); listeners.add(columnDefinitionListener); super.enterAlterByAddColumns(ctx); } @@ -342,7 +356,7 @@ public void enterAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) columnDefinitionListener = new CustomColumnDefinitionParserListener( - tableEditor, columnEditor, parser, listeners); + tableEditor, columnEditor, parser, listeners, appendOnly); listeners.add(columnDefinitionListener); super.enterAlterByChangeColumn(ctx); } @@ -393,7 +407,7 @@ public void enterAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx) ColumnEditor columnEditor = Column.editor().name(oldColumnName); columnDefinitionListener = new CustomColumnDefinitionParserListener( - tableEditor, columnEditor, parser, listeners); + tableEditor, columnEditor, parser, listeners, appendOnly); listeners.add(columnDefinitionListener); super.enterAlterByRenameColumn(ctx); } @@ -406,7 +420,7 @@ public void enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) columnDefinitionListener = new CustomColumnDefinitionParserListener( - tableEditor, columnEditor, parser, listeners); + tableEditor, columnEditor, parser, listeners, appendOnly); listeners.add(columnDefinitionListener); super.enterAlterByModifyColumn(ctx); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java index e886580b315..54498679202 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java @@ -54,17 +54,28 @@ public class CustomColumnDefinitionParserListener extends MySqlParserBaseListene private final TableEditor tableEditor; private final List listeners; + private final boolean appendOnly; public CustomColumnDefinitionParserListener( TableEditor tableEditor, ColumnEditor columnEditor, MySqlAntlrDdlParser parser, List listeners) { + this(tableEditor, columnEditor, parser, listeners, false); + } + + public CustomColumnDefinitionParserListener( + TableEditor tableEditor, + ColumnEditor columnEditor, + MySqlAntlrDdlParser parser, + List listeners, + boolean appendOnly) { this.tableEditor = tableEditor; this.columnEditor = columnEditor; this.parser = parser; this.dataTypeResolver = parser.dataTypeResolver(); this.listeners = listeners; + this.appendOnly = appendOnly; } public void setColumnEditor(ColumnEditor columnEditor) { @@ -111,7 +122,9 @@ public void enterPrimaryKeyColumnConstraint(MySqlParser.PrimaryKeyColumnConstrai // otherwise the statement can't be executed due to multiple primary key error optionalColumn.set(Boolean.FALSE); tableEditor.addColumn(columnEditor.create()); - tableEditor.setPrimaryKeyNames(columnEditor.name()); + if (!appendOnly) { + tableEditor.setPrimaryKeyNames(columnEditor.name()); + } super.enterPrimaryKeyColumnConstraint(ctx); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java index d92329094be..549b15dc3a5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java @@ -37,13 +37,23 @@ public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser { private final LinkedList parsedEvents; private final boolean tinyInt1isBit; private boolean isTableIdCaseInsensitive; + private final boolean appendOnly; public CustomMySqlAntlrDdlParser( boolean includeComments, boolean tinyInt1isBit, boolean isTableIdCaseInsensitive) { + this(includeComments, tinyInt1isBit, isTableIdCaseInsensitive, false); + } + + public CustomMySqlAntlrDdlParser( + boolean includeComments, + boolean tinyInt1isBit, + boolean isTableIdCaseInsensitive, + boolean appendOnly) { super(true, false, includeComments, null, Tables.TableFilter.includeAll()); this.parsedEvents = new LinkedList<>(); this.tinyInt1isBit = tinyInt1isBit; this.isTableIdCaseInsensitive = isTableIdCaseInsensitive; + this.appendOnly = appendOnly; } // Overriding this method because the BIT type requires default length dimension of 1. @@ -284,7 +294,7 @@ protected DataTypeResolver initializeDataTypeResolver() { @Override protected AntlrDdlParserListener createParseTreeWalkerListener() { return new CustomMySqlAntlrDdlParserListener( - this, parsedEvents, tinyInt1isBit, isTableIdCaseInsensitive); + this, parsedEvents, tinyInt1isBit, isTableIdCaseInsensitive, appendOnly); } public List getAndClearParsedEvents() { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java index e7351ac2b07..cc438c12240 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java @@ -78,13 +78,27 @@ public CustomMySqlAntlrDdlParserListener( LinkedList parsedEvents, boolean tinyInt1isBit, boolean isTableIdCaseInsensitive) { + this(parser, parsedEvents, tinyInt1isBit, isTableIdCaseInsensitive, false); + } + + public CustomMySqlAntlrDdlParserListener( + MySqlAntlrDdlParser parser, + LinkedList parsedEvents, + boolean tinyInt1isBit, + boolean isTableIdCaseInsensitive, + boolean appendOnly) { // initialize listeners listeners.add(new CreateAndAlterDatabaseParserListener(parser)); listeners.add(new DropDatabaseParserListener(parser)); listeners.add(new CreateTableParserListener(parser, listeners)); listeners.add( new CustomAlterTableParserListener( - parser, listeners, parsedEvents, tinyInt1isBit, isTableIdCaseInsensitive)); + parser, + listeners, + parsedEvents, + tinyInt1isBit, + isTableIdCaseInsensitive, + appendOnly)); listeners.add(new DropTableParserListener(parser)); listeners.add(new RenameTableParserListener(parser)); listeners.add(new TruncateTableParserListener(parser)); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index c946b9e29ce..0dd646f5e32 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -236,7 +236,7 @@ private String describeTable(JdbcConnection jdbc, TableId tableId) { meta.setUnique("UNI".equalsIgnoreCase(rs.getString("Key"))); meta.setDefaultValue(rs.getString("Default")); meta.setExtra(rs.getString("Extra")); - if (meta.isKey()) { + if (meta.isKey() && !sourceConfig.isScanReadChangelogAsAppendOnly()) { primaryKeys.add(meta.getColumnName()); } fieldMetas.add(meta); @@ -274,7 +274,9 @@ private Schema buildSchemaFromTable(Table table) { tableBuilder.comment(table.comment()); List primaryKey = table.primaryKeyColumnNames(); - if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) { + if (Objects.nonNull(primaryKey) + && !primaryKey.isEmpty() + && !sourceConfig.isScanReadChangelogAsAppendOnly()) { tableBuilder.primaryKey(primaryKey); } return tableBuilder.build(); @@ -297,6 +299,7 @@ private synchronized MySqlAntlrDdlParser getParser() { RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS .name(), false); + boolean appendOnly = sourceConfig.isScanReadChangelogAsAppendOnly(); mySqlAntlrDdlParser = new MySqlAntlrDdlParser( true, false, includeComments, null, Tables.TableFilter.includeAll()); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java index 74f3ef52a3c..966cfcd522f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java @@ -43,6 +43,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED; @@ -269,7 +270,8 @@ public void testOptionalOption() { .contains( TREAT_TINYINT1_AS_BOOLEAN_ENABLED, PARSE_ONLINE_SCHEMA_CHANGES, - SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED); + SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED, + SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED); MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); assertThat(dataSource.getSourceConfig().isTreatTinyInt1AsBoolean()).isFalse(); @@ -277,6 +279,59 @@ public void testOptionalOption() { assertThat(dataSource.getSourceConfig().isAssignUnboundedChunkFirst()).isTrue(); } + @Test + void testScanReadChangelogAsAppendOnlyEnabledDefault() { + inventoryDatabase.createAndInitialize(); + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*"); + Factory.Context context = new MockContext(Configuration.fromMap(options)); + + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); + + assertThat(dataSource.getSourceConfig().isScanReadChangelogAsAppendOnly()).isFalse(); + } + + @Test + void testScanReadChangelogAsAppendOnlyEnabledTrue() { + inventoryDatabase.createAndInitialize(); + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*"); + options.put(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED.key(), "true"); + Factory.Context context = new MockContext(Configuration.fromMap(options)); + + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); + + assertThat(dataSource.getSourceConfig().isScanReadChangelogAsAppendOnly()).isTrue(); + } + + @Test + void testScanReadChangelogAsAppendOnlyEnabledFalse() { + inventoryDatabase.createAndInitialize(); + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*"); + options.put(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED.key(), "false"); + Factory.Context context = new MockContext(Configuration.fromMap(options)); + + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); + + assertThat(dataSource.getSourceConfig().isScanReadChangelogAsAppendOnly()).isFalse(); + } + @Test void testPrefixRequireOption() { inventoryDatabase.createAndInitialize(); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlDdlParserAppendOnlyTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlDdlParserAppendOnlyTest.java new file mode 100644 index 00000000000..273bca43c50 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlDdlParserAppendOnlyTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.parser; + +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; + +import io.debezium.relational.Tables; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link CustomMySqlAntlrDdlParser} with append-only mode. */ +class CustomMySqlDdlParserAppendOnlyTest { + + @Test + void testCreateTableWithPrimaryKeyInNormalMode() { + CustomMySqlAntlrDdlParser parser = + new CustomMySqlAntlrDdlParser(false, false, false, false); + Tables tables = new Tables(); + parser.setCurrentDatabase("test_db"); + + String ddl = + "CREATE TABLE test_table (" + + "id INT NOT NULL PRIMARY KEY, " + + "name VARCHAR(255), " + + "age INT" + + ");"; + + parser.parse(ddl, tables); + List events = parser.getAndClearParsedEvents(); + + assertThat(events).hasSize(1); + assertThat(events.get(0)).isInstanceOf(CreateTableEvent.class); + + CreateTableEvent createEvent = (CreateTableEvent) events.get(0); + assertThat(createEvent.getSchema().primaryKeys()).containsExactly("id"); + } + + @Test + void testCreateTableWithPrimaryKeyInAppendOnlyMode() { + CustomMySqlAntlrDdlParser parser = new CustomMySqlAntlrDdlParser(false, false, false, true); + Tables tables = new Tables(); + parser.setCurrentDatabase("test_db"); + + String ddl = + "CREATE TABLE test_table (" + + "id INT NOT NULL PRIMARY KEY, " + + "name VARCHAR(255), " + + "age INT" + + ");"; + + parser.parse(ddl, tables); + List events = parser.getAndClearParsedEvents(); + + assertThat(events).hasSize(1); + assertThat(events.get(0)).isInstanceOf(CreateTableEvent.class); + + CreateTableEvent createEvent = (CreateTableEvent) events.get(0); + // In append-only mode, primary keys should be removed + assertThat(createEvent.getSchema().primaryKeys()).isEmpty(); + } + + @Test + void testCreateTableWithCompositePrimaryKeyInAppendOnlyMode() { + CustomMySqlAntlrDdlParser parser = new CustomMySqlAntlrDdlParser(false, false, false, true); + Tables tables = new Tables(); + parser.setCurrentDatabase("test_db"); + + String ddl = + "CREATE TABLE test_table (" + + "id INT NOT NULL, " + + "tenant_id INT NOT NULL, " + + "name VARCHAR(255), " + + "PRIMARY KEY (id, tenant_id)" + + ");"; + + parser.parse(ddl, tables); + List events = parser.getAndClearParsedEvents(); + + assertThat(events).hasSize(1); + assertThat(events.get(0)).isInstanceOf(CreateTableEvent.class); + + CreateTableEvent createEvent = (CreateTableEvent) events.get(0); + // In append-only mode, composite primary keys should also be removed + assertThat(createEvent.getSchema().primaryKeys()).isEmpty(); + } + + @Test + void testCreateTableLikeInAppendOnlyMode() { + CustomMySqlAntlrDdlParser parser = new CustomMySqlAntlrDdlParser(false, false, false, true); + Tables tables = new Tables(); + parser.setCurrentDatabase("test_db"); + + // First create the source table with primary key + String createSourceTable = + "CREATE TABLE source_table (" + + "id INT NOT NULL PRIMARY KEY, " + + "name VARCHAR(255)" + + ");"; + parser.parse(createSourceTable, tables); + parser.getAndClearParsedEvents(); // Clear the first event + + // Then create table like the source table + String createLikeTable = "CREATE TABLE target_table LIKE source_table;"; + parser.parse(createLikeTable, tables); + List events = parser.getAndClearParsedEvents(); + + assertThat(events).hasSize(1); + assertThat(events.get(0)).isInstanceOf(CreateTableEvent.class); + + CreateTableEvent createEvent = (CreateTableEvent) events.get(0); + // In append-only mode, primary keys should be removed even in LIKE tables + assertThat(createEvent.getSchema().primaryKeys()).isEmpty(); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 260a7cd2b5d..bfa07528b80 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -70,6 +70,7 @@ public class MySqlSourceConfig implements Serializable { private final boolean parseOnLineSchemaChanges; public static boolean useLegacyJsonFormat = true; private final boolean assignUnboundedChunkFirst; + private final boolean scanReadChangelogAsAppendOnly; // -------------------------------------------------------------------------------------------- // Debezium Configurations @@ -108,7 +109,8 @@ public class MySqlSourceConfig implements Serializable { boolean parseOnLineSchemaChanges, boolean treatTinyInt1AsBoolean, boolean useLegacyJsonFormat, - boolean assignUnboundedChunkFirst) { + boolean assignUnboundedChunkFirst, + boolean scanReadChangelogAsAppendOnly) { this.hostname = checkNotNull(hostname); this.port = port; this.username = checkNotNull(username); @@ -152,6 +154,7 @@ public class MySqlSourceConfig implements Serializable { this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean; this.useLegacyJsonFormat = useLegacyJsonFormat; this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; + this.scanReadChangelogAsAppendOnly = scanReadChangelogAsAppendOnly; } public String getHostname() { @@ -243,6 +246,10 @@ public boolean isAssignUnboundedChunkFirst() { return assignUnboundedChunkFirst; } + public boolean isScanReadChangelogAsAppendOnly() { + return scanReadChangelogAsAppendOnly; + } + public Properties getDbzProperties() { return dbzProperties; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 427115edea7..4f78ddf82c7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -74,6 +74,7 @@ public class MySqlSourceConfigFactory implements Serializable { private boolean treatTinyInt1AsBoolean = true; private boolean useLegacyJsonFormat = true; private boolean assignUnboundedChunkFirst = false; + private boolean scanReadChangelogAsAppendOnly = false; public MySqlSourceConfigFactory hostname(String hostname) { this.hostname = hostname; @@ -324,6 +325,16 @@ public MySqlSourceConfigFactory assignUnboundedChunkFirst(boolean assignUnbounde return this; } + /** + * Whether to convert the changelog data stream to an append-only data stream. Defaults to + * false. + */ + public MySqlSourceConfigFactory scanReadChangelogAsAppendOnly( + boolean scanReadChangelogAsAppendOnly) { + this.scanReadChangelogAsAppendOnly = scanReadChangelogAsAppendOnly; + return this; + } + /** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */ public MySqlSourceConfig createConfig(int subtaskId) { // hard code server name, because we don't need to distinguish it, docs: @@ -421,6 +432,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { parseOnLineSchemaChanges, treatTinyInt1AsBoolean, useLegacyJsonFormat, - assignUnboundedChunkFirst); + assignUnboundedChunkFirst, + scanReadChangelogAsAppendOnly); } } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java index d263dfd53e8..7707fc197e4 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java @@ -186,6 +186,151 @@ void testSyncWholeDatabase() throws Exception { validateSinkResult(warehouse, database, "products", recordsInSnapshotPhase); } + @Test + public void testReadChangelogAsAppendOnlyWithPaimon() throws Exception { + String warehouse = sharedVolume.toString() + "/" + "paimon_" + UUID.randomUUID(); + String database = inventoryDatabase.getDatabaseName(); + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: mysql\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.readChangelogAsAppendOnly\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + " scan.read-changelog-as-append-only.enabled: true\n" + + " use.legacy.json.format: false\n" + + " metadata.list: row_kind\n" + + "\n" + + "transform:\n" + + " - source-table: '^\\.+.\\.+$'\n" + + " projection: \\*, row_kind AS __internal__op_type\n" + + "\n" + + "sink:\n" + + " type: paimon\n" + + " catalog.properties.warehouse: %s\n" + + " catalog.properties.metastore: filesystem\n" + + " catalog.properties.cache-enabled: false\n" + + "\n" + + "pipeline:\n" + + " schema.change.behavior: evolve\n" + + " parallelism: 4", + MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, database, warehouse); + + Path paimonCdcConnector = TestUtils.getResource("paimon-cdc-pipeline-connector.jar"); + Path hadoopJar = TestUtils.getResource("flink-shade-hadoop.jar"); + + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), MYSQL.getDatabasePort(), database); + + // Create source table and insert initial data + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + + stat.execute( + "CREATE TABLE readChangelogAsAppendOnly (\n" + + " id INTEGER NOT NULL PRIMARY KEY,\n" + + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n" + + " description VARCHAR(512),\n" + + " weight FLOAT,\n" + + " enum_c enum('red', 'white') default 'red',\n" + + " json_c JSON,\n" + + " point_c POINT)"); + + stat.execute( + "INSERT INTO readChangelogAsAppendOnly \n" + + "VALUES (1,\"One\", \"Alice\", 3.202, 'red', '{\"key1\": \"value1\"}', null),\n" + + " (2,\"Two\", \"Bob\", 1.703, 'white', '{\"key2\": \"value2\"}', null),\n" + + " (3,\"Three\", \"Cecily\", 4.105, 'red', '{\"key3\": \"value3\"}', null),\n" + + " (4,\"Four\", \"Derrida\", 1.857, 'white', '{\"key4\": \"value4\"}', null),\n" + + " (5,\"Five\", \"Evelyn\", 5.211, 'red', '{\"K\": \"V\", \"k\": \"v\"}', null)"); + } catch (SQLException e) { + LOG.error("Create table for CDC failed.", e); + throw e; + } + + submitPipelineJob(pipelineJob, paimonCdcConnector, hadoopJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + + // Validate initial snapshot data + validateSinkResult( + warehouse, + database, + "readChangelogAsAppendOnly", + Arrays.asList( + "1, One, Alice, 3.202, red, {\"key1\": \"value1\"}, null, +I", + "2, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null, +I", + "3, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null, +I", + "4, Four, Derrida, 1.857, white, {\"key4\": \"value4\"}, null, +I", + "5, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": \"v\"}, null, +I")); + + LOG.info("Begin incremental reading stage with append-only enabled."); + + // Perform CDC operations: INSERT, UPDATE, DELETE + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + + // INSERT operations - these should appear as new records + stat.execute( + "INSERT INTO readChangelogAsAppendOnly VALUES (6,'Six','Ferris',9.813, null, null, null);"); + stat.execute( + "INSERT INTO readChangelogAsAppendOnly VALUES (7,'Seven','Grace',2.117, null, null, null);"); + + // UPDATE operations - with append-only, both old and new versions should be preserved + stat.execute( + "UPDATE readChangelogAsAppendOnly SET description='Alice Updated' WHERE id=1;"); + stat.execute("UPDATE readChangelogAsAppendOnly SET weight=2.0 WHERE id=2;"); + + // DELETE operations - with append-only, delete records should be preserved with + // row_kind metadata + stat.execute("DELETE FROM readChangelogAsAppendOnly WHERE id=3;"); + + Thread.sleep(5000); // Wait for changes to be processed + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + // For append-only mode, we expect to see all operations preserved as separate records + // This includes the original records, plus insert records, plus update records (before and + // after), plus delete records + List expectedAppendOnlyRecords = + Arrays.asList( + "1, One, Alice, 3.202, red, {\"key1\": \"value1\"}, null, +I", // Original + "2, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null, +I", // Original + "3, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null, +I", // Original + "4, Four, Derrida, 1.857, white, {\"key4\": \"value4\"}, null, +I", // Original + "5, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": \"v\"}, null, +I", // Original + "6, Six, Ferris, 9.813, null, null, null, +I", // Insert + "7, Seven, Grace, 2.117, null, null, null, +I", // Insert + "1, One, Alice, 3.202, red, {\"key1\": \"value1\"}, null, -U", // Update + // id=1 + // (before) + "1, One, Alice Updated, 3.202, red, {\"key1\": \"value1\"}, null, +U", // Update + // id=1 (after) + "2, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null, -U", // Update + // id=2 + // (before) + "2, Two, Bob, 2.0, white, {\"key2\": \"value2\"}, null, +U", // Update id=2 + // (after) + "3, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null, -D" // Delete + // id=3 + ); + + validateSinkResult( + warehouse, database, "readChangelogAsAppendOnly", expectedAppendOnlyRecords); + } + @Test public void testSinkToAppendOnlyTable() throws Exception { String warehouse = sharedVolume.toString() + "/" + "paimon_" + UUID.randomUUID();