Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[profile.default]
retries = 2
slow-timeout = { period = "15s", terminate-after = 4 }

[profile.no-bigquery]
default-filter = "not (package(etl-destinations) and binary(/bigquery_pipeline/))"
31 changes: 14 additions & 17 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,14 @@ jobs:
sudo apt-get install libpq-dev -y
./etl-api/scripts/run_migrations.sh

- name: Install cargo-nextest
uses: taiki-e/install-action@v2
with:
tool: cargo-nextest

- name: Run Tests
run: |
cargo test \
--workspace \
--all-features \
--no-fail-fast \
--exclude etl-destinations \
&& \
cargo test \
-p etl-destinations \
--no-default-features \
--no-fail-fast \
--features iceberg
cargo nextest run --all-features --no-fail-fast --profile no-bigquery

test-full:
name: Tests (Full)
Expand All @@ -121,6 +116,8 @@ jobs:

- name: Set up Rust
uses: dtolnay/[email protected]
with:
components: llvm-tools-preview

- name: Cache Cargo
uses: Swatinem/rust-cache@v2
Expand Down Expand Up @@ -149,8 +146,10 @@ jobs:
sudo apt-get install libpq-dev -y
./etl-api/scripts/run_migrations.sh

- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- name: Install cargo-llvm-cov and cargo-nextest
uses: taiki-e/install-action@v2
with:
tool: cargo-llvm-cov,cargo-nextest

- name: Set up BigQuery Credentials
run: |
Expand All @@ -161,10 +160,8 @@ jobs:
- name: Generate Code Coverage
id: coverage
run: |
cargo llvm-cov test \
--workspace --no-fail-fast \
--all-features \
--lcov --output-path lcov.info
cargo llvm-cov nextest --no-fail-fast \
--all-features --lcov --output-path lcov.info

- name: Upload Coverage to Coveralls
uses: coverallsapp/github-action@v2
Expand Down
4 changes: 2 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
- Tests live per crate (`src` unit tests, `tests` integration); benches in `etl-benchmarks/benches/`.

## Build and Test
- Build: `cargo build --workspace --all-targets --all-features`.
- Lint/format: `cargo fmt`; `cargo clippy --all-targets --all-features -- -D warnings`.
- Build: `just build`.
- Lint/format: `just fmt; just lint`.
- Use `ENABLE_TRACING=1` when running integration tests to see the logs.
- Use `RUST_LOG=[log-level]` if you need to see the logs with a specific log level.

Expand Down
8 changes: 5 additions & 3 deletions docs/how-to/configure-postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,17 @@ Publications define which tables and operations to replicate.

```sql
-- Create publication for specific tables
CREATE PUBLICATION my_publication FOR TABLE users, orders;
CREATE PUBLICATION my_publication FOR TABLE users, orders WITH (publish_via_partition_root = true);

-- Create publication for all tables (use with caution)
CREATE PUBLICATION all_tables FOR ALL TABLES;
CREATE PUBLICATION all_tables FOR ALL TABLES WITH (publish_via_partition_root = true);

-- Include only specific operations
CREATE PUBLICATION inserts_only FOR TABLE users WITH (publish = 'insert');
CREATE PUBLICATION inserts_only FOR TABLE users WITH (publish = 'insert') WITH (publish_via_partition_root = true);
```

`publish_via_partition_root` allows Postgres to treat [partitioned tables](https://www.postgresql.org/docs/current/sql-createpublication.html#SQL-CREATEPUBLICATION-PARAMS-WITH-PUBLISH-VIA-PARTITION-ROOT) as one table in the eyes of logical replication.

### Managing Publications

```sql
Expand Down
3 changes: 3 additions & 0 deletions etl-api/src/db/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub async fn create_publication(
}
}

// Ensure partitioned tables publish via ancestor/root schema for logical replication
query.push_str(" with (publish_via_partition_root = true)");

pool.execute(query.as_str()).await?;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion etl-postgres/src/tokio/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<G: GenericClient> PgDatabase<G> {
.collect::<Vec<_>>();

let create_publication_query = format!(
"create publication {} for table {}",
"create publication {} for table {} with (publish_via_partition_root = true)",
publication_name,
table_names.join(", ")
);
Expand Down
118 changes: 95 additions & 23 deletions etl/src/replication/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,24 +410,52 @@ impl PgReplicationClient {
&self,
publication_name: &str,
) -> EtlResult<Vec<TableId>> {
let publication_query = format!(
"select c.oid from pg_publication_tables pt
join pg_class c on c.relname = pt.tablename
join pg_namespace n on n.oid = c.relnamespace AND n.nspname = pt.schemaname
where pt.pubname = {};",
quote_literal(publication_name)
let query = format!(
r#"
with recursive has_rel as (
select exists(
select 1
from pg_publication_rel r
join pg_publication p on p.oid = r.prpubid
where p.pubname = {pub}
) as has
),
pub_tables as (
select r.prrelid as oid
from pg_publication_rel r
join pg_publication p on p.oid = r.prpubid
where p.pubname = {pub} and (select has from has_rel)
union all
select c.oid
from pg_publication_tables pt
join pg_class c on c.relname = pt.tablename
where pt.pubname = {pub} and not (select has from has_rel)
Comment on lines +423 to +432

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Join publication tables without schema

The new recursive query in PgReplicationClient::get_publication_table_ids now joins pg_publication_tables to pg_class only on relname. If a publication includes two tables with the same name in different schemas (e.g. sales.users and archive.users), this join can resolve to the wrong OID and the client will subscribe to or copy data from the wrong table. The previous implementation joined on both schema and table name and avoided this ambiguity.

Useful? React with 👍 / 👎.

),
recurse(relid) as (
select oid from pub_tables
union all
select i.inhparent
from pg_inherits i
join recurse r on r.relid = i.inhrelid
)
select distinct relid as oid
from recurse r
where not exists (
select 1 from pg_inherits i where i.inhrelid = r.relid
);
"#,
pub = quote_literal(publication_name)
);

let mut table_ids = vec![];
for msg in self.client.simple_query(&publication_query).await? {
let mut roots = vec![];
for msg in self.client.simple_query(&query).await? {
if let SimpleQueryMessage::Row(row) = msg {
// For the sake of simplicity, we refer to the table oid as table id.
let table_id = Self::get_row_value::<TableId>(&row, "oid", "pg_class").await?;
table_ids.push(table_id);
roots.push(table_id);
}
}

Ok(table_ids)
Ok(roots)
}

/// Starts a logical replication stream from the specified publication and slot.
Expand Down Expand Up @@ -661,7 +689,8 @@ impl PgReplicationClient {
when 0 then true
else (a.attnum in (select * from pub_attrs))
end
)",
)"
.to_string(),
)
} else {
// Postgres 14 or earlier or unknown, fallback to no column-level filtering
Expand All @@ -676,20 +705,65 @@ impl PgReplicationClient {
)",
publication = quote_literal(publication),
),
"and (select count(*) from pub_table) > 0",
format!(
"and ((select count(*) from pub_table) > 0 or exists(
-- Also allow if parent table is in publication (for partitioned tables)
select 1 from pg_inherits i
join pg_publication_rel r on r.prrelid = i.inhparent
join pg_publication p on p.oid = r.prpubid
where i.inhrelid = {table_id} and p.pubname = {publication}
))",
publication = quote_literal(publication),
),
)
}
} else {
("".into(), "")
("".to_string(), "".to_string())
};

let has_pub_cte = !pub_cte.is_empty();

let cte_prefix = if has_pub_cte {
// If there's already a pub_cte WITH clause, add our CTEs to it with a comma
format!("{pub_cte},")
} else {
// If no pub_cte, start our own WITH clause (no need for RECURSIVE)
"with ".to_string()
};

let column_info_query = format!(
"{pub_cte}
select a.attname,
"{cte_prefix}
-- Find direct parent of current table (if it's a partition)
direct_parent as (
select i.inhparent as parent_oid
from pg_inherits i
where i.inhrelid = {table_id}::oid
limit 1
),
-- Get parent table's primary key columns
parent_pk_cols as (
select array_agg(a.attname order by x.n) as pk_column_names
from pg_constraint con
join unnest(con.conkey) with ordinality as x(attnum, n) on true
join pg_attribute a on a.attrelid = con.conrelid and a.attnum = x.attnum
join direct_parent dp on con.conrelid = dp.parent_oid
where con.contype = 'p'
group by con.conname
)
SELECT a.attname,
a.atttypid,
a.atttypmod,
a.attnotnull,
coalesce(i.indisprimary, false) as primary
case
-- Direct primary key on this relation
when coalesce(i.indisprimary, false) = true then true
-- Inherit primary key from parent partitioned table if column name matches
when exists (
select 1 from parent_pk_cols pk
where a.attname = any(pk.pk_column_names)
) then true
else false
end as primary
from pg_attribute a
left join pg_index i
on a.attrelid = i.indrelid
Expand Down Expand Up @@ -746,16 +820,14 @@ impl PgReplicationClient {
.join(", ");

let table_name = self.get_table_name(table_id).await?;

// TODO: allow passing in format binary or text
let copy_query = format!(
r#"copy {} ({}) to stdout with (format text);"#,
table_name.as_quoted_identifier(),
column_list
r#"copy (select {} from {}) to stdout with (format text);"#,
column_list,
table_name.as_quoted_identifier()
);

// TODO: allow passing in format binary or text
let stream = self.client.copy_out_simple(&copy_query).await?;

Ok(stream)
}

Expand Down
71 changes: 71 additions & 0 deletions etl/src/test_utils/test_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,77 @@ pub async fn setup_test_database_schema<G: GenericClient>(
}
}

/// Creates a partitioned table with the given name and partitions.
///
/// This function creates:
/// 1. A parent partitioned table with a primary key
/// 2. Several child partitions based on the provided partition specifications
///
/// Returns the table ID of the parent table and a list of partition table IDs.
pub async fn create_partitioned_table<G: GenericClient>(
database: &PgDatabase<G>,
table_name: TableName,
partition_specs: &[(&str, &str)], // (partition_name, partition_constraint)
) -> Result<(TableId, Vec<TableId>), tokio_postgres::Error> {
let create_parent_query = format!(
"create table {} (
id bigserial,
data text NOT NULL,
partition_key integer NOT NULL,
primary key (id, partition_key)
) partition by range (partition_key)",
table_name.as_quoted_identifier()
);

database.run_sql(&create_parent_query).await?;

let parent_row = database
.client
.as_ref()
.unwrap()
.query_one(
"select c.oid from pg_class c join pg_namespace n on n.oid = c.relnamespace
where n.nspname = $1 and c.relname = $2",
&[&table_name.schema, &table_name.name],
)
.await?;

let parent_table_id: TableId = parent_row.get(0);
let mut partition_table_ids = Vec::new();

for (partition_name, partition_constraint) in partition_specs {
let partition_table_name = TableName::new(
table_name.schema.clone(),
format!("{}_{}", table_name.name, partition_name),
);

let create_partition_query = format!(
"create table {} partition of {} for values {}",
partition_table_name.as_quoted_identifier(),
table_name.as_quoted_identifier(),
partition_constraint
);

database.run_sql(&create_partition_query).await?;

let partition_row = database
.client
.as_ref()
.unwrap()
.query_one(
"select c.oid from pg_class c join pg_namespace n on n.oid = c.relnamespace
where n.nspname = $1 and c.relname = $2",
&[&partition_table_name.schema, &partition_table_name.name],
)
.await?;

let partition_table_id: TableId = partition_row.get(0);
partition_table_ids.push(partition_table_id);
}

Ok((parent_table_id, partition_table_ids))
}

/// Inserts users data into the database for testing purposes.
pub async fn insert_users_data<G: GenericClient>(
client: &mut PgDatabase<G>,
Expand Down
Loading