Skip to content

Commit 880f3cd

Browse files
Fix partitioned tables
1 parent b8352f5 commit 880f3cd

File tree

3 files changed

+184
-163
lines changed

3 files changed

+184
-163
lines changed

etl/src/replication/client.rs

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -662,7 +662,8 @@ impl PgReplicationClient {
662662
when 0 then true
663663
else (a.attnum in (select * from pub_attrs))
664664
end
665-
)",
665+
)"
666+
.to_string(),
666667
)
667668
} else {
668669
// No column-level filtering, check if table is in publication
@@ -677,20 +678,90 @@ impl PgReplicationClient {
677678
)",
678679
publication = quote_literal(publication),
679680
),
680-
"and (select count(*) from pub_table) > 0",
681+
format!(
682+
"and ((select count(*) from pub_table) > 0 or exists(
683+
-- Also allow if parent table is in publication (for partitioned tables)
684+
select 1 from pg_inherits i
685+
join pg_publication_rel r on r.prrelid = i.inhparent
686+
join pg_publication p on p.oid = r.prpubid
687+
where i.inhrelid = {table_id} and p.pubname = {publication}
688+
))",
689+
publication = quote_literal(publication),
690+
),
681691
)
682692
}
683693
} else {
684-
("".into(), "")
694+
("".to_string(), "".to_string())
695+
};
696+
697+
let has_pub_cte = !pub_cte.is_empty();
698+
699+
let cte_prefix = if has_pub_cte {
700+
// If there's already a pub_cte WITH clause, add our CTEs to it with a comma
701+
format!("{pub_cte},")
702+
} else {
703+
// If no pub_cte, start our own WITH clause (no need for RECURSIVE)
704+
"with ".to_string()
685705
};
686706

687707
let column_info_query = format!(
688-
"{pub_cte}
689-
select a.attname,
708+
"{cte_prefix}
709+
-- Find direct parent of current table (if it's a partition)
710+
direct_parent as (
711+
select i.inhparent as parent_oid
712+
from pg_inherits i
713+
where i.inhrelid = {table_id}::oid
714+
limit 1
715+
),
716+
-- Get parent table's primary key columns
717+
parent_pk_cols as (
718+
select array_agg(a.attname order by x.n) as pk_column_names
719+
from pg_constraint con
720+
join unnest(con.conkey) with ordinality as x(attnum, n) on true
721+
join pg_attribute a on a.attrelid = con.conrelid and a.attnum = x.attnum
722+
join direct_parent dp on con.conrelid = dp.parent_oid
723+
where con.contype = 'p'
724+
group by con.conname
725+
),
726+
-- Check if current table has a unique index on the parent PK columns
727+
partition_has_pk_index as (
728+
select case
729+
when exists (select 1 from direct_parent)
730+
and exists (select 1 from parent_pk_cols)
731+
and exists (
732+
-- Check if there's a unique, valid index on the parent PK columns
733+
select 1
734+
from pg_index ix
735+
cross join parent_pk_cols pk
736+
where ix.indrelid = {table_id}::oid
737+
and ix.indisunique = true
738+
and ix.indisvalid = true
739+
and array(
740+
select a.attname
741+
from unnest(ix.indkey) with ordinality k(attnum, ord)
742+
join pg_attribute a on a.attrelid = ix.indrelid and a.attnum = k.attnum
743+
where ord <= ix.indnkeyatts -- exclude INCLUDE columns
744+
order by ord
745+
) = pk.pk_column_names
746+
) then true
747+
else false
748+
end as has_inherited_pk
749+
)
750+
SELECT a.attname,
690751
a.atttypid,
691752
a.atttypmod,
692753
a.attnotnull,
693-
coalesce(i.indisprimary, false) as primary
754+
case
755+
-- First check for direct primary key
756+
when coalesce(i.indisprimary, false) = true then true
757+
-- Then check for inherited primary key from partitioned table parent
758+
when (select has_inherited_pk from partition_has_pk_index) = true
759+
and exists (
760+
select 1 from parent_pk_cols pk
761+
where a.attname = any(pk.pk_column_names)
762+
) then true
763+
else false
764+
end as primary
694765
from pg_attribute a
695766
left join pg_index i
696767
on a.attrelid = i.indrelid

etl/src/test_utils/test_schema.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,77 @@ pub async fn setup_test_database_schema<G: GenericClient>(
127127
}
128128
}
129129

130+
/// Creates a partitioned table with the given name and partitions.
131+
///
132+
/// This function creates:
133+
/// 1. A parent partitioned table with a primary key
134+
/// 2. Several child partitions based on the provided partition specifications
135+
///
136+
/// Returns the table ID of the parent table and a list of partition table IDs.
137+
pub async fn create_partitioned_table<G: GenericClient>(
138+
database: &PgDatabase<G>,
139+
table_name: TableName,
140+
partition_specs: &[(&str, &str)], // (partition_name, partition_constraint)
141+
) -> Result<(TableId, Vec<TableId>), tokio_postgres::Error> {
142+
let create_parent_query = format!(
143+
"CREATE TABLE {} (
144+
id bigserial,
145+
data text NOT NULL,
146+
partition_key integer NOT NULL,
147+
PRIMARY KEY (id, partition_key)
148+
) PARTITION BY RANGE (partition_key)",
149+
table_name.as_quoted_identifier()
150+
);
151+
152+
database.run_sql(&create_parent_query).await?;
153+
154+
let parent_row = database
155+
.client
156+
.as_ref()
157+
.unwrap()
158+
.query_one(
159+
"SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace
160+
WHERE n.nspname = $1 AND c.relname = $2",
161+
&[&table_name.schema, &table_name.name],
162+
)
163+
.await?;
164+
165+
let parent_table_id: TableId = parent_row.get(0);
166+
let mut partition_table_ids = Vec::new();
167+
168+
for (partition_name, partition_constraint) in partition_specs {
169+
let partition_table_name = TableName::new(
170+
table_name.schema.clone(),
171+
format!("{}_{}", table_name.name, partition_name),
172+
);
173+
174+
let create_partition_query = format!(
175+
"CREATE TABLE {} PARTITION OF {} FOR VALUES {}",
176+
partition_table_name.as_quoted_identifier(),
177+
table_name.as_quoted_identifier(),
178+
partition_constraint
179+
);
180+
181+
database.run_sql(&create_partition_query).await?;
182+
183+
let partition_row = database
184+
.client
185+
.as_ref()
186+
.unwrap()
187+
.query_one(
188+
"SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace
189+
WHERE n.nspname = $1 AND c.relname = $2",
190+
&[&partition_table_name.schema, &partition_table_name.name],
191+
)
192+
.await?;
193+
194+
let partition_table_id: TableId = partition_row.get(0);
195+
partition_table_ids.push(partition_table_id);
196+
}
197+
198+
Ok((parent_table_id, partition_table_ids))
199+
}
200+
130201
/// Inserts users data into the database for testing purposes.
131202
pub async fn insert_users_data<G: GenericClient>(
132203
client: &mut PgDatabase<G>,

0 commit comments

Comments
 (0)