Skip to content

Commit 99c00f3

Browse files
authored
[FLINK-38840][postgres] Postgres YAML connector supports emitting complete Table ID (#4209)
added a new option `table-id.include-database` to emit (db, schema, table) style table id.
1 parent 42580dc commit 99c00f3

File tree

14 files changed

+188
-58
lines changed

14 files changed

+188
-58
lines changed

docs/content.zh/docs/connectors/pipeline-connectors/postgres.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,18 @@ pipeline:
270270
此为实验性选项,默认值为 false。
271271
</td>
272272
</tr>
273+
<tr>
274+
<td>table-id.include-database</td>
275+
<td>optional</td>
276+
<td style="word-wrap: break-word;">false</td>
277+
<td>Boolean</td>
278+
<td>
279+
是否在生成的 Table ID 中包含数据库名称。<br>
280+
如果设置为 true,Table ID 的格式为 (数据库, 模式, 表)。<br>
281+
如果设置为 false,Table ID 的格式为 (模式, 表)。<br>
282+
默认值为 false。
283+
</td>
284+
</tr>
273285
</tbody>
274286
</table>
275287
</div>

docs/content/docs/connectors/pipeline-connectors/postgres.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,18 @@ pipeline:
262262
Experimental option, defaults to false.
263263
</td>
264264
</tr>
265+
<tr>
266+
<td>table-id.include-database</td>
267+
<td>optional</td>
268+
<td style="word-wrap: break-word;">false</td>
269+
<td>Boolean</td>
270+
<td>
271+
Whether to include database in the generated Table ID.<br>
272+
If set to true, the Table ID will be in the format (database, schema, table).<br>
273+
If set to false, the Table ID will be in the format (schema, table).<br>
274+
Defaults to false.
275+
</td>
276+
</tr>
265277
</tbody>
266278
</table>
267279
</div>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
8181
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES;
8282
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES_EXCLUDE;
83+
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLE_ID_INCLUDE_DATABASE;
8384
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.USERNAME;
8485
import static org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
8586
import static org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
@@ -129,6 +130,7 @@ public DataSource createDataSource(Context context) {
129130
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
130131
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
131132
int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
133+
boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE);
132134

133135
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
134136
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -169,6 +171,7 @@ public DataSource createDataSource(Context context) {
169171
.skipSnapshotBackfill(skipSnapshotBackfill)
170172
.lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
171173
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
174+
.includeDatabaseInTableId(tableIdIncludeDatabase)
172175
.getConfigFactory();
173176

174177
List<TableId> tableIds = PostgresSchemaUtils.listTables(configFactory.create(0), null);
@@ -197,6 +200,7 @@ public DataSource createDataSource(Context context) {
197200
String metadataList = config.get(METADATA_LIST);
198201
List<PostgreSQLReadableMetadata> readableMetadataList = listReadableMetadata(metadataList);
199202

203+
// Create a custom PostgresDataSource that passes the includeDatabaseInTableId flag
200204
return new PostgresDataSource(configFactory, readableMetadataList);
201205
}
202206

@@ -257,6 +261,7 @@ public Set<ConfigOption<?>> optionalOptions() {
257261
options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
258262
options.add(METADATA_LIST);
259263
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
264+
options.add(TABLE_ID_INCLUDE_DATABASE);
260265
return options;
261266
}
262267

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,14 @@ public PostgresDataSource(
6565

6666
@Override
6767
public EventSourceProvider getEventSourceProvider() {
68+
String databaseName = postgresSourceConfig.getDatabaseList().get(0);
69+
boolean includeDatabaseInTableId = postgresSourceConfig.isIncludeDatabaseInTableId();
6870
DebeziumEventDeserializationSchema deserializer =
69-
new PostgresEventDeserializer(DebeziumChangelogMode.ALL, readableMetadataList);
71+
new PostgresEventDeserializer(
72+
DebeziumChangelogMode.ALL,
73+
readableMetadataList,
74+
includeDatabaseInTableId,
75+
databaseName);
7076

7177
PostgresOffsetFactory postgresOffsetFactory = new PostgresOffsetFactory();
7278
PostgresDialect postgresDialect = new PostgresDialect(postgresSourceConfig);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,4 +264,13 @@ public class PostgresDataSourceOptions {
264264
.defaultValue(false)
265265
.withDescription(
266266
"Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false.");
267+
268+
public static final ConfigOption<Boolean> TABLE_ID_INCLUDE_DATABASE =
269+
ConfigOptions.key("table-id.include-database")
270+
.booleanType()
271+
.defaultValue(false)
272+
.withDescription(
273+
"Whether to include database in the generated Table ID. "
274+
+ "If set to true, the Table ID will be in the format (database, schema, table). "
275+
+ "If set to false, the Table ID will be in the format (schema, table). Defaults to false.");
267276
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,37 @@ public class PostgresEventDeserializer extends DebeziumEventDeserializationSchem
5050

5151
private static final long serialVersionUID = 1L;
5252
private List<PostgreSQLReadableMetadata> readableMetadataList;
53+
private final boolean includeDatabaseInTableId;
54+
private final String databaseName;
5355

5456
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
5557

5658
public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
57-
super(new PostgresSchemaDataTypeInference(), changelogMode);
59+
this(changelogMode, new ArrayList<>(), false, null);
5860
}
5961

6062
public PostgresEventDeserializer(
6163
DebeziumChangelogMode changelogMode,
6264
List<PostgreSQLReadableMetadata> readableMetadataList) {
65+
this(changelogMode, readableMetadataList, false, null);
66+
}
67+
68+
public PostgresEventDeserializer(
69+
DebeziumChangelogMode changelogMode,
70+
List<PostgreSQLReadableMetadata> readableMetadataList,
71+
boolean includeDatabaseInTableId) {
72+
this(changelogMode, readableMetadataList, includeDatabaseInTableId, null);
73+
}
74+
75+
public PostgresEventDeserializer(
76+
DebeziumChangelogMode changelogMode,
77+
List<PostgreSQLReadableMetadata> readableMetadataList,
78+
boolean includeDatabaseInTableId,
79+
String databaseName) {
6380
super(new PostgresSchemaDataTypeInference(), changelogMode);
6481
this.readableMetadataList = readableMetadataList;
82+
this.includeDatabaseInTableId = includeDatabaseInTableId;
83+
this.databaseName = databaseName;
6584
}
6685

6786
@Override
@@ -87,7 +106,11 @@ protected boolean isSchemaChangeRecord(SourceRecord record) {
87106
@Override
88107
protected TableId getTableId(SourceRecord record) {
89108
String[] parts = record.topic().split("\\.");
90-
return TableId.tableId(parts[1], parts[2]);
109+
if (includeDatabaseInTableId && databaseName != null) {
110+
return TableId.tableId(databaseName, parts[1], parts[2]);
111+
} else {
112+
return TableId.tableId(parts[1], parts[2]);
113+
}
91114
}
92115

93116
@Override

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import static org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isLowWatermarkEvent;
6262
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
6363
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
64+
import static org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toCdcTableId;
6465

6566
/** The {@link RecordEmitter} implementation for PostgreSQL pipeline connector. */
6667
public class PostgresPipelineRecordEmitter<T> extends IncrementalSourceRecordEmitter<T> {
@@ -73,6 +74,7 @@ public class PostgresPipelineRecordEmitter<T> extends IncrementalSourceRecordEmi
7374
// Used when startup mode is not initial
7475
private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
7576
private boolean isBounded = false;
77+
private boolean includeDatabaseInTableId = false;
7678

7779
private final Map<TableId, CreateTableEvent> createTableEventCache;
7880

@@ -88,6 +90,7 @@ public PostgresPipelineRecordEmitter(
8890
sourceConfig.isIncludeSchemaChanges(),
8991
offsetFactory);
9092
this.sourceConfig = sourceConfig;
93+
this.includeDatabaseInTableId = sourceConfig.isIncludeDatabaseInTableId();
9194
this.postgresDialect = postgresDialect;
9295
this.alreadySendCreateTableTables = new HashSet<>();
9396
this.createTableEventCache =
@@ -103,10 +106,17 @@ public void applySplit(SourceSplitBase split) {
103106
// TableSchemas in SnapshotSplit only contains one table.
104107
createTableEventCache.putAll(generateCreateTableEvent(sourceConfig));
105108
} else {
106-
for (TableChanges.TableChange tableChange : split.getTableSchemas().values()) {
109+
for (Map.Entry<TableId, TableChanges.TableChange> entry :
110+
split.getTableSchemas().entrySet()) {
111+
TableId tableId =
112+
entry.getKey(); // Use the TableId from the map key which contains full info
113+
TableChanges.TableChange tableChange = entry.getValue();
107114
CreateTableEvent createTableEvent =
108115
new CreateTableEvent(
109-
toCdcTableId(tableChange.getId()),
116+
toCdcTableId(
117+
tableId,
118+
sourceConfig.getDatabaseList().get(0),
119+
includeDatabaseInTableId),
110120
buildSchemaFromTable(tableChange.getTable()));
111121
((DebeziumEventDeserializationSchema) debeziumDeserializationSchema)
112122
.applyChangeEvent(createTableEvent);
@@ -128,10 +138,8 @@ protected void processElement(
128138
} else if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
129139
TableId tableId = splitState.asSnapshotSplitState().toSourceSplit().getTableId();
130140
if (!alreadySendCreateTableTables.contains(tableId)) {
131-
try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) {
132-
sendCreateTableEvent(jdbc, tableId, (SourceOutput<Event>) output);
133-
alreadySendCreateTableTables.add(tableId);
134-
}
141+
sendCreateTableEvent(tableId, (SourceOutput<Event>) output);
142+
alreadySendCreateTableTables.add(tableId);
135143
}
136144
} else {
137145
boolean isDataChangeRecord = isDataChangeRecord(element);
@@ -189,30 +197,19 @@ private Schema buildSchemaFromTable(Table table) {
189197
return tableBuilder.build();
190198
}
191199

192-
private void sendCreateTableEvent(
193-
PostgresConnection jdbc, TableId tableId, SourceOutput<Event> output) {
194-
Schema schema = PostgresSchemaUtils.getTableSchema(tableId, sourceConfig, jdbc);
195-
output.collect(
196-
new CreateTableEvent(
197-
org.apache.flink.cdc.common.event.TableId.tableId(
198-
tableId.schema(), tableId.table()),
199-
schema));
200-
}
201-
202-
private org.apache.flink.cdc.common.event.TableId toCdcTableId(
203-
io.debezium.relational.TableId dbzTableId) {
204-
String schemaName =
205-
dbzTableId.catalog() == null ? dbzTableId.schema() : dbzTableId.catalog();
206-
return org.apache.flink.cdc.common.event.TableId.tableId(schemaName, dbzTableId.table());
200+
private void sendCreateTableEvent(TableId tableId, SourceOutput<Event> output) {
201+
output.collect(getCreateTableEvent(sourceConfig, tableId));
207202
}
208203

209204
private CreateTableEvent getCreateTableEvent(
210205
PostgresSourceConfig sourceConfig, TableId tableId) {
211206
try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) {
212207
Schema schema = PostgresSchemaUtils.getTableSchema(tableId, sourceConfig, jdbc);
213208
return new CreateTableEvent(
214-
org.apache.flink.cdc.common.event.TableId.tableId(
215-
tableId.schema(), tableId.table()),
209+
toCdcTableId(
210+
tableId,
211+
sourceConfig.getDatabaseList().get(0),
212+
includeDatabaseInTableId),
216213
schema);
217214
}
218215
}
@@ -244,8 +241,10 @@ private Map<TableId, CreateTableEvent> generateCreateTableEvent(
244241
createTableEventCache.put(
245242
tableId,
246243
new CreateTableEvent(
247-
org.apache.flink.cdc.common.event.TableId.tableId(
248-
tableId.schema(), tableId.table()),
244+
toCdcTableId(
245+
tableId,
246+
this.sourceConfig.getDatabaseList().get(0),
247+
includeDatabaseInTableId),
249248
schema));
250249
}
251250
return createTableEventCache;

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,12 +208,26 @@ public static Column toColumn(
208208

209209
public static io.debezium.relational.TableId toDbzTableId(TableId tableId) {
210210
return new io.debezium.relational.TableId(
211-
tableId.getSchemaName(), null, tableId.getTableName());
211+
tableId.getNamespace(), tableId.getSchemaName(), tableId.getTableName());
212212
}
213213

214214
public static org.apache.flink.cdc.common.event.TableId toCdcTableId(
215215
io.debezium.relational.TableId dbzTableId) {
216-
return org.apache.flink.cdc.common.event.TableId.tableId(
217-
dbzTableId.schema(), dbzTableId.table());
216+
return toCdcTableId(dbzTableId, null, false);
217+
}
218+
219+
public static org.apache.flink.cdc.common.event.TableId toCdcTableId(
220+
io.debezium.relational.TableId dbzTableId,
221+
String databaseName,
222+
boolean includeDatabaseInTableId) {
223+
String schema = dbzTableId.schema();
224+
String table = dbzTableId.table();
225+
if (includeDatabaseInTableId && databaseName != null) {
226+
return org.apache.flink.cdc.common.event.TableId.tableId(databaseName, schema, table);
227+
} else if (schema != null) {
228+
return org.apache.flink.cdc.common.event.TableId.tableId(schema, table);
229+
} else {
230+
return org.apache.flink.cdc.common.event.TableId.tableId(table);
231+
}
218232
}
219233
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@
5757
import org.junit.jupiter.api.BeforeEach;
5858
import org.junit.jupiter.api.Test;
5959
import org.junit.jupiter.params.ParameterizedTest;
60-
import org.junit.jupiter.params.provider.ValueSource;
60+
import org.junit.jupiter.params.provider.Arguments;
61+
import org.junit.jupiter.params.provider.MethodSource;
6162
import org.slf4j.Logger;
6263
import org.slf4j.LoggerFactory;
6364

@@ -74,6 +75,7 @@
7475
import java.util.Map;
7576
import java.util.UUID;
7677
import java.util.stream.Collectors;
78+
import java.util.stream.Stream;
7779

7880
import static org.assertj.core.api.Assertions.assertThat;
7981
import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
@@ -103,6 +105,14 @@ public void after() throws SQLException {
103105
inventoryDatabase.removeSlot(slotName);
104106
}
105107

108+
static Stream<Arguments> provideParameters() {
109+
return Stream.of(
110+
Arguments.of(true, true),
111+
Arguments.of(false, false),
112+
Arguments.of(true, false),
113+
Arguments.of(false, true));
114+
}
115+
106116
@Test
107117
public void testInitialStartupMode() throws Exception {
108118
inventoryDatabase.createAndInitialize();
@@ -341,9 +351,10 @@ private <T> CollectResultIterator<T> addCollector(
341351
return iterator;
342352
}
343353

344-
@ParameterizedTest(name = "unboundedChunkFirst: {0}")
345-
@ValueSource(booleans = {true, false})
346-
public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws Exception {
354+
@ParameterizedTest
355+
@MethodSource("provideParameters")
356+
public void testInitialStartupModeWithOpts(
357+
boolean unboundedChunkFirst, boolean isTableIdIncludeDatabase) throws Exception {
347358
inventoryDatabase.createAndInitialize();
348359
org.apache.flink.cdc.common.configuration.Configuration sourceConfiguration =
349360
new org.apache.flink.cdc.common.configuration.Configuration();
@@ -365,6 +376,8 @@ public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws E
365376
sourceConfiguration.set(
366377
PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED,
367378
unboundedChunkFirst);
379+
sourceConfiguration.set(
380+
PostgresDataSourceOptions.TABLE_ID_INCLUDE_DATABASE, isTableIdIncludeDatabase);
368381

369382
Factory.Context context =
370383
new FactoryHelper.DefaultContext(
@@ -384,7 +397,12 @@ public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws E
384397
new EventTypeInfo())
385398
.executeAndCollect();
386399

387-
TableId tableId = TableId.tableId("inventory", "products");
400+
TableId tableId;
401+
if (isTableIdIncludeDatabase) {
402+
tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), "inventory", "products");
403+
} else {
404+
tableId = TableId.tableId("inventory", "products");
405+
}
388406
CreateTableEvent createTableEvent = getProductsCreateTableEvent(tableId);
389407

390408
// generate snapshot data
@@ -582,16 +600,6 @@ private static <T> List<T> fetchResultsExcept(Iterator<T> iter, int size, T side
582600
return result;
583601
}
584602

585-
// Helper method to create a temporary directory for savepoint
586-
private Path createTempSavepointDir() throws Exception {
587-
return Files.createTempDirectory("postgres-savepoint");
588-
}
589-
590-
// Helper method to execute the job and create a savepoint
591-
private String createSavepoint(JobClient jobClient, Path savepointDir) throws Exception {
592-
return jobClient.stopWithSavepoint(true, savepointDir.toAbsolutePath().toString()).get();
593-
}
594-
595603
private List<Event> getSnapshotExpected(TableId tableId) {
596604
RowType rowType =
597605
RowType.of(

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,12 @@ public PostgresSourceBuilder<T> includePartitionedTables(boolean includePartitio
305305
return this;
306306
}
307307

308+
/** Whether to include database in the generated Table ID. */
309+
public PostgresSourceBuilder<T> includeDatabaseInTableId(boolean includeDatabaseInTableId) {
310+
this.configFactory.setIncludeDatabaseInTableId(includeDatabaseInTableId);
311+
return this;
312+
}
313+
308314
/**
309315
* Build the {@link PostgresIncrementalSource}.
310316
*

0 commit comments

Comments
 (0)