diff --git a/.config/nextest.toml b/.config/nextest.toml new file mode 100644 index 000000000..d0cc0d530 --- /dev/null +++ b/.config/nextest.toml @@ -0,0 +1,6 @@ +[profile.default] +retries = 2 +slow-timeout = { period = "15s", terminate-after = 4 } + +[profile.no-bigquery] +default-filter = "not (package(etl-destinations) and binary(/bigquery_pipeline/))" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 82bc89633..39b0ec00f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -90,19 +90,14 @@ jobs: sudo apt-get install libpq-dev -y ./etl-api/scripts/run_migrations.sh + - name: Install cargo-nextest + uses: taiki-e/install-action@v2 + with: + tool: cargo-nextest + - name: Run Tests run: | - cargo test \ - --workspace \ - --all-features \ - --no-fail-fast \ - --exclude etl-destinations \ - && \ - cargo test \ - -p etl-destinations \ - --no-default-features \ - --no-fail-fast \ - --features iceberg + cargo nextest run --all-features --no-fail-fast --profile no-bigquery test-full: name: Tests (Full) @@ -121,6 +116,8 @@ jobs: - name: Set up Rust uses: dtolnay/rust-toolchain@1.88.0 + with: + components: llvm-tools-preview - name: Cache Cargo uses: Swatinem/rust-cache@v2 @@ -149,8 +146,10 @@ jobs: sudo apt-get install libpq-dev -y ./etl-api/scripts/run_migrations.sh - - name: Install cargo-llvm-cov - uses: taiki-e/install-action@cargo-llvm-cov + - name: Install cargo-llvm-cov and cargo-nextest + uses: taiki-e/install-action@v2 + with: + tool: cargo-llvm-cov,cargo-nextest - name: Set up BigQuery Credentials run: | @@ -161,10 +160,8 @@ jobs: - name: Generate Code Coverage id: coverage run: | - cargo llvm-cov test \ - --workspace --no-fail-fast \ - --all-features \ - --lcov --output-path lcov.info + cargo llvm-cov nextest --no-fail-fast \ + --all-features --lcov --output-path lcov.info - name: Upload Coverage to Coveralls uses: coverallsapp/github-action@v2 diff --git a/AGENTS.md b/AGENTS.md index 5ff8ff8d9..eb3ecb45b 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -6,8 +6,8 @@ - Tests live per crate (`src` unit tests, `tests` integration); benches in `etl-benchmarks/benches/`. ## Build and Test -- Build: `cargo build --workspace --all-targets --all-features`. -- Lint/format: `cargo fmt`; `cargo clippy --all-targets --all-features -- -D warnings`. +- Build: `just build`. +- Lint/format: `just fmt; just lint`. - Use `ENABLE_TRACING=1` when running integration tests to see the logs. - Use `RUST_LOG=[log-level]` if you need to see the logs with a specific log level. diff --git a/docs/how-to/configure-postgres.md b/docs/how-to/configure-postgres.md index fe57cd07b..a525132db 100644 --- a/docs/how-to/configure-postgres.md +++ b/docs/how-to/configure-postgres.md @@ -107,15 +107,17 @@ Publications define which tables and operations to replicate. ```sql -- Create publication for specific tables -CREATE PUBLICATION my_publication FOR TABLE users, orders; +CREATE PUBLICATION my_publication FOR TABLE users, orders WITH (publish_via_partition_root = true); -- Create publication for all tables (use with caution) -CREATE PUBLICATION all_tables FOR ALL TABLES; +CREATE PUBLICATION all_tables FOR ALL TABLES WITH (publish_via_partition_root = true); -- Include only specific operations -CREATE PUBLICATION inserts_only FOR TABLE users WITH (publish = 'insert'); +CREATE PUBLICATION inserts_only FOR TABLE users WITH (publish = 'insert') WITH (publish_via_partition_root = true); ``` +`publish_via_partition_root` allows Postgres to treat [partitioned tables](https://www.postgresql.org/docs/current/sql-createpublication.html#SQL-CREATEPUBLICATION-PARAMS-WITH-PUBLISH-VIA-PARTITION-ROOT) as one table in the eyes of logical replication. + ### Managing Publications ```sql diff --git a/etl-api/src/db/publications.rs b/etl-api/src/db/publications.rs index 38a4ab575..5c01ac859 100644 --- a/etl-api/src/db/publications.rs +++ b/etl-api/src/db/publications.rs @@ -43,6 +43,9 @@ pub async fn create_publication( } } + // Ensure partitioned tables publish via ancestor/root schema for logical replication + query.push_str(" with (publish_via_partition_root = true)"); + pool.execute(query.as_str()).await?; Ok(()) } diff --git a/etl-postgres/src/tokio/test_utils.rs b/etl-postgres/src/tokio/test_utils.rs index 6692e486b..7aad2b62b 100644 --- a/etl-postgres/src/tokio/test_utils.rs +++ b/etl-postgres/src/tokio/test_utils.rs @@ -61,7 +61,7 @@ impl PgDatabase { .collect::>(); let create_publication_query = format!( - "create publication {} for table {}", + "create publication {} for table {} with (publish_via_partition_root = true)", publication_name, table_names.join(", ") ); diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 722437505..0a81c6619 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -410,24 +410,52 @@ impl PgReplicationClient { &self, publication_name: &str, ) -> EtlResult> { - let publication_query = format!( - "select c.oid from pg_publication_tables pt - join pg_class c on c.relname = pt.tablename - join pg_namespace n on n.oid = c.relnamespace AND n.nspname = pt.schemaname - where pt.pubname = {};", - quote_literal(publication_name) + let query = format!( + r#" + with recursive has_rel as ( + select exists( + select 1 + from pg_publication_rel r + join pg_publication p on p.oid = r.prpubid + where p.pubname = {pub} + ) as has + ), + pub_tables as ( + select r.prrelid as oid + from pg_publication_rel r + join pg_publication p on p.oid = r.prpubid + where p.pubname = {pub} and (select has from has_rel) + union all + select c.oid + from pg_publication_tables pt + join pg_class c on c.relname = pt.tablename + where pt.pubname = {pub} and not (select has from has_rel) + ), + recurse(relid) as ( + select oid from pub_tables + union all + select i.inhparent + from pg_inherits i + join recurse r on r.relid = i.inhrelid + ) + select distinct relid as oid + from recurse r + where not exists ( + select 1 from pg_inherits i where i.inhrelid = r.relid + ); + "#, + pub = quote_literal(publication_name) ); - let mut table_ids = vec![]; - for msg in self.client.simple_query(&publication_query).await? { + let mut roots = vec![]; + for msg in self.client.simple_query(&query).await? { if let SimpleQueryMessage::Row(row) = msg { - // For the sake of simplicity, we refer to the table oid as table id. let table_id = Self::get_row_value::(&row, "oid", "pg_class").await?; - table_ids.push(table_id); + roots.push(table_id); } } - Ok(table_ids) + Ok(roots) } /// Starts a logical replication stream from the specified publication and slot. @@ -661,7 +689,8 @@ impl PgReplicationClient { when 0 then true else (a.attnum in (select * from pub_attrs)) end - )", + )" + .to_string(), ) } else { // Postgres 14 or earlier or unknown, fallback to no column-level filtering @@ -676,20 +705,65 @@ impl PgReplicationClient { )", publication = quote_literal(publication), ), - "and (select count(*) from pub_table) > 0", + format!( + "and ((select count(*) from pub_table) > 0 or exists( + -- Also allow if parent table is in publication (for partitioned tables) + select 1 from pg_inherits i + join pg_publication_rel r on r.prrelid = i.inhparent + join pg_publication p on p.oid = r.prpubid + where i.inhrelid = {table_id} and p.pubname = {publication} + ))", + publication = quote_literal(publication), + ), ) } } else { - ("".into(), "") + ("".to_string(), "".to_string()) + }; + + let has_pub_cte = !pub_cte.is_empty(); + + let cte_prefix = if has_pub_cte { + // If there's already a pub_cte WITH clause, add our CTEs to it with a comma + format!("{pub_cte},") + } else { + // If no pub_cte, start our own WITH clause (no need for RECURSIVE) + "with ".to_string() }; let column_info_query = format!( - "{pub_cte} - select a.attname, + "{cte_prefix} + -- Find direct parent of current table (if it's a partition) + direct_parent as ( + select i.inhparent as parent_oid + from pg_inherits i + where i.inhrelid = {table_id}::oid + limit 1 + ), + -- Get parent table's primary key columns + parent_pk_cols as ( + select array_agg(a.attname order by x.n) as pk_column_names + from pg_constraint con + join unnest(con.conkey) with ordinality as x(attnum, n) on true + join pg_attribute a on a.attrelid = con.conrelid and a.attnum = x.attnum + join direct_parent dp on con.conrelid = dp.parent_oid + where con.contype = 'p' + group by con.conname + ) + SELECT a.attname, a.atttypid, a.atttypmod, a.attnotnull, - coalesce(i.indisprimary, false) as primary + case + -- Direct primary key on this relation + when coalesce(i.indisprimary, false) = true then true + -- Inherit primary key from parent partitioned table if column name matches + when exists ( + select 1 from parent_pk_cols pk + where a.attname = any(pk.pk_column_names) + ) then true + else false + end as primary from pg_attribute a left join pg_index i on a.attrelid = i.indrelid @@ -746,16 +820,14 @@ impl PgReplicationClient { .join(", "); let table_name = self.get_table_name(table_id).await?; - - // TODO: allow passing in format binary or text let copy_query = format!( - r#"copy {} ({}) to stdout with (format text);"#, - table_name.as_quoted_identifier(), - column_list + r#"copy (select {} from {}) to stdout with (format text);"#, + column_list, + table_name.as_quoted_identifier() ); + // TODO: allow passing in format binary or text let stream = self.client.copy_out_simple(©_query).await?; - Ok(stream) } diff --git a/etl/src/test_utils/test_schema.rs b/etl/src/test_utils/test_schema.rs index 8faa6449b..78b9ed670 100644 --- a/etl/src/test_utils/test_schema.rs +++ b/etl/src/test_utils/test_schema.rs @@ -127,6 +127,77 @@ pub async fn setup_test_database_schema( } } +/// Creates a partitioned table with the given name and partitions. +/// +/// This function creates: +/// 1. A parent partitioned table with a primary key +/// 2. Several child partitions based on the provided partition specifications +/// +/// Returns the table ID of the parent table and a list of partition table IDs. +pub async fn create_partitioned_table( + database: &PgDatabase, + table_name: TableName, + partition_specs: &[(&str, &str)], // (partition_name, partition_constraint) +) -> Result<(TableId, Vec), tokio_postgres::Error> { + let create_parent_query = format!( + "create table {} ( + id bigserial, + data text NOT NULL, + partition_key integer NOT NULL, + primary key (id, partition_key) + ) partition by range (partition_key)", + table_name.as_quoted_identifier() + ); + + database.run_sql(&create_parent_query).await?; + + let parent_row = database + .client + .as_ref() + .unwrap() + .query_one( + "select c.oid from pg_class c join pg_namespace n on n.oid = c.relnamespace + where n.nspname = $1 and c.relname = $2", + &[&table_name.schema, &table_name.name], + ) + .await?; + + let parent_table_id: TableId = parent_row.get(0); + let mut partition_table_ids = Vec::new(); + + for (partition_name, partition_constraint) in partition_specs { + let partition_table_name = TableName::new( + table_name.schema.clone(), + format!("{}_{}", table_name.name, partition_name), + ); + + let create_partition_query = format!( + "create table {} partition of {} for values {}", + partition_table_name.as_quoted_identifier(), + table_name.as_quoted_identifier(), + partition_constraint + ); + + database.run_sql(&create_partition_query).await?; + + let partition_row = database + .client + .as_ref() + .unwrap() + .query_one( + "select c.oid from pg_class c join pg_namespace n on n.oid = c.relnamespace + where n.nspname = $1 and c.relname = $2", + &[&partition_table_name.schema, &partition_table_name.name], + ) + .await?; + + let partition_table_id: TableId = partition_row.get(0); + partition_table_ids.push(partition_table_id); + } + + Ok((parent_table_id, partition_table_ids)) +} + /// Inserts users data into the database for testing purposes. pub async fn insert_users_data( client: &mut PgDatabase, diff --git a/etl/tests/partitioned_table_test.rs b/etl/tests/partitioned_table_test.rs new file mode 100644 index 000000000..40d1ad5b9 --- /dev/null +++ b/etl/tests/partitioned_table_test.rs @@ -0,0 +1,303 @@ +#![cfg(feature = "test-utils")] + +use etl::destination::memory::MemoryDestination; +use etl::state::table::TableReplicationPhaseType; +use etl::test_utils::database::{spawn_source_database, test_table_name}; +use etl::test_utils::event::group_events_by_type_and_table_id; +use etl::test_utils::notify::NotifyingStore; +use etl::test_utils::pipeline::create_pipeline; +use etl::test_utils::test_destination_wrapper::TestDestinationWrapper; +use etl::test_utils::test_schema::create_partitioned_table; +use etl::types::EventType; +use etl::types::PipelineId; +use etl_telemetry::tracing::init_test_tracing; +use rand::random; + +/// Initial copy for a partitioned table (published via root) copies all existing rows. +#[tokio::test(flavor = "multi_thread")] +async fn partitioned_table_copy_replicates_existing_data() { + init_test_tracing(); + let database = spawn_source_database().await; + + let table_name = test_table_name("partitioned_events"); + let partition_specs = [ + ("p1", "from (1) to (100)"), + ("p2", "from (100) to (200)"), + ("p3", "from (200) to (300)"), + ]; + + let (parent_table_id, _partition_table_ids) = + create_partitioned_table(&database, table_name.clone(), &partition_specs) + .await + .expect("Failed to create partitioned table"); + + database + .run_sql(&format!( + "insert into {} (data, partition_key) values + ('event1', 50), ('event2', 150), ('event3', 250)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + let publication_name = "test_partitioned_pub".to_string(); + database + .create_publication(&publication_name, std::slice::from_ref(&table_name)) + .await + .expect("Failed to create publication"); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + // Register notification for initial copy completion. + let parent_sync_done = state_store + .notify_on_table_state_type(parent_table_id, TableReplicationPhaseType::SyncDone) + .await; + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name, + state_store.clone(), + destination.clone(), + ); + + pipeline.start().await.unwrap(); + + parent_sync_done.notified().await; + + let _ = pipeline.shutdown_and_wait().await; + + let table_rows = destination.get_table_rows().await; + let total_rows: usize = table_rows.values().map(|rows| rows.len()).sum(); + + assert_eq!( + total_rows, 3, + "Expected 3 rows synced (one per partition), but got {total_rows}" + ); + + let table_states = state_store.get_table_replication_states().await; + + assert!( + table_states.contains_key(&parent_table_id), + "Parent table should be tracked in state" + ); + assert_eq!( + table_states.len(), + 1, + "Only the parent table should be tracked in state" + ); + + let parent_table_rows = table_rows + .iter() + .filter(|(table_id, _)| **table_id == parent_table_id) + .map(|(_, rows)| rows.len()) + .sum::(); + assert_eq!( + parent_table_rows, 3, + "Parent table should contain all rows when publishing via root" + ); +} + +/// Initial copy completes and CDC streams new rows from newly added partitions. +#[tokio::test(flavor = "multi_thread")] +async fn partitioned_table_copy_and_streams_new_data_from_new_partition() { + init_test_tracing(); + let database = spawn_source_database().await; + + let table_name = test_table_name("partitioned_events_late"); + let initial_partition_specs = [("p1", "from (1) to (100)"), ("p2", "from (100) to (200)")]; + + let (parent_table_id, _initial_partition_table_ids) = + create_partitioned_table(&database, table_name.clone(), &initial_partition_specs) + .await + .expect("Failed to create initial partitioned table"); + + database + .run_sql(&format!( + "insert into {} (data, partition_key) values \ + ('event1', 50), ('event2', 150)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + let publication_name = "test_partitioned_pub_late".to_string(); + database + .create_publication(&publication_name, std::slice::from_ref(&table_name)) + .await + .expect("Failed to create publication"); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + // Register notification for initial copy completion. + let parent_sync_done = state_store + .notify_on_table_state_type(parent_table_id, TableReplicationPhaseType::SyncDone) + .await; + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name, + state_store.clone(), + destination.clone(), + ); + + pipeline.start().await.unwrap(); + + parent_sync_done.notified().await; + + let new_partition_name = format!("{}_{}", table_name.name, "p3"); + let new_partition_qualified_name = format!("{}.{}", table_name.schema, new_partition_name); + database + .run_sql(&format!( + "create table {} partition of {} for values from (200) to (300)", + new_partition_qualified_name, + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + database + .run_sql(&format!( + "insert into {} (data, partition_key) values ('event3', 250)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + // Wait for CDC to deliver the new row. + let inserts_notify = destination + .wait_for_events_count(vec![(EventType::Insert, 1)]) + .await; + inserts_notify.notified().await; + + let _ = pipeline.shutdown_and_wait().await; + + let table_rows = destination.get_table_rows().await; + let total_rows: usize = table_rows.values().map(|rows| rows.len()).sum(); + assert_eq!( + total_rows, 2, + "Expected 2 rows synced from initial copy, got {total_rows}" + ); + + let table_states = state_store.get_table_replication_states().await; + assert!(table_states.contains_key(&parent_table_id)); + assert_eq!(table_states.len(), 1); + + let parent_table_rows = table_rows + .iter() + .filter(|(table_id, _)| **table_id == parent_table_id) + .map(|(_, rows)| rows.len()) + .sum::(); + assert_eq!(parent_table_rows, 2); + + let events = destination.get_events().await; + let grouped = group_events_by_type_and_table_id(&events); + let parent_inserts = grouped + .get(&(EventType::Insert, parent_table_id)) + .cloned() + .unwrap_or_default(); + assert_eq!(parent_inserts.len(), 1); +} + +/// Dropping a child partition must not emit DELETE/TRUNCATE events. +#[tokio::test(flavor = "multi_thread")] +async fn partition_drop_does_not_emit_delete_or_truncate() { + init_test_tracing(); + let database = spawn_source_database().await; + + let table_name = test_table_name("partitioned_events_drop"); + let partition_specs = [("p1", "from (1) to (100)"), ("p2", "from (100) to (200)")]; + + let (parent_table_id, _partition_table_ids) = + create_partitioned_table(&database, table_name.clone(), &partition_specs) + .await + .expect("Failed to create partitioned table"); + + database + .run_sql(&format!( + "insert into {} (data, partition_key) values \ + ('event1', 50), ('event2', 150)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + let publication_name = "test_partitioned_pub_drop".to_string(); + database + .create_publication(&publication_name, std::slice::from_ref(&table_name)) + .await + .expect("Failed to create publication"); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + let parent_sync_done = state_store + .notify_on_table_state_type(parent_table_id, TableReplicationPhaseType::SyncDone) + .await; + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name, + state_store.clone(), + destination.clone(), + ); + + pipeline.start().await.unwrap(); + parent_sync_done.notified().await; + + let events_before = destination.get_events().await; + let grouped_before = group_events_by_type_and_table_id(&events_before); + let del_before = grouped_before + .get(&(EventType::Delete, parent_table_id)) + .map(|v| v.len()) + .unwrap_or(0); + let trunc_before = grouped_before + .get(&(EventType::Truncate, parent_table_id)) + .map(|v| v.len()) + .unwrap_or(0); + + // Detach and drop one child partition (DDL should not generate DML events) + let child_p1_name = format!("{}_{}", table_name.name, "p1"); + let child_p1_qualified = format!("{}.{}", table_name.schema, child_p1_name); + database + .run_sql(&format!( + "alter table {} detach partition {}", + table_name.as_quoted_identifier(), + child_p1_qualified + )) + .await + .unwrap(); + database + .run_sql(&format!("drop table {child_p1_qualified}")) + .await + .unwrap(); + + let _ = pipeline.shutdown_and_wait().await; + + let events_after = destination.get_events().await; + let grouped_after = group_events_by_type_and_table_id(&events_after); + let del_after = grouped_after + .get(&(EventType::Delete, parent_table_id)) + .map(|v| v.len()) + .unwrap_or(0); + let trunc_after = grouped_after + .get(&(EventType::Truncate, parent_table_id)) + .map(|v| v.len()) + .unwrap_or(0); + + assert_eq!( + del_after, del_before, + "Partition drop must not emit DELETE events" + ); + assert_eq!( + trunc_after, trunc_before, + "Partition drop must not emit TRUNCATE events" + ); +} diff --git a/etl/tests/replication.rs b/etl/tests/replication.rs index 67092271c..c8ae76027 100644 --- a/etl/tests/replication.rs +++ b/etl/tests/replication.rs @@ -1,10 +1,13 @@ #![cfg(feature = "test-utils")] +use std::collections::HashSet; + use etl::error::ErrorKind; use etl::replication::client::PgReplicationClient; use etl::test_utils::database::{spawn_source_database, test_table_name}; use etl::test_utils::pipeline::test_slot_name; use etl::test_utils::table::assert_table_schema; +use etl::test_utils::test_schema::create_partitioned_table; use etl_postgres::tokio::test_utils::{TableModification, id_column_schema}; use etl_postgres::types::ColumnSchema; use etl_telemetry::tracing::init_test_tracing; @@ -405,11 +408,47 @@ async fn test_publication_creation_and_check() { ); // We check the table ids of the tables in the publication. - let table_ids = parent_client + let table_ids: HashSet<_> = parent_client .get_publication_table_ids("my_publication") + .await + .unwrap() + .into_iter() + .collect(); + assert_eq!(table_ids, HashSet::from([table_1_id, table_2_id])); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_publication_table_ids_collapse_partitioned_root() { + init_test_tracing(); + let database = spawn_source_database().await; + + let client = PgReplicationClient::connect(database.config.clone()) .await .unwrap(); - assert_eq!(table_ids, vec![table_1_id, table_2_id]); + + // We create a partitioned parent with two child partitions. + let table_name = test_table_name("part_parent"); + let (parent_table_id, _children) = create_partitioned_table( + &database, + table_name.clone(), + &[("p1", "from (1) to (100)"), ("p2", "from (100) to (200)")], + ) + .await + .unwrap(); + + let publication_name = "pub_part_root"; + database + .create_publication(publication_name, std::slice::from_ref(&table_name)) + .await + .unwrap(); + + let id = client + .get_publication_table_ids(publication_name) + .await + .unwrap(); + + // We expect to get only the parent table id. + assert_eq!(id, vec![parent_table_id]); } #[tokio::test(flavor = "multi_thread")] diff --git a/justfile b/justfile new file mode 100644 index 000000000..76ab2710c --- /dev/null +++ b/justfile @@ -0,0 +1,18 @@ +build: + cargo build --workspace --all-targets --all-features + +fmt: + cargo fmt + +lint: + cargo clippy --all-targets --all-features -- -D warnings + +test: + cargo nextest run --all-features + +test-nobigquery: + cargo nextest run --all-features --profile no-bigquery + +install-tools: + cargo install cargo-nextest --locked + cargo install just --locked