Skip to content

Commit e050725

Browse files
Fix partitioned tables
1 parent 25548da commit e050725

File tree

3 files changed

+184
-163
lines changed

3 files changed

+184
-163
lines changed

etl/src/replication/client.rs

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,8 @@ impl PgReplicationClient {
659659
when 0 then true
660660
else (a.attnum in (select * from pub_attrs))
661661
end
662-
)",
662+
)"
663+
.to_string(),
663664
)
664665
} else {
665666
// Postgres 14 or earlier or unknown, fallback to no column-level filtering
@@ -674,20 +675,90 @@ impl PgReplicationClient {
674675
)",
675676
publication = quote_literal(publication),
676677
),
677-
"and (select count(*) from pub_table) > 0",
678+
format!(
679+
"and ((select count(*) from pub_table) > 0 or exists(
680+
-- Also allow if parent table is in publication (for partitioned tables)
681+
select 1 from pg_inherits i
682+
join pg_publication_rel r on r.prrelid = i.inhparent
683+
join pg_publication p on p.oid = r.prpubid
684+
where i.inhrelid = {table_id} and p.pubname = {publication}
685+
))",
686+
publication = quote_literal(publication),
687+
),
678688
)
679689
}
680690
} else {
681-
("".into(), "")
691+
("".to_string(), "".to_string())
692+
};
693+
694+
let has_pub_cte = !pub_cte.is_empty();
695+
696+
let cte_prefix = if has_pub_cte {
697+
// If there's already a pub_cte WITH clause, add our CTEs to it with a comma
698+
format!("{pub_cte},")
699+
} else {
700+
// If no pub_cte, start our own WITH clause (no need for RECURSIVE)
701+
"with ".to_string()
682702
};
683703

684704
let column_info_query = format!(
685-
"{pub_cte}
686-
select a.attname,
705+
"{cte_prefix}
706+
-- Find direct parent of current table (if it's a partition)
707+
direct_parent as (
708+
select i.inhparent as parent_oid
709+
from pg_inherits i
710+
where i.inhrelid = {table_id}::oid
711+
limit 1
712+
),
713+
-- Get parent table's primary key columns
714+
parent_pk_cols as (
715+
select array_agg(a.attname order by x.n) as pk_column_names
716+
from pg_constraint con
717+
join unnest(con.conkey) with ordinality as x(attnum, n) on true
718+
join pg_attribute a on a.attrelid = con.conrelid and a.attnum = x.attnum
719+
join direct_parent dp on con.conrelid = dp.parent_oid
720+
where con.contype = 'p'
721+
group by con.conname
722+
),
723+
-- Check if current table has a unique index on the parent PK columns
724+
partition_has_pk_index as (
725+
select case
726+
when exists (select 1 from direct_parent)
727+
and exists (select 1 from parent_pk_cols)
728+
and exists (
729+
-- Check if there's a unique, valid index on the parent PK columns
730+
select 1
731+
from pg_index ix
732+
cross join parent_pk_cols pk
733+
where ix.indrelid = {table_id}::oid
734+
and ix.indisunique = true
735+
and ix.indisvalid = true
736+
and array(
737+
select a.attname
738+
from unnest(ix.indkey) with ordinality k(attnum, ord)
739+
join pg_attribute a on a.attrelid = ix.indrelid and a.attnum = k.attnum
740+
where ord <= ix.indnkeyatts -- exclude INCLUDE columns
741+
order by ord
742+
) = pk.pk_column_names
743+
) then true
744+
else false
745+
end as has_inherited_pk
746+
)
747+
SELECT a.attname,
687748
a.atttypid,
688749
a.atttypmod,
689750
a.attnotnull,
690-
coalesce(i.indisprimary, false) as primary
751+
case
752+
-- First check for direct primary key
753+
when coalesce(i.indisprimary, false) = true then true
754+
-- Then check for inherited primary key from partitioned table parent
755+
when (select has_inherited_pk from partition_has_pk_index) = true
756+
and exists (
757+
select 1 from parent_pk_cols pk
758+
where a.attname = any(pk.pk_column_names)
759+
) then true
760+
else false
761+
end as primary
691762
from pg_attribute a
692763
left join pg_index i
693764
on a.attrelid = i.indrelid

etl/src/test_utils/test_schema.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,77 @@ pub async fn setup_test_database_schema<G: GenericClient>(
127127
}
128128
}
129129

130+
/// Creates a partitioned table with the given name and partitions.
131+
///
132+
/// This function creates:
133+
/// 1. A parent partitioned table with a primary key
134+
/// 2. Several child partitions based on the provided partition specifications
135+
///
136+
/// Returns the table ID of the parent table and a list of partition table IDs.
137+
pub async fn create_partitioned_table<G: GenericClient>(
138+
database: &PgDatabase<G>,
139+
table_name: TableName,
140+
partition_specs: &[(&str, &str)], // (partition_name, partition_constraint)
141+
) -> Result<(TableId, Vec<TableId>), tokio_postgres::Error> {
142+
let create_parent_query = format!(
143+
"CREATE TABLE {} (
144+
id bigserial,
145+
data text NOT NULL,
146+
partition_key integer NOT NULL,
147+
PRIMARY KEY (id, partition_key)
148+
) PARTITION BY RANGE (partition_key)",
149+
table_name.as_quoted_identifier()
150+
);
151+
152+
database.run_sql(&create_parent_query).await?;
153+
154+
let parent_row = database
155+
.client
156+
.as_ref()
157+
.unwrap()
158+
.query_one(
159+
"SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace
160+
WHERE n.nspname = $1 AND c.relname = $2",
161+
&[&table_name.schema, &table_name.name],
162+
)
163+
.await?;
164+
165+
let parent_table_id: TableId = parent_row.get(0);
166+
let mut partition_table_ids = Vec::new();
167+
168+
for (partition_name, partition_constraint) in partition_specs {
169+
let partition_table_name = TableName::new(
170+
table_name.schema.clone(),
171+
format!("{}_{}", table_name.name, partition_name),
172+
);
173+
174+
let create_partition_query = format!(
175+
"CREATE TABLE {} PARTITION OF {} FOR VALUES {}",
176+
partition_table_name.as_quoted_identifier(),
177+
table_name.as_quoted_identifier(),
178+
partition_constraint
179+
);
180+
181+
database.run_sql(&create_partition_query).await?;
182+
183+
let partition_row = database
184+
.client
185+
.as_ref()
186+
.unwrap()
187+
.query_one(
188+
"SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace
189+
WHERE n.nspname = $1 AND c.relname = $2",
190+
&[&partition_table_name.schema, &partition_table_name.name],
191+
)
192+
.await?;
193+
194+
let partition_table_id: TableId = partition_row.get(0);
195+
partition_table_ids.push(partition_table_id);
196+
}
197+
198+
Ok((parent_table_id, partition_table_ids))
199+
}
200+
130201
/// Inserts users data into the database for testing purposes.
131202
pub async fn insert_users_data<G: GenericClient>(
132203
client: &mut PgDatabase<G>,

0 commit comments

Comments
 (0)