diff --git a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
index fa501e7849a..b2b0418fa3f 100644
--- a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
@@ -447,6 +447,13 @@ Flink SQL> SELECT * FROM orders;
如果设置 'use.legacy.json.format' = 'false', 这条数据会被转换为 {"key1": "value1", "key2": "value2"}, 也就是 key 和 value 前的空格都会被保留。
+
+ | ignore-no-primary-key-table |
+ optional |
+ false |
+ Boolean |
+ 是否跳过没有主键的表。如果设置为true,连接器将跳过没有主键的表。 |
+
@@ -644,8 +651,8 @@ Flink 定期为 Source 执行 checkpoint,在故障转移的情况下,作业
在执行增量快照读取时,MySQL CDC source 需要一个用于分片的的算法。
MySQL CDC Source 使用主键列将表划分为多个分片(chunk)。 默认情况下,MySQL CDC source 会识别表的主键列,并使用主键中的第一列作为用作分片列。
-如果表中没有主键,用户必须指定 `scan.incremental.snapshot.chunk.key-column`、
-否则增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。
+如果表中没有主键,用户必须指定 `scan.incremental.snapshot.chunk.key-column`作为分块键,或者
+设置 `ignore-no-primary-key-table` 参数为 true 以跳过没有主键的表。否则增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。
请注意,使用不在主键中的列作为分块键可能会降低表的查询性能。
对于数值和自动增量拆分列,MySQL CDC Source 按固定步长高效地拆分块。
@@ -868,6 +875,8 @@ $ ./bin/flink run \
由于**处理顺序**无法保证,最终 `id=0` 的 `pid` 可能为 `2` 或 `4`,从而导致数据不一致。
+从 3.5.0 版本开始,MySQL 变更数据捕获(CDC)提供了一个忽略无主键表的选项。
+当 “ignore-no-primary-key-table”(忽略无主键表)设置为 “true”(真)时,连接器将跳过没有主键的表。
### 可用的指标
指标系统能够帮助了解分片分发的进展, 下面列举出了支持的 Flink 指标 [Flink metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/):
diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
index 0eb5419853d..e0e368f603a 100644
--- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
@@ -461,6 +461,13 @@ During a snapshot operation, the connector will query each included table to pro
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
+
+ | ignore-no-primary-key-table |
+ optional |
+ false |
+ Boolean |
+ Whether to skip tables without primary keys. If set to true, the connector will skip tables that don't have a primary key. |
+
@@ -666,7 +673,7 @@ Flink performs checkpoints for the source periodically, in case of failover, the
When performing incremental snapshot reading, MySQL CDC source need a criterion which used to split the table.
MySQL CDC Source use a splitting column to split the table to multiple splits (chunks). By default, MySQL CDC source will identify the primary key column of the table and use the first column in primary key as the splitting column.
-If there is no primary key in the table, user must specify `scan.incremental.snapshot.chunk.key-column`,
+If there is no primary key in the table, users must specify scan.incremental.snapshot.chunk.key-column as the chunk key, or set ignore-no-primary-key-table to true to skip tables without primary keys,
otherwise incremental snapshot reading will fail and you can disable `scan.incremental.snapshot.enabled` to fallback to old snapshot reading mechanism.
Please note that using a column not in primary key as a chunk key can result in slower table query performance.
@@ -892,6 +899,9 @@ Using a **non-primary key column** as the `scan.incremental.snapshot.chunk.key-c
Since the order of processing these records cannot be guaranteed, the final value of `pid` for `id=0` may end up being either `2` or `4`, leading to potential data inconsistencies.
+Starting from version 3.5.0, MySQL CDC provides an option to ignore tables without primary keys.
+When `ignore-no-primary-key-table` is set to `true`, the connector will skip tables that don't have a primary key.
+
### About converting binary type data to base64 encoded data
```sql
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index f522adfdb1a..3c1445cd32c 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -71,6 +71,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECT_TIMEOUT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
+import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.IGNORE_NO_PRIMARY_KEY_TABLE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.INCLUDE_COMMENTS_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.METADATA_LIST;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES;
@@ -168,6 +169,7 @@ public DataSource createDataSource(Context context) {
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
boolean isAssignUnboundedChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
+ boolean ignoreNoPrimaryKeyTable = config.get(IGNORE_NO_PRIMARY_KEY_TABLE);
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -221,6 +223,7 @@ public DataSource createDataSource(Context context) {
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
.useLegacyJsonFormat(useLegacyJsonFormat)
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
+ .ignoreNoPrimaryKeyTable(ignoreNoPrimaryKeyTable)
.skipSnapshotBackfill(skipSnapshotBackfill);
List tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
@@ -359,6 +362,7 @@ public Set> optionalOptions() {
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
+ options.add(IGNORE_NO_PRIMARY_KEY_TABLE);
return options;
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
index 6aff556e7fa..f521b799652 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
@@ -330,4 +330,12 @@ public class MySqlDataSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.");
+
+ @Experimental
+ public static final ConfigOption IGNORE_NO_PRIMARY_KEY_TABLE =
+ ConfigOptions.key("ignore-no-primary-key-table")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to ignore tables without primary key in MySQL. When enabled, the connector will skip tables.");
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
index 007333b0745..2d4371da841 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
@@ -132,6 +132,13 @@ protected void processElement(
private void sendCreateTableEvent(
JdbcConnection jdbc, TableId tableId, SourceOutput output) {
Schema schema = getSchema(jdbc, tableId);
+ // Check if table has primary key and ignore-no-primary-key-table is enabled
+ if (schema.primaryKeys().isEmpty() && sourceConfig.isIgnoreNoPrimaryKeyTable()) {
+ LOG.warn(
+ "Table {} has no primary key and ignore-no-primary-key-table is set to true, skipping table creation.",
+ tableId);
+ return;
+ }
output.collect(
new CreateTableEvent(
org.apache.flink.cdc.common.event.TableId.tableId(
@@ -263,6 +270,13 @@ private Map generateCreateTableEvent(
jdbc, sourceConfig.getDatabaseFilter(), sourceConfig.getTableFilter());
for (TableId tableId : capturedTableIds) {
Schema schema = getSchema(jdbc, tableId);
+ // Skip tables without primary keys if ignore-no-primary-key-table is enabled
+ if (schema.primaryKeys().isEmpty() && sourceConfig.isIgnoreNoPrimaryKeyTable()) {
+ LOG.warn(
+ "Table {} has no primary key and ignore-no-primary-key-table is set to true, skipping table creation.",
+ tableId);
+ continue;
+ }
createTableEventCache.put(
tableId,
new CreateTableEvent(
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
index 87a435ff62b..7ff67b878fd 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
@@ -249,6 +249,18 @@ private Optional parseOnLineSchemaChangeEvent(SourceRecord sourceR
private boolean shouldEmit(SourceRecord sourceRecord) {
if (RecordUtils.isDataChangeRecord(sourceRecord)) {
TableId tableId = RecordUtils.getTableId(sourceRecord);
+ // Skip events for tables without primary keys if ignore-no-primary-key-table is enabled
+ if (statefulTaskContext.getSourceConfig().isIgnoreNoPrimaryKeyTable()
+ && statefulTaskContext
+ .getDatabaseSchema()
+ .tableFor(tableId)
+ .primaryKeyColumns()
+ .isEmpty()) {
+ LOG.warn(
+ "Table {} has no primary key and ignore-no-primary-key-table is set to true, skipping binlog event.",
+ tableId);
+ return false;
+ }
if (pureBinlogPhaseTables.contains(tableId)) {
return true;
}
@@ -267,7 +279,8 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
ChunkUtils.getChunkKeyColumnType(
statefulTaskContext.getDatabaseSchema().tableFor(tableId),
statefulTaskContext.getSourceConfig().getChunkKeyColumns(),
- statefulTaskContext.getSourceConfig().isTreatTinyInt1AsBoolean());
+ statefulTaskContext.getSourceConfig().isTreatTinyInt1AsBoolean(),
+ statefulTaskContext.getSourceConfig().isIgnoreNoPrimaryKeyTable());
Struct target = RecordUtils.getStructContainsChunkKey(sourceRecord);
Object[] chunkKey =
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
index 4821eaba2ea..8bc4bd10490 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
@@ -114,6 +114,13 @@ public List splitChunks(MySqlPartition partition, TableId ta
throws Exception {
if (!hasNextChunk()) {
analyzeTable(partition, tableId);
+ // Skip tables without primary key
+ if (splitColumn == null && sourceConfig.isIgnoreNoPrimaryKeyTable()) {
+ LOG.warn(
+ "Table {} doesn't have primary key and ignore-no-primary-key-table is set to true, skipping incremental snapshot.",
+ tableId);
+ return Collections.emptyList();
+ }
Optional> evenlySplitChunks =
trySplitAllEvenlySizedChunks(partition, tableId);
if (evenlySplitChunks.isPresent()) {
@@ -133,6 +140,13 @@ public List splitChunks(MySqlPartition partition, TableId ta
"Can not split a new table before the previous table splitting finish.");
if (currentSplittingTable == null) {
analyzeTable(partition, currentSplittingTableId);
+ // Skip tables without primary key
+ if (splitColumn == null && sourceConfig.isIgnoreNoPrimaryKeyTable()) {
+ LOG.warn(
+ "Table {} doesn't have primary key and ignore-no-primary-key-table is set to true, skipping incremental snapshot.",
+ currentSplittingTableId);
+ return Collections.emptyList();
+ }
}
synchronized (lock) {
return Collections.singletonList(splitOneUnevenlySizedChunk(partition, tableId));
@@ -145,12 +159,22 @@ private void analyzeTable(MySqlPartition partition, TableId tableId) {
try {
currentSplittingTable =
mySqlSchema.getTableSchema(partition, jdbcConnection, tableId).getTable();
- splitColumn =
- ChunkUtils.getChunkKeyColumn(
- currentSplittingTable, sourceConfig.getChunkKeyColumns());
+ splitColumn = getChunkKeyColumn(currentSplittingTable);
+ if (splitColumn == null && sourceConfig.isIgnoreNoPrimaryKeyTable()) {
+ LOG.warn(
+ "Table {} doesn't have primary key and ignore-no-primary-key-table is set to true, skipping incremental snapshot.",
+ tableId);
+ currentSplittingTableId = null;
+ nextChunkStart = null;
+ nextChunkId = null;
+ return;
+ }
splitType =
ChunkUtils.getChunkKeyColumnType(
- splitColumn, sourceConfig.isTreatTinyInt1AsBoolean());
+ currentSplittingTable,
+ sourceConfig.getChunkKeyColumns(),
+ sourceConfig.isTreatTinyInt1AsBoolean(),
+ sourceConfig.isIgnoreNoPrimaryKeyTable());
minMaxOfSplitColumn =
StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name());
approximateRowCnt = StatementUtils.queryApproximateRowCnt(jdbcConnection, tableId);
@@ -479,4 +503,9 @@ public void close() throws Exception {
}
mySqlSchema.close();
}
+
+ private Column getChunkKeyColumn(Table table) {
+ return ChunkUtils.getChunkKeyColumn(
+ table, sourceConfig.getChunkKeyColumns(), sourceConfig.isIgnoreNoPrimaryKeyTable());
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
index 260a7cd2b5d..305628b66de 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
@@ -70,6 +70,7 @@ public class MySqlSourceConfig implements Serializable {
private final boolean parseOnLineSchemaChanges;
public static boolean useLegacyJsonFormat = true;
private final boolean assignUnboundedChunkFirst;
+ private final boolean ignoreNoPrimaryKeyTable;
// --------------------------------------------------------------------------------------------
// Debezium Configurations
@@ -108,7 +109,8 @@ public class MySqlSourceConfig implements Serializable {
boolean parseOnLineSchemaChanges,
boolean treatTinyInt1AsBoolean,
boolean useLegacyJsonFormat,
- boolean assignUnboundedChunkFirst) {
+ boolean assignUnboundedChunkFirst,
+ boolean ignoreNoPrimaryKeyTable) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
@@ -152,6 +154,7 @@ public class MySqlSourceConfig implements Serializable {
this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean;
this.useLegacyJsonFormat = useLegacyJsonFormat;
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
+ this.ignoreNoPrimaryKeyTable = ignoreNoPrimaryKeyTable;
}
public String getHostname() {
@@ -285,4 +288,8 @@ public boolean isSkipSnapshotBackfill() {
public boolean isTreatTinyInt1AsBoolean() {
return treatTinyInt1AsBoolean;
}
+
+ public boolean isIgnoreNoPrimaryKeyTable() {
+ return ignoreNoPrimaryKeyTable;
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
index 427115edea7..dc7db6d97b7 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
@@ -74,6 +74,7 @@ public class MySqlSourceConfigFactory implements Serializable {
private boolean treatTinyInt1AsBoolean = true;
private boolean useLegacyJsonFormat = true;
private boolean assignUnboundedChunkFirst = false;
+ private boolean ignoreNoPrimaryKeyTable = false;
public MySqlSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
@@ -280,6 +281,15 @@ public MySqlSourceConfigFactory skipSnapshotBackfill(boolean skipSnapshotBackfil
return this;
}
+ /**
+ * Whether to ignore tables without primary key. When enabled, the connector will skip tables
+ * that don't have a primary key.
+ */
+ public MySqlSourceConfigFactory ignoreNoPrimaryKeyTable(boolean ignoreNoPrimaryKeyTable) {
+ this.ignoreNoPrimaryKeyTable = ignoreNoPrimaryKeyTable;
+ return this;
+ }
+
/**
* Whether to use legacy json format. The default value is true, which means there is no
* whitespace before value and after comma in json format.
@@ -421,6 +431,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
parseOnLineSchemaChanges,
treatTinyInt1AsBoolean,
useLegacyJsonFormat,
- assignUnboundedChunkFirst);
+ assignUnboundedChunkFirst,
+ ignoreNoPrimaryKeyTable);
}
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
index a00d6d564ad..73ae1d887fe 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
@@ -293,4 +293,12 @@ public class MySqlSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false.");
+
+ @Experimental
+ public static final ConfigOption IGNORE_NO_PRIMARY_KEY_TABLE =
+ ConfigOptions.key("ignore-no-primary-key-table")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to ignore tables without primary key in MySQL. When enabled, the connector will skip tables.");
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java
index 794abd2b5f9..43a13f2e6a8 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java
@@ -28,6 +28,8 @@
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -41,12 +43,17 @@
/** Utilities to split chunks of table. */
public class ChunkUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(ChunkUtils.class);
private ChunkUtils() {}
public static RowType getChunkKeyColumnType(
- Table table, Map chunkKeyColumns, boolean tinyInt1isBit) {
- return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumns), tinyInt1isBit);
+ Table table,
+ Map chunkKeyColumns,
+ boolean tinyInt1isBit,
+ boolean ignoreNoPrimaryKeyTable) {
+ return getChunkKeyColumnType(
+ getChunkKeyColumn(table, chunkKeyColumns, ignoreNoPrimaryKeyTable), tinyInt1isBit);
}
public static RowType getChunkKeyColumnType(Column chunkKeyColumn, boolean tinyInt1isBit) {
@@ -62,12 +69,25 @@ public static RowType getChunkKeyColumnType(Column chunkKeyColumn, boolean tinyI
* have primary keys, `chunkKeyColumn` must be set. When the parameter `chunkKeyColumn` is not
* set and the table has primary keys, return the first column of primary keys.
*/
- public static Column getChunkKeyColumn(Table table, Map chunkKeyColumns) {
+ public static Column getChunkKeyColumn(
+ Table table, Map chunkKeyColumns, boolean ignoreNoPrimaryKeyTable) {
List primaryKeys = table.primaryKeyColumns();
String chunkKeyColumn = findChunkKeyColumn(table.id(), chunkKeyColumns);
if (primaryKeys.isEmpty() && chunkKeyColumn == null) {
- throw new ValidationException(
- "To use incremental snapshot, 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.");
+ if (ignoreNoPrimaryKeyTable) {
+ LOG.warn(
+ "Table {} has no primary key and no chunk key column specified. This table will be skipped.",
+ table.id());
+ return null;
+ } else {
+ throw new ValidationException(
+ String.format(
+ "Table %s has no primary key and no chunk key column specified. "
+ + "To use incremental snapshot, either: "
+ + "1. Set 'scan.incremental.snapshot.chunk.key-column' for this table, or "
+ + "2. Set 'scan.incremental.snapshot.ignore-no-primary-key-table' to true to skip tables without primary keys.",
+ table.id()));
+ }
}
List searchColumns = table.columns();
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
index 33f50333af8..2320fec2817 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
@@ -17,11 +17,18 @@
package org.apache.flink.cdc.connectors.mysql.source.assigners;
+import org.apache.flink.cdc.connectors.mysql.schema.MySqlSchema;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
+import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import io.debezium.connector.mysql.MySqlPartition;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -85,4 +92,49 @@ void testSplitEvenlySizedChunksNormal() {
ChunkRange.of(2147483637, 2147483647),
ChunkRange.of(2147483647, null));
}
+
+ @Test
+ void testIgnoreNoPrimaryKeyTable() throws Exception {
+ MySqlSourceConfig sourceConfig =
+ new MySqlSourceConfigFactory()
+ .startupOptions(StartupOptions.initial())
+ .databaseList("test_db")
+ .tableList("test_db.test_table")
+ .hostname("localhost")
+ .username("test")
+ .password("test")
+ .serverTimeZone(ZoneId.of("UTC").toString())
+ .ignoreNoPrimaryKeyTable(true)
+ .createConfig(0);
+
+ MySqlSchema schema =
+ new MySqlSchema(sourceConfig, true) {
+ @Override
+ public TableChanges.TableChange getTableSchema(
+ MySqlPartition partition, JdbcConnection jdbc, TableId tableId) {
+ // 创建一个没有主键的表
+ Table noPkTable =
+ Table.editor()
+ .tableId(tableId)
+ .addColumn(
+ Column.editor()
+ .name("id")
+ .type("BIGINT")
+ .jdbcType(-5)
+ .optional(false)
+ .create())
+ .create();
+ return new TableChanges.TableChange(
+ TableChanges.TableChangeType.CREATE, noPkTable);
+ }
+ };
+
+ MySqlChunkSplitter splitter = new MySqlChunkSplitter(schema, sourceConfig);
+ MySqlPartition partition = new MySqlPartition("mysql_binlog_source");
+
+ List splits =
+ splitter.splitChunks(partition, new TableId("test_db", null, "test_table"));
+
+ Assertions.assertThat(splits).isEmpty();
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
index d9755559357..bd166900dff 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
@@ -404,7 +404,14 @@ void testTableWithoutPrimaryKey() {
new String[] {tableWithoutPrimaryKey});
})
.hasStackTraceContaining(
- "To use incremental snapshot, 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.");
+ "Table "
+ + customerDatabase.getDatabaseName()
+ + "."
+ + tableWithoutPrimaryKey
+ + " has no primary key and no chunk key column specified. "
+ + "To use incremental snapshot, either: "
+ + "1. Set 'scan.incremental.snapshot.chunk.key-column' for this table, or "
+ + "2. Set 'scan.incremental.snapshot.ignore-no-primary-key-table' to true to skip tables without primary keys.");
}
@Test
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
index 7cc5fe5861e..472b355a84c 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
@@ -2445,4 +2445,13 @@ public void testReadChangelogAppendOnly(boolean incrementalSnapshot) throws Exce
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
result.getJobClient().get().cancel().get();
}
+
+ @ParameterizedTest(name = "incrementalSnapshot = {0}")
+ @ValueSource(booleans = {true, false})
+ void testNoPKTableWithIgnoreNoPrimaryKeyTable(boolean incrementalSnapshot) throws Exception {
+ setup(incrementalSnapshot);
+ runConsumingForNoPKTableTest(
+ ", 'scan.incremental.snapshot.ignore-no-primary-key-table'='true'",
+ incrementalSnapshot);
+ }
}