Skip to content

Commit 409a44a

Browse files
Fix partitioned tables
1 parent 83f6c8a commit 409a44a

File tree

3 files changed

+184
-163
lines changed

3 files changed

+184
-163
lines changed

etl/src/replication/client.rs

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,8 @@ impl PgReplicationClient {
661661
when 0 then true
662662
else (a.attnum in (select * from pub_attrs))
663663
end
664-
)",
664+
)"
665+
.to_string(),
665666
)
666667
} else {
667668
// Postgres 14 or earlier or unknown, fallback to no column-level filtering
@@ -676,20 +677,90 @@ impl PgReplicationClient {
676677
)",
677678
publication = quote_literal(publication),
678679
),
679-
"and (select count(*) from pub_table) > 0",
680+
format!(
681+
"and ((select count(*) from pub_table) > 0 or exists(
682+
-- Also allow if parent table is in publication (for partitioned tables)
683+
select 1 from pg_inherits i
684+
join pg_publication_rel r on r.prrelid = i.inhparent
685+
join pg_publication p on p.oid = r.prpubid
686+
where i.inhrelid = {table_id} and p.pubname = {publication}
687+
))",
688+
publication = quote_literal(publication),
689+
),
680690
)
681691
}
682692
} else {
683-
("".into(), "")
693+
("".to_string(), "".to_string())
694+
};
695+
696+
let has_pub_cte = !pub_cte.is_empty();
697+
698+
let cte_prefix = if has_pub_cte {
699+
// If there's already a pub_cte WITH clause, add our CTEs to it with a comma
700+
format!("{pub_cte},")
701+
} else {
702+
// If no pub_cte, start our own WITH clause (no need for RECURSIVE)
703+
"with ".to_string()
684704
};
685705

686706
let column_info_query = format!(
687-
"{pub_cte}
688-
select a.attname,
707+
"{cte_prefix}
708+
-- Find direct parent of current table (if it's a partition)
709+
direct_parent as (
710+
select i.inhparent as parent_oid
711+
from pg_inherits i
712+
where i.inhrelid = {table_id}::oid
713+
limit 1
714+
),
715+
-- Get parent table's primary key columns
716+
parent_pk_cols as (
717+
select array_agg(a.attname order by x.n) as pk_column_names
718+
from pg_constraint con
719+
join unnest(con.conkey) with ordinality as x(attnum, n) on true
720+
join pg_attribute a on a.attrelid = con.conrelid and a.attnum = x.attnum
721+
join direct_parent dp on con.conrelid = dp.parent_oid
722+
where con.contype = 'p'
723+
group by con.conname
724+
),
725+
-- Check if current table has a unique index on the parent PK columns
726+
partition_has_pk_index as (
727+
select case
728+
when exists (select 1 from direct_parent)
729+
and exists (select 1 from parent_pk_cols)
730+
and exists (
731+
-- Check if there's a unique, valid index on the parent PK columns
732+
select 1
733+
from pg_index ix
734+
cross join parent_pk_cols pk
735+
where ix.indrelid = {table_id}::oid
736+
and ix.indisunique = true
737+
and ix.indisvalid = true
738+
and array(
739+
select a.attname
740+
from unnest(ix.indkey) with ordinality k(attnum, ord)
741+
join pg_attribute a on a.attrelid = ix.indrelid and a.attnum = k.attnum
742+
where ord <= ix.indnkeyatts -- exclude INCLUDE columns
743+
order by ord
744+
) = pk.pk_column_names
745+
) then true
746+
else false
747+
end as has_inherited_pk
748+
)
749+
SELECT a.attname,
689750
a.atttypid,
690751
a.atttypmod,
691752
a.attnotnull,
692-
coalesce(i.indisprimary, false) as primary
753+
case
754+
-- First check for direct primary key
755+
when coalesce(i.indisprimary, false) = true then true
756+
-- Then check for inherited primary key from partitioned table parent
757+
when (select has_inherited_pk from partition_has_pk_index) = true
758+
and exists (
759+
select 1 from parent_pk_cols pk
760+
where a.attname = any(pk.pk_column_names)
761+
) then true
762+
else false
763+
end as primary
693764
from pg_attribute a
694765
left join pg_index i
695766
on a.attrelid = i.indrelid

etl/src/test_utils/test_schema.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,77 @@ pub async fn setup_test_database_schema<G: GenericClient>(
127127
}
128128
}
129129

130+
/// Creates a partitioned table with the given name and partitions.
131+
///
132+
/// This function creates:
133+
/// 1. A parent partitioned table with a primary key
134+
/// 2. Several child partitions based on the provided partition specifications
135+
///
136+
/// Returns the table ID of the parent table and a list of partition table IDs.
137+
pub async fn create_partitioned_table<G: GenericClient>(
138+
database: &PgDatabase<G>,
139+
table_name: TableName,
140+
partition_specs: &[(&str, &str)], // (partition_name, partition_constraint)
141+
) -> Result<(TableId, Vec<TableId>), tokio_postgres::Error> {
142+
let create_parent_query = format!(
143+
"CREATE TABLE {} (
144+
id bigserial,
145+
data text NOT NULL,
146+
partition_key integer NOT NULL,
147+
PRIMARY KEY (id, partition_key)
148+
) PARTITION BY RANGE (partition_key)",
149+
table_name.as_quoted_identifier()
150+
);
151+
152+
database.run_sql(&create_parent_query).await?;
153+
154+
let parent_row = database
155+
.client
156+
.as_ref()
157+
.unwrap()
158+
.query_one(
159+
"SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace
160+
WHERE n.nspname = $1 AND c.relname = $2",
161+
&[&table_name.schema, &table_name.name],
162+
)
163+
.await?;
164+
165+
let parent_table_id: TableId = parent_row.get(0);
166+
let mut partition_table_ids = Vec::new();
167+
168+
for (partition_name, partition_constraint) in partition_specs {
169+
let partition_table_name = TableName::new(
170+
table_name.schema.clone(),
171+
format!("{}_{}", table_name.name, partition_name),
172+
);
173+
174+
let create_partition_query = format!(
175+
"CREATE TABLE {} PARTITION OF {} FOR VALUES {}",
176+
partition_table_name.as_quoted_identifier(),
177+
table_name.as_quoted_identifier(),
178+
partition_constraint
179+
);
180+
181+
database.run_sql(&create_partition_query).await?;
182+
183+
let partition_row = database
184+
.client
185+
.as_ref()
186+
.unwrap()
187+
.query_one(
188+
"SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace
189+
WHERE n.nspname = $1 AND c.relname = $2",
190+
&[&partition_table_name.schema, &partition_table_name.name],
191+
)
192+
.await?;
193+
194+
let partition_table_id: TableId = partition_row.get(0);
195+
partition_table_ids.push(partition_table_id);
196+
}
197+
198+
Ok((parent_table_id, partition_table_ids))
199+
}
200+
130201
/// Inserts users data into the database for testing purposes.
131202
pub async fn insert_users_data<G: GenericClient>(
132203
client: &mut PgDatabase<G>,

0 commit comments

Comments
 (0)