From 5d882b6653b2b509646abc4973e71f034404cb0b Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Thu, 28 Aug 2025 12:00:28 -0400 Subject: [PATCH 01/12] Patch column schemas for pg<=14 --- .github/workflows/ci.yml | 14 +++++-- etl/src/replication/client.rs | 73 +++++++++++++++++++++++++++-------- scripts/docker-compose.yaml | 2 +- 3 files changed, 67 insertions(+), 22 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2c55ade32..63435c2d7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,12 +43,15 @@ jobs: run: cargo clippy --all-targets --all-features --no-deps -- -D warnings test-partial: - name: Test Partial + name: Test Partial (Postgres ${{ matrix.postgres_version }}) runs-on: ubuntu-latest # Run on forks. if: github.event.pull_request.head.repo.fork == true permissions: contents: read + strategy: + matrix: + postgres_version: [17, 16, 15, 14] steps: - name: Checkout repository uses: actions/checkout@v4 @@ -63,7 +66,7 @@ jobs: - name: Start Docker Compose Environment run: | - docker compose -f ./scripts/docker-compose.yaml up -d + POSTGRES_VERSION=${{ matrix.postgres_version }} docker compose -f ./scripts/docker-compose.yaml up -d - name: Install sqlx-cli run: | @@ -89,13 +92,16 @@ jobs: --all-features -- --skip integration::bigquery_test test-full: - name: Test Full + name: Test Full (Postgres ${{ matrix.postgres_version }}) runs-on: ubuntu-latest # Run on non forks. if: github.event.pull_request.head.repo.fork == false permissions: contents: read id-token: write + strategy: + matrix: + postgres_version: [17, 16, 15, 14] steps: - name: Checkout repository uses: actions/checkout@v4 @@ -110,7 +116,7 @@ jobs: - name: Start Docker Compose Environment run: | - docker compose -f ./scripts/docker-compose.yaml up -d + POSTGRES_VERSION=${{ matrix.postgres_version }} docker compose -f ./scripts/docker-compose.yaml up -d - name: Install sqlx-cli run: | diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 392d03953..f4bca90da 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -626,24 +626,43 @@ impl PgReplicationClient { publication: Option<&str>, ) -> EtlResult> { let (pub_cte, pub_pred) = if let Some(publication) = publication { - ( - format!( - "with pub_attrs as ( - select unnest(r.prattrs) - from pg_publication_rel r - left join pg_publication p on r.prpubid = p.oid - where p.pubname = {publication} - and r.prrelid = {table_id} + let is_pg14_or_earlier = self.is_postgres_14_or_earlier().await?; + + if !is_pg14_or_earlier { + ( + format!( + "with pub_attrs as ( + select unnest(r.prattrs) + from pg_publication_rel r + left join pg_publication p on r.prpubid = p.oid + where p.pubname = {publication} + and r.prrelid = {table_id} + )", + publication = quote_literal(publication), + ), + "and ( + case (select count(*) from pub_attrs) + when 0 then true + else (a.attnum in (select * from pub_attrs)) + end )", - publication = quote_literal(publication), - ), - "and ( - case (select count(*) from pub_attrs) - when 0 then true - else (a.attnum in (select * from pub_attrs)) - end - )", - ) + ) + } else { + // No column-level filtering, check if table is in publication + ( + format!( + "with pub_table as ( + select 1 as exists_in_pub + from pg_publication_rel r + left join pg_publication p on r.prpubid = p.oid + where p.pubname = {publication} + and r.prrelid = {table_id} + )", + publication = quote_literal(publication), + ), + "and (select count(*) from pub_table) > 0", + ) + } } else { ("".into(), "") }; @@ -697,6 +716,26 @@ impl PgReplicationClient { Ok(column_schemas) } + async fn is_postgres_14_or_earlier(&self) -> EtlResult { + let version_query = "SHOW server_version_num"; + + for message in self.client.simple_query(version_query).await? { + if let SimpleQueryMessage::Row(row) = message { + let version_str = + Self::get_row_value::(&row, "server_version_num", "server_version_num") + .await?; + let server_version: i32 = version_str.parse().unwrap_or(0); + + // PostgreSQL version format is typically: MAJOR * 10000 + MINOR * 100 + PATCH + // For version 14.x.x, this would be 140000 + minor * 100 + patch + // For version 15.x.x, this would be 150000 + minor * 100 + patch + return Ok(server_version < 150000); + } + } + + Ok(false) + } + /// Creates a COPY stream for reading data from a table using its OID. /// /// The stream will include only the specified columns and use text format. diff --git a/scripts/docker-compose.yaml b/scripts/docker-compose.yaml index c9058ee9e..d8cda9bc2 100644 --- a/scripts/docker-compose.yaml +++ b/scripts/docker-compose.yaml @@ -10,7 +10,7 @@ name: etl services: # Start Postgres postgres: - image: postgres:17 + image: postgres:${POSTGRES_VERSION:-17} container_name: postgres environment: POSTGRES_USER: ${POSTGRES_USER:-postgres} From 18a7724160e390f966f1580419c48585741aa797 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Thu, 28 Aug 2025 12:40:23 -0400 Subject: [PATCH 02/12] Convert into config script --- scripts/docker-compose.yaml | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/scripts/docker-compose.yaml b/scripts/docker-compose.yaml index d8cda9bc2..1647f1c7f 100644 --- a/scripts/docker-compose.yaml +++ b/scripts/docker-compose.yaml @@ -20,6 +20,10 @@ services: - "${POSTGRES_PORT:-5430}:5432" volumes: - ${POSTGRES_DATA_VOLUME:-postgres_data}:/var/lib/postgresql/data + configs: + - source: postgres_init + target: /docker-entrypoint-initdb.d/01-create-iceberg-catalog.sh + mode: 0755 command: > postgres -N 1000 @@ -33,24 +37,6 @@ services: timeout: 5s retries: 5 - # Create database for iceberg catalog - create-catalog-db: - image: postgres:17 - container_name: create-catalog-db - depends_on: - postgres: - condition: service_healthy - environment: - PGHOST: postgres - PGPORT: 5432 - PGUSER: ${POSTGRES_USER:-postgres} - PGPASSWORD: ${POSTGRES_PASSWORD:-postgres} - entrypoint: > - /bin/sh -c " - psql -c 'create database \"iceberg-catalog\";'; - exit 0; - " - # Start MinIO for S3-compatible object storage minio: image: minio/minio:latest @@ -92,8 +78,8 @@ services: restart: "no" command: ["migrate"] depends_on: - create-catalog-db: - condition: service_completed_successfully + postgres: + condition: service_healthy # Start lakekeeper, an iceberg REST catalog lakekeeper: @@ -167,6 +153,16 @@ services: volumes: - ./warehouse.json:/home/curl_user/warehouse.json +configs: + postgres_init: + content: | + #!/bin/bash + set -e + + psql -v ON_ERROR_STOP=1 --username "${POSTGRES_USER:-postgres}" --dbname "${POSTGRES_DB:-postgres}" <<-EOSQL + CREATE DATABASE "iceberg-catalog"; + EOSQL + volumes: postgres_data: driver: local From e094c8621fba1e35e7eeae6f7cbf1938b5672248 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Thu, 28 Aug 2025 13:01:18 -0400 Subject: [PATCH 03/12] Rework docker compose to be isolated across pg versions --- scripts/docker-compose.yaml | 46 +++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/scripts/docker-compose.yaml b/scripts/docker-compose.yaml index 1647f1c7f..bc4d07fc0 100644 --- a/scripts/docker-compose.yaml +++ b/scripts/docker-compose.yaml @@ -1,17 +1,16 @@ x-lakekeeper-env: &lakekeeper-env environment: - LAKEKEEPER__PG_ENCRYPTION_KEY=insecure-encryption-key - - LAKEKEEPER__PG_DATABASE_URL_READ=postgresql://postgres:postgres@postgres:5432/iceberg-catalog - - LAKEKEEPER__PG_DATABASE_URL_WRITE=postgresql://postgres:postgres@postgres:5432/iceberg-catalog + - LAKEKEEPER__PG_DATABASE_URL_READ=postgresql://lakekeeper-postgres:lakekeeper-postgres@lakekeeper-postgres:5432/iceberg-catalog + - LAKEKEEPER__PG_DATABASE_URL_WRITE=postgresql://lakekeeper-postgres:lakekeeper-postgres@lakekeeper-postgres:5432/iceberg-catalog - RUST_LOG=info -name: etl +name: etl-pg-${POSTGRES_VERSION:-17} services: # Start Postgres postgres: image: postgres:${POSTGRES_VERSION:-17} - container_name: postgres environment: POSTGRES_USER: ${POSTGRES_USER:-postgres} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-postgres} @@ -20,10 +19,6 @@ services: - "${POSTGRES_PORT:-5430}:5432" volumes: - ${POSTGRES_DATA_VOLUME:-postgres_data}:/var/lib/postgresql/data - configs: - - source: postgres_init - target: /docker-entrypoint-initdb.d/01-create-iceberg-catalog.sh - mode: 0755 command: > postgres -N 1000 @@ -37,10 +32,24 @@ services: timeout: 5s retries: 5 + lakekeeper-postgres: + image: postgres:17 + environment: + POSTGRES_USER: ${LAKEKEEPER_POSTGRES_USER:-lakekeeper-postgres} + POSTGRES_PASSWORD: ${LAKEKEEPER_POSTGRES_PASSWORD:-lakekeeper-postgres} + POSTGRES_DB: ${LAKEKEEPER_POSTGRES_DB:-iceberg-catalog} + volumes: + - ${LAKEKEEPER_POSTGRES_DATA_VOLUME:-lakekeeper_postgres_data}:/var/lib/postgresql/data + restart: unless-stopped + healthcheck: + test: ["CMD-SHELL", "pg_isready -U ${LAKEKEEPER_POSTGRES_USER:-lakekeeper-postgres} -d ${LAKEKEEPER_POSTGRES_DB:-iceberg-catalog}"] + interval: 5s + timeout: 5s + retries: 5 + # Start MinIO for S3-compatible object storage minio: image: minio/minio:latest - container_name: minio environment: MINIO_ROOT_USER: minio-admin MINIO_ROOT_PASSWORD: minio-admin-password @@ -59,7 +68,6 @@ services: # Create MinIO bucket create-bucket: image: minio/mc:latest - container_name: create-bucket depends_on: minio: condition: service_healthy @@ -73,18 +81,16 @@ services: # Migrate lakekeeper database migrate-lakekeeper: image: quay.io/lakekeeper/catalog:latest-main - container_name: migrate-lakekeeper <<: *lakekeeper-env restart: "no" command: ["migrate"] depends_on: - postgres: + lakekeeper-postgres: condition: service_healthy # Start lakekeeper, an iceberg REST catalog lakekeeper: image: quay.io/lakekeeper/catalog:latest-main - container_name: lakekeeper depends_on: migrate-lakekeeper: condition: service_completed_successfully @@ -104,7 +110,6 @@ services: # Bootstrap lakekeeper. After deployment, lakekeeper needs to be bootstrapped. bootstrap-lakekeeper: image: curlimages/curl - container_name: bootstrap-lakekeeper depends_on: lakekeeper: condition: service_healthy @@ -127,7 +132,6 @@ services: # Create a warehouse for development and testing create-warehouse: image: curlimages/curl - container_name: create-warehouse depends_on: lakekeeper: condition: service_healthy @@ -153,18 +157,10 @@ services: volumes: - ./warehouse.json:/home/curl_user/warehouse.json -configs: - postgres_init: - content: | - #!/bin/bash - set -e - - psql -v ON_ERROR_STOP=1 --username "${POSTGRES_USER:-postgres}" --dbname "${POSTGRES_DB:-postgres}" <<-EOSQL - CREATE DATABASE "iceberg-catalog"; - EOSQL - volumes: postgres_data: driver: local + lakekeeper_postgres_data: + driver: local minio_data: driver: local From 1a2cfc1bf7315dc5e3da6220f9c7eda363ad2d77 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Thu, 28 Aug 2025 13:26:32 -0400 Subject: [PATCH 04/12] Store server version in OnceCell --- etl/src/replication/client.rs | 91 ++++++++++++++++++++++------------- 1 file changed, 58 insertions(+), 33 deletions(-) diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index f4bca90da..748e87572 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -11,6 +11,7 @@ use std::collections::HashMap; use std::fmt; use std::io::BufReader; use std::sync::Arc; +use tokio::sync::OnceCell; use tokio_postgres::error::SqlState; use tokio_postgres::tls::MakeTlsConnect; use tokio_postgres::{ @@ -162,7 +163,7 @@ impl PgReplicationSlotTransaction { /// and streaming changes from the database. #[derive(Debug, Clone)] pub struct PgReplicationClient { - client: Arc, + client: Arc<(Client, OnceCell)>, } impl PgReplicationClient { @@ -177,6 +178,15 @@ impl PgReplicationClient { } } + + // Convenience method to avoid having to access the client directly. + async fn simple_query( + &self, + query: &str, + ) -> Result, tokio_postgres::Error> { + self.client.0.simple_query(query).await + } + /// Establishes a connection to Postgres without TLS encryption. /// /// The connection is configured for logical replication mode. @@ -190,7 +200,7 @@ impl PgReplicationClient { info!("successfully connected to postgres without tls"); Ok(PgReplicationClient { - client: Arc::new(client), + client: Arc::new((client, OnceCell::new())), }) } @@ -221,7 +231,7 @@ impl PgReplicationClient { info!("successfully connected to postgres with tls"); Ok(PgReplicationClient { - client: Arc::new(client), + client: Arc::new((client, OnceCell::new())), }) } @@ -253,7 +263,7 @@ impl PgReplicationClient { quote_literal(slot_name) ); - let results = self.client.simple_query(&query).await?; + let results = self.simple_query(&query).await?; for result in results { if let SimpleQueryMessage::Row(row) = result { let confirmed_flush_lsn = Self::get_row_value::( @@ -315,7 +325,7 @@ impl PgReplicationClient { quote_identifier(slot_name) ); - match self.client.simple_query(&query).await { + match self.simple_query(&query).await { Ok(_) => { info!("successfully deleted replication slot '{}'", slot_name); @@ -353,7 +363,7 @@ impl PgReplicationClient { "select 1 as exists from pg_publication where pubname = {};", quote_literal(publication) ); - for msg in self.client.simple_query(&publication_exists_query).await? { + for msg in self.simple_query(&publication_exists_query).await? { if let SimpleQueryMessage::Row(_) = msg { return Ok(true); } @@ -373,7 +383,7 @@ impl PgReplicationClient { ); let mut table_names = vec![]; - for msg in self.client.simple_query(&publication_query).await? { + for msg in self.simple_query(&publication_query).await? { if let SimpleQueryMessage::Row(row) = msg { let schema = Self::get_row_value::(&row, "schemaname", "pg_publication_tables") @@ -403,7 +413,7 @@ impl PgReplicationClient { ); let mut table_ids = vec![]; - for msg in self.client.simple_query(&publication_query).await? { + for msg in self.simple_query(&publication_query).await? { if let SimpleQueryMessage::Row(row) = msg { // For the sake of simplicity, we refer to the table oid as table id. let table_id = Self::get_row_value::(&row, "oid", "pg_class").await?; @@ -441,7 +451,11 @@ impl PgReplicationClient { options ); - let copy_stream = self.client.copy_both_simple::(&query).await?; + let copy_stream = self + .client + .0 + .copy_both_simple::(&query) + .await?; let stream = LogicalReplicationStream::new(copy_stream); Ok(stream) @@ -452,8 +466,7 @@ impl PgReplicationClient { /// The transaction doesn't make any assumptions about the snapshot in use, since this is a /// concern of the statements issued within the transaction. async fn begin_tx(&self) -> EtlResult<()> { - self.client - .simple_query("begin read only isolation level repeatable read;") + self.simple_query("begin read only isolation level repeatable read;") .await?; Ok(()) @@ -461,14 +474,14 @@ impl PgReplicationClient { /// Commits the current transaction. async fn commit_tx(&self) -> EtlResult<()> { - self.client.simple_query("commit;").await?; + self.simple_query("commit;").await?; Ok(()) } /// Rolls back the current transaction. async fn rollback_tx(&self) -> EtlResult<()> { - self.client.simple_query("rollback;").await?; + self.simple_query("rollback;").await?; Ok(()) } @@ -495,7 +508,7 @@ impl PgReplicationClient { quote_identifier(slot_name), snapshot_option ); - match self.client.simple_query(&query).await { + match self.simple_query(&query).await { Ok(results) => { for result in results { if let SimpleQueryMessage::Row(row) = result { @@ -595,7 +608,7 @@ impl PgReplicationClient { where c.oid = {table_id}", ); - for message in self.client.simple_query(&table_info_query).await? { + for message in self.simple_query(&table_info_query).await? { if let SimpleQueryMessage::Row(row) = message { let schema_name = Self::get_row_value::(&row, "schema_name", "pg_namespace").await?; @@ -626,7 +639,7 @@ impl PgReplicationClient { publication: Option<&str>, ) -> EtlResult> { let (pub_cte, pub_pred) = if let Some(publication) = publication { - let is_pg14_or_earlier = self.is_postgres_14_or_earlier().await?; + let is_pg14_or_earlier = self.get_server_version().await.unwrap_or(0) < 150000; if !is_pg14_or_earlier { ( @@ -690,7 +703,7 @@ impl PgReplicationClient { let mut column_schemas = vec![]; - for message in self.client.simple_query(&column_info_query).await? { + for message in self.simple_query(&column_info_query).await? { if let SimpleQueryMessage::Row(row) = message { let name = Self::get_row_value::(&row, "attname", "pg_attribute").await?; let type_oid = Self::get_row_value::(&row, "atttypid", "pg_attribute").await?; @@ -716,24 +729,36 @@ impl PgReplicationClient { Ok(column_schemas) } - async fn is_postgres_14_or_earlier(&self) -> EtlResult { - let version_query = "SHOW server_version_num"; - - for message in self.client.simple_query(version_query).await? { - if let SimpleQueryMessage::Row(row) = message { - let version_str = - Self::get_row_value::(&row, "server_version_num", "server_version_num") + /// Gets the PostgreSQL server version. + /// + /// Returns the version in the format: MAJOR * 10000 + MINOR * 100 + PATCH + /// For example: PostgreSQL 14.2 = 140200, PostgreSQL 15.1 = 150100 + async fn get_server_version(&self) -> EtlResult { + let version = self + .client + .1 + .get_or_try_init(|| async { + let version_query = "SHOW server_version_num"; + + for message in self.simple_query(version_query).await? { + if let SimpleQueryMessage::Row(row) = message { + let version_str = Self::get_row_value::( + &row, + "server_version_num", + "server_version_num", + ) .await?; - let server_version: i32 = version_str.parse().unwrap_or(0); + let version: i32 = version_str.parse().unwrap_or(0); + return Ok::<_, EtlError>(version); + } + } - // PostgreSQL version format is typically: MAJOR * 10000 + MINOR * 100 + PATCH - // For version 14.x.x, this would be 140000 + minor * 100 + patch - // For version 15.x.x, this would be 150000 + minor * 100 + patch - return Ok(server_version < 150000); - } - } + // If we can't determine, return 0 (which will be treated as very old version) + Ok(0) + }) + .await?; - Ok(false) + Ok(*version) } /// Creates a COPY stream for reading data from a table using its OID. @@ -759,7 +784,7 @@ impl PgReplicationClient { column_list ); - let stream = self.client.copy_out_simple(©_query).await?; + let stream = self.client.0.copy_out_simple(©_query).await?; Ok(stream) } From da6459fbfcfa2c72e554317f1977026d1dde13c3 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Thu, 28 Aug 2025 13:31:36 -0400 Subject: [PATCH 05/12] Fmt --- etl/src/replication/client.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 748e87572..7f045207b 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -178,7 +178,6 @@ impl PgReplicationClient { } } - // Convenience method to avoid having to access the client directly. async fn simple_query( &self, From e8c10c6db4611d1d197e67cd28ffb48bfdd831fa Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Fri, 29 Aug 2025 10:14:22 -0400 Subject: [PATCH 06/12] Extract postgres version from connection parameters --- etl/src/replication/client.rs | 129 +++++++++++++++++++--------------- 1 file changed, 72 insertions(+), 57 deletions(-) diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 7f045207b..b4e4d2183 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -10,8 +10,10 @@ use rustls::ClientConfig; use std::collections::HashMap; use std::fmt; use std::io::BufReader; +use std::num::NonZeroI32; use std::sync::Arc; -use tokio::sync::OnceCell; + +use tokio::io::{AsyncRead, AsyncWrite}; use tokio_postgres::error::SqlState; use tokio_postgres::tls::MakeTlsConnect; use tokio_postgres::{ @@ -163,7 +165,8 @@ impl PgReplicationSlotTransaction { /// and streaming changes from the database. #[derive(Debug, Clone)] pub struct PgReplicationClient { - client: Arc<(Client, OnceCell)>, + client: Arc, + server_version: Option, } impl PgReplicationClient { @@ -178,14 +181,6 @@ impl PgReplicationClient { } } - // Convenience method to avoid having to access the client directly. - async fn simple_query( - &self, - query: &str, - ) -> Result, tokio_postgres::Error> { - self.client.0.simple_query(query).await - } - /// Establishes a connection to Postgres without TLS encryption. /// /// The connection is configured for logical replication mode. @@ -194,12 +189,16 @@ impl PgReplicationClient { config.replication_mode(ReplicationMode::Logical); let (client, connection) = config.connect(NoTls).await?; + + let server_version = Self::extract_server_version(&connection); + spawn_postgres_connection::(connection); info!("successfully connected to postgres without tls"); Ok(PgReplicationClient { - client: Arc::new((client, OnceCell::new())), + client: Arc::new(client), + server_version, }) } @@ -225,12 +224,16 @@ impl PgReplicationClient { .with_no_client_auth(); let (client, connection) = config.connect(MakeRustlsConnect::new(tls_config)).await?; + + let server_version = Self::extract_server_version(&connection); + spawn_postgres_connection::(connection); info!("successfully connected to postgres with tls"); Ok(PgReplicationClient { - client: Arc::new((client, OnceCell::new())), + client: Arc::new(client), + server_version, }) } @@ -262,7 +265,7 @@ impl PgReplicationClient { quote_literal(slot_name) ); - let results = self.simple_query(&query).await?; + let results = self.client.simple_query(&query).await?; for result in results { if let SimpleQueryMessage::Row(row) = result { let confirmed_flush_lsn = Self::get_row_value::( @@ -324,7 +327,7 @@ impl PgReplicationClient { quote_identifier(slot_name) ); - match self.simple_query(&query).await { + match self.client.simple_query(&query).await { Ok(_) => { info!("successfully deleted replication slot '{}'", slot_name); @@ -362,7 +365,7 @@ impl PgReplicationClient { "select 1 as exists from pg_publication where pubname = {};", quote_literal(publication) ); - for msg in self.simple_query(&publication_exists_query).await? { + for msg in self.client.simple_query(&publication_exists_query).await? { if let SimpleQueryMessage::Row(_) = msg { return Ok(true); } @@ -382,7 +385,7 @@ impl PgReplicationClient { ); let mut table_names = vec![]; - for msg in self.simple_query(&publication_query).await? { + for msg in self.client.simple_query(&publication_query).await? { if let SimpleQueryMessage::Row(row) = msg { let schema = Self::get_row_value::(&row, "schemaname", "pg_publication_tables") @@ -412,7 +415,7 @@ impl PgReplicationClient { ); let mut table_ids = vec![]; - for msg in self.simple_query(&publication_query).await? { + for msg in self.client.simple_query(&publication_query).await? { if let SimpleQueryMessage::Row(row) = msg { // For the sake of simplicity, we refer to the table oid as table id. let table_id = Self::get_row_value::(&row, "oid", "pg_class").await?; @@ -450,11 +453,7 @@ impl PgReplicationClient { options ); - let copy_stream = self - .client - .0 - .copy_both_simple::(&query) - .await?; + let copy_stream = self.client.copy_both_simple::(&query).await?; let stream = LogicalReplicationStream::new(copy_stream); Ok(stream) @@ -465,7 +464,8 @@ impl PgReplicationClient { /// The transaction doesn't make any assumptions about the snapshot in use, since this is a /// concern of the statements issued within the transaction. async fn begin_tx(&self) -> EtlResult<()> { - self.simple_query("begin read only isolation level repeatable read;") + self.client + .simple_query("begin read only isolation level repeatable read;") .await?; Ok(()) @@ -473,14 +473,14 @@ impl PgReplicationClient { /// Commits the current transaction. async fn commit_tx(&self) -> EtlResult<()> { - self.simple_query("commit;").await?; + self.client.simple_query("commit;").await?; Ok(()) } /// Rolls back the current transaction. async fn rollback_tx(&self) -> EtlResult<()> { - self.simple_query("rollback;").await?; + self.client.simple_query("rollback;").await?; Ok(()) } @@ -507,7 +507,7 @@ impl PgReplicationClient { quote_identifier(slot_name), snapshot_option ); - match self.simple_query(&query).await { + match self.client.simple_query(&query).await { Ok(results) => { for result in results { if let SimpleQueryMessage::Row(row) = result { @@ -607,7 +607,7 @@ impl PgReplicationClient { where c.oid = {table_id}", ); - for message in self.simple_query(&table_info_query).await? { + for message in self.client.simple_query(&table_info_query).await? { if let SimpleQueryMessage::Row(row) = message { let schema_name = Self::get_row_value::(&row, "schema_name", "pg_namespace").await?; @@ -638,7 +638,12 @@ impl PgReplicationClient { publication: Option<&str>, ) -> EtlResult> { let (pub_cte, pub_pred) = if let Some(publication) = publication { - let is_pg14_or_earlier = self.get_server_version().await.unwrap_or(0) < 150000; + let is_pg14_or_earlier = if let Some(server_version) = self.server_version { + server_version.get() < 150000 + } else { + // be conservative by default + true + }; if !is_pg14_or_earlier { ( @@ -702,7 +707,7 @@ impl PgReplicationClient { let mut column_schemas = vec![]; - for message in self.simple_query(&column_info_query).await? { + for message in self.client.simple_query(&column_info_query).await? { if let SimpleQueryMessage::Row(row) = message { let name = Self::get_row_value::(&row, "attname", "pg_attribute").await?; let type_oid = Self::get_row_value::(&row, "atttypid", "pg_attribute").await?; @@ -728,36 +733,46 @@ impl PgReplicationClient { Ok(column_schemas) } - /// Gets the PostgreSQL server version. + /// Extracts the PostgreSQL server version from connection parameters. + /// + /// This method should be called during connection establishment to extract + /// the server version from the parameter status messages sent by the server. /// /// Returns the version in the format: MAJOR * 10000 + MINOR * 100 + PATCH + /// This matches the format used by `SELECT version()`. /// For example: PostgreSQL 14.2 = 140200, PostgreSQL 15.1 = 150100 - async fn get_server_version(&self) -> EtlResult { - let version = self - .client - .1 - .get_or_try_init(|| async { - let version_query = "SHOW server_version_num"; - - for message in self.simple_query(version_query).await? { - if let SimpleQueryMessage::Row(row) = message { - let version_str = Self::get_row_value::( - &row, - "server_version_num", - "server_version_num", - ) - .await?; - let version: i32 = version_str.parse().unwrap_or(0); - return Ok::<_, EtlError>(version); - } - } - - // If we can't determine, return 0 (which will be treated as very old version) - Ok(0) - }) - .await?; - - Ok(*version) + fn extract_server_version(connection: &Connection) -> Option + where + S: AsyncRead + AsyncWrite + Unpin + Send, + { + if let Some(server_version_str) = connection.parameter("server_version") { + // Parse version string like "15.5 (Homebrew)" or "14.2" + let version_part = server_version_str + .split_whitespace() + .next() + .unwrap_or("0.0"); + + let version_components: Vec<&str> = version_part.split('.').collect(); + + let major = version_components + .get(0) + .and_then(|v| v.parse::().ok()) + .unwrap_or(0); + let minor = version_components + .get(1) + .and_then(|v| v.parse::().ok()) + .unwrap_or(0); + let patch = version_components + .get(2) + .and_then(|v| v.parse::().ok()) + .unwrap_or(0); + + let version = major * 10000 + minor * 100 + patch; + + NonZeroI32::new(version) + } else { + None + } } /// Creates a COPY stream for reading data from a table using its OID. @@ -783,7 +798,7 @@ impl PgReplicationClient { column_list ); - let stream = self.client.0.copy_out_simple(©_query).await?; + let stream = self.client.copy_out_simple(©_query).await?; Ok(stream) } From 96ba722c50e2a1139e6513990f1e0cdc7ca63d9d Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Fri, 29 Aug 2025 10:55:02 -0400 Subject: [PATCH 07/12] Clippy --- etl/src/replication/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index b4e4d2183..cd8873f3c 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -755,7 +755,7 @@ impl PgReplicationClient { let version_components: Vec<&str> = version_part.split('.').collect(); let major = version_components - .get(0) + .first() .and_then(|v| v.parse::().ok()) .unwrap_or(0); let minor = version_components From 283135676e82de242b6a18cae8d931ad0d2d65e4 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Fri, 29 Aug 2025 13:10:32 -0400 Subject: [PATCH 08/12] Write unit tests, disable some tests if PG<=15, --- etl-postgres/src/replication/db.rs | 106 +++++++++++++++++++++++++ etl-postgres/src/tokio/test_utils.rs | 86 +++++++++++++++----- etl/src/replication/client.rs | 53 ++----------- etl/tests/integration/pipeline_test.rs | 14 ++++ 4 files changed, 194 insertions(+), 65 deletions(-) diff --git a/etl-postgres/src/replication/db.rs b/etl-postgres/src/replication/db.rs index b3698ed1b..d86f15154 100644 --- a/etl-postgres/src/replication/db.rs +++ b/etl-postgres/src/replication/db.rs @@ -1,3 +1,5 @@ +use std::num::NonZeroI32; + use etl_config::shared::{IntoConnectOptions, PgConnectionConfig}; use sqlx::{PgPool, Row, postgres::PgPoolOptions}; use thiserror::Error; @@ -68,3 +70,107 @@ pub async fn get_table_name_from_oid( None => Err(TableLookupError::TableNotFound(table_id)), } } + +/// Extracts the PostgreSQL server version from a version string. +/// +/// This function parses version strings like "15.5 (Homebrew)" or "14.2" +/// and converts them to the numeric format used by PostgreSQL. +/// +/// Returns the version in the format: MAJOR * 10000 + MINOR * 100 + PATCH +/// For example: PostgreSQL 14.2 = 140200, PostgreSQL 15.1 = 150100 +/// +/// Returns `None` if the version string cannot be parsed or results in zero. +pub fn extract_server_version(server_version_str: impl AsRef) -> Option { + // Parse version string like "15.5 (Homebrew)" or "14.2" + let version_part = server_version_str + .as_ref() + .split_whitespace() + .next() + .unwrap_or("0.0"); + + let version_components: Vec<&str> = version_part.split('.').collect(); + + let major = version_components + .first() + .and_then(|v| v.parse::().ok()) + .unwrap_or(0); + let minor = version_components + .get(1) + .and_then(|v| v.parse::().ok()) + .unwrap_or(0); + let patch = version_components + .get(2) + .and_then(|v| v.parse::().ok()) + .unwrap_or(0); + + let version = major * 10000 + minor * 100 + patch; + + NonZeroI32::new(version) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_server_version_basic_versions() { + assert_eq!(extract_server_version("15.5"), NonZeroI32::new(150500)); + assert_eq!(extract_server_version("14.2"), NonZeroI32::new(140200)); + assert_eq!(extract_server_version("13.0"), NonZeroI32::new(130000)); + assert_eq!(extract_server_version("16.1"), NonZeroI32::new(160100)); + } + + #[test] + fn test_extract_server_version_with_suffixes() { + assert_eq!( + extract_server_version("15.5 (Homebrew)"), + NonZeroI32::new(150500) + ); + assert_eq!( + extract_server_version("14.2 on x86_64-pc-linux-gnu"), + NonZeroI32::new(140200) + ); + assert_eq!( + extract_server_version("13.7 (Ubuntu 13.7-1.pgdg20.04+1)"), + NonZeroI32::new(130700) + ); + assert_eq!( + extract_server_version("16.0 devel"), + NonZeroI32::new(160000) + ); + } + + #[test] + fn test_extract_server_version_patch_versions() { + // Test versions with patch numbers + assert_eq!(extract_server_version("15.5.1"), NonZeroI32::new(150501)); + assert_eq!(extract_server_version("14.10.3"), NonZeroI32::new(141003)); + assert_eq!(extract_server_version("13.12.25"), NonZeroI32::new(131225)); + } + + #[test] + fn test_extract_server_version_invalid_inputs() { + // Test invalid inputs that should return None + assert_eq!(extract_server_version(""), None); + assert_eq!(extract_server_version("invalid"), None); + assert_eq!(extract_server_version("not.a.version"), None); + assert_eq!(extract_server_version("PostgreSQL"), None); + assert_eq!(extract_server_version(" "), None); + } + + #[test] + fn test_extract_server_version_zero_versions() { + assert_eq!(extract_server_version("0.0.0"), None); + assert_eq!(extract_server_version("0.0"), None); + } + + #[test] + fn test_extract_server_version_whitespace_handling() { + assert_eq!(extract_server_version(" 15.5 "), NonZeroI32::new(150500)); + assert_eq!( + extract_server_version("15.5\t(Homebrew)"), + NonZeroI32::new(150500) + ); + assert_eq!(extract_server_version("15.5\n"), NonZeroI32::new(150500)); + } +} diff --git a/etl-postgres/src/tokio/test_utils.rs b/etl-postgres/src/tokio/test_utils.rs index 5aed84476..1c791fabc 100644 --- a/etl-postgres/src/tokio/test_utils.rs +++ b/etl-postgres/src/tokio/test_utils.rs @@ -1,9 +1,12 @@ +use std::num::NonZeroI32; + use etl_config::shared::{IntoConnectOptions, PgConnectionConfig}; use tokio::runtime::Handle; use tokio_postgres::types::{ToSql, Type}; use tokio_postgres::{Client, GenericClient, NoTls, Transaction}; use tracing::info; +use crate::replication::extract_server_version; use crate::types::{ColumnSchema, TableId, TableName}; /// Table modification operations for ALTER TABLE statements. @@ -34,10 +37,15 @@ pub enum TableModification<'a> { pub struct PgDatabase { pub config: PgConnectionConfig, pub client: Option, + server_version: Option, destroy_on_drop: bool, } impl PgDatabase { + pub fn server_version(&self) -> Option { + self.server_version + } + /// Creates a Postgres publication for the specified tables. /// /// Sets up logical replication by creating a publication that includes @@ -71,19 +79,51 @@ impl PgDatabase { publication_name: &str, schema: Option<&str>, ) -> Result<(), tokio_postgres::Error> { - let create_publication_query = match schema { - Some(schema_name) => format!( - "create publication {} for tables in schema {}", - publication_name, schema_name - ), - None => format!("create publication {} for all tables", publication_name), - }; - - self.client - .as_ref() - .unwrap() - .execute(&create_publication_query, &[]) - .await?; + let client = self.client.as_ref().unwrap(); + + if let Some(server_version) = self.server_version + && server_version.get() >= 150000 + { + // PostgreSQL 15+ supports FOR ALL TABLES IN SCHEMA syntax + let create_publication_query = match schema { + Some(schema_name) => format!( + "create publication {} for tables in schema {}", + publication_name, schema_name + ), + None => format!("create publication {} for all tables", publication_name), + }; + + client.execute(&create_publication_query, &[]).await?; + } else { + // PostgreSQL 14 and earlier: create publication and add tables individually + match schema { + Some(schema_name) => { + let create_pub_query = format!("create publication {}", publication_name); + client.execute(&create_pub_query, &[]).await?; + + let tables_query = format!( + "select schemaname, tablename from pg_tables where schemaname = '{}'", + schema_name + ); + let rows = client.query(&tables_query, &[]).await?; + + for row in rows { + let schema: String = row.get(0); + let table: String = row.get(1); + let add_table_query = format!( + "alter publication {} add table {}.{}", + publication_name, schema, table + ); + client.execute(&add_table_query, &[]).await?; + } + } + None => { + let create_publication_query = + format!("create publication {} for all tables", publication_name); + client.execute(&create_publication_query, &[]).await?; + } + } + } Ok(()) } @@ -369,7 +409,8 @@ impl PgDatabase { Self { config, - client: Some(client), + client: Some(client.0), + server_version: client.1, destroy_on_drop: true, } } @@ -386,7 +427,8 @@ impl PgDatabase { Self { config, - client: Some(client), + client: Some(client.0), + server_version: client.1, destroy_on_drop: true, } } @@ -401,6 +443,7 @@ impl PgDatabase { PgDatabase { config: self.config.clone(), client: Some(transaction), + server_version: self.server_version, destroy_on_drop: false, } } @@ -450,7 +493,7 @@ pub fn id_column_schema() -> ColumnSchema { /// /// # Panics /// Panics if connection or database creation fails. -pub async fn create_pg_database(config: &PgConnectionConfig) -> Client { +pub async fn create_pg_database(config: &PgConnectionConfig) -> (Client, Option) { // Create the database via a single connection let (client, connection) = { let config: tokio_postgres::Config = config.without_db(); @@ -474,14 +517,16 @@ pub async fn create_pg_database(config: &PgConnectionConfig) -> Client { .expect("Failed to create database"); // Connects to the actual Postgres database - connect_to_pg_database(config).await + let (client, server_version) = connect_to_pg_database(config).await; + + (client, server_version) } /// Connects to an existing Postgres database. /// /// Establishes a client connection to the database specified in the configuration. /// Assumes the database already exists. -pub async fn connect_to_pg_database(config: &PgConnectionConfig) -> Client { +pub async fn connect_to_pg_database(config: &PgConnectionConfig) -> (Client, Option) { // Create a new client connected to the created database let (client, connection) = { let config: tokio_postgres::Config = config.with_db(); @@ -490,6 +535,9 @@ pub async fn connect_to_pg_database(config: &PgConnectionConfig) -> Client { .await .expect("Failed to connect to Postgres") }; + let server_version = connection + .parameter("server_version") + .and_then(extract_server_version); // Spawn the connection on a new task tokio::spawn(async move { @@ -498,7 +546,7 @@ pub async fn connect_to_pg_database(config: &PgConnectionConfig) -> Client { } }); - client + (client, server_version) } /// Drops a Postgres database and cleans up all resources. diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index cd8873f3c..9f3a79b7e 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -2,6 +2,7 @@ use crate::error::{ErrorKind, EtlError, EtlResult}; use crate::utils::tokio::MakeRustlsConnect; use crate::{bail, etl_error}; use etl_config::shared::{IntoConnectOptions, PgConnectionConfig}; +use etl_postgres::replication::extract_server_version; use etl_postgres::types::convert_type_oid_to_type; use etl_postgres::types::{ColumnSchema, TableId, TableName, TableSchema}; use pg_escape::{quote_identifier, quote_literal}; @@ -13,7 +14,6 @@ use std::io::BufReader; use std::num::NonZeroI32; use std::sync::Arc; -use tokio::io::{AsyncRead, AsyncWrite}; use tokio_postgres::error::SqlState; use tokio_postgres::tls::MakeTlsConnect; use tokio_postgres::{ @@ -190,7 +190,9 @@ impl PgReplicationClient { let (client, connection) = config.connect(NoTls).await?; - let server_version = Self::extract_server_version(&connection); + let server_version = connection + .parameter("server_version") + .and_then(extract_server_version); spawn_postgres_connection::(connection); @@ -225,7 +227,9 @@ impl PgReplicationClient { let (client, connection) = config.connect(MakeRustlsConnect::new(tls_config)).await?; - let server_version = Self::extract_server_version(&connection); + let server_version = connection + .parameter("server_version") + .and_then(extract_server_version); spawn_postgres_connection::(connection); @@ -732,49 +736,6 @@ impl PgReplicationClient { Ok(column_schemas) } - - /// Extracts the PostgreSQL server version from connection parameters. - /// - /// This method should be called during connection establishment to extract - /// the server version from the parameter status messages sent by the server. - /// - /// Returns the version in the format: MAJOR * 10000 + MINOR * 100 + PATCH - /// This matches the format used by `SELECT version()`. - /// For example: PostgreSQL 14.2 = 140200, PostgreSQL 15.1 = 150100 - fn extract_server_version(connection: &Connection) -> Option - where - S: AsyncRead + AsyncWrite + Unpin + Send, - { - if let Some(server_version_str) = connection.parameter("server_version") { - // Parse version string like "15.5 (Homebrew)" or "14.2" - let version_part = server_version_str - .split_whitespace() - .next() - .unwrap_or("0.0"); - - let version_components: Vec<&str> = version_part.split('.').collect(); - - let major = version_components - .first() - .and_then(|v| v.parse::().ok()) - .unwrap_or(0); - let minor = version_components - .get(1) - .and_then(|v| v.parse::().ok()) - .unwrap_or(0); - let patch = version_components - .get(2) - .and_then(|v| v.parse::().ok()) - .unwrap_or(0); - - let version = major * 10000 + minor * 100 + patch; - - NonZeroI32::new(version) - } else { - None - } - } - /// Creates a COPY stream for reading data from a table using its OID. /// /// The stream will include only the specified columns and use text format. diff --git a/etl/tests/integration/pipeline_test.rs b/etl/tests/integration/pipeline_test.rs index 0e2ece881..fd4377ed8 100644 --- a/etl/tests/integration/pipeline_test.rs +++ b/etl/tests/integration/pipeline_test.rs @@ -190,6 +190,13 @@ async fn publication_changes_are_correctly_handled() { table_1_done.notified().await; table_2_done.notified().await; + if let Some(server_version) = database.server_version() + && server_version.get() <= 150000 + { + println!("Skipping test for PostgreSQL version <= 15"); + return; + } + // Insert one row in each table and wait for two insert events. let inserts_notify = destination .wait_for_events_count(vec![(EventType::Insert, 2)]) @@ -353,6 +360,13 @@ async fn publication_for_all_tables_in_schema_ignores_new_tables_until_restart() assert!(table_schemas.contains_key(&table_1_id)); assert!(!table_schemas.contains_key(&table_2_id)); + if let Some(server_version) = database.server_version() + && server_version.get() <= 150000 + { + println!("Skipping test for PostgreSQL version <= 15"); + return; + } + // We restart the pipeline and verify that the new table is now processed. let mut pipeline = create_pipeline( &database.config, From ba801096ec03e9c75ea9722bae7e26f574929694 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Fri, 29 Aug 2025 14:17:24 -0400 Subject: [PATCH 09/12] Move to if-let chain --- etl/src/replication/client.rs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 9f3a79b7e..6b279332b 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -642,14 +642,7 @@ impl PgReplicationClient { publication: Option<&str>, ) -> EtlResult> { let (pub_cte, pub_pred) = if let Some(publication) = publication { - let is_pg14_or_earlier = if let Some(server_version) = self.server_version { - server_version.get() < 150000 - } else { - // be conservative by default - true - }; - - if !is_pg14_or_earlier { + if let Some(server_version) = self.server_version && server_version.get() >= 150000 { ( format!( "with pub_attrs as ( @@ -669,7 +662,7 @@ impl PgReplicationClient { )", ) } else { - // No column-level filtering, check if table is in publication + // Postgres 14 or earlier or unknown, fallback to no column-level filtering ( format!( "with pub_table as ( From 0046a36f28e163f47a0b8c331bc6b9d750b7721c Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Fri, 29 Aug 2025 14:20:14 -0400 Subject: [PATCH 10/12] fmt --- etl/src/replication/client.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 6b279332b..8a27ddc91 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -642,7 +642,9 @@ impl PgReplicationClient { publication: Option<&str>, ) -> EtlResult> { let (pub_cte, pub_pred) = if let Some(publication) = publication { - if let Some(server_version) = self.server_version && server_version.get() >= 150000 { + if let Some(server_version) = self.server_version + && server_version.get() >= 150000 + { ( format!( "with pub_attrs as ( From 8d6af035c2c49bb6d009b84e8cdf2ad75647bfe0 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Mon, 1 Sep 2025 08:18:15 -0400 Subject: [PATCH 11/12] Update skip message to be more descriptive --- etl/tests/integration/pipeline_test.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/etl/tests/integration/pipeline_test.rs b/etl/tests/integration/pipeline_test.rs index fd4377ed8..a44a67f45 100644 --- a/etl/tests/integration/pipeline_test.rs +++ b/etl/tests/integration/pipeline_test.rs @@ -193,7 +193,9 @@ async fn publication_changes_are_correctly_handled() { if let Some(server_version) = database.server_version() && server_version.get() <= 150000 { - println!("Skipping test for PostgreSQL version <= 15"); + println!( + "Skipping test for PostgreSQL version <= 15, CREATE PUBLICATION FOR TABLES IN SCHEMA is not supported" + ); return; } @@ -363,7 +365,9 @@ async fn publication_for_all_tables_in_schema_ignores_new_tables_until_restart() if let Some(server_version) = database.server_version() && server_version.get() <= 150000 { - println!("Skipping test for PostgreSQL version <= 15"); + println!( + "Skipping test for PostgreSQL version <= 15, CREATE PUBLICATION FOR TABLES IN SCHEMA is not supported" + ); return; } From 3e74769a0188d6bc0bf7347359595cbf37570946 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Mon, 1 Sep 2025 11:18:38 -0400 Subject: [PATCH 12/12] Rename CI/CD job --- .github/workflows/ci.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 63435c2d7..6881ad377 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,7 +43,7 @@ jobs: run: cargo clippy --all-targets --all-features --no-deps -- -D warnings test-partial: - name: Test Partial (Postgres ${{ matrix.postgres_version }}) + name: Test Partial runs-on: ubuntu-latest # Run on forks. if: github.event.pull_request.head.repo.fork == true @@ -64,7 +64,7 @@ jobs: with: key: test-partial - - name: Start Docker Compose Environment + - name: Start Docker Compose Environment (Postgres ${{ matrix.postgres_version }}) run: | POSTGRES_VERSION=${{ matrix.postgres_version }} docker compose -f ./scripts/docker-compose.yaml up -d @@ -92,7 +92,7 @@ jobs: --all-features -- --skip integration::bigquery_test test-full: - name: Test Full (Postgres ${{ matrix.postgres_version }}) + name: Test Full runs-on: ubuntu-latest # Run on non forks. if: github.event.pull_request.head.repo.fork == false @@ -114,7 +114,7 @@ jobs: with: key: test-full - - name: Start Docker Compose Environment + - name: Start Docker Compose Environment (Postgres ${{ matrix.postgres_version }}) run: | POSTGRES_VERSION=${{ matrix.postgres_version }} docker compose -f ./scripts/docker-compose.yaml up -d