Skip to content

Commit 82cac0e

Browse files
authored
[cdc] support remained postgres cdc source options excluding chunkKeyColumns (#6888)
1 parent 64481a1 commit 82cac0e

File tree

1 file changed

+20
-0
lines changed

1 file changed

+20
-0
lines changed

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,26 @@ public static JdbcIncrementalSource<CdcSourceRecord> buildPostgresSource(
142142
postgresConfig
143143
.getOptional(PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE)
144144
.ifPresent(sourceBuilder::splitSize);
145+
postgresConfig
146+
.getOptional(PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE)
147+
.ifPresent(sourceBuilder::fetchSize);
148+
postgresConfig
149+
.getOptional(PostgresSourceOptions.CHUNK_META_GROUP_SIZE)
150+
.ifPresent(sourceBuilder::splitMetaGroupSize);
151+
postgresConfig
152+
.getOptional(PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND)
153+
.ifPresent(sourceBuilder::distributionFactorUpper);
154+
postgresConfig
155+
.getOptional(PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND)
156+
.ifPresent(sourceBuilder::distributionFactorLower);
157+
postgresConfig
158+
.getOptional(
159+
PostgresSourceOptions
160+
.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED)
161+
.ifPresent(sourceBuilder::assignUnboundedChunkFirst);
162+
postgresConfig
163+
.getOptional(PostgresSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED)
164+
.ifPresent(sourceBuilder::scanNewlyAddedTableEnabled);
145165
postgresConfig
146166
.getOptional(PostgresSourceOptions.CONNECT_TIMEOUT)
147167
.ifPresent(sourceBuilder::connectTimeout);

0 commit comments

Comments
 (0)