From ea4a9762286b992366be47ddd8a66f1647d5702c Mon Sep 17 00:00:00 2001 From: Carlos Souza Date: Mon, 10 Nov 2025 10:07:01 -0500 Subject: [PATCH 1/2] Warn on generated columns generated columns are not supported in logical replication for PostgreSQL versions < 18 --- etl/src/replication/client.rs | 36 +++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 7c8f76887..3d9fa505f 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -803,7 +803,8 @@ impl PgReplicationClient { /// Retrieves schema information for all columns in a table. /// /// If a publication is specified, only columns included in that publication - /// will be returned. + /// will be returned. Generated columns are always excluded since they are not + /// supported in PostgreSQL logical replication. async fn get_column_schemas( &self, table_id: TableId, @@ -831,6 +832,19 @@ impl PgReplicationClient { join direct_parent dp on con.conrelid = dp.parent_oid where con.contype = 'p' group by con.conname + ), + -- Identify generated columns for warning purposes + generated_cols as ( + select + c.relname as table_name, + string_agg(a.attname, ', ' order by a.attnum) as generated_column_names + from pg_class c + join pg_attribute a on a.attrelid = c.oid + where c.oid = {table_id} + and a.attnum > 0 + and not a.attisdropped + and a.attgenerated != '' + group by c.relname ) select a.attname, @@ -847,7 +861,9 @@ impl PgReplicationClient { where a.attname = any(pk.pk_column_names) ) then true else false - end as primary + end as primary, + (select table_name from generated_cols) as gen_table_name, + (select generated_column_names from generated_cols) as gen_columns from pg_attribute a left join pg_index i on a.attrelid = i.indrelid @@ -865,6 +881,8 @@ impl PgReplicationClient { ); let mut column_schemas = vec![]; + let mut warned_about_generated = false; + for message in self.client.simple_query(&column_info_query).await? { if let SimpleQueryMessage::Row(row) = message { let name = Self::get_row_value::(&row, "attname", "pg_attribute").await?; @@ -876,6 +894,20 @@ impl PgReplicationClient { let primary = Self::get_row_value::(&row, "primary", "pg_index").await? == "t"; + // Check for generated columns and warn once per table + if !warned_about_generated + && let Some(gen_columns) = row.try_get("gen_columns")? + && let Some(table_name) = row.try_get("gen_table_name")? + { + warn!( + "Table '{}' contains generated columns ({}) that will NOT be replicated. \ + Generated columns are not supported in PostgreSQL logical replication and will \ + be excluded from the ETL schema. These columns will NOT appear in the destination.", + table_name, gen_columns + ); + warned_about_generated = true; + } + let typ = convert_type_oid_to_type(type_oid); column_schemas.push(ColumnSchema { From a63e1f3933fe47b2bdad1fcb737d189d8ec6ce21 Mon Sep 17 00:00:00 2001 From: Carlos Souza Date: Thu, 20 Nov 2025 10:25:59 -0500 Subject: [PATCH 2/2] Simplify checking for generated columns. --- etl/src/replication/client.rs | 67 ++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 3d9fa505f..76e2ee525 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -832,19 +832,6 @@ impl PgReplicationClient { join direct_parent dp on con.conrelid = dp.parent_oid where con.contype = 'p' group by con.conname - ), - -- Identify generated columns for warning purposes - generated_cols as ( - select - c.relname as table_name, - string_agg(a.attname, ', ' order by a.attnum) as generated_column_names - from pg_class c - join pg_attribute a on a.attrelid = c.oid - where c.oid = {table_id} - and a.attnum > 0 - and not a.attisdropped - and a.attgenerated != '' - group by c.relname ) select a.attname, @@ -861,9 +848,7 @@ impl PgReplicationClient { where a.attname = any(pk.pk_column_names) ) then true else false - end as primary, - (select table_name from generated_cols) as gen_table_name, - (select generated_column_names from generated_cols) as gen_columns + end as primary from pg_attribute a left join pg_index i on a.attrelid = i.indrelid @@ -880,9 +865,41 @@ impl PgReplicationClient { publication_predicate = publication_filter.predicate, ); - let mut column_schemas = vec![]; - let mut warned_about_generated = false; + // Check for generated columns so we can warn if there are any. + let generated_columns_check_query = format!( + r#"select exists ( + select 1 + from pg_attribute + where attrelid = {table_id} + and attnum > 0 + and not attisdropped + and attgenerated != '' + ) as has_generated;"# + ); + + for message in self + .client + .simple_query(&generated_columns_check_query) + .await? + { + if let SimpleQueryMessage::Row(row) = message { + let has_generated_columns = + Self::get_row_value::(&row, "has_generated", "pg_attribute").await? + == "t"; + if has_generated_columns { + warn!( + "Table {} contains generated columns that will NOT be replicated. \ + Generated columns are not supported in PostgreSQL logical replication and will \ + be excluded from the ETL schema. These columns will NOT appear in the destination.", + table_id + ); + } + // Explicity break for clarity; this query returns a single SimpleQueryMessage::Row. + break; + } + } + let mut column_schemas = vec![]; for message in self.client.simple_query(&column_info_query).await? { if let SimpleQueryMessage::Row(row) = message { let name = Self::get_row_value::(&row, "attname", "pg_attribute").await?; @@ -894,20 +911,6 @@ impl PgReplicationClient { let primary = Self::get_row_value::(&row, "primary", "pg_index").await? == "t"; - // Check for generated columns and warn once per table - if !warned_about_generated - && let Some(gen_columns) = row.try_get("gen_columns")? - && let Some(table_name) = row.try_get("gen_table_name")? - { - warn!( - "Table '{}' contains generated columns ({}) that will NOT be replicated. \ - Generated columns are not supported in PostgreSQL logical replication and will \ - be excluded from the ETL schema. These columns will NOT appear in the destination.", - table_name, gen_columns - ); - warned_about_generated = true; - } - let typ = convert_type_oid_to_type(type_oid); column_schemas.push(ColumnSchema {