Skip to content

Commit 64481a1

Browse files
authored
[cdc] Support more mysql cdc source options (#6886)
1 parent a420f9f commit 64481a1

File tree

1 file changed

+21
-0
lines changed

1 file changed

+21
-0
lines changed

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,27 @@ public static MySqlSource<CdcSourceRecord> buildMySqlSource(
183183
mySqlConfig
184184
.getOptional(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP)
185185
.ifPresent(sourceBuilder::skipSnapshotBackfill);
186+
mySqlConfig
187+
.getOptional(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE)
188+
.ifPresent(sourceBuilder::fetchSize);
189+
mySqlConfig
190+
.getOptional(MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND)
191+
.ifPresent(sourceBuilder::distributionFactorLower);
192+
mySqlConfig
193+
.getOptional(MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND)
194+
.ifPresent(sourceBuilder::distributionFactorUpper);
195+
mySqlConfig
196+
.getOptional(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST)
197+
.ifPresent(sourceBuilder::assignUnboundedChunkFirst);
198+
mySqlConfig
199+
.getOptional(MySqlSourceOptions.CHUNK_META_GROUP_SIZE)
200+
.ifPresent(sourceBuilder::splitMetaGroupSize);
201+
mySqlConfig
202+
.getOptional(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES)
203+
.ifPresent(sourceBuilder::parseOnLineSchemaChanges);
204+
mySqlConfig
205+
.getOptional(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT)
206+
.ifPresent(sourceBuilder::useLegacyJsonFormat);
186207

187208
String startupMode = mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_MODE);
188209
// see

0 commit comments

Comments
 (0)