Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 82 additions & 2 deletions docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,33 @@ pipeline:
<td>String</td>
<td>连接 Postgres 数据库服务器时使用的密码。</td>
</tr>
<tr>
<td>database</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>
连接的 Postgres 数据库名称。设置该项后,<code>tables</code> 与 <code>partition.tables</code> 中的表模式可以省略数据库前缀(例如使用 <code>public.my_table</code> 而不是 <code>db.public.my_table</code>)。<br>
若同时在 <code>database</code> 与 <code>tables</code> 中提供了数据库名称,则两者必须一致。
</td>
</tr>
<tr>
<td>schema</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>
默认 Schema 名称。设置该项后,<code>tables</code> 与 <code>partition.tables</code> 中未显式带 schema 的项将自动补全为 <code>schema.table</code>(例如 <code>orders</code> 将补全为 <code>public.orders</code>)。<br>
若在 <code>tables</code> 中显式写了其他 schema,将按显式值匹配,不受该选项影响。
</td>
</tr>
<tr>
<td>tables</td>
<td>required</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>需要监视的 Postgres 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。<br>
<b>注意:'tables' 和 'partition.tables' 至少需要配置一个。</b><br>
需要确保所有的表来自同一个数据库。<br>
需要注意的是,点号(.)被视为数据库、模式和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。<br>
例如,bdb.user_schema_[0-9].user_table_[0-9]+, bdb.schema_\.*.order_\.*</td>
Expand Down Expand Up @@ -282,10 +303,69 @@ pipeline:
默认值为 false。
</td>
</tr>
<tr>
<td>scan.include-partitioned-tables.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
启用 PostgreSQL 分区表支持。<br>
<b>PostgreSQL 11+:</b> 单独使用此选项,配合 PUBLICATION 中的 <code>publish_via_partition_root=true</code>,无需配置 <code>partition.tables</code>。<br>
<b>PostgreSQL 10:</b> 必须与 <code>partition.tables</code> 一起使用来定义父子表映射。
</td>
</tr>
<tr>
<td>partition.tables</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>
<b>PostgreSQL 10 分区表必需。</b> 使用冒号格式定义父子表映射:<code>parent:child_regex</code>。<br>
格式:<code>schema.parent:schema.child_regex</code>(例如:<code>public.orders:public.orders_\d{6}</code>)。<br>
配置此选项后,<code>scan.include-partitioned-tables.enabled</code> 会自动启用。<br>
子分区表的事件将自动路由到父表,减少 CreateTableEvent 请求数量。
</td>
</tr>
</tbody>
</table>
</div>

## 分区表支持

### PostgreSQL 11+(推荐)

使用 PUBLICATION 中的 `publish_via_partition_root=true`,只需配置 `scan.include-partitioned-tables.enabled`:

```yaml
source:
type: postgres
tables: db.public.orders
scan.include-partitioned-tables.enabled: true
```

### PostgreSQL 10

需要配置 `partition.tables` 来定义父子表映射:

```yaml
source:
type: postgres
tables: db.public.orders
partition.tables: public.orders:public.orders_\d{6}
```

### partition.tables 格式

使用冒号格式:`parent:child_regex`

- 两段式:`schema.parent:schema.child_regex`
- 仅表名:`parent:child_regex`(运行时继承子表的 schema)

```text
public.orders:public.orders_\d{6} # 父表=public.orders,子表匹配 orders_YYYYMM
orders:orders_\d{6} # 仅表名,schema 在运行时继承
```

注意:
1. 配置选项`tables`指定 Postgres CDC 需要采集的表,格式为`db.schema1.tabe1,db.schema2.table2`,其中所有的db需要为同一个db,这是因为postgres链接url中需要指定dbname,目前cdc只支持链接一个db。

Expand Down Expand Up @@ -728,4 +808,4 @@ PostgreSQL 通过 PostGIS 扩展支持空间数据类型:
</table>
</div>

{{< top >}}
{{< top >}}
84 changes: 82 additions & 2 deletions docs/content/docs/connectors/pipeline-connectors/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,33 @@ pipeline:
<td>String</td>
<td>Password to use when connecting to the Postgres database server.</td>
</tr>
<tr>
<td>database</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>
Name of the PostgreSQL database to connect to. When set, you may omit the database prefix in <code>tables</code> and <code>partition.tables</code> (e.g., use <code>public.my_table</code> instead of <code>db.public.my_table</code>).<br>
If both this option and a database prefix in <code>tables</code> are provided, they must be consistent.
</td>
</tr>
<tr>
<td>schema</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>
Default schema name. When set, entries in <code>tables</code> and <code>partition.tables</code> that omit schema will be auto-qualified as <code>schema.table</code> (e.g., <code>orders</code> becomes <code>public.orders</code>).<br>
Explicit schema in patterns, if present, takes precedence and is left unchanged.
</td>
</tr>
<tr>
<td>tables</td>
<td>required</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Table name of the Postgres database to monitor. The table-name also supports regular expressions to monitor multiple tables that satisfy the regular expressions. <br>
<b>Note: At least one of 'tables' or 'partition.tables' must be configured.</b><br>
All the tables are required to share same database. <br>
It is important to note that the dot (.) is treated as a delimiter for database, schema and table names.
If there is a need to use a dot (.) in a regular expression to match any character, it is necessary to escape the dot with a backslash.<br>
Expand Down Expand Up @@ -274,10 +295,69 @@ pipeline:
Defaults to false.
</td>
</tr>
<tr>
<td>scan.include-partitioned-tables.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Enable partition table support for PostgreSQL.<br>
<b>For PostgreSQL 11+:</b> Use this option alone with <code>publish_via_partition_root=true</code> in PUBLICATION. No need to configure <code>partition.tables</code>.<br>
<b>For PostgreSQL 10:</b> Must be used together with <code>partition.tables</code> to define parent-child mappings.
</td>
</tr>
<tr>
<td>partition.tables</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>
<b>Required for PostgreSQL 10 partition tables.</b> Defines parent-child table mappings using colon format: <code>parent:child_regex</code>.<br>
Format: <code>schema.parent:schema.child_regex</code> (e.g., <code>public.orders:public.orders_\d{6}</code>).<br>
When configured, <code>scan.include-partitioned-tables.enabled</code> is automatically enabled.<br>
Child partition events will be routed to their parent table, reducing CreateTableEvent requests.
</td>
</tr>
</tbody>
</table>
</div>

## Partition Table Support

### PostgreSQL 11+ (Recommended)

Use `publish_via_partition_root=true` in PUBLICATION. Only need `scan.include-partitioned-tables.enabled`:

```yaml
source:
type: postgres
tables: db.public.orders
scan.include-partitioned-tables.enabled: true
```

### PostgreSQL 10

Requires `partition.tables` to define parent-child mappings:

```yaml
source:
type: postgres
tables: db.public.orders
partition.tables: public.orders:public.orders_\d{6}
```

### partition.tables Format

Use colon format: `parent:child_regex`

- Two segments: `schema.parent:schema.child_regex`
- Table-only: `parent:child_regex` (schema inherited at runtime)

```text
public.orders:public.orders_\d{6} # parent=public.orders, child matches orders_YYYYMM
orders:orders_\d{6} # table-only, schema inherited at runtime
```


Note:
1. The configuration option tables specifies the tables to be captured by Postgres CDC, in the format db.schema1.table1,db.schema2.table2. All db values must be the same, as the PostgreSQL connection URL requires a single database name. Currently, CDC only supports connecting to one database.
Expand Down Expand Up @@ -723,4 +803,4 @@ The former is used for small-area planar data, while the latter is used for larg
</table>
</div>

{{< top >}}
{{< top >}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.utils;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.regex.Pattern;

/**
* LRU cache for compiled {@link Pattern} instances.
*
* <p>It avoids repeated {@link Pattern#compile(String)} calls for the same regex.
*
* <p>Example:
*
* <pre>{@code
* Pattern pattern = PatternCache.getPattern("aia_t_icc_jjdb_\\d{6}");
* Matcher matcher = pattern.matcher(tableName);
* }</pre>
*/
public class PatternCache {

/** Maximum number of cached patterns. */
private static final int MAX_CACHE_SIZE = 100;

/** Pattern cache with LRU eviction. */
private static final Map<String, Pattern> CACHE =
new LinkedHashMap<String, Pattern>(16, 0.75f, true) {
@Override
protected boolean removeEldestEntry(Map.Entry<String, Pattern> eldest) {
return size() > MAX_CACHE_SIZE;
}
};

/**
* Returns a cached {@link Pattern} or compiles and caches it.
*
* @param regex regular expression
* @return compiled {@link Pattern}
*/
public static synchronized Pattern getPattern(String regex) {
return CACHE.computeIfAbsent(regex, Pattern::compile);
}

/** Clears the cache (mainly for tests). */
public static synchronized void clear() {
CACHE.clear();
}

/** Returns the current cache size. */
public static synchronized int size() {
return CACHE.size();
}
}
Loading