diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md index 50206761a19..ee36d449f28 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md @@ -253,10 +253,10 @@ pipeline: metadata.list optional - false + (none) String - 源记录中可读取的元数据列表,将传递给下游并在转换模块中使用,各字段以逗号分隔。可用的可读元数据包括:op_ts。 + 源记录中可读取的元数据列表,将传递给下游并在转换模块中使用,各字段以逗号分隔。可用的可读元数据包括:op_ts、table_name、database_name、schema_name。详见支持的元数据列。 @@ -304,6 +304,78 @@ pipeline: 注意: 1. Group 名称是 `namespace.schema.table`,这里的 `namespace` 是实际的数据库名称, `schema` 是实际的 schema 名称, `table` 是实际的表名称。 +## 支持的元数据列 + +PostgreSQL CDC 连接器支持从源记录中读取元数据列。这些元数据列可以在转换操作中使用或传递给下游 Sink。 + +**注意:** 部分元数据信息也可以通过 Transform 表达式获取(例如 `__namespace_name__`、`__schema_name__`、`__table_name__`)。主要区别如下: +- **`op_ts`**:仅可通过 `metadata.list` 获取 - 提供数据库中实际的操作时间戳。 +- **`table_name`、`database_name`、`schema_name`**:可通过 `metadata.list` 或 Transform 表达式获取。使用 `metadata.list` 可以直接将这些值传递给下游 Sink,无需编写转换规则,对于基本用例更加简单。 + +要启用元数据列,请使用逗号分隔的元数据列名称列表配置 `metadata.list` 选项: + +```yaml +source: + type: postgres + # ... 其他配置 + metadata.list: op_ts,table_name,database_name,schema_name +``` + +支持以下元数据列: + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
元数据列数据类型描述
op_tsBIGINT NOT NULL数据变更事件在数据库中发生的时间戳(自纪元以来的毫秒数)。对于快照记录,此值为 0。
table_nameSTRING NOT NULL包含变更行的表名称。替代方案:在 Transform 表达式中使用 __table_name__
database_nameSTRING NOT NULL包含变更行的数据库名称。替代方案:在 Transform 表达式中使用 __namespace_name__
schema_nameSTRING NOT NULL包含变更行的 Schema 名称。这是 PostgreSQL 特有的。替代方案:在 Transform 表达式中使用 __schema_name__
+
+ +**使用示例:** + +```yaml +source: + type: postgres + hostname: localhost + port: 5432 + username: postgres + password: postgres + tables: mydb.public.orders + slot.name: flink_slot + metadata.list: op_ts,table_name,schema_name + +transform: + - source-table: mydb.public.orders + projection: order_id, customer_id, op_ts, table_name, schema_name + description: 在输出中包含元数据列 +``` + ## 数据类型映射
diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md b/docs/content/docs/connectors/pipeline-connectors/postgres.md index 03dd4e5a314..508f9e7e84d 100644 --- a/docs/content/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md @@ -245,10 +245,10 @@ pipeline: metadata.list optional - false + (none) String - List of readable metadata from SourceRecord to be passed to downstream and could be used in transform module, split by `,`. Available readable metadata are: op_ts. + List of readable metadata from SourceRecord to be passed to downstream and could be used in transform module, split by `,`. Available readable metadata are: op_ts, table_name, database_name, schema_name. See Supported Metadata Columns for more details. @@ -299,6 +299,78 @@ Metrics can help understand the progress of assignments, and the following are t Notice: 1. The group name is `namespace.schema.table`, where `namespace` is the actual database name, `schema` is the actual schema name, and `table` is the actual table name. +## Supported Metadata Columns + +PostgreSQL CDC connector supports reading metadata columns from source records. These metadata columns can be used in transform operations or passed to downstream sinks. + +**Note:** Some metadata information is also available through Transform expressions (e.g., `__namespace_name__`, `__schema_name__`, `__table_name__`). The key differences are: +- **`op_ts`**: Only available via `metadata.list` - provides the actual operation timestamp from the database. +- **`table_name`, `database_name`, `schema_name`**: Can be obtained via either `metadata.list` or Transform expressions. Using `metadata.list` allows you to pass these values directly to downstream sinks without writing transform rules, which is simpler for basic use cases. + +To enable metadata columns, configure the `metadata.list` option with a comma-separated list of metadata column names: + +```yaml +source: + type: postgres + # ... other configurations + metadata.list: op_ts,table_name,database_name,schema_name +``` + +The following metadata columns are supported: + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Metadata ColumnData TypeDescription
op_tsBIGINT NOT NULLThe timestamp (in milliseconds since epoch) when the change event occurred in the database. For snapshot records, this value is 0.
table_nameSTRING NOT NULLThe name of the table that contains the changed row. Alternative: use __table_name__ in Transform expressions.
database_nameSTRING NOT NULLThe name of the database that contains the changed row. Alternative: use __namespace_name__ in Transform expressions.
schema_nameSTRING NOT NULLThe name of the schema that contains the changed row. This is specific to PostgreSQL. Alternative: use __schema_name__ in Transform expressions.
+
+ +**Example Usage:** + +```yaml +source: + type: postgres + hostname: localhost + port: 5432 + username: postgres + password: postgres + tables: mydb.public.orders + slot.name: flink_slot + metadata.list: op_ts,table_name,schema_name + +transform: + - source-table: mydb.public.orders + projection: order_id, customer_id, op_ts, table_name, schema_name + description: Include metadata columns in output +``` + ## Data Type Mapping
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/DatabaseNameMetadataColumn.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/DatabaseNameMetadataColumn.java new file mode 100644 index 00000000000..25f7141ba53 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/DatabaseNameMetadataColumn.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source; + +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; + +import java.util.Map; + +/** A {@link SupportedMetadataColumn} for database_name. */ +public class DatabaseNameMetadataColumn implements SupportedMetadataColumn { + + @Override + public String getName() { + return "database_name"; + } + + @Override + public DataType getType() { + return DataTypes.STRING().notNull(); + } + + @Override + public Class getJavaClass() { + return String.class; + } + + @Override + public Object read(Map metadata) { + if (metadata.containsKey(getName())) { + return metadata.get(getName()); + } + throw new IllegalArgumentException( + "database_name doesn't exist in the metadata: " + metadata); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/OpTsMetadataColumn.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/OpTsMetadataColumn.java new file mode 100644 index 00000000000..9a9217969f8 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/OpTsMetadataColumn.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source; + +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; + +import java.util.Map; + +/** A {@link SupportedMetadataColumn} for op_ts. */ +public class OpTsMetadataColumn implements SupportedMetadataColumn { + + @Override + public String getName() { + return "op_ts"; + } + + @Override + public DataType getType() { + return DataTypes.BIGINT().notNull(); + } + + @Override + public Class getJavaClass() { + return Long.class; + } + + @Override + public Object read(Map metadata) { + if (metadata.containsKey(getName())) { + return Long.parseLong(metadata.get(getName())); + } + throw new IllegalArgumentException("op_ts doesn't exist in the metadata: " + metadata); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java index 767fb5fc33a..afb67e75fd9 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java @@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.source.EventSourceProvider; import org.apache.flink.cdc.common.source.FlinkSourceProvider; import org.apache.flink.cdc.common.source.MetadataAccessor; +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.connectors.base.config.SourceConfig; import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords; @@ -92,6 +93,23 @@ public PostgresSourceConfig getPostgresSourceConfig() { return postgresSourceConfig; } + @Override + public SupportedMetadataColumn[] supportedMetadataColumns() { + return new SupportedMetadataColumn[] { + new OpTsMetadataColumn(), + new TableNameMetadataColumn(), + new DatabaseNameMetadataColumn(), + new SchemaNameMetadataColumn() + }; + } + + @Override + public boolean isParallelMetadataSource() { + // During incremental stage, PostgreSQL never emits schema change events on different + // partitions (since it has one WAL stream only.) + return false; + } + /** The {@link JdbcIncrementalSource} implementation for Postgres. */ public static class PostgresPipelineSource extends PostgresSourceBuilder.PostgresIncrementalSource { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/SchemaNameMetadataColumn.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/SchemaNameMetadataColumn.java new file mode 100644 index 00000000000..ac61a090a97 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/SchemaNameMetadataColumn.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source; + +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; + +import java.util.Map; + +/** A {@link SupportedMetadataColumn} for schema_name. */ +public class SchemaNameMetadataColumn implements SupportedMetadataColumn { + + @Override + public String getName() { + return "schema_name"; + } + + @Override + public DataType getType() { + return DataTypes.STRING().notNull(); + } + + @Override + public Class getJavaClass() { + return String.class; + } + + @Override + public Object read(Map metadata) { + if (metadata.containsKey(getName())) { + return metadata.get(getName()); + } + throw new IllegalArgumentException( + "schema_name doesn't exist in the metadata: " + metadata); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/TableNameMetadataColumn.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/TableNameMetadataColumn.java new file mode 100644 index 00000000000..f2ad0fb8aa1 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/TableNameMetadataColumn.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source; + +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; + +import java.util.Map; + +/** A {@link SupportedMetadataColumn} for table_name. */ +public class TableNameMetadataColumn implements SupportedMetadataColumn { + + @Override + public String getName() { + return "table_name"; + } + + @Override + public DataType getType() { + return DataTypes.STRING().notNull(); + } + + @Override + public Class getJavaClass() { + return String.class; + } + + @Override + public Object read(Map metadata) { + if (metadata.containsKey(getName())) { + return metadata.get(getName()); + } + throw new IllegalArgumentException("table_name doesn't exist in the metadata: " + metadata); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java index 473c2d61ced..338ef664292 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java @@ -21,8 +21,13 @@ import org.apache.flink.cdc.common.configuration.ConfigOption; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.factories.Factory; +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.connectors.postgres.PostgresTestBase; +import org.apache.flink.cdc.connectors.postgres.source.DatabaseNameMetadataColumn; +import org.apache.flink.cdc.connectors.postgres.source.OpTsMetadataColumn; import org.apache.flink.cdc.connectors.postgres.source.PostgresDataSource; +import org.apache.flink.cdc.connectors.postgres.source.SchemaNameMetadataColumn; +import org.apache.flink.cdc.connectors.postgres.source.TableNameMetadataColumn; import org.apache.flink.table.api.ValidationException; import org.junit.jupiter.api.AfterEach; @@ -305,6 +310,34 @@ public void testTableValidationWithOriginalBugScenario() { .hasMessageContaining("Cannot find any table by the option 'tables'"); } + @Test + public void testSupportedMetadataColumns() { + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), POSTGRES_CONTAINER.getHost()); + options.put( + PG_PORT.key(), String.valueOf(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(TABLES.key(), POSTGRES_CONTAINER.getDatabaseName() + ".inventory.prod\\.*"); + options.put(SLOT_NAME.key(), slotName); + Factory.Context context = new MockContext(Configuration.fromMap(options)); + + PostgresDataSourceFactory factory = new PostgresDataSourceFactory(); + PostgresDataSource dataSource = (PostgresDataSource) factory.createDataSource(context); + + SupportedMetadataColumn[] metadataColumns = dataSource.supportedMetadataColumns(); + assertThat(metadataColumns).hasSize(4); + assertThat(metadataColumns[0]).isInstanceOf(OpTsMetadataColumn.class); + assertThat(metadataColumns[0].getName()).isEqualTo("op_ts"); + assertThat(metadataColumns[1]).isInstanceOf(TableNameMetadataColumn.class); + assertThat(metadataColumns[1].getName()).isEqualTo("table_name"); + assertThat(metadataColumns[2]).isInstanceOf(DatabaseNameMetadataColumn.class); + assertThat(metadataColumns[2].getName()).isEqualTo("database_name"); + assertThat(metadataColumns[3]).isInstanceOf(SchemaNameMetadataColumn.class); + assertThat(metadataColumns[3].getName()).isEqualTo("schema_name"); + + } + class MockContext implements Factory.Context { Configuration factoryConfiguration; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java index 0dac3c153e8..de487e7e5d5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java @@ -40,6 +40,7 @@ import org.apache.flink.cdc.connectors.postgres.PostgresTestBase; import org.apache.flink.cdc.connectors.postgres.factory.PostgresDataSourceFactory; import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory; +import org.apache.flink.cdc.connectors.postgres.table.PostgreSQLReadableMetadata; import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.planner.factories.TestValuesTableFactory; @@ -820,6 +821,111 @@ public void testJsonTypes() throws Exception { Assertions.assertThat(recordFields(snapshotRecord, JSON_TYPES)).isEqualTo(expectedSnapshot); } + @Test + public void testAllMetadataColumns() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "column_type_test"); + List metadataList = new ArrayList<>(); + metadataList.add(PostgreSQLReadableMetadata.OP_TS); + metadataList.add(PostgreSQLReadableMetadata.DATABASE_NAME); + metadataList.add(PostgreSQLReadableMetadata.SCHEMA_NAME); + metadataList.add(PostgreSQLReadableMetadata.TABLE_NAME); + // Note: ROW_KIND is not included because it requires RowData and cannot be read from + // SourceRecord + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("inventory.full_types") + .startupOptions(StartupOptions.initial()) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory, metadataList) + .getEventSourceProvider(); + + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + DataChangeEvent snapshotEvent = (DataChangeEvent) snapshotResults.get(0); + + // Verify all metadata columns exist in snapshot phase + Map metadata = snapshotEvent.meta(); + + // Verify op_ts metadata + // According to PostgreSQLReadableMetadata.OP_TS documentation: + // "If the record is read from snapshot of the table instead of the change stream, + // the value is always 0." + Assertions.assertThat(metadata).containsKey("op_ts"); + Long opTs = Long.parseLong(metadata.get("op_ts")); + Assertions.assertThat(opTs).isEqualTo(0L); + + // Verify database_name metadata + Assertions.assertThat(metadata).containsKey("database_name"); + Assertions.assertThat(metadata.get("database_name")) + .isEqualTo(POSTGRES_CONTAINER.getDatabaseName()); + + // Verify schema_name metadata + Assertions.assertThat(metadata).containsKey("schema_name"); + Assertions.assertThat(metadata.get("schema_name")).isEqualTo("inventory"); + + // Verify table_name metadata + Assertions.assertThat(metadata).containsKey("table_name"); + Assertions.assertThat(metadata.get("table_name")).isEqualTo("full_types"); + + // Insert a new row to test incremental phase + try (Connection connection = + PostgresTestBase.getJdbcConnection(POSTGIS_CONTAINER, "postgres"); + Statement statement = connection.createStatement()) { + // Use a simpler INSERT statement that only includes basic required columns + statement.execute( + "INSERT INTO inventory.full_types (id, small_c, int_c, big_c, " + + "real_c, double_precision, boolean_c, text_c, username, status) VALUES (" + + "2, 100, 200, 300, 1.1, 2.2, true, 'test', 'testuser', 'pending')"); + } + + // Fetch the incremental event + List incrementalResults = fetchResultsAndCreateTableEvent(events, 1).f0; + DataChangeEvent incrementalEvent = (DataChangeEvent) incrementalResults.get(0); + + // Verify all metadata columns in incremental phase + Map incrementalMetadata = incrementalEvent.meta(); + + // Verify op_ts metadata in incremental phase + // In incremental phase (change stream), op_ts should be a valid timestamp > 0 + Assertions.assertThat(incrementalMetadata).containsKey("op_ts"); + Long incrementalOpTs = Long.parseLong(incrementalMetadata.get("op_ts")); + Assertions.assertThat(incrementalOpTs) + .as("op_ts in incremental phase should be greater than 0") + .isGreaterThan(0L); + + // Verify database_name metadata in incremental phase + Assertions.assertThat(incrementalMetadata).containsKey("database_name"); + Assertions.assertThat(incrementalMetadata.get("database_name")) + .isEqualTo(POSTGRES_CONTAINER.getDatabaseName()); + + // Verify schema_name metadata in incremental phase + Assertions.assertThat(incrementalMetadata).containsKey("schema_name"); + Assertions.assertThat(incrementalMetadata.get("schema_name")).isEqualTo("inventory"); + + // Verify table_name metadata in incremental phase + Assertions.assertThat(incrementalMetadata).containsKey("table_name"); + Assertions.assertThat(incrementalMetadata.get("table_name")).isEqualTo("full_types"); + } + @Test public void testArrayTypes() throws Exception { initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");