Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE;
Expand Down Expand Up @@ -168,6 +169,8 @@ public DataSource createDataSource(Context context) {
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
boolean isAssignUnboundedChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
boolean scanReadChangelogAsAppendOnlyEnabled =
config.get(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -221,7 +224,8 @@ public DataSource createDataSource(Context context) {
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
.useLegacyJsonFormat(useLegacyJsonFormat)
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
.skipSnapshotBackfill(skipSnapshotBackfill);
.skipSnapshotBackfill(skipSnapshotBackfill)
.scanReadChangelogAsAppendOnly(scanReadChangelogAsAppendOnlyEnabled);

List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);

Expand Down Expand Up @@ -359,6 +363,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
options.add(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public EventSourceProvider getEventSourceProvider() {
readableMetadataList,
includeComments,
sourceConfig.isTreatTinyInt1AsBoolean(),
MySqlSchemaUtils.isTableIdCaseInsensitive(sourceConfig));
MySqlSchemaUtils.isTableIdCaseInsensitive(sourceConfig),
sourceConfig.isScanReadChangelogAsAppendOnly());

MySqlSource<Event> source =
new MySqlSource<>(
Expand All @@ -99,7 +100,9 @@ public MySqlSourceConfig getSourceConfig() {

@Override
public SupportedMetadataColumn[] supportedMetadataColumns() {
return new SupportedMetadataColumn[] {new OpTsMetadataColumn()};
return new SupportedMetadataColumn[] {
new OpTsMetadataColumn(), new RowKindMetadataColumn()
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,4 +330,12 @@ public class MySqlDataSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.");

@Experimental
public static final ConfigOption<Boolean> SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED =
ConfigOptions.key("scan.read-changelog-as-append-only.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change would affect too many tables. Could we set a regular expression (e.g., scan.read-changelog-as-append-only.tables: adb.*) to modify the primary key only for tables that match this pattern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good suggestion, I will try incorporating it.

.booleanType()
.defaultValue(false)
.withDescription(
"Whether to convert the changelog data stream to an append-only data stream");
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
package org.apache.flink.cdc.connectors.mysql.source;

import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.connectors.mysql.source.parser.CustomMySqlAntlrDdlParser;
import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata;
import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
import org.apache.flink.cdc.debezium.event.SourceRecordEventDeserializer;
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
import org.apache.flink.cdc.debezium.table.DeserializationRuntimeConverter;
import org.apache.flink.table.data.TimestampData;

import com.esri.core.geometry.ogc.OGCGeometry;
Expand All @@ -38,6 +43,8 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -56,6 +63,7 @@
@Internal
public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {

private static final Logger LOG = LoggerFactory.getLogger(MySqlEventDeserializer.class);
private static final long serialVersionUID = 1L;

public static final String SCHEMA_CHANGE_EVENT_KEY_NAME =
Expand All @@ -72,6 +80,7 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {

private List<MySqlReadableMetadata> readableMetadataList;
private boolean isTableIdCaseInsensitive;
private final boolean appendOnly;

public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode,
Expand All @@ -84,7 +93,8 @@ public MySqlEventDeserializer(
new ArrayList<>(),
includeSchemaChanges,
tinyInt1isBit,
isTableIdCaseInsensitive);
isTableIdCaseInsensitive,
false);
this.isTableIdCaseInsensitive = isTableIdCaseInsensitive;
}

Expand All @@ -95,12 +105,31 @@ public MySqlEventDeserializer(
boolean includeComments,
boolean tinyInt1isBit,
boolean isTableIdCaseInsensitive) {
this(
changelogMode,
includeSchemaChanges,
readableMetadataList,
includeComments,
tinyInt1isBit,
isTableIdCaseInsensitive,
false);
}

public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode,
boolean includeSchemaChanges,
List<MySqlReadableMetadata> readableMetadataList,
boolean includeComments,
boolean tinyInt1isBit,
boolean isTableIdCaseInsensitive,
boolean appendOnly) {
super(new MySqlSchemaDataTypeInference(), changelogMode);
this.includeSchemaChanges = includeSchemaChanges;
this.readableMetadataList = readableMetadataList;
this.includeComments = includeComments;
this.tinyInt1isBit = tinyInt1isBit;
this.isTableIdCaseInsensitive = isTableIdCaseInsensitive;
this.appendOnly = appendOnly;
}

@Override
Expand All @@ -109,7 +138,10 @@ protected List<SchemaChangeEvent> deserializeSchemaChangeRecord(SourceRecord rec
if (customParser == null) {
customParser =
new CustomMySqlAntlrDdlParser(
includeComments, tinyInt1isBit, isTableIdCaseInsensitive);
includeComments,
tinyInt1isBit,
isTableIdCaseInsensitive,
appendOnly);
tables = new Tables();
}

Expand Down Expand Up @@ -157,21 +189,109 @@ protected TableId getTableId(SourceRecord record) {

@Override
protected Map<String, String> getMetadata(SourceRecord record) {
return getMetadata(record, null);
}

protected Map<String, String> getMetadata(SourceRecord record, String opType) {
Map<String, String> metadataMap = new HashMap<>();
readableMetadataList.forEach(
(mySqlReadableMetadata -> {
Object metadata = mySqlReadableMetadata.getConverter().read(record);
if (mySqlReadableMetadata.equals(MySqlReadableMetadata.OP_TS)) {
if (mySqlReadableMetadata.equals(MySqlReadableMetadata.ROW_KIND)) {
if (appendOnly) {
// In append-only mode, map row_kind to the original operation type
metadataMap.put(mySqlReadableMetadata.getKey(), opType);
}
// Skip ROW_KIND in non-append-only mode since it throws
// UnsupportedOperationException
} else if (mySqlReadableMetadata.equals(MySqlReadableMetadata.OP_TS)) {
Object metadata = mySqlReadableMetadata.getConverter().read(record);
metadataMap.put(
mySqlReadableMetadata.getKey(),
String.valueOf(((TimestampData) metadata).getMillisecond()));
} else {
Object metadata = mySqlReadableMetadata.getConverter().read(record);
metadataMap.put(mySqlReadableMetadata.getKey(), String.valueOf(metadata));
}
}));
return metadataMap;
}

@Override
public List<DataChangeEvent> deserializeDataChangeRecord(SourceRecord record) throws Exception {
if (!appendOnly) {
// Use default behavior when append-only is not enabled
return super.deserializeDataChangeRecord(record);
}

// Append-only mode: convert all operations to INSERT events, but preserve original
// operation type
Envelope.Operation op = Envelope.operationFor(record);
TableId tableId = getTableId(record);

Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();

if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
Map<String, String> meta = getMetadata(record, "+I");
RecordData after = extractAfterData(value, valueSchema);
return Collections.singletonList(DataChangeEvent.insertEvent(tableId, after, meta));
} else if (op == Envelope.Operation.DELETE) {
Map<String, String> meta = getMetadata(record, "-D");
// For DELETE: convert to INSERT with before data
RecordData before = extractBeforeData(value, valueSchema);
return Collections.singletonList(DataChangeEvent.insertEvent(tableId, before, meta));
} else if (op == Envelope.Operation.UPDATE) {
// For UPDATE: convert to INSERT with after data (and optionally before data if ALL
// mode)
RecordData after = extractAfterData(value, valueSchema);
List<DataChangeEvent> events = new ArrayList<>();

if (changelogMode == DebeziumChangelogMode.ALL) {
Map<String, String> beforeMeta = getMetadata(record, "-U");
// Generate INSERT event for before data (UPDATE_BEFORE -> INSERT)
RecordData before = extractBeforeData(value, valueSchema);
events.add(DataChangeEvent.insertEvent(tableId, before, beforeMeta));
}

Map<String, String> afterMeta = getMetadata(record, "+U");
// Generate INSERT event for after data (UPDATE_AFTER -> INSERT)
events.add(DataChangeEvent.insertEvent(tableId, after, afterMeta));
return events;
} else {
LOG.trace("Received {} operation, skip", op);
return Collections.emptyList();
}
}

private RecordData extractBeforeData(Struct value, Schema valueSchema) throws Exception {
Schema beforeSchema =
SourceRecordEventDeserializer.fieldSchema(valueSchema, Envelope.FieldName.BEFORE);
Struct beforeValue =
SourceRecordEventDeserializer.fieldStruct(value, Envelope.FieldName.BEFORE);
return extractData(beforeValue, beforeSchema);
}

private RecordData extractAfterData(Struct value, Schema valueSchema) throws Exception {
Schema afterSchema =
SourceRecordEventDeserializer.fieldSchema(valueSchema, Envelope.FieldName.AFTER);
Struct afterValue =
SourceRecordEventDeserializer.fieldStruct(value, Envelope.FieldName.AFTER);
return extractData(afterValue, afterSchema);
}

private RecordData extractData(Struct value, Schema valueSchema) throws Exception {
if (value == null) {
return null;
}
DataType dataType = schemaDataTypeInference.infer(value, valueSchema);
DeserializationRuntimeConverter converter = createNotNullConverter(dataType);
// Null-safe wrapper
if (value == null) {
return null;
}
return (RecordData) converter.convert(value, valueSchema);
}

@Override
protected Object convertToString(Object dbzObj, Schema schema) {
// the Geometry datatype in MySQL will be converted to
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.mysql.source;

import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;

import java.util.Map;

/** A {@link SupportedMetadataColumn} for row_kind. */
public class RowKindMetadataColumn implements SupportedMetadataColumn {

@Override
public String getName() {
return "row_kind";
}

@Override
public DataType getType() {
return DataTypes.STRING().notNull();
}

@Override
public Class<?> getJavaClass() {
return String.class;
}

@Override
public Object read(Map<String, String> metadata) {
if (metadata.containsKey("row_kind")) {
return metadata.get("row_kind");
}
throw new IllegalArgumentException("row_kind doesn't exist in the metadata: " + metadata);
}
}
Loading
Loading