Skip to content

Commit 3488aa2

Browse files
committed
[FLINK-XXXXX][postgres] Support partition table routing for PostgreSQL CDC
This PR adds partition.tables configuration to support PostgreSQL partition table routing, significantly improving performance for databases with many partitions. Performance Improvements: - Fix frequent schema refresh on every table access - Fix full schema load when requesting single table (now fetches only requested) - Consolidate child partition events to parent table, eliminating CreateTableEvent storms - Cache parent table schema to reduce DB queries and memory pressure Configuration format: parent:child_regex (e.g., public.orders:public\.orders_\d{6})
1 parent 91a7af1 commit 3488aa2

File tree

45 files changed

+4491
-219
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+4491
-219
lines changed

docs/content.zh/docs/connectors/pipeline-connectors/postgres.md

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,33 @@ pipeline:
103103
<td>String</td>
104104
<td>连接 Postgres 数据库服务器时使用的密码。</td>
105105
</tr>
106+
<tr>
107+
<td>database</td>
108+
<td>optional</td>
109+
<td style="word-wrap: break-word;">(none)</td>
110+
<td>String</td>
111+
<td>
112+
连接的 Postgres 数据库名称。设置该项后,<code>tables</code> 与 <code>partition.tables</code> 中的表模式可以省略数据库前缀(例如使用 <code>public.my_table</code> 而不是 <code>db.public.my_table</code>)。<br>
113+
若同时在 <code>database</code> 与 <code>tables</code> 中提供了数据库名称,则两者必须一致。
114+
</td>
115+
</tr>
116+
<tr>
117+
<td>schema</td>
118+
<td>optional</td>
119+
<td style="word-wrap: break-word;">(none)</td>
120+
<td>String</td>
121+
<td>
122+
默认 Schema 名称。设置该项后,<code>tables</code> 与 <code>partition.tables</code> 中未显式带 schema 的项将自动补全为 <code>schema.table</code>(例如 <code>orders</code> 将补全为 <code>public.orders</code>)。<br>
123+
若在 <code>tables</code> 中显式写了其他 schema,将按显式值匹配,不受该选项影响。
124+
</td>
125+
</tr>
106126
<tr>
107127
<td>tables</td>
108-
<td>required</td>
128+
<td>optional</td>
109129
<td style="word-wrap: break-word;">(none)</td>
110130
<td>String</td>
111131
<td>需要监视的 Postgres 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。<br>
132+
<b>注意:'tables' 和 'partition.tables' 至少需要配置一个。</b><br>
112133
需要确保所有的表来自同一个数据库。<br>
113134
需要注意的是,点号(.)被视为数据库、模式和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。<br>
114135
例如,bdb.user_schema_[0-9].user_table_[0-9]+, bdb.schema_\.*.order_\.*</td>
@@ -282,10 +303,69 @@ pipeline:
282303
默认值为 false。
283304
</td>
284305
</tr>
306+
<tr>
307+
<td>scan.include-partitioned-tables.enabled</td>
308+
<td>optional</td>
309+
<td style="word-wrap: break-word;">false</td>
310+
<td>Boolean</td>
311+
<td>
312+
启用 PostgreSQL 分区表支持。<br>
313+
<b>PostgreSQL 11+:</b> 单独使用此选项,配合 PUBLICATION 中的 <code>publish_via_partition_root=true</code>,无需配置 <code>partition.tables</code>。<br>
314+
<b>PostgreSQL 10:</b> 必须与 <code>partition.tables</code> 一起使用来定义父子表映射。
315+
</td>
316+
</tr>
317+
<tr>
318+
<td>partition.tables</td>
319+
<td>optional</td>
320+
<td style="word-wrap: break-word;">(none)</td>
321+
<td>String</td>
322+
<td>
323+
<b>PostgreSQL 10 分区表必需。</b> 使用冒号格式定义父子表映射:<code>parent:child_regex</code>。<br>
324+
格式:<code>schema.parent:schema.child_regex</code>(例如:<code>public.orders:public.orders_\d{6}</code>)。<br>
325+
配置此选项后,<code>scan.include-partitioned-tables.enabled</code> 会自动启用。<br>
326+
子分区表的事件将自动路由到父表,减少 CreateTableEvent 请求数量。
327+
</td>
328+
</tr>
285329
</tbody>
286330
</table>
287331
</div>
288332
333+
## 分区表支持
334+
335+
### PostgreSQL 11+(推荐)
336+
337+
使用 PUBLICATION 中的 `publish_via_partition_root=true`,只需配置 `scan.include-partitioned-tables.enabled`:
338+
339+
```yaml
340+
source:
341+
type: postgres
342+
tables: db.public.orders
343+
scan.include-partitioned-tables.enabled: true
344+
```
345+
346+
### PostgreSQL 10
347+
348+
需要配置 `partition.tables` 来定义父子表映射:
349+
350+
```yaml
351+
source:
352+
type: postgres
353+
tables: db.public.orders
354+
partition.tables: public.orders:public.orders_\d{6}
355+
```
356+
357+
### partition.tables 格式
358+
359+
使用冒号格式:`parent:child_regex`
360+
361+
- 两段式:`schema.parent:schema.child_regex`
362+
- 仅表名:`parent:child_regex`(运行时继承子表的 schema)
363+
364+
```text
365+
public.orders:public.orders_\d{6} # 父表=public.orders,子表匹配 orders_YYYYMM
366+
orders:orders_\d{6} # 仅表名,schema 在运行时继承
367+
```
368+
289369
注意:
290370
1. 配置选项`tables`指定 Postgres CDC 需要采集的表,格式为`db.schema1.tabe1,db.schema2.table2`,其中所有的db需要为同一个db,这是因为postgres链接url中需要指定dbname,目前cdc只支持链接一个db。
291371

@@ -728,4 +808,4 @@ PostgreSQL 通过 PostGIS 扩展支持空间数据类型:
728808
</table>
729809
</div>
730810
731-
{{< top >}}
811+
{{< top >}}

docs/content/docs/connectors/pipeline-connectors/postgres.md

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,33 @@ pipeline:
104104
<td>String</td>
105105
<td>Password to use when connecting to the Postgres database server.</td>
106106
</tr>
107+
<tr>
108+
<td>database</td>
109+
<td>optional</td>
110+
<td style="word-wrap: break-word;">(none)</td>
111+
<td>String</td>
112+
<td>
113+
Name of the PostgreSQL database to connect to. When set, you may omit the database prefix in <code>tables</code> and <code>partition.tables</code> (e.g., use <code>public.my_table</code> instead of <code>db.public.my_table</code>).<br>
114+
If both this option and a database prefix in <code>tables</code> are provided, they must be consistent.
115+
</td>
116+
</tr>
117+
<tr>
118+
<td>schema</td>
119+
<td>optional</td>
120+
<td style="word-wrap: break-word;">(none)</td>
121+
<td>String</td>
122+
<td>
123+
Default schema name. When set, entries in <code>tables</code> and <code>partition.tables</code> that omit schema will be auto-qualified as <code>schema.table</code> (e.g., <code>orders</code> becomes <code>public.orders</code>).<br>
124+
Explicit schema in patterns, if present, takes precedence and is left unchanged.
125+
</td>
126+
</tr>
107127
<tr>
108128
<td>tables</td>
109-
<td>required</td>
129+
<td>optional</td>
110130
<td style="word-wrap: break-word;">(none)</td>
111131
<td>String</td>
112132
<td>Table name of the Postgres database to monitor. The table-name also supports regular expressions to monitor multiple tables that satisfy the regular expressions. <br>
133+
<b>Note: At least one of 'tables' or 'partition.tables' must be configured.</b><br>
113134
All the tables are required to share same database. <br>
114135
It is important to note that the dot (.) is treated as a delimiter for database, schema and table names.
115136
If there is a need to use a dot (.) in a regular expression to match any character, it is necessary to escape the dot with a backslash.<br>
@@ -274,10 +295,69 @@ pipeline:
274295
Defaults to false.
275296
</td>
276297
</tr>
298+
<tr>
299+
<td>scan.include-partitioned-tables.enabled</td>
300+
<td>optional</td>
301+
<td style="word-wrap: break-word;">false</td>
302+
<td>Boolean</td>
303+
<td>
304+
Enable partition table support for PostgreSQL.<br>
305+
<b>For PostgreSQL 11+:</b> Use this option alone with <code>publish_via_partition_root=true</code> in PUBLICATION. No need to configure <code>partition.tables</code>.<br>
306+
<b>For PostgreSQL 10:</b> Must be used together with <code>partition.tables</code> to define parent-child mappings.
307+
</td>
308+
</tr>
309+
<tr>
310+
<td>partition.tables</td>
311+
<td>optional</td>
312+
<td style="word-wrap: break-word;">(none)</td>
313+
<td>String</td>
314+
<td>
315+
<b>Required for PostgreSQL 10 partition tables.</b> Defines parent-child table mappings using colon format: <code>parent:child_regex</code>.<br>
316+
Format: <code>schema.parent:schema.child_regex</code> (e.g., <code>public.orders:public.orders_\d{6}</code>).<br>
317+
When configured, <code>scan.include-partitioned-tables.enabled</code> is automatically enabled.<br>
318+
Child partition events will be routed to their parent table, reducing CreateTableEvent requests.
319+
</td>
320+
</tr>
277321
</tbody>
278322
</table>
279323
</div>
280324

325+
## Partition Table Support
326+
327+
### PostgreSQL 11+ (Recommended)
328+
329+
Use `publish_via_partition_root=true` in PUBLICATION. Only need `scan.include-partitioned-tables.enabled`:
330+
331+
```yaml
332+
source:
333+
type: postgres
334+
tables: db.public.orders
335+
scan.include-partitioned-tables.enabled: true
336+
```
337+
338+
### PostgreSQL 10
339+
340+
Requires `partition.tables` to define parent-child mappings:
341+
342+
```yaml
343+
source:
344+
type: postgres
345+
tables: db.public.orders
346+
partition.tables: public.orders:public.orders_\d{6}
347+
```
348+
349+
### partition.tables Format
350+
351+
Use colon format: `parent:child_regex`
352+
353+
- Two segments: `schema.parent:schema.child_regex`
354+
- Table-only: `parent:child_regex` (schema inherited at runtime)
355+
356+
```text
357+
public.orders:public.orders_\d{6} # parent=public.orders, child matches orders_YYYYMM
358+
orders:orders_\d{6} # table-only, schema inherited at runtime
359+
```
360+
281361

282362
Note:
283363
1. The configuration option tables specifies the tables to be captured by Postgres CDC, in the format db.schema1.table1,db.schema2.table2. All db values must be the same, as the PostgreSQL connection URL requires a single database name. Currently, CDC only supports connecting to one database.
@@ -723,4 +803,4 @@ The former is used for small-area planar data, while the latter is used for larg
723803
</table>
724804
</div>
725805

726-
{{< top >}}
806+
{{< top >}}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.utils;
19+
20+
import java.util.LinkedHashMap;
21+
import java.util.Map;
22+
import java.util.regex.Pattern;
23+
24+
/**
25+
* LRU cache for compiled {@link Pattern} instances.
26+
*
27+
* <p>It avoids repeated {@link Pattern#compile(String)} calls for the same regex.
28+
*
29+
* <p>Example:
30+
*
31+
* <pre>{@code
32+
* Pattern pattern = PatternCache.getPattern("aia_t_icc_jjdb_\\d{6}");
33+
* Matcher matcher = pattern.matcher(tableName);
34+
* }</pre>
35+
*/
36+
public class PatternCache {
37+
38+
/** Maximum number of cached patterns. */
39+
private static final int MAX_CACHE_SIZE = 100;
40+
41+
/** Pattern cache with LRU eviction. */
42+
private static final Map<String, Pattern> CACHE =
43+
new LinkedHashMap<String, Pattern>(16, 0.75f, true) {
44+
@Override
45+
protected boolean removeEldestEntry(Map.Entry<String, Pattern> eldest) {
46+
return size() > MAX_CACHE_SIZE;
47+
}
48+
};
49+
50+
/**
51+
* Returns a cached {@link Pattern} or compiles and caches it.
52+
*
53+
* @param regex regular expression
54+
* @return compiled {@link Pattern}
55+
*/
56+
public static synchronized Pattern getPattern(String regex) {
57+
return CACHE.computeIfAbsent(regex, Pattern::compile);
58+
}
59+
60+
/** Clears the cache (mainly for tests). */
61+
public static synchronized void clear() {
62+
CACHE.clear();
63+
}
64+
65+
/** Returns the current cache size. */
66+
public static synchronized int size() {
67+
return CACHE.size();
68+
}
69+
}

0 commit comments

Comments
 (0)