Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECT_TIMEOUT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.IGNORE_NO_PRIMARY_KEY_TABLE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.INCLUDE_COMMENTS_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.METADATA_LIST;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES;
Expand Down Expand Up @@ -168,6 +169,7 @@ public DataSource createDataSource(Context context) {
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
boolean isAssignUnboundedChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
boolean ignoreNoPrimaryKeyTable = config.get(IGNORE_NO_PRIMARY_KEY_TABLE);

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -221,6 +223,7 @@ public DataSource createDataSource(Context context) {
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
.useLegacyJsonFormat(useLegacyJsonFormat)
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
.ignoreNoPrimaryKeyTable(ignoreNoPrimaryKeyTable)
.skipSnapshotBackfill(skipSnapshotBackfill);

List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
Expand Down Expand Up @@ -359,6 +362,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
options.add(IGNORE_NO_PRIMARY_KEY_TABLE);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,4 +330,11 @@ public class MySqlDataSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.");

@Experimental
public static final ConfigOption<Boolean> IGNORE_NO_PRIMARY_KEY_TABLE =
ConfigOptions.key("ignore-no-primary-key-table")
.booleanType()
.defaultValue(false)
.withDescription("Whether to ignore tables without primary key in MySQL.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
ChunkUtils.getChunkKeyColumnType(
statefulTaskContext.getDatabaseSchema().tableFor(tableId),
statefulTaskContext.getSourceConfig().getChunkKeyColumns(),
statefulTaskContext.getSourceConfig().isTreatTinyInt1AsBoolean());
statefulTaskContext.getSourceConfig().isTreatTinyInt1AsBoolean(),
statefulTaskContext.getSourceConfig());

Struct target = RecordUtils.getStructContainsChunkKey(sourceRecord);
Object[] chunkKey =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ public List<MySqlSnapshotSplit> splitChunks(MySqlPartition partition, TableId ta
throws Exception {
if (!hasNextChunk()) {
analyzeTable(partition, tableId);
// Skip tables without primary key
if (splitColumn == null && sourceConfig.isIgnoreNoPrimaryKeyTable()) {
return Collections.emptyList();
}
Optional<List<MySqlSnapshotSplit>> evenlySplitChunks =
trySplitAllEvenlySizedChunks(partition, tableId);
if (evenlySplitChunks.isPresent()) {
Expand All @@ -133,6 +137,10 @@ public List<MySqlSnapshotSplit> splitChunks(MySqlPartition partition, TableId ta
"Can not split a new table before the previous table splitting finish.");
if (currentSplittingTable == null) {
analyzeTable(partition, currentSplittingTableId);
// Skip tables without primary key
if (splitColumn == null && sourceConfig.isIgnoreNoPrimaryKeyTable()) {
return Collections.emptyList();
}
}
synchronized (lock) {
return Collections.singletonList(splitOneUnevenlySizedChunk(partition, tableId));
Expand All @@ -145,12 +153,22 @@ private void analyzeTable(MySqlPartition partition, TableId tableId) {
try {
currentSplittingTable =
mySqlSchema.getTableSchema(partition, jdbcConnection, tableId).getTable();
splitColumn =
ChunkUtils.getChunkKeyColumn(
currentSplittingTable, sourceConfig.getChunkKeyColumns());
splitColumn = getChunkKeyColumn(currentSplittingTable);
if (splitColumn == null && sourceConfig.isIgnoreNoPrimaryKeyTable()) {
LOG.warn(
"Table {} doesn't have primary key and ignore-no-primary-key-table is set to true, skipping incremental snapshot.",
tableId);
currentSplittingTableId = null;
nextChunkStart = null;
nextChunkId = null;
return;
}
splitType =
ChunkUtils.getChunkKeyColumnType(
splitColumn, sourceConfig.isTreatTinyInt1AsBoolean());
currentSplittingTable,
sourceConfig.getChunkKeyColumns(),
sourceConfig.isTreatTinyInt1AsBoolean(),
sourceConfig);
minMaxOfSplitColumn =
StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name());
approximateRowCnt = StatementUtils.queryApproximateRowCnt(jdbcConnection, tableId);
Expand Down Expand Up @@ -479,4 +497,9 @@ public void close() throws Exception {
}
mySqlSchema.close();
}

private Column getChunkKeyColumn(Table table) {
return ChunkUtils.getChunkKeyColumn(
table, sourceConfig.getChunkKeyColumns(), sourceConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class MySqlSourceConfig implements Serializable {
private final boolean parseOnLineSchemaChanges;
public static boolean useLegacyJsonFormat = true;
private final boolean assignUnboundedChunkFirst;
private final boolean ignoreNoPrimaryKeyTable;

// --------------------------------------------------------------------------------------------
// Debezium Configurations
Expand Down Expand Up @@ -108,7 +109,8 @@ public class MySqlSourceConfig implements Serializable {
boolean parseOnLineSchemaChanges,
boolean treatTinyInt1AsBoolean,
boolean useLegacyJsonFormat,
boolean assignUnboundedChunkFirst) {
boolean assignUnboundedChunkFirst,
boolean ignoreNoPrimaryKeyTable) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
Expand Down Expand Up @@ -152,6 +154,7 @@ public class MySqlSourceConfig implements Serializable {
this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean;
this.useLegacyJsonFormat = useLegacyJsonFormat;
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
this.ignoreNoPrimaryKeyTable = ignoreNoPrimaryKeyTable;
}

public String getHostname() {
Expand Down Expand Up @@ -285,4 +288,8 @@ public boolean isSkipSnapshotBackfill() {
public boolean isTreatTinyInt1AsBoolean() {
return treatTinyInt1AsBoolean;
}

public boolean isIgnoreNoPrimaryKeyTable() {
return ignoreNoPrimaryKeyTable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class MySqlSourceConfigFactory implements Serializable {
private boolean treatTinyInt1AsBoolean = true;
private boolean useLegacyJsonFormat = true;
private boolean assignUnboundedChunkFirst = false;
private boolean ignoreNoPrimaryKeyTable = false;

public MySqlSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
Expand Down Expand Up @@ -280,6 +281,15 @@ public MySqlSourceConfigFactory skipSnapshotBackfill(boolean skipSnapshotBackfil
return this;
}

/**
* Whether to ignore tables without primary key. When enabled, the connector will skip tables
* that don't have a primary key.
*/
public MySqlSourceConfigFactory ignoreNoPrimaryKeyTable(boolean ignoreNoPrimaryKeyTable) {
this.ignoreNoPrimaryKeyTable = ignoreNoPrimaryKeyTable;
return this;
}

/**
* Whether to use legacy json format. The default value is true, which means there is no
* whitespace before value and after comma in json format.
Expand Down Expand Up @@ -421,6 +431,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
parseOnLineSchemaChanges,
treatTinyInt1AsBoolean,
useLegacyJsonFormat,
assignUnboundedChunkFirst);
assignUnboundedChunkFirst,
ignoreNoPrimaryKeyTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,15 @@ public class MySqlSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false.");

@Experimental
public static final ConfigOption<Boolean> IGNORE_NO_PRIMARY_KEY_TABLE =
ConfigOptions.key("ignore-no-primary-key-table")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to ignore tables without primary key. When enabled, the connector will skip tables "
+ "that don't have a primary key. By default these tables will be processed, but for some "
+ "scenarios it may be desirable to ignore them since tables without primary keys "
+ "might cause performance issues during incremental snapshot.");
Copy link
Contributor

@lvyanquan lvyanquan May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if we met the records of these tables in incremental reading phase? Skip it or emit it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will skip them

Copy link
Contributor

@lvyanquan lvyanquan May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an ITCase test to verify it during snapshot and incremental reading phases?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code logic content has been newly added and verified through testing.

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.relational.Column;
import io.debezium.relational.Table;
Expand All @@ -41,12 +44,13 @@

/** Utilities to split chunks of table. */
public class ChunkUtils {
private static final Logger LOG = LoggerFactory.getLogger(ChunkUtils.class);

private ChunkUtils() {}

public static RowType getChunkKeyColumnType(
Table table, Map<ObjectPath, String> chunkKeyColumns, boolean tinyInt1isBit) {
return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumns), tinyInt1isBit);
Table table, Map<ObjectPath, String> chunkKeyColumns, boolean tinyInt1isBit, MySqlSourceConfig sourceConfig) {
return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumns, sourceConfig), tinyInt1isBit);
}

public static RowType getChunkKeyColumnType(Column chunkKeyColumn, boolean tinyInt1isBit) {
Expand All @@ -62,12 +66,25 @@ public static RowType getChunkKeyColumnType(Column chunkKeyColumn, boolean tinyI
* have primary keys, `chunkKeyColumn` must be set. When the parameter `chunkKeyColumn` is not
* set and the table has primary keys, return the first column of primary keys.
*/
public static Column getChunkKeyColumn(Table table, Map<ObjectPath, String> chunkKeyColumns) {
public static Column getChunkKeyColumn(
Table table, Map<ObjectPath, String> chunkKeyColumns, MySqlSourceConfig sourceConfig) {
List<Column> primaryKeys = table.primaryKeyColumns();
String chunkKeyColumn = findChunkKeyColumn(table.id(), chunkKeyColumns);
if (primaryKeys.isEmpty() && chunkKeyColumn == null) {
throw new ValidationException(
"To use incremental snapshot, 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.");
if (sourceConfig != null && sourceConfig.isIgnoreNoPrimaryKeyTable()) {
LOG.warn(
"Table {} has no primary key and no chunk key column specified. This table will be skipped.",
table.id());
return null;
} else {
throw new ValidationException(
String.format(
"Table %s has no primary key and no chunk key column specified. "
+ "To use incremental snapshot, either: "
+ "1. Set 'scan.incremental.snapshot.chunk.key-column' for this table, or "
+ "2. Set 'scan.incremental.snapshot.ignore-no-primary-key-table' to true to skip tables without primary keys.",
table.id()));
}
}

List<Column> searchColumns = table.columns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@

package org.apache.flink.cdc.connectors.mysql.source.assigners;

import org.apache.flink.cdc.connectors.mysql.schema.MySqlSchema;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.table.catalog.ObjectPath;

import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -85,4 +93,53 @@ void testSplitEvenlySizedChunksNormal() {
ChunkRange.of(2147483637, 2147483647),
ChunkRange.of(2147483647, null));
}

@Test
void testIgnoreNoPrimaryKeyTable() throws Exception {
// 创建配置,设置ignoreNoPrimaryKeyTable为true
MySqlSourceConfig sourceConfig =
new MySqlSourceConfigFactory()
.startupOptions(StartupOptions.initial())
.databaseList("test_db")
.tableList("test_db.test_table")
.hostname("localhost")
.username("test")
.password("test")
.serverTimeZone(ZoneId.of("UTC").toString())
.ignoreNoPrimaryKeyTable(true)
.createConfig(0);

// 创建一个简单的MySqlSchema实现
MySqlSchema schema =
new MySqlSchema(sourceConfig, true) {
@Override
public TableChanges.TableChange getTableSchema(
MySqlPartition partition, JdbcConnection jdbc, TableId tableId) {
// 创建一个没有主键的表
Table noPkTable =
Table.editor()
.tableId(tableId)
.addColumn(
Column.editor()
.name("id")
.type("BIGINT")
.jdbcType(-5)
.optional(false)
.create())
.create();
return new TableChanges.TableChange(
TableChanges.TableChangeType.CREATE, noPkTable);
}
};

MySqlChunkSplitter splitter = new MySqlChunkSplitter(schema, sourceConfig);
MySqlPartition partition = new MySqlPartition("mysql_binlog_source");

// 测试无主键表
List<MySqlSnapshotSplit> splits =
splitter.splitChunks(partition, new TableId("test_db", null, "test_table"));

// 验证对于没有主键的表,返回空的分片列表
Assertions.assertThat(splits).isEmpty();
}
}