From f3855226b2eafce6872582d333bf1c5d66b777c8 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Thu, 28 Aug 2025 14:58:46 -0400 Subject: [PATCH 01/18] Create replication for partitioned table issue --- etl/tests/partitioned_table_test.rs | 217 ++++++++++++++++++++++++++++ 1 file changed, 217 insertions(+) create mode 100644 etl/tests/partitioned_table_test.rs diff --git a/etl/tests/partitioned_table_test.rs b/etl/tests/partitioned_table_test.rs new file mode 100644 index 000000000..cf57529bb --- /dev/null +++ b/etl/tests/partitioned_table_test.rs @@ -0,0 +1,217 @@ +use etl::destination::memory::MemoryDestination; +use etl::state::table::TableReplicationPhaseType; +use etl::test_utils::database::{spawn_source_database, test_table_name}; +use etl::test_utils::notify::NotifyingStore; +use etl::test_utils::pipeline::create_pipeline; +use etl::test_utils::test_destination_wrapper::TestDestinationWrapper; +use etl::types::PipelineId; +use etl_postgres::tokio::test_utils::PgDatabase; +use tokio_postgres::GenericClient; +use etl_postgres::types::{TableId, TableName}; +use etl_telemetry::tracing::init_test_tracing; +use rand::random; + +/// Creates a partitioned table with the given name and partitions. +/// +/// This function creates: +/// 1. A parent partitioned table with a primary key +/// 2. Several child partitions based on the provided partition specifications +/// +/// Returns the table ID of the parent table and a list of partition table IDs. +pub async fn create_partitioned_table( + database: &PgDatabase, + table_name: TableName, + partition_specs: &[(&str, &str)], // (partition_name, partition_constraint) +) -> Result<(TableId, Vec), tokio_postgres::Error> { + // Create the parent partitioned table with a primary key + let create_parent_query = format!( + "CREATE TABLE {} ( + id bigserial, + data text NOT NULL, + partition_key integer NOT NULL, + PRIMARY KEY (id, partition_key) + ) PARTITION BY RANGE (partition_key)", + table_name.as_quoted_identifier() + ); + + database.run_sql(&create_parent_query).await?; + + // Get the OID of the parent table + let parent_row = database + .client + .as_ref() + .unwrap() + .query_one( + "SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = $1 AND c.relname = $2", + &[&table_name.schema, &table_name.name], + ) + .await?; + + let parent_table_id: TableId = parent_row.get(0); + let mut partition_table_ids = Vec::new(); + + // Create child partitions + for (partition_name, partition_constraint) in partition_specs { + let partition_table_name = TableName::new( + table_name.schema.clone(), + format!("{}_{}", table_name.name, partition_name), + ); + + let create_partition_query = format!( + "CREATE TABLE {} PARTITION OF {} FOR VALUES {}", + partition_table_name.as_quoted_identifier(), + table_name.as_quoted_identifier(), + partition_constraint + ); + + database.run_sql(&create_partition_query).await?; + + // Get the OID of the partition table + let partition_row = database + .client + .as_ref() + .unwrap() + .query_one( + "SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = $1 AND c.relname = $2", + &[&partition_table_name.schema, &partition_table_name.name], + ) + .await?; + + let partition_table_id: TableId = partition_row.get(0); + partition_table_ids.push(partition_table_id); + } + + Ok((parent_table_id, partition_table_ids)) +} + +/// Test that verifies partitioned tables with inherited primary keys work correctly. +/// +/// This test validates the fix for GitHub issue #296 where partitioned tables +/// failed during sync because the ETL system didn't recognize that leaf partitions +/// inherit primary key constraints from their parent table. +#[tokio::test(flavor = "multi_thread")] +async fn partitioned_table_sync_succeeds_with_inherited_primary_keys() { + init_test_tracing(); + let database = spawn_source_database().await; + + let table_name = test_table_name("partitioned_events"); + + // Create a partitioned table with three partitions based on partition_key ranges + let partition_specs = [ + ("p1", "FROM (1) TO (100)"), + ("p2", "FROM (100) TO (200)"), + ("p3", "FROM (200) TO (300)"), + ]; + + let (parent_table_id, partition_table_ids) = create_partitioned_table( + &database, + table_name.clone(), + &partition_specs + ).await.expect("Failed to create partitioned table"); + + // Insert some test data into the partitioned table + database + .run_sql(&format!( + "INSERT INTO {} (data, partition_key) VALUES + ('event1', 50), ('event2', 150), ('event3', 250)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + let publication_name = "test_partitioned_pub".to_string(); + database + .create_publication(&publication_name, std::slice::from_ref(&table_name)) + .await + .expect("Failed to create publication"); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name, + state_store.clone(), + destination.clone(), + ); + + pipeline.start().await.unwrap(); + + // Wait for all partitions to complete their sync (should succeed now) + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + + // Check that all partitions successfully synced without primary key errors + let table_states = state_store.get_table_replication_states().await; + + let mut sync_done_count = 0; + let mut error_count = 0; + + for (table_id, state) in &table_states { + match state.as_type() { + TableReplicationPhaseType::SyncDone | TableReplicationPhaseType::Ready => { + sync_done_count += 1; + println!("✓ Partition {} successfully synced", table_id); + } + TableReplicationPhaseType::Errored => { + error_count += 1; + let reason = match state { + etl::state::table::TableReplicationPhase::Errored { reason, .. } => reason, + _ => unreachable!(), + }; + println!("✗ Partition {} failed with error: {}", table_id, reason); + } + other_state => { + println!("? Partition {} in state: {:?}", table_id, other_state); + } + } + } + + // Shutdown the pipeline (should succeed now) + let shutdown_result = pipeline.shutdown_and_wait().await; + + // Verify all 3 partitions successfully synced without errors + assert_eq!( + error_count, 0, + "Expected no partitions to fail, but got {} errors", + error_count + ); + + assert!( + sync_done_count >= 3, + "Expected all 3 partitions to sync successfully, but only {} completed", + sync_done_count + ); + + // The pipeline should succeed (or fail for reasons other than primary key issues) + if let Err(e) = shutdown_result { + // If there's an error, it shouldn't be about missing primary keys + let error_str = format!("{:?}", e); + assert!( + !error_str.contains("Missing primary key"), + "Pipeline failed with primary key error despite fix: {}", + error_str + ); + } + + // Verify that data was synced successfully + let table_rows = destination.get_table_rows().await; + + // We expect data to be synced from all 3 partitions + let total_rows: usize = table_rows.values().map(|rows| rows.len()).sum(); + assert!( + total_rows >= 3, + "Expected at least 3 rows synced (one per partition), but got {}", + total_rows + ); + + println!( + "✓ Successfully verified GitHub issue #296 fix: Partitioned table sync works with inherited primary keys" + ); + println!("✓ Parent table ID: {}", parent_table_id); + println!("✓ Successfully synced partition IDs: {:?}", partition_table_ids); + println!("✓ Total rows synced: {}", total_rows); +} From 9913523477fbf30e92a904347a7a480cb26185c3 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Thu, 28 Aug 2025 15:27:55 -0400 Subject: [PATCH 02/18] Fix partitioned tables --- etl/src/replication/client.rs | 83 +++++++++++- etl/src/test_utils/test_schema.rs | 71 ++++++++++ etl/tests/partitioned_table_test.rs | 193 ++++++---------------------- 3 files changed, 184 insertions(+), 163 deletions(-) diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 722437505..a687c6f77 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -661,7 +661,8 @@ impl PgReplicationClient { when 0 then true else (a.attnum in (select * from pub_attrs)) end - )", + )" + .to_string(), ) } else { // Postgres 14 or earlier or unknown, fallback to no column-level filtering @@ -676,20 +677,90 @@ impl PgReplicationClient { )", publication = quote_literal(publication), ), - "and (select count(*) from pub_table) > 0", + format!( + "and ((select count(*) from pub_table) > 0 or exists( + -- Also allow if parent table is in publication (for partitioned tables) + select 1 from pg_inherits i + join pg_publication_rel r on r.prrelid = i.inhparent + join pg_publication p on p.oid = r.prpubid + where i.inhrelid = {table_id} and p.pubname = {publication} + ))", + publication = quote_literal(publication), + ), ) } } else { - ("".into(), "") + ("".to_string(), "".to_string()) + }; + + let has_pub_cte = !pub_cte.is_empty(); + + let cte_prefix = if has_pub_cte { + // If there's already a pub_cte WITH clause, add our CTEs to it with a comma + format!("{pub_cte},") + } else { + // If no pub_cte, start our own WITH clause (no need for RECURSIVE) + "with ".to_string() }; let column_info_query = format!( - "{pub_cte} - select a.attname, + "{cte_prefix} + -- Find direct parent of current table (if it's a partition) + direct_parent as ( + select i.inhparent as parent_oid + from pg_inherits i + where i.inhrelid = {table_id}::oid + limit 1 + ), + -- Get parent table's primary key columns + parent_pk_cols as ( + select array_agg(a.attname order by x.n) as pk_column_names + from pg_constraint con + join unnest(con.conkey) with ordinality as x(attnum, n) on true + join pg_attribute a on a.attrelid = con.conrelid and a.attnum = x.attnum + join direct_parent dp on con.conrelid = dp.parent_oid + where con.contype = 'p' + group by con.conname + ), + -- Check if current table has a unique index on the parent PK columns + partition_has_pk_index as ( + select case + when exists (select 1 from direct_parent) + and exists (select 1 from parent_pk_cols) + and exists ( + -- Check if there's a unique, valid index on the parent PK columns + select 1 + from pg_index ix + cross join parent_pk_cols pk + where ix.indrelid = {table_id}::oid + and ix.indisunique = true + and ix.indisvalid = true + and array( + select a.attname + from unnest(ix.indkey) with ordinality k(attnum, ord) + join pg_attribute a on a.attrelid = ix.indrelid and a.attnum = k.attnum + where ord <= ix.indnkeyatts -- exclude INCLUDE columns + order by ord + ) = pk.pk_column_names + ) then true + else false + end as has_inherited_pk + ) + SELECT a.attname, a.atttypid, a.atttypmod, a.attnotnull, - coalesce(i.indisprimary, false) as primary + case + -- First check for direct primary key + when coalesce(i.indisprimary, false) = true then true + -- Then check for inherited primary key from partitioned table parent + when (select has_inherited_pk from partition_has_pk_index) = true + and exists ( + select 1 from parent_pk_cols pk + where a.attname = any(pk.pk_column_names) + ) then true + else false + end as primary from pg_attribute a left join pg_index i on a.attrelid = i.indrelid diff --git a/etl/src/test_utils/test_schema.rs b/etl/src/test_utils/test_schema.rs index 8faa6449b..25ca7725e 100644 --- a/etl/src/test_utils/test_schema.rs +++ b/etl/src/test_utils/test_schema.rs @@ -127,6 +127,77 @@ pub async fn setup_test_database_schema( } } +/// Creates a partitioned table with the given name and partitions. +/// +/// This function creates: +/// 1. A parent partitioned table with a primary key +/// 2. Several child partitions based on the provided partition specifications +/// +/// Returns the table ID of the parent table and a list of partition table IDs. +pub async fn create_partitioned_table( + database: &PgDatabase, + table_name: TableName, + partition_specs: &[(&str, &str)], // (partition_name, partition_constraint) +) -> Result<(TableId, Vec), tokio_postgres::Error> { + let create_parent_query = format!( + "CREATE TABLE {} ( + id bigserial, + data text NOT NULL, + partition_key integer NOT NULL, + PRIMARY KEY (id, partition_key) + ) PARTITION BY RANGE (partition_key)", + table_name.as_quoted_identifier() + ); + + database.run_sql(&create_parent_query).await?; + + let parent_row = database + .client + .as_ref() + .unwrap() + .query_one( + "SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = $1 AND c.relname = $2", + &[&table_name.schema, &table_name.name], + ) + .await?; + + let parent_table_id: TableId = parent_row.get(0); + let mut partition_table_ids = Vec::new(); + + for (partition_name, partition_constraint) in partition_specs { + let partition_table_name = TableName::new( + table_name.schema.clone(), + format!("{}_{}", table_name.name, partition_name), + ); + + let create_partition_query = format!( + "CREATE TABLE {} PARTITION OF {} FOR VALUES {}", + partition_table_name.as_quoted_identifier(), + table_name.as_quoted_identifier(), + partition_constraint + ); + + database.run_sql(&create_partition_query).await?; + + let partition_row = database + .client + .as_ref() + .unwrap() + .query_one( + "SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = $1 AND c.relname = $2", + &[&partition_table_name.schema, &partition_table_name.name], + ) + .await?; + + let partition_table_id: TableId = partition_row.get(0); + partition_table_ids.push(partition_table_id); + } + + Ok((parent_table_id, partition_table_ids)) +} + /// Inserts users data into the database for testing purposes. pub async fn insert_users_data( client: &mut PgDatabase, diff --git a/etl/tests/partitioned_table_test.rs b/etl/tests/partitioned_table_test.rs index cf57529bb..77e0375ae 100644 --- a/etl/tests/partitioned_table_test.rs +++ b/etl/tests/partitioned_table_test.rs @@ -4,114 +4,29 @@ use etl::test_utils::database::{spawn_source_database, test_table_name}; use etl::test_utils::notify::NotifyingStore; use etl::test_utils::pipeline::create_pipeline; use etl::test_utils::test_destination_wrapper::TestDestinationWrapper; +use etl::test_utils::test_schema::create_partitioned_table; use etl::types::PipelineId; -use etl_postgres::tokio::test_utils::PgDatabase; -use tokio_postgres::GenericClient; -use etl_postgres::types::{TableId, TableName}; use etl_telemetry::tracing::init_test_tracing; use rand::random; -/// Creates a partitioned table with the given name and partitions. -/// -/// This function creates: -/// 1. A parent partitioned table with a primary key -/// 2. Several child partitions based on the provided partition specifications -/// -/// Returns the table ID of the parent table and a list of partition table IDs. -pub async fn create_partitioned_table( - database: &PgDatabase, - table_name: TableName, - partition_specs: &[(&str, &str)], // (partition_name, partition_constraint) -) -> Result<(TableId, Vec), tokio_postgres::Error> { - // Create the parent partitioned table with a primary key - let create_parent_query = format!( - "CREATE TABLE {} ( - id bigserial, - data text NOT NULL, - partition_key integer NOT NULL, - PRIMARY KEY (id, partition_key) - ) PARTITION BY RANGE (partition_key)", - table_name.as_quoted_identifier() - ); - - database.run_sql(&create_parent_query).await?; - - // Get the OID of the parent table - let parent_row = database - .client - .as_ref() - .unwrap() - .query_one( - "SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace - WHERE n.nspname = $1 AND c.relname = $2", - &[&table_name.schema, &table_name.name], - ) - .await?; - - let parent_table_id: TableId = parent_row.get(0); - let mut partition_table_ids = Vec::new(); - - // Create child partitions - for (partition_name, partition_constraint) in partition_specs { - let partition_table_name = TableName::new( - table_name.schema.clone(), - format!("{}_{}", table_name.name, partition_name), - ); - - let create_partition_query = format!( - "CREATE TABLE {} PARTITION OF {} FOR VALUES {}", - partition_table_name.as_quoted_identifier(), - table_name.as_quoted_identifier(), - partition_constraint - ); - - database.run_sql(&create_partition_query).await?; - - // Get the OID of the partition table - let partition_row = database - .client - .as_ref() - .unwrap() - .query_one( - "SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace - WHERE n.nspname = $1 AND c.relname = $2", - &[&partition_table_name.schema, &partition_table_name.name], - ) - .await?; - - let partition_table_id: TableId = partition_row.get(0); - partition_table_ids.push(partition_table_id); - } - - Ok((parent_table_id, partition_table_ids)) -} - /// Test that verifies partitioned tables with inherited primary keys work correctly. -/// -/// This test validates the fix for GitHub issue #296 where partitioned tables -/// failed during sync because the ETL system didn't recognize that leaf partitions -/// inherit primary key constraints from their parent table. #[tokio::test(flavor = "multi_thread")] async fn partitioned_table_sync_succeeds_with_inherited_primary_keys() { init_test_tracing(); let database = spawn_source_database().await; let table_name = test_table_name("partitioned_events"); - - // Create a partitioned table with three partitions based on partition_key ranges let partition_specs = [ ("p1", "FROM (1) TO (100)"), - ("p2", "FROM (100) TO (200)"), + ("p2", "FROM (100) TO (200)"), ("p3", "FROM (200) TO (300)"), ]; - - let (parent_table_id, partition_table_ids) = create_partitioned_table( - &database, - table_name.clone(), - &partition_specs - ).await.expect("Failed to create partitioned table"); - // Insert some test data into the partitioned table + let (_parent_table_id, partition_table_ids) = + create_partitioned_table(&database, table_name.clone(), &partition_specs) + .await + .expect("Failed to create partitioned table"); + database .run_sql(&format!( "INSERT INTO {} (data, partition_key) VALUES @@ -130,6 +45,14 @@ async fn partitioned_table_sync_succeeds_with_inherited_primary_keys() { let state_store = NotifyingStore::new(); let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + let mut partition_notifications = Vec::new(); + for &partition_id in &partition_table_ids { + let notification = state_store + .notify_on_table_state(partition_id, TableReplicationPhaseType::SyncDone) + .await; + partition_notifications.push(notification); + } + let pipeline_id: PipelineId = random(); let mut pipeline = create_pipeline( &database.config, @@ -141,77 +64,33 @@ async fn partitioned_table_sync_succeeds_with_inherited_primary_keys() { pipeline.start().await.unwrap(); - // Wait for all partitions to complete their sync (should succeed now) - tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; - - // Check that all partitions successfully synced without primary key errors - let table_states = state_store.get_table_replication_states().await; - - let mut sync_done_count = 0; - let mut error_count = 0; - - for (table_id, state) in &table_states { - match state.as_type() { - TableReplicationPhaseType::SyncDone | TableReplicationPhaseType::Ready => { - sync_done_count += 1; - println!("✓ Partition {} successfully synced", table_id); - } - TableReplicationPhaseType::Errored => { - error_count += 1; - let reason = match state { - etl::state::table::TableReplicationPhase::Errored { reason, .. } => reason, - _ => unreachable!(), - }; - println!("✗ Partition {} failed with error: {}", table_id, reason); - } - other_state => { - println!("? Partition {} in state: {:?}", table_id, other_state); - } - } + for notification in partition_notifications { + notification.notified().await; } - // Shutdown the pipeline (should succeed now) - let shutdown_result = pipeline.shutdown_and_wait().await; - - // Verify all 3 partitions successfully synced without errors - assert_eq!( - error_count, 0, - "Expected no partitions to fail, but got {} errors", - error_count - ); - - assert!( - sync_done_count >= 3, - "Expected all 3 partitions to sync successfully, but only {} completed", - sync_done_count - ); - - // The pipeline should succeed (or fail for reasons other than primary key issues) - if let Err(e) = shutdown_result { - // If there's an error, it shouldn't be about missing primary keys - let error_str = format!("{:?}", e); - assert!( - !error_str.contains("Missing primary key"), - "Pipeline failed with primary key error despite fix: {}", - error_str - ); - } + let _ = pipeline.shutdown_and_wait().await; - // Verify that data was synced successfully let table_rows = destination.get_table_rows().await; - - // We expect data to be synced from all 3 partitions let total_rows: usize = table_rows.values().map(|rows| rows.len()).sum(); - assert!( - total_rows >= 3, - "Expected at least 3 rows synced (one per partition), but got {}", + + assert_eq!( + total_rows, 3, + "Expected 3 rows synced (one per partition), but got {}", total_rows ); - println!( - "✓ Successfully verified GitHub issue #296 fix: Partitioned table sync works with inherited primary keys" - ); - println!("✓ Parent table ID: {}", parent_table_id); - println!("✓ Successfully synced partition IDs: {:?}", partition_table_ids); - println!("✓ Total rows synced: {}", total_rows); + let table_states = state_store.get_table_replication_states().await; + for (table_id, state) in &table_states { + if partition_table_ids.contains(table_id) { + assert!( + matches!( + state.as_type(), + TableReplicationPhaseType::SyncDone | TableReplicationPhaseType::Ready + ), + "Partition {} should be in SyncDone or Ready state, but was in {:?}", + table_id, + state.as_type() + ); + } + } } From 550956838efefb51248ec19de74a7b64bcbb4b3e Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Thu, 28 Aug 2025 16:21:16 -0400 Subject: [PATCH 03/18] Clippy --- etl/tests/partitioned_table_test.rs | 51 ++++++++++++++++++++++------- 1 file changed, 39 insertions(+), 12 deletions(-) diff --git a/etl/tests/partitioned_table_test.rs b/etl/tests/partitioned_table_test.rs index 77e0375ae..aa73b806f 100644 --- a/etl/tests/partitioned_table_test.rs +++ b/etl/tests/partitioned_table_test.rs @@ -80,17 +80,44 @@ async fn partitioned_table_sync_succeeds_with_inherited_primary_keys() { ); let table_states = state_store.get_table_replication_states().await; - for (table_id, state) in &table_states { - if partition_table_ids.contains(table_id) { - assert!( - matches!( - state.as_type(), - TableReplicationPhaseType::SyncDone | TableReplicationPhaseType::Ready - ), - "Partition {} should be in SyncDone or Ready state, but was in {:?}", - table_id, - state.as_type() - ); - } + + assert_eq!( + table_states.len(), + partition_table_ids.len(), + "Expected {} partition states, but found {}", + partition_table_ids.len(), + table_states.len() + ); + + for &partition_id in &partition_table_ids { + let state = table_states + .get(&partition_id) + .unwrap_or_else(|| panic!("Partition {} should have a state", partition_id)); + assert!( + matches!( + state.as_type(), + TableReplicationPhaseType::SyncDone | TableReplicationPhaseType::Ready + ), + "Partition {} should be in SyncDone or Ready state, but was in {:?}", + partition_id, + state.as_type() + ); } + + assert!( + !table_states.contains_key(&parent_table_id), + "Parent table {} should not be tracked since parent partitioned tables are excluded from processing", + parent_table_id + ); + + let parent_table_rows = table_rows + .iter() + .filter(|(table_id, _)| **table_id == parent_table_id) + .map(|(_, rows)| rows.len()) + .sum::(); + assert_eq!( + parent_table_rows, 0, + "Parent table {} should have no data since it's excluded from processing and all data goes to partitions, but found {} rows", + parent_table_id, parent_table_rows + ); } From b4230495b6018185c1506dc8f8f2f52a934527e2 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Mon, 1 Sep 2025 09:30:49 -0400 Subject: [PATCH 04/18] merge conflict --- etl/tests/partitioned_table_test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etl/tests/partitioned_table_test.rs b/etl/tests/partitioned_table_test.rs index aa73b806f..758bffefa 100644 --- a/etl/tests/partitioned_table_test.rs +++ b/etl/tests/partitioned_table_test.rs @@ -22,7 +22,7 @@ async fn partitioned_table_sync_succeeds_with_inherited_primary_keys() { ("p3", "FROM (200) TO (300)"), ]; - let (_parent_table_id, partition_table_ids) = + let (parent_table_id, partition_table_ids) = create_partitioned_table(&database, table_name.clone(), &partition_specs) .await .expect("Failed to create partitioned table"); From c4087b2ddfe86e3ad73c08dc7c9cc6d6d1943664 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Mon, 1 Sep 2025 11:56:07 -0400 Subject: [PATCH 05/18] make sql lowercase --- etl/src/test_utils/test_schema.rs | 16 ++++++++-------- etl/tests/partitioned_table_test.rs | 8 ++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/etl/src/test_utils/test_schema.rs b/etl/src/test_utils/test_schema.rs index 25ca7725e..78b9ed670 100644 --- a/etl/src/test_utils/test_schema.rs +++ b/etl/src/test_utils/test_schema.rs @@ -140,12 +140,12 @@ pub async fn create_partitioned_table( partition_specs: &[(&str, &str)], // (partition_name, partition_constraint) ) -> Result<(TableId, Vec), tokio_postgres::Error> { let create_parent_query = format!( - "CREATE TABLE {} ( + "create table {} ( id bigserial, data text NOT NULL, partition_key integer NOT NULL, - PRIMARY KEY (id, partition_key) - ) PARTITION BY RANGE (partition_key)", + primary key (id, partition_key) + ) partition by range (partition_key)", table_name.as_quoted_identifier() ); @@ -156,8 +156,8 @@ pub async fn create_partitioned_table( .as_ref() .unwrap() .query_one( - "SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace - WHERE n.nspname = $1 AND c.relname = $2", + "select c.oid from pg_class c join pg_namespace n on n.oid = c.relnamespace + where n.nspname = $1 and c.relname = $2", &[&table_name.schema, &table_name.name], ) .await?; @@ -172,7 +172,7 @@ pub async fn create_partitioned_table( ); let create_partition_query = format!( - "CREATE TABLE {} PARTITION OF {} FOR VALUES {}", + "create table {} partition of {} for values {}", partition_table_name.as_quoted_identifier(), table_name.as_quoted_identifier(), partition_constraint @@ -185,8 +185,8 @@ pub async fn create_partitioned_table( .as_ref() .unwrap() .query_one( - "SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace - WHERE n.nspname = $1 AND c.relname = $2", + "select c.oid from pg_class c join pg_namespace n on n.oid = c.relnamespace + where n.nspname = $1 and c.relname = $2", &[&partition_table_name.schema, &partition_table_name.name], ) .await?; diff --git a/etl/tests/partitioned_table_test.rs b/etl/tests/partitioned_table_test.rs index 758bffefa..6a7385036 100644 --- a/etl/tests/partitioned_table_test.rs +++ b/etl/tests/partitioned_table_test.rs @@ -17,9 +17,9 @@ async fn partitioned_table_sync_succeeds_with_inherited_primary_keys() { let table_name = test_table_name("partitioned_events"); let partition_specs = [ - ("p1", "FROM (1) TO (100)"), - ("p2", "FROM (100) TO (200)"), - ("p3", "FROM (200) TO (300)"), + ("p1", "from (1) to (100)"), + ("p2", "from (100) to (200)"), + ("p3", "from (200) to (300)"), ]; let (parent_table_id, partition_table_ids) = @@ -29,7 +29,7 @@ async fn partitioned_table_sync_succeeds_with_inherited_primary_keys() { database .run_sql(&format!( - "INSERT INTO {} (data, partition_key) VALUES + "insert into {} (data, partition_key) values ('event1', 50), ('event2', 150), ('event3', 250)", table_name.as_quoted_identifier() )) From ae4f86f96f07ca4ea6cdc3058744e8317c53d0ab Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Tue, 2 Sep 2025 08:53:11 -0400 Subject: [PATCH 06/18] fit new test style --- etl/tests/partitioned_table_test.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/etl/tests/partitioned_table_test.rs b/etl/tests/partitioned_table_test.rs index 6a7385036..725e745d9 100644 --- a/etl/tests/partitioned_table_test.rs +++ b/etl/tests/partitioned_table_test.rs @@ -1,3 +1,5 @@ +#![cfg(feature = "test-utils")] + use etl::destination::memory::MemoryDestination; use etl::state::table::TableReplicationPhaseType; use etl::test_utils::database::{spawn_source_database, test_table_name}; From ef58a4b5d384a9cea48c9b18d9806683adf09013 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Tue, 2 Sep 2025 10:20:55 -0400 Subject: [PATCH 07/18] Switch to `publish_via_partition_root = true` --- docs/how-to/configure-postgres.md | 8 +- etl-api/src/db/publications.rs | 3 + etl-postgres/src/tokio/test_utils.rs | 2 +- etl/src/replication/client.rs | 150 ++++++++++++++++++------- etl/tests/partitioned_table_test.rs | 162 ++++++++++++++++++++------- 5 files changed, 240 insertions(+), 85 deletions(-) diff --git a/docs/how-to/configure-postgres.md b/docs/how-to/configure-postgres.md index fe57cd07b..a525132db 100644 --- a/docs/how-to/configure-postgres.md +++ b/docs/how-to/configure-postgres.md @@ -107,15 +107,17 @@ Publications define which tables and operations to replicate. ```sql -- Create publication for specific tables -CREATE PUBLICATION my_publication FOR TABLE users, orders; +CREATE PUBLICATION my_publication FOR TABLE users, orders WITH (publish_via_partition_root = true); -- Create publication for all tables (use with caution) -CREATE PUBLICATION all_tables FOR ALL TABLES; +CREATE PUBLICATION all_tables FOR ALL TABLES WITH (publish_via_partition_root = true); -- Include only specific operations -CREATE PUBLICATION inserts_only FOR TABLE users WITH (publish = 'insert'); +CREATE PUBLICATION inserts_only FOR TABLE users WITH (publish = 'insert') WITH (publish_via_partition_root = true); ``` +`publish_via_partition_root` allows Postgres to treat [partitioned tables](https://www.postgresql.org/docs/current/sql-createpublication.html#SQL-CREATEPUBLICATION-PARAMS-WITH-PUBLISH-VIA-PARTITION-ROOT) as one table in the eyes of logical replication. + ### Managing Publications ```sql diff --git a/etl-api/src/db/publications.rs b/etl-api/src/db/publications.rs index 38a4ab575..5c01ac859 100644 --- a/etl-api/src/db/publications.rs +++ b/etl-api/src/db/publications.rs @@ -43,6 +43,9 @@ pub async fn create_publication( } } + // Ensure partitioned tables publish via ancestor/root schema for logical replication + query.push_str(" with (publish_via_partition_root = true)"); + pool.execute(query.as_str()).await?; Ok(()) } diff --git a/etl-postgres/src/tokio/test_utils.rs b/etl-postgres/src/tokio/test_utils.rs index 6692e486b..7aad2b62b 100644 --- a/etl-postgres/src/tokio/test_utils.rs +++ b/etl-postgres/src/tokio/test_utils.rs @@ -61,7 +61,7 @@ impl PgDatabase { .collect::>(); let create_publication_query = format!( - "create publication {} for table {}", + "create publication {} for table {} with (publish_via_partition_root = true)", publication_name, table_names.join(", ") ); diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index a687c6f77..437ab9662 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -410,6 +410,31 @@ impl PgReplicationClient { &self, publication_name: &str, ) -> EtlResult> { + // Prefer pg_publication_rel (explicit tables in the publication, including partition roots) + let rel_query = format!( + r#"select r.prrelid as oid + from pg_publication_rel r + join pg_publication p on p.oid = r.prpubid + where p.pubname = {}"#, + quote_literal(publication_name) + ); + + let mut table_ids = vec![]; + let mut has_rows = false; + for msg in self.client.simple_query(&rel_query).await? { + if let SimpleQueryMessage::Row(row) = msg { + has_rows = true; + let table_id = + Self::get_row_value::(&row, "oid", "pg_publication_rel").await?; + table_ids.push(table_id); + } + } + + if has_rows { + return Ok(table_ids); + } + + // Fallback to pg_publication_tables (expanded view), used for publications like FOR ALL TABLES let publication_query = format!( "select c.oid from pg_publication_tables pt join pg_class c on c.relname = pt.tablename @@ -418,10 +443,8 @@ impl PgReplicationClient { quote_literal(publication_name) ); - let mut table_ids = vec![]; for msg in self.client.simple_query(&publication_query).await? { if let SimpleQueryMessage::Row(row) = msg { - // For the sake of simplicity, we refer to the table oid as table id. let table_id = Self::get_row_value::(&row, "oid", "pg_class").await?; table_ids.push(table_id); } @@ -721,44 +744,19 @@ impl PgReplicationClient { join direct_parent dp on con.conrelid = dp.parent_oid where con.contype = 'p' group by con.conname - ), - -- Check if current table has a unique index on the parent PK columns - partition_has_pk_index as ( - select case - when exists (select 1 from direct_parent) - and exists (select 1 from parent_pk_cols) - and exists ( - -- Check if there's a unique, valid index on the parent PK columns - select 1 - from pg_index ix - cross join parent_pk_cols pk - where ix.indrelid = {table_id}::oid - and ix.indisunique = true - and ix.indisvalid = true - and array( - select a.attname - from unnest(ix.indkey) with ordinality k(attnum, ord) - join pg_attribute a on a.attrelid = ix.indrelid and a.attnum = k.attnum - where ord <= ix.indnkeyatts -- exclude INCLUDE columns - order by ord - ) = pk.pk_column_names - ) then true - else false - end as has_inherited_pk ) SELECT a.attname, a.atttypid, a.atttypmod, a.attnotnull, case - -- First check for direct primary key + -- Direct primary key on this relation when coalesce(i.indisprimary, false) = true then true - -- Then check for inherited primary key from partitioned table parent - when (select has_inherited_pk from partition_has_pk_index) = true - and exists ( - select 1 from parent_pk_cols pk - where a.attname = any(pk.pk_column_names) - ) then true + -- Inherit primary key from parent partitioned table if column name matches + when exists ( + select 1 from parent_pk_cols pk + where a.attname = any(pk.pk_column_names) + ) then true else false end as primary from pg_attribute a @@ -816,17 +814,34 @@ impl PgReplicationClient { .collect::>() .join(", "); - let table_name = self.get_table_name(table_id).await?; + let copy_query = if self.is_partitioned_table(table_id).await? + && let leaf_partitions = self.get_leaf_partition_ids(table_id).await? + && !leaf_partitions.is_empty() + { + let mut selects = Vec::with_capacity(leaf_partitions.len()); + for child_id in leaf_partitions { + let child_name = self.get_table_name(child_id).await?; + let select = format!( + "select {} from {}", + column_list, + child_name.as_quoted_identifier() + ); + selects.push(select); + } - // TODO: allow passing in format binary or text - let copy_query = format!( - r#"copy {} ({}) to stdout with (format text);"#, - table_name.as_quoted_identifier(), - column_list - ); + let union_query = selects.join(" union all "); + format!(r#"copy ({}) to stdout with (format text);"#, union_query) + } else { + let table_name = self.get_table_name(table_id).await?; + format!( + r#"copy {} ({}) to stdout with (format text);"#, + table_name.as_quoted_identifier(), + column_list + ) + }; + // TODO: allow passing in format binary or text let stream = self.client.copy_out_simple(©_query).await?; - Ok(stream) } @@ -861,4 +876,57 @@ impl PgReplicationClient { ) }) } + + /// Returns true if the given table id refers to a partitioned table (relkind = 'p'). + async fn is_partitioned_table(&self, table_id: TableId) -> EtlResult { + let query = format!( + "select c.relkind from pg_class c where c.oid = {}", + table_id + ); + + for msg in self.client.simple_query(&query).await? { + if let SimpleQueryMessage::Row(row) = msg { + let relkind = Self::get_row_value::(&row, "relkind", "pg_class").await?; + return Ok(relkind == "p"); + } + } + + bail!( + ErrorKind::SourceSchemaError, + "Table not found", + format!("Table not found in database (table id: {})", table_id) + ); + } + + /// Returns all leaf partition OIDs for a partitioned table. + async fn get_leaf_partition_ids(&self, parent_id: TableId) -> EtlResult> { + let query = format!( + r#" + with recursive parts(relid) as ( + select i.inhrelid + from pg_inherits i + where i.inhparent = {parent} + union all + select i.inhrelid + from pg_inherits i + join parts p on p.relid = i.inhparent + ) + select p.relid as oid + from parts p + left join pg_inherits i on i.inhparent = p.relid + where i.inhrelid is null + "#, + parent = parent_id + ); + + let mut ids = Vec::new(); + for msg in self.client.simple_query(&query).await? { + if let SimpleQueryMessage::Row(row) = msg { + let id = Self::get_row_value::(&row, "oid", "pg_inherits").await?; + ids.push(id); + } + } + + Ok(ids) + } } diff --git a/etl/tests/partitioned_table_test.rs b/etl/tests/partitioned_table_test.rs index 725e745d9..4b6e032cc 100644 --- a/etl/tests/partitioned_table_test.rs +++ b/etl/tests/partitioned_table_test.rs @@ -3,17 +3,19 @@ use etl::destination::memory::MemoryDestination; use etl::state::table::TableReplicationPhaseType; use etl::test_utils::database::{spawn_source_database, test_table_name}; +use etl::test_utils::event::group_events_by_type_and_table_id; use etl::test_utils::notify::NotifyingStore; use etl::test_utils::pipeline::create_pipeline; use etl::test_utils::test_destination_wrapper::TestDestinationWrapper; use etl::test_utils::test_schema::create_partitioned_table; +use etl::types::EventType; use etl::types::PipelineId; use etl_telemetry::tracing::init_test_tracing; use rand::random; -/// Test that verifies partitioned tables with inherited primary keys work correctly. +/// Initial copy for a partitioned table (published via root) copies all existing rows. #[tokio::test(flavor = "multi_thread")] -async fn partitioned_table_sync_succeeds_with_inherited_primary_keys() { +async fn partitioned_table_copy_replicates_existing_data() { init_test_tracing(); let database = spawn_source_database().await; @@ -24,7 +26,7 @@ async fn partitioned_table_sync_succeeds_with_inherited_primary_keys() { ("p3", "from (200) to (300)"), ]; - let (parent_table_id, partition_table_ids) = + let (parent_table_id, _partition_table_ids) = create_partitioned_table(&database, table_name.clone(), &partition_specs) .await .expect("Failed to create partitioned table"); @@ -47,13 +49,10 @@ async fn partitioned_table_sync_succeeds_with_inherited_primary_keys() { let state_store = NotifyingStore::new(); let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); - let mut partition_notifications = Vec::new(); - for &partition_id in &partition_table_ids { - let notification = state_store - .notify_on_table_state(partition_id, TableReplicationPhaseType::SyncDone) - .await; - partition_notifications.push(notification); - } + // Register notification for initial copy completion. + let parent_sync_done = state_store + .notify_on_table_state(parent_table_id, TableReplicationPhaseType::SyncDone) + .await; let pipeline_id: PipelineId = random(); let mut pipeline = create_pipeline( @@ -66,9 +65,7 @@ async fn partitioned_table_sync_succeeds_with_inherited_primary_keys() { pipeline.start().await.unwrap(); - for notification in partition_notifications { - notification.notified().await; - } + parent_sync_done.notified().await; let _ = pipeline.shutdown_and_wait().await; @@ -83,43 +80,128 @@ async fn partitioned_table_sync_succeeds_with_inherited_primary_keys() { let table_states = state_store.get_table_replication_states().await; + assert!( + table_states.contains_key(&parent_table_id), + "Parent table should be tracked in state" + ); assert_eq!( table_states.len(), - partition_table_ids.len(), - "Expected {} partition states, but found {}", - partition_table_ids.len(), - table_states.len() + 1, + "Only the parent table should be tracked in state" + ); + + let parent_table_rows = table_rows + .iter() + .filter(|(table_id, _)| **table_id == parent_table_id) + .map(|(_, rows)| rows.len()) + .sum::(); + assert_eq!( + parent_table_rows, 3, + "Parent table should contain all rows when publishing via root" ); +} - for &partition_id in &partition_table_ids { - let state = table_states - .get(&partition_id) - .unwrap_or_else(|| panic!("Partition {} should have a state", partition_id)); - assert!( - matches!( - state.as_type(), - TableReplicationPhaseType::SyncDone | TableReplicationPhaseType::Ready - ), - "Partition {} should be in SyncDone or Ready state, but was in {:?}", - partition_id, - state.as_type() - ); - } +/// Initial copy completes and CDC streams new rows from newly added partitions. +#[tokio::test(flavor = "multi_thread")] +async fn partitioned_table_copy_and_streams_new_data_from_new_partition() { + init_test_tracing(); + let database = spawn_source_database().await; - assert!( - !table_states.contains_key(&parent_table_id), - "Parent table {} should not be tracked since parent partitioned tables are excluded from processing", - parent_table_id + let table_name = test_table_name("partitioned_events_late"); + let initial_partition_specs = [("p1", "from (1) to (100)"), ("p2", "from (100) to (200)")]; + + let (parent_table_id, _initial_partition_table_ids) = + create_partitioned_table(&database, table_name.clone(), &initial_partition_specs) + .await + .expect("Failed to create initial partitioned table"); + + database + .run_sql(&format!( + "insert into {} (data, partition_key) values \ + ('event1', 50), ('event2', 150)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + let publication_name = "test_partitioned_pub_late".to_string(); + database + .create_publication(&publication_name, std::slice::from_ref(&table_name)) + .await + .expect("Failed to create publication"); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + // Register notification for initial copy completion. + let parent_sync_done = state_store + .notify_on_table_state(parent_table_id, TableReplicationPhaseType::SyncDone) + .await; + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name, + state_store.clone(), + destination.clone(), ); + pipeline.start().await.unwrap(); + + parent_sync_done.notified().await; + + let new_partition_name = format!("{}_{}", table_name.name, "p3"); + let new_partition_qualified_name = format!("{}.{}", table_name.schema, new_partition_name); + database + .run_sql(&format!( + "create table {} partition of {} for values from (200) to (300)", + new_partition_qualified_name, + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + database + .run_sql(&format!( + "insert into {} (data, partition_key) values ('event3', 250)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + // Wait for CDC to deliver the new row. + let inserts_notify = destination + .wait_for_events_count(vec![(EventType::Insert, 1)]) + .await; + inserts_notify.notified().await; + + let _ = pipeline.shutdown_and_wait().await; + + let table_rows = destination.get_table_rows().await; + let total_rows: usize = table_rows.values().map(|rows| rows.len()).sum(); + assert_eq!( + total_rows, 2, + "Expected 2 rows synced from initial copy, got {}", + total_rows + ); + + let table_states = state_store.get_table_replication_states().await; + assert!(table_states.contains_key(&parent_table_id)); + assert_eq!(table_states.len(), 1); + let parent_table_rows = table_rows .iter() .filter(|(table_id, _)| **table_id == parent_table_id) .map(|(_, rows)| rows.len()) .sum::(); - assert_eq!( - parent_table_rows, 0, - "Parent table {} should have no data since it's excluded from processing and all data goes to partitions, but found {} rows", - parent_table_id, parent_table_rows - ); + assert_eq!(parent_table_rows, 2); + + let events = destination.get_events().await; + let grouped = group_events_by_type_and_table_id(&events); + let parent_inserts = grouped + .get(&(EventType::Insert, parent_table_id)) + .cloned() + .unwrap_or_default(); + assert_eq!(parent_inserts.len(), 1); } From 827275469b58c82d06fc8839e959dbcdd923ef3b Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Wed, 10 Sep 2025 09:35:30 -0400 Subject: [PATCH 08/18] clippy --- etl/src/replication/client.rs | 12 ++++-------- etl/tests/partitioned_table_test.rs | 6 ++---- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 437ab9662..771d60795 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -830,7 +830,7 @@ impl PgReplicationClient { } let union_query = selects.join(" union all "); - format!(r#"copy ({}) to stdout with (format text);"#, union_query) + format!(r#"copy ({union_query}) to stdout with (format text);"#) } else { let table_name = self.get_table_name(table_id).await?; format!( @@ -879,10 +879,7 @@ impl PgReplicationClient { /// Returns true if the given table id refers to a partitioned table (relkind = 'p'). async fn is_partitioned_table(&self, table_id: TableId) -> EtlResult { - let query = format!( - "select c.relkind from pg_class c where c.oid = {}", - table_id - ); + let query = format!("select c.relkind from pg_class c where c.oid = {table_id}"); for msg in self.client.simple_query(&query).await? { if let SimpleQueryMessage::Row(row) = msg { @@ -905,7 +902,7 @@ impl PgReplicationClient { with recursive parts(relid) as ( select i.inhrelid from pg_inherits i - where i.inhparent = {parent} + where i.inhparent = {parent_id} union all select i.inhrelid from pg_inherits i @@ -915,8 +912,7 @@ impl PgReplicationClient { from parts p left join pg_inherits i on i.inhparent = p.relid where i.inhrelid is null - "#, - parent = parent_id + "# ); let mut ids = Vec::new(); diff --git a/etl/tests/partitioned_table_test.rs b/etl/tests/partitioned_table_test.rs index 4b6e032cc..79c5e419f 100644 --- a/etl/tests/partitioned_table_test.rs +++ b/etl/tests/partitioned_table_test.rs @@ -74,8 +74,7 @@ async fn partitioned_table_copy_replicates_existing_data() { assert_eq!( total_rows, 3, - "Expected 3 rows synced (one per partition), but got {}", - total_rows + "Expected 3 rows synced (one per partition), but got {total_rows}" ); let table_states = state_store.get_table_replication_states().await; @@ -182,8 +181,7 @@ async fn partitioned_table_copy_and_streams_new_data_from_new_partition() { let total_rows: usize = table_rows.values().map(|rows| rows.len()).sum(); assert_eq!( total_rows, 2, - "Expected 2 rows synced from initial copy, got {}", - total_rows + "Expected 2 rows synced from initial copy, got {total_rows}" ); let table_states = state_store.get_table_replication_states().await; From 80ece8f771598f9d4d8c61ab07d822ee8409c727 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Mon, 22 Sep 2025 11:13:57 -0400 Subject: [PATCH 09/18] Merge conflict Signed-off-by: Abhi Agarwal --- etl/tests/partitioned_table_test.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/etl/tests/partitioned_table_test.rs b/etl/tests/partitioned_table_test.rs index 79c5e419f..e014b7945 100644 --- a/etl/tests/partitioned_table_test.rs +++ b/etl/tests/partitioned_table_test.rs @@ -51,7 +51,7 @@ async fn partitioned_table_copy_replicates_existing_data() { // Register notification for initial copy completion. let parent_sync_done = state_store - .notify_on_table_state(parent_table_id, TableReplicationPhaseType::SyncDone) + .notify_on_table_state_type(parent_table_id, TableReplicationPhaseType::SyncDone) .await; let pipeline_id: PipelineId = random(); @@ -134,7 +134,7 @@ async fn partitioned_table_copy_and_streams_new_data_from_new_partition() { // Register notification for initial copy completion. let parent_sync_done = state_store - .notify_on_table_state(parent_table_id, TableReplicationPhaseType::SyncDone) + .notify_on_table_state_type(parent_table_id, TableReplicationPhaseType::SyncDone) .await; let pipeline_id: PipelineId = random(); From f8be974cc608c00e58be927ceeb88d38bc4f688a Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Sun, 14 Sep 2025 21:55:12 -0400 Subject: [PATCH 10/18] Use cargo nextest to speed up runs Signed-off-by: Abhi Agarwal --- .config/nextest.toml | 2 ++ .github/workflows/ci.yml | 31 ++++++++++++++----------------- AGENTS.md | 4 ++-- justfile | 18 ++++++++++++++++++ 4 files changed, 36 insertions(+), 19 deletions(-) create mode 100644 .config/nextest.toml create mode 100644 justfile diff --git a/.config/nextest.toml b/.config/nextest.toml new file mode 100644 index 000000000..bd886b4d5 --- /dev/null +++ b/.config/nextest.toml @@ -0,0 +1,2 @@ +[profile.no-bigquery] +default-filter = "not (package(etl-destinations) and binary(/bigquery_pipeline/))" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 82bc89633..39b0ec00f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -90,19 +90,14 @@ jobs: sudo apt-get install libpq-dev -y ./etl-api/scripts/run_migrations.sh + - name: Install cargo-nextest + uses: taiki-e/install-action@v2 + with: + tool: cargo-nextest + - name: Run Tests run: | - cargo test \ - --workspace \ - --all-features \ - --no-fail-fast \ - --exclude etl-destinations \ - && \ - cargo test \ - -p etl-destinations \ - --no-default-features \ - --no-fail-fast \ - --features iceberg + cargo nextest run --all-features --no-fail-fast --profile no-bigquery test-full: name: Tests (Full) @@ -121,6 +116,8 @@ jobs: - name: Set up Rust uses: dtolnay/rust-toolchain@1.88.0 + with: + components: llvm-tools-preview - name: Cache Cargo uses: Swatinem/rust-cache@v2 @@ -149,8 +146,10 @@ jobs: sudo apt-get install libpq-dev -y ./etl-api/scripts/run_migrations.sh - - name: Install cargo-llvm-cov - uses: taiki-e/install-action@cargo-llvm-cov + - name: Install cargo-llvm-cov and cargo-nextest + uses: taiki-e/install-action@v2 + with: + tool: cargo-llvm-cov,cargo-nextest - name: Set up BigQuery Credentials run: | @@ -161,10 +160,8 @@ jobs: - name: Generate Code Coverage id: coverage run: | - cargo llvm-cov test \ - --workspace --no-fail-fast \ - --all-features \ - --lcov --output-path lcov.info + cargo llvm-cov nextest --no-fail-fast \ + --all-features --lcov --output-path lcov.info - name: Upload Coverage to Coveralls uses: coverallsapp/github-action@v2 diff --git a/AGENTS.md b/AGENTS.md index 5ff8ff8d9..eb3ecb45b 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -6,8 +6,8 @@ - Tests live per crate (`src` unit tests, `tests` integration); benches in `etl-benchmarks/benches/`. ## Build and Test -- Build: `cargo build --workspace --all-targets --all-features`. -- Lint/format: `cargo fmt`; `cargo clippy --all-targets --all-features -- -D warnings`. +- Build: `just build`. +- Lint/format: `just fmt; just lint`. - Use `ENABLE_TRACING=1` when running integration tests to see the logs. - Use `RUST_LOG=[log-level]` if you need to see the logs with a specific log level. diff --git a/justfile b/justfile new file mode 100644 index 000000000..76ab2710c --- /dev/null +++ b/justfile @@ -0,0 +1,18 @@ +build: + cargo build --workspace --all-targets --all-features + +fmt: + cargo fmt + +lint: + cargo clippy --all-targets --all-features -- -D warnings + +test: + cargo nextest run --all-features + +test-nobigquery: + cargo nextest run --all-features --profile no-bigquery + +install-tools: + cargo install cargo-nextest --locked + cargo install just --locked From 8d2fd3cf53fa8b2d543ec678091f6d17c370633e Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Sun, 14 Sep 2025 22:06:25 -0400 Subject: [PATCH 11/18] add retries to test for nondeterministic tests Signed-off-by: Abhi Agarwal --- .config/nextest.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.config/nextest.toml b/.config/nextest.toml index bd886b4d5..d015739f6 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -1,2 +1,5 @@ +[profile.default] +retries = 2 + [profile.no-bigquery] default-filter = "not (package(etl-destinations) and binary(/bigquery_pipeline/))" From 9bd46b568ea56f480936b3a7658d104d0e077478 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Sun, 14 Sep 2025 22:16:56 -0400 Subject: [PATCH 12/18] Add slow timeout Signed-off-by: Abhi Agarwal --- .config/nextest.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/.config/nextest.toml b/.config/nextest.toml index d015739f6..d0cc0d530 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -1,5 +1,6 @@ [profile.default] retries = 2 +slow-timeout = { period = "15s", terminate-after = 4 } [profile.no-bigquery] default-filter = "not (package(etl-destinations) and binary(/bigquery_pipeline/))" From 35240856adcd2a243591a7be8e7eddafcd9f325b Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Mon, 29 Sep 2025 15:23:34 -0400 Subject: [PATCH 13/18] Simplify COPY command Signed-off-by: Abhi Agarwal --- etl/src/replication/client.rs | 80 +++-------------------------------- 1 file changed, 6 insertions(+), 74 deletions(-) diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 771d60795..c8294b7d6 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -814,31 +814,12 @@ impl PgReplicationClient { .collect::>() .join(", "); - let copy_query = if self.is_partitioned_table(table_id).await? - && let leaf_partitions = self.get_leaf_partition_ids(table_id).await? - && !leaf_partitions.is_empty() - { - let mut selects = Vec::with_capacity(leaf_partitions.len()); - for child_id in leaf_partitions { - let child_name = self.get_table_name(child_id).await?; - let select = format!( - "select {} from {}", - column_list, - child_name.as_quoted_identifier() - ); - selects.push(select); - } - - let union_query = selects.join(" union all "); - format!(r#"copy ({union_query}) to stdout with (format text);"#) - } else { - let table_name = self.get_table_name(table_id).await?; - format!( - r#"copy {} ({}) to stdout with (format text);"#, - table_name.as_quoted_identifier(), - column_list - ) - }; + let table_name = self.get_table_name(table_id).await?; + let copy_query = format!( + r#"copy (select {} from {}) to stdout with (format text);"#, + column_list, + table_name.as_quoted_identifier() + ); // TODO: allow passing in format binary or text let stream = self.client.copy_out_simple(©_query).await?; @@ -876,53 +857,4 @@ impl PgReplicationClient { ) }) } - - /// Returns true if the given table id refers to a partitioned table (relkind = 'p'). - async fn is_partitioned_table(&self, table_id: TableId) -> EtlResult { - let query = format!("select c.relkind from pg_class c where c.oid = {table_id}"); - - for msg in self.client.simple_query(&query).await? { - if let SimpleQueryMessage::Row(row) = msg { - let relkind = Self::get_row_value::(&row, "relkind", "pg_class").await?; - return Ok(relkind == "p"); - } - } - - bail!( - ErrorKind::SourceSchemaError, - "Table not found", - format!("Table not found in database (table id: {})", table_id) - ); - } - - /// Returns all leaf partition OIDs for a partitioned table. - async fn get_leaf_partition_ids(&self, parent_id: TableId) -> EtlResult> { - let query = format!( - r#" - with recursive parts(relid) as ( - select i.inhrelid - from pg_inherits i - where i.inhparent = {parent_id} - union all - select i.inhrelid - from pg_inherits i - join parts p on p.relid = i.inhparent - ) - select p.relid as oid - from parts p - left join pg_inherits i on i.inhparent = p.relid - where i.inhrelid is null - "# - ); - - let mut ids = Vec::new(); - for msg in self.client.simple_query(&query).await? { - if let SimpleQueryMessage::Row(row) = msg { - let id = Self::get_row_value::(&row, "oid", "pg_inherits").await?; - ids.push(id); - } - } - - Ok(ids) - } } From 2089cc00bc788b58d05aa5d30be719ebcd6b917e Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Mon, 29 Sep 2025 15:51:22 -0400 Subject: [PATCH 14/18] Use recursive query to get oids and write test Signed-off-by: Abhi Agarwal --- etl/src/replication/client.rs | 74 +++++++++++++++++++---------------- etl/tests/replication.rs | 43 +++++++++++++++++++- 2 files changed, 81 insertions(+), 36 deletions(-) diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index c8294b7d6..c66eb2485 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -410,47 +410,53 @@ impl PgReplicationClient { &self, publication_name: &str, ) -> EtlResult> { - // Prefer pg_publication_rel (explicit tables in the publication, including partition roots) - let rel_query = format!( - r#"select r.prrelid as oid - from pg_publication_rel r - join pg_publication p on p.oid = r.prpubid - where p.pubname = {}"#, - quote_literal(publication_name) - ); - - let mut table_ids = vec![]; - let mut has_rows = false; - for msg in self.client.simple_query(&rel_query).await? { - if let SimpleQueryMessage::Row(row) = msg { - has_rows = true; - let table_id = - Self::get_row_value::(&row, "oid", "pg_publication_rel").await?; - table_ids.push(table_id); - } - } - - if has_rows { - return Ok(table_ids); - } - - // Fallback to pg_publication_tables (expanded view), used for publications like FOR ALL TABLES - let publication_query = format!( - "select c.oid from pg_publication_tables pt - join pg_class c on c.relname = pt.tablename - join pg_namespace n on n.oid = c.relnamespace AND n.nspname = pt.schemaname - where pt.pubname = {};", - quote_literal(publication_name) + let query = format!( + r#" + with recursive has_rel as ( + select exists( + select 1 + from pg_publication_rel r + join pg_publication p on p.oid = r.prpubid + where p.pubname = {pub} + ) as has + ), + pub_tables as ( + select r.prrelid as oid + from pg_publication_rel r + join pg_publication p on p.oid = r.prpubid + where p.pubname = {pub} and (select has from has_rel) + union all + select c.oid + from pg_publication_tables pt + join pg_class c on c.relname = pt.tablename + join pg_namespace n on n.oid = c.relnamespace and n.nspname = pt.schemaname + where pt.pubname = {pub} and not (select has from has_rel) + ), + recurse(relid) as ( + select oid from pub_tables + union all + select i.inhparent + from pg_inherits i + join recurse r on r.relid = i.inhrelid + ) + select distinct relid as oid + from recurse r + where not exists ( + select 1 from pg_inherits i where i.inhrelid = r.relid + ); + "#, + pub = quote_literal(publication_name) ); - for msg in self.client.simple_query(&publication_query).await? { + let mut roots = vec![]; + for msg in self.client.simple_query(&query).await? { if let SimpleQueryMessage::Row(row) = msg { let table_id = Self::get_row_value::(&row, "oid", "pg_class").await?; - table_ids.push(table_id); + roots.push(table_id); } } - Ok(table_ids) + Ok(roots) } /// Starts a logical replication stream from the specified publication and slot. diff --git a/etl/tests/replication.rs b/etl/tests/replication.rs index 67092271c..c8ae76027 100644 --- a/etl/tests/replication.rs +++ b/etl/tests/replication.rs @@ -1,10 +1,13 @@ #![cfg(feature = "test-utils")] +use std::collections::HashSet; + use etl::error::ErrorKind; use etl::replication::client::PgReplicationClient; use etl::test_utils::database::{spawn_source_database, test_table_name}; use etl::test_utils::pipeline::test_slot_name; use etl::test_utils::table::assert_table_schema; +use etl::test_utils::test_schema::create_partitioned_table; use etl_postgres::tokio::test_utils::{TableModification, id_column_schema}; use etl_postgres::types::ColumnSchema; use etl_telemetry::tracing::init_test_tracing; @@ -405,11 +408,47 @@ async fn test_publication_creation_and_check() { ); // We check the table ids of the tables in the publication. - let table_ids = parent_client + let table_ids: HashSet<_> = parent_client .get_publication_table_ids("my_publication") + .await + .unwrap() + .into_iter() + .collect(); + assert_eq!(table_ids, HashSet::from([table_1_id, table_2_id])); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_publication_table_ids_collapse_partitioned_root() { + init_test_tracing(); + let database = spawn_source_database().await; + + let client = PgReplicationClient::connect(database.config.clone()) .await .unwrap(); - assert_eq!(table_ids, vec![table_1_id, table_2_id]); + + // We create a partitioned parent with two child partitions. + let table_name = test_table_name("part_parent"); + let (parent_table_id, _children) = create_partitioned_table( + &database, + table_name.clone(), + &[("p1", "from (1) to (100)"), ("p2", "from (100) to (200)")], + ) + .await + .unwrap(); + + let publication_name = "pub_part_root"; + database + .create_publication(publication_name, std::slice::from_ref(&table_name)) + .await + .unwrap(); + + let id = client + .get_publication_table_ids(publication_name) + .await + .unwrap(); + + // We expect to get only the parent table id. + assert_eq!(id, vec![parent_table_id]); } #[tokio::test(flavor = "multi_thread")] From 5ef0c6dd1820b7d17461e6cf2461504bbbedf05b Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Mon, 29 Sep 2025 16:14:37 -0400 Subject: [PATCH 15/18] Remove unnecessary namespace check Signed-off-by: Abhi Agarwal --- etl/src/replication/client.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index c66eb2485..0a81c6619 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -429,7 +429,6 @@ impl PgReplicationClient { select c.oid from pg_publication_tables pt join pg_class c on c.relname = pt.tablename - join pg_namespace n on n.oid = c.relnamespace and n.nspname = pt.schemaname where pt.pubname = {pub} and not (select has from has_rel) ), recurse(relid) as ( From 82436de7076d825508e6ab277cc156a4989e8380 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Mon, 29 Sep 2025 16:41:29 -0400 Subject: [PATCH 16/18] Add test about dropping partitions Signed-off-by: Abhi Agarwal --- etl/tests/partitioned_table_test.rs | 95 +++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/etl/tests/partitioned_table_test.rs b/etl/tests/partitioned_table_test.rs index e014b7945..bda267f7f 100644 --- a/etl/tests/partitioned_table_test.rs +++ b/etl/tests/partitioned_table_test.rs @@ -203,3 +203,98 @@ async fn partitioned_table_copy_and_streams_new_data_from_new_partition() { .unwrap_or_default(); assert_eq!(parent_inserts.len(), 1); } + +/// Dropping a child partition must not emit DELETE/TRUNCATE events. +#[tokio::test(flavor = "multi_thread")] +async fn partition_drop_does_not_emit_delete_or_truncate() { + init_test_tracing(); + let database = spawn_source_database().await; + + let table_name = test_table_name("partitioned_events_drop"); + let partition_specs = [("p1", "from (1) to (100)"), ("p2", "from (100) to (200)")]; + + let (parent_table_id, _partition_table_ids) = + create_partitioned_table(&database, table_name.clone(), &partition_specs) + .await + .expect("Failed to create partitioned table"); + + database + .run_sql(&format!( + "insert into {} (data, partition_key) values \ + ('event1', 50), ('event2', 150)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + let publication_name = "test_partitioned_pub_drop".to_string(); + database + .create_publication(&publication_name, std::slice::from_ref(&table_name)) + .await + .expect("Failed to create publication"); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + let parent_sync_done = state_store + .notify_on_table_state_type(parent_table_id, TableReplicationPhaseType::SyncDone) + .await; + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name, + state_store.clone(), + destination.clone(), + ); + + pipeline.start().await.unwrap(); + parent_sync_done.notified().await; + + let events_before = destination.get_events().await; + let grouped_before = group_events_by_type_and_table_id(&events_before); + let del_before = grouped_before + .get(&(EventType::Delete, parent_table_id)) + .map(|v| v.len()) + .unwrap_or(0); + let trunc_before = grouped_before + .get(&(EventType::Truncate, parent_table_id)) + .map(|v| v.len()) + .unwrap_or(0); + + // Detach and drop one child partition (DDL should not generate DML events) + let child_p1_name = format!("{}_{}", table_name.name, "p1"); + let child_p1_qualified = format!("{}.{}", table_name.schema, child_p1_name); + database + .run_sql(&format!( + "alter table {} detach partition {}", + table_name.as_quoted_identifier(), + child_p1_qualified + )) + .await + .unwrap(); + database + .run_sql(&format!("drop table {}", child_p1_qualified)) + .await + .unwrap(); + + let _ = pipeline.shutdown_and_wait().await; + + let events_after = destination.get_events().await; + let grouped_after = group_events_by_type_and_table_id(&events_after); + let del_after = grouped_after + .get(&(EventType::Delete, parent_table_id)) + .map(|v| v.len()) + .unwrap_or(0); + let trunc_after = grouped_after + .get(&(EventType::Truncate, parent_table_id)) + .map(|v| v.len()) + .unwrap_or(0); + + assert_eq!(del_after, del_before, "Partition drop must not emit DELETE events"); + assert_eq!( + trunc_after, trunc_before, + "Partition drop must not emit TRUNCATE events" + ); +} From c00be91b67990257c182633b99e67486a1cfe910 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Mon, 29 Sep 2025 16:47:07 -0400 Subject: [PATCH 17/18] fmt Signed-off-by: Abhi Agarwal --- etl/tests/partitioned_table_test.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/etl/tests/partitioned_table_test.rs b/etl/tests/partitioned_table_test.rs index bda267f7f..085612bb9 100644 --- a/etl/tests/partitioned_table_test.rs +++ b/etl/tests/partitioned_table_test.rs @@ -292,7 +292,10 @@ async fn partition_drop_does_not_emit_delete_or_truncate() { .map(|v| v.len()) .unwrap_or(0); - assert_eq!(del_after, del_before, "Partition drop must not emit DELETE events"); + assert_eq!( + del_after, del_before, + "Partition drop must not emit DELETE events" + ); assert_eq!( trunc_after, trunc_before, "Partition drop must not emit TRUNCATE events" From c41ed851179c5f4faa22b64f73ac85a04ed9c6ae Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Mon, 29 Sep 2025 16:51:14 -0400 Subject: [PATCH 18/18] Clippy Signed-off-by: Abhi Agarwal --- etl/tests/partitioned_table_test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etl/tests/partitioned_table_test.rs b/etl/tests/partitioned_table_test.rs index 085612bb9..40d1ad5b9 100644 --- a/etl/tests/partitioned_table_test.rs +++ b/etl/tests/partitioned_table_test.rs @@ -275,7 +275,7 @@ async fn partition_drop_does_not_emit_delete_or_truncate() { .await .unwrap(); database - .run_sql(&format!("drop table {}", child_p1_qualified)) + .run_sql(&format!("drop table {child_p1_qualified}")) .await .unwrap();