diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java index caf316d1b4a..3337a8ee384 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -300,4 +300,9 @@ public MySqlSourceBuilder assignUnboundedChunkFirst(boolean assignUnboundedCh public MySqlSource build() { return new MySqlSource<>(configFactory, checkNotNull(deserializer)); } + + public MySqlSourceBuilder setDebeziumSkippedOperations(String skippedOperation) { + this.configFactory.skippedOperations(skippedOperation); + return this; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 427115edea7..009e70f87dd 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -51,6 +51,7 @@ public class MySqlSourceConfigFactory implements Serializable { private List tableList; private String excludeTableList; private String serverTimeZone = ZoneId.systemDefault().getId(); + private String debeziumSkippedOperations; private StartupOptions startupOptions = StartupOptions.initial(); private int splitSize = MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue(); private int splitMetaGroupSize = MySqlSourceOptions.CHUNK_META_GROUP_SIZE.defaultValue(); @@ -112,6 +113,11 @@ public MySqlSourceConfigFactory excludeTableList(String tableInclusions) { return this; } + public MySqlSourceConfigFactory skippedOperations(String debeziumSkippedOperations) { + this.debeziumSkippedOperations = debeziumSkippedOperations; + return this; + } + /** Name of the MySQL database to use when connecting to the MySQL database server. */ public MySqlSourceConfigFactory username(String username) { this.username = username; @@ -382,6 +388,9 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { if (serverTimeZone != null) { props.setProperty("database.serverTimezone", serverTimeZone); } + if (debeziumSkippedOperations != null) { + props.put("skipped.operations", debeziumSkippedOperations); + } // override the user-defined debezium properties if (dbzProperties != null) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index a8e143f5fc5..37d8e96c09f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -107,6 +107,15 @@ public class MySqlSourceOptions { .withDescription( "The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table."); + public static final ConfigOption DEBEZIUM_SKIPPED_OPERATIONS = + ConfigOptions.key("debezium.skipped.operations") + .stringType() + .noDefaultValue() + .withDescription( + "The comma-separated list of operations to skip during streaming, " + + "defined as: 'c' for inserts/create; 'u' for updates; 'd' for deletes, " + + "'t' for truncates, By default, no operations will be skipped."); + public static final ConfigOption SCAN_SNAPSHOT_FETCH_SIZE = ConfigOptions.key("scan.snapshot.fetch.size") .intType()