@@ -626,24 +626,43 @@ impl PgReplicationClient {
626
626
publication : Option < & str > ,
627
627
) -> EtlResult < Vec < ColumnSchema > > {
628
628
let ( pub_cte, pub_pred) = if let Some ( publication) = publication {
629
- (
630
- format ! (
631
- "with pub_attrs as (
632
- select unnest(r.prattrs)
633
- from pg_publication_rel r
634
- left join pg_publication p on r.prpubid = p.oid
635
- where p.pubname = {publication}
636
- and r.prrelid = {table_id}
629
+ let is_pg14_or_earlier = self . is_postgres_14_or_earlier ( ) . await ?;
630
+
631
+ if !is_pg14_or_earlier {
632
+ (
633
+ format ! (
634
+ "with pub_attrs as (
635
+ select unnest(r.prattrs)
636
+ from pg_publication_rel r
637
+ left join pg_publication p on r.prpubid = p.oid
638
+ where p.pubname = {publication}
639
+ and r.prrelid = {table_id}
640
+ )" ,
641
+ publication = quote_literal( publication) ,
642
+ ) ,
643
+ "and (
644
+ case (select count(*) from pub_attrs)
645
+ when 0 then true
646
+ else (a.attnum in (select * from pub_attrs))
647
+ end
637
648
)" ,
638
- publication = quote_literal( publication) ,
639
- ) ,
640
- "and (
641
- case (select count(*) from pub_attrs)
642
- when 0 then true
643
- else (a.attnum in (select * from pub_attrs))
644
- end
645
- )" ,
646
- )
649
+ )
650
+ } else {
651
+ // No column-level filtering, check if table is in publication
652
+ (
653
+ format ! (
654
+ "with pub_table as (
655
+ select 1 as exists_in_pub
656
+ from pg_publication_rel r
657
+ left join pg_publication p on r.prpubid = p.oid
658
+ where p.pubname = {publication}
659
+ and r.prrelid = {table_id}
660
+ )" ,
661
+ publication = quote_literal( publication) ,
662
+ ) ,
663
+ "and (select count(*) from pub_table) > 0" ,
664
+ )
665
+ }
647
666
} else {
648
667
( "" . into ( ) , "" )
649
668
} ;
@@ -697,6 +716,26 @@ impl PgReplicationClient {
697
716
Ok ( column_schemas)
698
717
}
699
718
719
+ async fn is_postgres_14_or_earlier ( & self ) -> EtlResult < bool > {
720
+ let version_query = "SHOW server_version_num" ;
721
+
722
+ for message in self . client . simple_query ( version_query) . await ? {
723
+ if let SimpleQueryMessage :: Row ( row) = message {
724
+ let version_str =
725
+ Self :: get_row_value :: < String > ( & row, "server_version_num" , "server_version_num" )
726
+ . await ?;
727
+ let server_version: i32 = version_str. parse ( ) . unwrap_or ( 0 ) ;
728
+
729
+ // PostgreSQL version format is typically: MAJOR * 10000 + MINOR * 100 + PATCH
730
+ // For version 14.x.x, this would be 140000 + minor * 100 + patch
731
+ // For version 15.x.x, this would be 150000 + minor * 100 + patch
732
+ return Ok ( server_version < 150000 ) ;
733
+ }
734
+ }
735
+
736
+ Ok ( false )
737
+ }
738
+
700
739
/// Creates a COPY stream for reading data from a table using its OID.
701
740
///
702
741
/// The stream will include only the specified columns and use text format.
0 commit comments