Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Postgres CDC Pipeline 连接器允许从 Postgres 数据库读取快照数据和

## 示例

从 Postgres 读取数据同步到 Doris 的 Pipeline 可以定义如下:
从 Postgres 读取数据同步到 Fluss 的 Pipeline 可以定义如下:

```yaml
source:
Expand All @@ -41,19 +41,23 @@ source:
port: 5432
username: admin
password: pass
tables: adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, [app|web].schema_\.*.order_\.*
# 需要确保所有的表来自同一个database
tables: adb.\.*.\.*
decoding.plugin.name: pgoutput
slot.name: pgtest

sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass
type: fluss
name: Fluss Sink
bootstrap.servers: localhost:9123
# Security-related properties for the Fluss client
properties.client.security.protocol: sasl
properties.client.security.sasl.mechanism: PLAIN
properties.client.security.sasl.username: developer
properties.client.security.sasl.password: developer-pass

pipeline:
name: Postgres to Doris Pipeline
name: Postgres to Fluss Pipeline
parallelism: 4
```

Expand Down Expand Up @@ -105,8 +109,9 @@ pipeline:
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>需要监视的 Postgres 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。<br>
需要注意的是,点号(.)被视为数据库和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。<br>
例如,adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, [app|web].schema_\.*.order_\.*</td>
需要确保所有的表来自同一个数据库。<br>
需要注意的是,点号(.)被视为数据库、模式和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。<br>
例如,bdb.user_schema_[0-9].user_table_[0-9]+, bdb.schema_\.*.order_\.*</td>
</tr>
<tr>
<td>slot.name</td>
Expand Down
27 changes: 16 additions & 11 deletions docs/content/docs/connectors/pipeline-connectors/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,33 @@ Note: Since the Postgres WAL log cannot parse table structure change records, Po

## Example

An example of the pipeline for reading data from Postgres and sink to Doris can be defined as follows:
An example of the pipeline for reading data from Postgres and sink to Fluss can be defined as follows:

```yaml
source:
type: posgtres
type: postgres
name: Postgres Source
hostname: 127.0.0.1
port: 5432
username: admin
password: pass
tables: adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, [app|web].schema_\.*.order_\.*
# make sure all the tables share same database.
tables: adb.\.*.\.*
decoding.plugin.name: pgoutput
slot.name: pgtest

sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass
type: fluss
name: Fluss Sink
bootstrap.servers: localhost:9123
# Security-related properties for the Fluss client
properties.client.security.protocol: sasl
properties.client.security.sasl.mechanism: PLAIN
properties.client.security.sasl.username: developer
properties.client.security.sasl.password: developer-pass

pipeline:
name: Postgres to Doris Pipeline
name: Postgres to Fluss Pipeline
parallelism: 4
```

Expand Down Expand Up @@ -106,9 +110,10 @@ pipeline:
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Table name of the Postgres database to monitor. The table-name also supports regular expressions to monitor multiple tables that satisfy the regular expressions. <br>
It is important to note that the dot (.) is treated as a delimiter for database and table names.
All the tables are required to share same database. <br>
It is important to note that the dot (.) is treated as a delimiter for database, schema and table names.
If there is a need to use a dot (.) in a regular expression to match any character, it is necessary to escape the dot with a backslash.<br>
例如,adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, [app|web].schema_\.*.order_\.*</td>
for example: bdb.user_schema_[0-9].user_table_[0-9]+, bdb.schema_\.*.order_\.*</td>
</tr>
<tr>
<td>slot.name</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.IncrementalSource;
import org.apache.flink.table.catalog.ObjectPath;

import io.debezium.config.Configuration;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalTableFilters;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
Expand All @@ -46,7 +48,7 @@ public abstract class JdbcSourceConfig extends BaseSourceConfig {
protected final Duration connectTimeout;
protected final int connectMaxRetries;
protected final int connectionPoolSize;
protected final String chunkKeyColumn;
protected final Map<ObjectPath, String> chunkKeyColumns;

public JdbcSourceConfig(
StartupOptions startupOptions,
Expand All @@ -71,7 +73,7 @@ public JdbcSourceConfig(
Duration connectTimeout,
int connectMaxRetries,
int connectionPoolSize,
String chunkKeyColumn,
Map<ObjectPath, String> chunkKeyColumns,
boolean skipSnapshotBackfill,
boolean isScanNewlyAddedTableEnabled,
boolean assignUnboundedChunkFirst) {
Expand Down Expand Up @@ -101,7 +103,7 @@ public JdbcSourceConfig(
this.connectTimeout = connectTimeout;
this.connectMaxRetries = connectMaxRetries;
this.connectionPoolSize = connectionPoolSize;
this.chunkKeyColumn = chunkKeyColumn;
this.chunkKeyColumns = chunkKeyColumns;
}

public abstract RelationalDatabaseConnectorConfig getDbzConnectorConfig();
Expand Down Expand Up @@ -154,8 +156,8 @@ public int getConnectionPoolSize() {
return connectionPoolSize;
}

public String getChunkKeyColumn() {
return chunkKeyColumn;
public Map<ObjectPath, String> getChunkKeyColumns() {
return chunkKeyColumns;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions;
import org.apache.flink.cdc.connectors.base.options.SourceOptions;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.table.catalog.ObjectPath;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/** A {@link Factory} to provide {@link SourceConfig} of JDBC data source. */
Expand Down Expand Up @@ -55,7 +58,7 @@ public abstract class JdbcSourceConfigFactory implements Factory<JdbcSourceConfi
protected int connectMaxRetries = JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue();
protected int connectionPoolSize = JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue();
protected Properties dbzProperties;
protected String chunkKeyColumn;
protected Map<ObjectPath, String> chunkKeyColumns = new HashMap<>();
protected boolean skipSnapshotBackfill =
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue();
protected boolean scanNewlyAddedTableEnabled =
Expand Down Expand Up @@ -198,8 +201,17 @@ public JdbcSourceConfigFactory debeziumProperties(Properties properties) {
* The chunk key of table snapshot, captured tables are split into multiple chunks by the chunk
* key column when read the snapshot of table.
*/
public JdbcSourceConfigFactory chunkKeyColumn(String chunkKeyColumn) {
this.chunkKeyColumn = chunkKeyColumn;
public JdbcSourceConfigFactory chunkKeyColumn(ObjectPath objectPath, String chunkKeyColumn) {
this.chunkKeyColumns.put(objectPath, chunkKeyColumn);
return this;
}

/**
* The chunk key of table snapshot, captured tables are split into multiple chunks by the chunk
* key column when read the snapshot of table.
*/
public JdbcSourceConfigFactory chunkKeyColumn(Map<ObjectPath, String> chunkKeyColumns) {
this.chunkKeyColumns.putAll(chunkKeyColumns);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils;
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -253,11 +254,13 @@ protected double calculateDistributionFactor(
* Get the column which is seen as chunk key.
*
* @param table table identity.
* @param chunkKeyColumn column name which is seen as chunk key, if chunkKeyColumn is null, use
* primary key instead. @Column the column which is seen as chunk key.
* @param chunkKeyColumns column name which is seen as chunk key, if chunkKeyColumns is null,
* use primary key instead.
* @return the column which is seen as chunk key.
*/
protected Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) {
return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumn);
protected Column getSplitColumn(
Table table, @Nullable Map<ObjectPath, String> chunkKeyColumns) {
return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumns);
}

/** ChunkEnd less than or equal to max. */
Expand Down Expand Up @@ -360,7 +363,7 @@ private void analyzeTable(TableId tableId) {
try {
currentSchema = dialect.queryTableSchema(jdbcConnection, tableId);
currentSplittingTable = Objects.requireNonNull(currentSchema).getTable();
splitColumn = getSplitColumn(currentSplittingTable, sourceConfig.getChunkKeyColumn());
splitColumn = getSplitColumn(currentSplittingTable, sourceConfig.getChunkKeyColumns());
splitType = getSplitType(splitColumn);
minMaxOfSplitColumn = queryMinMax(jdbcConnection, tableId, splitColumn);
approximateRowCnt = queryApproximateRowCnt(jdbcConnection, tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import io.debezium.util.SchemaNameAdjuster;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.Collection;
Expand All @@ -47,7 +49,7 @@
/** The context for fetch task that fetching data of snapshot split from JDBC data source. */
@Internal
public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context {

private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceFetchTaskContext.class);
protected final JdbcSourceConfig sourceConfig;
protected final JdbcDataSourceDialect dataSourceDialect;
protected CommonConnectorConfig dbzConnectorConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@
package org.apache.flink.cdc.connectors.base.source.utils;

import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectPath;

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

Expand All @@ -35,6 +41,8 @@
/** Utilities to split chunks of table. */
public class JdbcChunkUtils {

private static final Logger LOG = LoggerFactory.getLogger(JdbcChunkUtils.class);

/**
* Query the maximum and minimum value of the column in the table. e.g. query string <code>
* SELECT MIN(%s) FROM %s WHERE %s > ?</code>
Expand Down Expand Up @@ -100,15 +108,55 @@ public static Object queryMin(
});
}

// Write createTableFilter method here to avoid the dependency on DebeziumUtils
private static Tables.TableFilter createTableFilter(String schemaName, String tableName) {
return new Tables.TableFilter() {
@Override
public boolean isIncluded(TableId tableId) {
final String catalog = tableId.catalog();
final String schema = tableId.schema();
final String table = tableId.table();

if (schemaName != null && !schemaName.equalsIgnoreCase(schema)) {
return false;
}

if (tableName != null && !tableName.equalsIgnoreCase(table)) {
return false;
}

return true;
}
};
}

@Nullable
private static String findChunkKeyColumn(
TableId tableId, Map<ObjectPath, String> chunkKeyColumns) {
String schemaName = tableId.schema();

for (ObjectPath table : chunkKeyColumns.keySet()) {
Tables.TableFilter filter = createTableFilter(schemaName, table.getObjectName());
if (filter.isIncluded(tableId)) {
String chunkKeyColumn = chunkKeyColumns.get(table);
return chunkKeyColumn;
}
}

return null;
}

/**
* Get the column which is seen as chunk key.
*
* @param table table identity.
* @param chunkKeyColumn column name which is seen as chunk key, if chunkKeyColumn is null, use
* @param chunkKeyColumns column name which is seen as chunk key, if chunkKeyColumn is null, use
* primary key instead. @Column the column which is seen as chunk key.
*/
public static Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) {
public static Column getSplitColumn(
Table table, @Nullable Map<ObjectPath, String> chunkKeyColumns) {
List<Column> primaryKeys = table.primaryKeyColumns();
String chunkKeyColumn = findChunkKeyColumn(table.id(), chunkKeyColumns);
if (primaryKeys.isEmpty() && chunkKeyColumn == null) {
throw new ValidationException(
"To use incremental snapshot, 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.");
Expand Down
Loading