@@ -661,7 +661,8 @@ impl PgReplicationClient {
661
661
when 0 then true
662
662
else (a.attnum in (select * from pub_attrs))
663
663
end
664
- )" ,
664
+ )"
665
+ . to_string ( ) ,
665
666
)
666
667
} else {
667
668
// Postgres 14 or earlier or unknown, fallback to no column-level filtering
@@ -676,20 +677,90 @@ impl PgReplicationClient {
676
677
)" ,
677
678
publication = quote_literal( publication) ,
678
679
) ,
679
- "and (select count(*) from pub_table) > 0" ,
680
+ format ! (
681
+ "and ((select count(*) from pub_table) > 0 or exists(
682
+ -- Also allow if parent table is in publication (for partitioned tables)
683
+ select 1 from pg_inherits i
684
+ join pg_publication_rel r on r.prrelid = i.inhparent
685
+ join pg_publication p on p.oid = r.prpubid
686
+ where i.inhrelid = {table_id} and p.pubname = {publication}
687
+ ))" ,
688
+ publication = quote_literal( publication) ,
689
+ ) ,
680
690
)
681
691
}
682
692
} else {
683
- ( "" . into ( ) , "" )
693
+ ( "" . to_string ( ) , "" . to_string ( ) )
694
+ } ;
695
+
696
+ let has_pub_cte = !pub_cte. is_empty ( ) ;
697
+
698
+ let cte_prefix = if has_pub_cte {
699
+ // If there's already a pub_cte WITH clause, add our CTEs to it with a comma
700
+ format ! ( "{pub_cte}," )
701
+ } else {
702
+ // If no pub_cte, start our own WITH clause (no need for RECURSIVE)
703
+ "with " . to_string ( )
684
704
} ;
685
705
686
706
let column_info_query = format ! (
687
- "{pub_cte}
688
- select a.attname,
707
+ "{cte_prefix}
708
+ -- Find direct parent of current table (if it's a partition)
709
+ direct_parent as (
710
+ select i.inhparent as parent_oid
711
+ from pg_inherits i
712
+ where i.inhrelid = {table_id}::oid
713
+ limit 1
714
+ ),
715
+ -- Get parent table's primary key columns
716
+ parent_pk_cols as (
717
+ select array_agg(a.attname order by x.n) as pk_column_names
718
+ from pg_constraint con
719
+ join unnest(con.conkey) with ordinality as x(attnum, n) on true
720
+ join pg_attribute a on a.attrelid = con.conrelid and a.attnum = x.attnum
721
+ join direct_parent dp on con.conrelid = dp.parent_oid
722
+ where con.contype = 'p'
723
+ group by con.conname
724
+ ),
725
+ -- Check if current table has a unique index on the parent PK columns
726
+ partition_has_pk_index as (
727
+ select case
728
+ when exists (select 1 from direct_parent)
729
+ and exists (select 1 from parent_pk_cols)
730
+ and exists (
731
+ -- Check if there's a unique, valid index on the parent PK columns
732
+ select 1
733
+ from pg_index ix
734
+ cross join parent_pk_cols pk
735
+ where ix.indrelid = {table_id}::oid
736
+ and ix.indisunique = true
737
+ and ix.indisvalid = true
738
+ and array(
739
+ select a.attname
740
+ from unnest(ix.indkey) with ordinality k(attnum, ord)
741
+ join pg_attribute a on a.attrelid = ix.indrelid and a.attnum = k.attnum
742
+ where ord <= ix.indnkeyatts -- exclude INCLUDE columns
743
+ order by ord
744
+ ) = pk.pk_column_names
745
+ ) then true
746
+ else false
747
+ end as has_inherited_pk
748
+ )
749
+ SELECT a.attname,
689
750
a.atttypid,
690
751
a.atttypmod,
691
752
a.attnotnull,
692
- coalesce(i.indisprimary, false) as primary
753
+ case
754
+ -- First check for direct primary key
755
+ when coalesce(i.indisprimary, false) = true then true
756
+ -- Then check for inherited primary key from partitioned table parent
757
+ when (select has_inherited_pk from partition_has_pk_index) = true
758
+ and exists (
759
+ select 1 from parent_pk_cols pk
760
+ where a.attname = any(pk.pk_column_names)
761
+ ) then true
762
+ else false
763
+ end as primary
693
764
from pg_attribute a
694
765
left join pg_index i
695
766
on a.attrelid = i.indrelid
0 commit comments