Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 74 additions & 2 deletions docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,10 @@ pipeline:
<tr>
<td>metadata.list</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>
源记录中可读取的元数据列表,将传递给下游并在转换模块中使用,各字段以逗号分隔。可用的可读元数据包括:op_ts。
源记录中可读取的元数据列表,将传递给下游并在转换模块中使用,各字段以逗号分隔。可用的可读元数据包括:op_ts、table_name、database_name、schema_name。详见<a href="#支持的元数据列">支持的元数据列</a>
</td>
</tr>
<tr>
Expand Down Expand Up @@ -304,6 +304,78 @@ pipeline:
注意:
1. Group 名称是 `namespace.schema.table`,这里的 `namespace` 是实际的数据库名称, `schema` 是实际的 schema 名称, `table` 是实际的表名称。

## 支持的元数据列

PostgreSQL CDC 连接器支持从源记录中读取元数据列。这些元数据列可以在转换操作中使用或传递给下游 Sink。

**注意:** 部分元数据信息也可以通过 Transform 表达式获取(例如 `__namespace_name__`、`__schema_name__`、`__table_name__`)。主要区别如下:
- **`op_ts`**:仅可通过 `metadata.list` 获取 - 提供数据库中实际的操作时间戳。
- **`table_name`、`database_name`、`schema_name`**:可通过 `metadata.list` 或 Transform 表达式获取。使用 `metadata.list` 可以直接将这些值传递给下游 Sink,无需编写转换规则,对于基本用例更加简单。

要启用元数据列,请使用逗号分隔的元数据列名称列表配置 `metadata.list` 选项:

```yaml
source:
type: postgres
# ... 其他配置
metadata.list: op_ts,table_name,database_name,schema_name
```

支持以下元数据列:

<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width: 20%">元数据列</th>
<th class="text-left" style="width: 15%">数据类型</th>
<th class="text-left" style="width: 65%">描述</th>
</tr>
</thead>
<tbody>
<tr>
<td>op_ts</td>
<td>BIGINT NOT NULL</td>
<td>数据变更事件在数据库中发生的时间戳(自纪元以来的毫秒数)。对于快照记录,此值为 0。</td>
</tr>
<tr>
<td>table_name</td>
<td>STRING NOT NULL</td>
<td>包含变更行的表名称。替代方案:在 Transform 表达式中使用 <code>__table_name__</code>。</td>
</tr>
<tr>
<td>database_name</td>
<td>STRING NOT NULL</td>
<td>包含变更行的数据库名称。替代方案:在 Transform 表达式中使用 <code>__namespace_name__</code>。</td>
</tr>
<tr>
<td>schema_name</td>
<td>STRING NOT NULL</td>
<td>包含变更行的 Schema 名称。这是 PostgreSQL 特有的。替代方案:在 Transform 表达式中使用 <code>__schema_name__</code>。</td>
</tr>
</tbody>
</table>
</div>

**使用示例:**

```yaml
source:
type: postgres
hostname: localhost
port: 5432
username: postgres
password: postgres
tables: mydb.public.orders
slot.name: flink_slot
metadata.list: op_ts,table_name,schema_name

transform:
- source-table: mydb.public.orders
projection: order_id, customer_id, op_ts, table_name, schema_name
description: 在输出中包含元数据列
```

## 数据类型映射

<div class="wy-table-responsive">
Expand Down
76 changes: 74 additions & 2 deletions docs/content/docs/connectors/pipeline-connectors/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,10 @@ pipeline:
<tr>
<td>metadata.list</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>
List of readable metadata from SourceRecord to be passed to downstream and could be used in transform module, split by `,`. Available readable metadata are: op_ts.
List of readable metadata from SourceRecord to be passed to downstream and could be used in transform module, split by `,`. Available readable metadata are: op_ts, table_name, database_name, schema_name. See <a href="#supported-metadata-columns">Supported Metadata Columns</a> for more details.
</td>
</tr>
<tr>
Expand Down Expand Up @@ -299,6 +299,78 @@ Metrics can help understand the progress of assignments, and the following are t
Notice:
1. The group name is `namespace.schema.table`, where `namespace` is the actual database name, `schema` is the actual schema name, and `table` is the actual table name.

## Supported Metadata Columns

PostgreSQL CDC connector supports reading metadata columns from source records. These metadata columns can be used in transform operations or passed to downstream sinks.

**Note:** Some metadata information is also available through Transform expressions (e.g., `__namespace_name__`, `__schema_name__`, `__table_name__`). The key differences are:
- **`op_ts`**: Only available via `metadata.list` - provides the actual operation timestamp from the database.
- **`table_name`, `database_name`, `schema_name`**: Can be obtained via either `metadata.list` or Transform expressions. Using `metadata.list` allows you to pass these values directly to downstream sinks without writing transform rules, which is simpler for basic use cases.

To enable metadata columns, configure the `metadata.list` option with a comma-separated list of metadata column names:

```yaml
source:
type: postgres
# ... other configurations
metadata.list: op_ts,table_name,database_name,schema_name
```

The following metadata columns are supported:

<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width: 20%">Metadata Column</th>
<th class="text-left" style="width: 15%">Data Type</th>
<th class="text-left" style="width: 65%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>op_ts</td>
<td>BIGINT NOT NULL</td>
<td>The timestamp (in milliseconds since epoch) when the change event occurred in the database. For snapshot records, this value is 0.</td>
</tr>
<tr>
<td>table_name</td>
<td>STRING NOT NULL</td>
<td>The name of the table that contains the changed row. Alternative: use <code>__table_name__</code> in Transform expressions.</td>
</tr>
<tr>
<td>database_name</td>
<td>STRING NOT NULL</td>
<td>The name of the database that contains the changed row. Alternative: use <code>__namespace_name__</code> in Transform expressions.</td>
</tr>
<tr>
<td>schema_name</td>
<td>STRING NOT NULL</td>
<td>The name of the schema that contains the changed row. This is specific to PostgreSQL. Alternative: use <code>__schema_name__</code> in Transform expressions.</td>
</tr>
</tbody>
</table>
</div>

**Example Usage:**

```yaml
source:
type: postgres
hostname: localhost
port: 5432
username: postgres
password: postgres
tables: mydb.public.orders
slot.name: flink_slot
metadata.list: op_ts,table_name,schema_name

transform:
- source-table: mydb.public.orders
projection: order_id, customer_id, op_ts, table_name, schema_name
description: Include metadata columns in output
```

## Data Type Mapping

<div class="wy-table-responsive">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.postgres.source;

import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;

import java.util.Map;

/** A {@link SupportedMetadataColumn} for database_name. */
public class DatabaseNameMetadataColumn implements SupportedMetadataColumn {

@Override
public String getName() {
return "database_name";
}

@Override
public DataType getType() {
return DataTypes.STRING().notNull();
}

@Override
public Class<?> getJavaClass() {
return String.class;
}

@Override
public Object read(Map<String, String> metadata) {
if (metadata.containsKey(getName())) {
return metadata.get(getName());
}
throw new IllegalArgumentException(
"database_name doesn't exist in the metadata: " + metadata);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.postgres.source;

import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;

import java.util.Map;

/** A {@link SupportedMetadataColumn} for op_ts. */
public class OpTsMetadataColumn implements SupportedMetadataColumn {

@Override
public String getName() {
return "op_ts";
}

@Override
public DataType getType() {
return DataTypes.BIGINT().notNull();
}

@Override
public Class<?> getJavaClass() {
return Long.class;
}

@Override
public Object read(Map<String, String> metadata) {
if (metadata.containsKey(getName())) {
return Long.parseLong(metadata.get(getName()));
}
throw new IllegalArgumentException("op_ts doesn't exist in the metadata: " + metadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.cdc.common.source.EventSourceProvider;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
import org.apache.flink.cdc.common.source.MetadataAccessor;
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
Expand Down Expand Up @@ -92,6 +93,23 @@ public PostgresSourceConfig getPostgresSourceConfig() {
return postgresSourceConfig;
}

@Override
public SupportedMetadataColumn[] supportedMetadataColumns() {
return new SupportedMetadataColumn[] {
new OpTsMetadataColumn(),
new TableNameMetadataColumn(),
new DatabaseNameMetadataColumn(),
new SchemaNameMetadataColumn()
};
}

@Override
public boolean isParallelMetadataSource() {
// During incremental stage, PostgreSQL never emits schema change events on different
// partitions (since it has one WAL stream only.)
return false;
}

/** The {@link JdbcIncrementalSource} implementation for Postgres. */
public static class PostgresPipelineSource<T>
extends PostgresSourceBuilder.PostgresIncrementalSource<T> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.postgres.source;

import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;

import java.util.Map;

/** A {@link SupportedMetadataColumn} for schema_name. */
public class SchemaNameMetadataColumn implements SupportedMetadataColumn {

@Override
public String getName() {
return "schema_name";
}

@Override
public DataType getType() {
return DataTypes.STRING().notNull();
}

@Override
public Class<?> getJavaClass() {
return String.class;
}

@Override
public Object read(Map<String, String> metadata) {
if (metadata.containsKey(getName())) {
return metadata.get(getName());
}
throw new IllegalArgumentException(
"schema_name doesn't exist in the metadata: " + metadata);
}
}
Loading