Skip to content

Commit 94be2a2

Browse files
Patch column schemas for pg<=14
1 parent 7c25852 commit 94be2a2

File tree

3 files changed

+61
-20
lines changed

3 files changed

+61
-20
lines changed

.github/workflows/ci.yml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,14 @@ jobs:
4141
run: cargo clippy --all-targets --all-features --no-deps -- -D warnings
4242

4343
test:
44-
name: Test
44+
name: Test (Postgres ${matrix.postgres_version})
4545
runs-on: ubuntu-latest
4646
permissions:
4747
contents: read
4848
id-token: write
49+
strategy:
50+
matrix:
51+
postgres_version: [17, 16, 15, 14]
4952
steps:
5053
- name: Checkout repository
5154
uses: actions/checkout@v4
@@ -60,7 +63,7 @@ jobs:
6063

6164
- name: Start Docker Compose Environment
6265
run: |
63-
docker compose -f ./scripts/docker-compose.yaml up -d
66+
POSTGRES_VERSION=${{ matrix.postgres_version }} docker compose -f ./scripts/docker-compose.yaml up -d
6467
6568
- name: Install sqlx-cli
6669
run: |

etl/src/replication/client.rs

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -626,24 +626,43 @@ impl PgReplicationClient {
626626
publication: Option<&str>,
627627
) -> EtlResult<Vec<ColumnSchema>> {
628628
let (pub_cte, pub_pred) = if let Some(publication) = publication {
629-
(
630-
format!(
631-
"with pub_attrs as (
632-
select unnest(r.prattrs)
633-
from pg_publication_rel r
634-
left join pg_publication p on r.prpubid = p.oid
635-
where p.pubname = {publication}
636-
and r.prrelid = {table_id}
629+
let is_pg14_or_earlier = self.is_postgres_14_or_earlier().await?;
630+
631+
if !is_pg14_or_earlier {
632+
(
633+
format!(
634+
"with pub_attrs as (
635+
select unnest(r.prattrs)
636+
from pg_publication_rel r
637+
left join pg_publication p on r.prpubid = p.oid
638+
where p.pubname = {publication}
639+
and r.prrelid = {table_id}
640+
)",
641+
publication = quote_literal(publication),
642+
),
643+
"and (
644+
case (select count(*) from pub_attrs)
645+
when 0 then true
646+
else (a.attnum in (select * from pub_attrs))
647+
end
637648
)",
638-
publication = quote_literal(publication),
639-
),
640-
"and (
641-
case (select count(*) from pub_attrs)
642-
when 0 then true
643-
else (a.attnum in (select * from pub_attrs))
644-
end
645-
)",
646-
)
649+
)
650+
} else {
651+
// No column-level filtering, check if table is in publication
652+
(
653+
format!(
654+
"with pub_table as (
655+
select 1 as exists_in_pub
656+
from pg_publication_rel r
657+
left join pg_publication p on r.prpubid = p.oid
658+
where p.pubname = {publication}
659+
and r.prrelid = {table_id}
660+
)",
661+
publication = quote_literal(publication),
662+
),
663+
"and (select count(*) from pub_table) > 0",
664+
)
665+
}
647666
} else {
648667
("".into(), "")
649668
};
@@ -697,6 +716,25 @@ impl PgReplicationClient {
697716
Ok(column_schemas)
698717
}
699718

719+
720+
async fn is_postgres_14_or_earlier(&self) -> EtlResult<bool> {
721+
let version_query = "SHOW server_version_num";
722+
723+
for message in self.client.simple_query(version_query).await? {
724+
if let SimpleQueryMessage::Row(row) = message {
725+
let version_str = Self::get_row_value::<String>(&row, "server_version_num", "server_version_num").await?;
726+
let server_version: i32 = version_str.parse().unwrap_or(0);
727+
728+
// PostgreSQL version format is typically: MAJOR * 10000 + MINOR * 100 + PATCH
729+
// For version 14.x.x, this would be 140000 + minor * 100 + patch
730+
// For version 15.x.x, this would be 150000 + minor * 100 + patch
731+
return Ok(server_version < 150000);
732+
}
733+
}
734+
735+
Ok(false)
736+
}
737+
700738
/// Creates a COPY stream for reading data from a table using its OID.
701739
///
702740
/// The stream will include only the specified columns and use text format.

scripts/docker-compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ name: etl
1010
services:
1111
# Start Postgres
1212
postgres:
13-
image: postgres:17
13+
image: postgres:${POSTGRES_VERSION:-17}
1414
container_name: postgres
1515
environment:
1616
POSTGRES_USER: ${POSTGRES_USER:-postgres}

0 commit comments

Comments
 (0)