diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2c55ade32..6881ad377 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,6 +49,9 @@ jobs: if: github.event.pull_request.head.repo.fork == true permissions: contents: read + strategy: + matrix: + postgres_version: [17, 16, 15, 14] steps: - name: Checkout repository uses: actions/checkout@v4 @@ -61,9 +64,9 @@ jobs: with: key: test-partial - - name: Start Docker Compose Environment + - name: Start Docker Compose Environment (Postgres ${{ matrix.postgres_version }}) run: | - docker compose -f ./scripts/docker-compose.yaml up -d + POSTGRES_VERSION=${{ matrix.postgres_version }} docker compose -f ./scripts/docker-compose.yaml up -d - name: Install sqlx-cli run: | @@ -96,6 +99,9 @@ jobs: permissions: contents: read id-token: write + strategy: + matrix: + postgres_version: [17, 16, 15, 14] steps: - name: Checkout repository uses: actions/checkout@v4 @@ -108,9 +114,9 @@ jobs: with: key: test-full - - name: Start Docker Compose Environment + - name: Start Docker Compose Environment (Postgres ${{ matrix.postgres_version }}) run: | - docker compose -f ./scripts/docker-compose.yaml up -d + POSTGRES_VERSION=${{ matrix.postgres_version }} docker compose -f ./scripts/docker-compose.yaml up -d - name: Install sqlx-cli run: | diff --git a/etl-postgres/src/replication/db.rs b/etl-postgres/src/replication/db.rs index b3698ed1b..d86f15154 100644 --- a/etl-postgres/src/replication/db.rs +++ b/etl-postgres/src/replication/db.rs @@ -1,3 +1,5 @@ +use std::num::NonZeroI32; + use etl_config::shared::{IntoConnectOptions, PgConnectionConfig}; use sqlx::{PgPool, Row, postgres::PgPoolOptions}; use thiserror::Error; @@ -68,3 +70,107 @@ pub async fn get_table_name_from_oid( None => Err(TableLookupError::TableNotFound(table_id)), } } + +/// Extracts the PostgreSQL server version from a version string. +/// +/// This function parses version strings like "15.5 (Homebrew)" or "14.2" +/// and converts them to the numeric format used by PostgreSQL. +/// +/// Returns the version in the format: MAJOR * 10000 + MINOR * 100 + PATCH +/// For example: PostgreSQL 14.2 = 140200, PostgreSQL 15.1 = 150100 +/// +/// Returns `None` if the version string cannot be parsed or results in zero. +pub fn extract_server_version(server_version_str: impl AsRef) -> Option { + // Parse version string like "15.5 (Homebrew)" or "14.2" + let version_part = server_version_str + .as_ref() + .split_whitespace() + .next() + .unwrap_or("0.0"); + + let version_components: Vec<&str> = version_part.split('.').collect(); + + let major = version_components + .first() + .and_then(|v| v.parse::().ok()) + .unwrap_or(0); + let minor = version_components + .get(1) + .and_then(|v| v.parse::().ok()) + .unwrap_or(0); + let patch = version_components + .get(2) + .and_then(|v| v.parse::().ok()) + .unwrap_or(0); + + let version = major * 10000 + minor * 100 + patch; + + NonZeroI32::new(version) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_server_version_basic_versions() { + assert_eq!(extract_server_version("15.5"), NonZeroI32::new(150500)); + assert_eq!(extract_server_version("14.2"), NonZeroI32::new(140200)); + assert_eq!(extract_server_version("13.0"), NonZeroI32::new(130000)); + assert_eq!(extract_server_version("16.1"), NonZeroI32::new(160100)); + } + + #[test] + fn test_extract_server_version_with_suffixes() { + assert_eq!( + extract_server_version("15.5 (Homebrew)"), + NonZeroI32::new(150500) + ); + assert_eq!( + extract_server_version("14.2 on x86_64-pc-linux-gnu"), + NonZeroI32::new(140200) + ); + assert_eq!( + extract_server_version("13.7 (Ubuntu 13.7-1.pgdg20.04+1)"), + NonZeroI32::new(130700) + ); + assert_eq!( + extract_server_version("16.0 devel"), + NonZeroI32::new(160000) + ); + } + + #[test] + fn test_extract_server_version_patch_versions() { + // Test versions with patch numbers + assert_eq!(extract_server_version("15.5.1"), NonZeroI32::new(150501)); + assert_eq!(extract_server_version("14.10.3"), NonZeroI32::new(141003)); + assert_eq!(extract_server_version("13.12.25"), NonZeroI32::new(131225)); + } + + #[test] + fn test_extract_server_version_invalid_inputs() { + // Test invalid inputs that should return None + assert_eq!(extract_server_version(""), None); + assert_eq!(extract_server_version("invalid"), None); + assert_eq!(extract_server_version("not.a.version"), None); + assert_eq!(extract_server_version("PostgreSQL"), None); + assert_eq!(extract_server_version(" "), None); + } + + #[test] + fn test_extract_server_version_zero_versions() { + assert_eq!(extract_server_version("0.0.0"), None); + assert_eq!(extract_server_version("0.0"), None); + } + + #[test] + fn test_extract_server_version_whitespace_handling() { + assert_eq!(extract_server_version(" 15.5 "), NonZeroI32::new(150500)); + assert_eq!( + extract_server_version("15.5\t(Homebrew)"), + NonZeroI32::new(150500) + ); + assert_eq!(extract_server_version("15.5\n"), NonZeroI32::new(150500)); + } +} diff --git a/etl-postgres/src/tokio/test_utils.rs b/etl-postgres/src/tokio/test_utils.rs index 5aed84476..1c791fabc 100644 --- a/etl-postgres/src/tokio/test_utils.rs +++ b/etl-postgres/src/tokio/test_utils.rs @@ -1,9 +1,12 @@ +use std::num::NonZeroI32; + use etl_config::shared::{IntoConnectOptions, PgConnectionConfig}; use tokio::runtime::Handle; use tokio_postgres::types::{ToSql, Type}; use tokio_postgres::{Client, GenericClient, NoTls, Transaction}; use tracing::info; +use crate::replication::extract_server_version; use crate::types::{ColumnSchema, TableId, TableName}; /// Table modification operations for ALTER TABLE statements. @@ -34,10 +37,15 @@ pub enum TableModification<'a> { pub struct PgDatabase { pub config: PgConnectionConfig, pub client: Option, + server_version: Option, destroy_on_drop: bool, } impl PgDatabase { + pub fn server_version(&self) -> Option { + self.server_version + } + /// Creates a Postgres publication for the specified tables. /// /// Sets up logical replication by creating a publication that includes @@ -71,19 +79,51 @@ impl PgDatabase { publication_name: &str, schema: Option<&str>, ) -> Result<(), tokio_postgres::Error> { - let create_publication_query = match schema { - Some(schema_name) => format!( - "create publication {} for tables in schema {}", - publication_name, schema_name - ), - None => format!("create publication {} for all tables", publication_name), - }; - - self.client - .as_ref() - .unwrap() - .execute(&create_publication_query, &[]) - .await?; + let client = self.client.as_ref().unwrap(); + + if let Some(server_version) = self.server_version + && server_version.get() >= 150000 + { + // PostgreSQL 15+ supports FOR ALL TABLES IN SCHEMA syntax + let create_publication_query = match schema { + Some(schema_name) => format!( + "create publication {} for tables in schema {}", + publication_name, schema_name + ), + None => format!("create publication {} for all tables", publication_name), + }; + + client.execute(&create_publication_query, &[]).await?; + } else { + // PostgreSQL 14 and earlier: create publication and add tables individually + match schema { + Some(schema_name) => { + let create_pub_query = format!("create publication {}", publication_name); + client.execute(&create_pub_query, &[]).await?; + + let tables_query = format!( + "select schemaname, tablename from pg_tables where schemaname = '{}'", + schema_name + ); + let rows = client.query(&tables_query, &[]).await?; + + for row in rows { + let schema: String = row.get(0); + let table: String = row.get(1); + let add_table_query = format!( + "alter publication {} add table {}.{}", + publication_name, schema, table + ); + client.execute(&add_table_query, &[]).await?; + } + } + None => { + let create_publication_query = + format!("create publication {} for all tables", publication_name); + client.execute(&create_publication_query, &[]).await?; + } + } + } Ok(()) } @@ -369,7 +409,8 @@ impl PgDatabase { Self { config, - client: Some(client), + client: Some(client.0), + server_version: client.1, destroy_on_drop: true, } } @@ -386,7 +427,8 @@ impl PgDatabase { Self { config, - client: Some(client), + client: Some(client.0), + server_version: client.1, destroy_on_drop: true, } } @@ -401,6 +443,7 @@ impl PgDatabase { PgDatabase { config: self.config.clone(), client: Some(transaction), + server_version: self.server_version, destroy_on_drop: false, } } @@ -450,7 +493,7 @@ pub fn id_column_schema() -> ColumnSchema { /// /// # Panics /// Panics if connection or database creation fails. -pub async fn create_pg_database(config: &PgConnectionConfig) -> Client { +pub async fn create_pg_database(config: &PgConnectionConfig) -> (Client, Option) { // Create the database via a single connection let (client, connection) = { let config: tokio_postgres::Config = config.without_db(); @@ -474,14 +517,16 @@ pub async fn create_pg_database(config: &PgConnectionConfig) -> Client { .expect("Failed to create database"); // Connects to the actual Postgres database - connect_to_pg_database(config).await + let (client, server_version) = connect_to_pg_database(config).await; + + (client, server_version) } /// Connects to an existing Postgres database. /// /// Establishes a client connection to the database specified in the configuration. /// Assumes the database already exists. -pub async fn connect_to_pg_database(config: &PgConnectionConfig) -> Client { +pub async fn connect_to_pg_database(config: &PgConnectionConfig) -> (Client, Option) { // Create a new client connected to the created database let (client, connection) = { let config: tokio_postgres::Config = config.with_db(); @@ -490,6 +535,9 @@ pub async fn connect_to_pg_database(config: &PgConnectionConfig) -> Client { .await .expect("Failed to connect to Postgres") }; + let server_version = connection + .parameter("server_version") + .and_then(extract_server_version); // Spawn the connection on a new task tokio::spawn(async move { @@ -498,7 +546,7 @@ pub async fn connect_to_pg_database(config: &PgConnectionConfig) -> Client { } }); - client + (client, server_version) } /// Drops a Postgres database and cleans up all resources. diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 392d03953..8a27ddc91 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -2,6 +2,7 @@ use crate::error::{ErrorKind, EtlError, EtlResult}; use crate::utils::tokio::MakeRustlsConnect; use crate::{bail, etl_error}; use etl_config::shared::{IntoConnectOptions, PgConnectionConfig}; +use etl_postgres::replication::extract_server_version; use etl_postgres::types::convert_type_oid_to_type; use etl_postgres::types::{ColumnSchema, TableId, TableName, TableSchema}; use pg_escape::{quote_identifier, quote_literal}; @@ -10,7 +11,9 @@ use rustls::ClientConfig; use std::collections::HashMap; use std::fmt; use std::io::BufReader; +use std::num::NonZeroI32; use std::sync::Arc; + use tokio_postgres::error::SqlState; use tokio_postgres::tls::MakeTlsConnect; use tokio_postgres::{ @@ -163,6 +166,7 @@ impl PgReplicationSlotTransaction { #[derive(Debug, Clone)] pub struct PgReplicationClient { client: Arc, + server_version: Option, } impl PgReplicationClient { @@ -185,12 +189,18 @@ impl PgReplicationClient { config.replication_mode(ReplicationMode::Logical); let (client, connection) = config.connect(NoTls).await?; + + let server_version = connection + .parameter("server_version") + .and_then(extract_server_version); + spawn_postgres_connection::(connection); info!("successfully connected to postgres without tls"); Ok(PgReplicationClient { client: Arc::new(client), + server_version, }) } @@ -216,12 +226,18 @@ impl PgReplicationClient { .with_no_client_auth(); let (client, connection) = config.connect(MakeRustlsConnect::new(tls_config)).await?; + + let server_version = connection + .parameter("server_version") + .and_then(extract_server_version); + spawn_postgres_connection::(connection); info!("successfully connected to postgres with tls"); Ok(PgReplicationClient { client: Arc::new(client), + server_version, }) } @@ -626,24 +642,43 @@ impl PgReplicationClient { publication: Option<&str>, ) -> EtlResult> { let (pub_cte, pub_pred) = if let Some(publication) = publication { - ( - format!( - "with pub_attrs as ( - select unnest(r.prattrs) - from pg_publication_rel r - left join pg_publication p on r.prpubid = p.oid - where p.pubname = {publication} - and r.prrelid = {table_id} + if let Some(server_version) = self.server_version + && server_version.get() >= 150000 + { + ( + format!( + "with pub_attrs as ( + select unnest(r.prattrs) + from pg_publication_rel r + left join pg_publication p on r.prpubid = p.oid + where p.pubname = {publication} + and r.prrelid = {table_id} + )", + publication = quote_literal(publication), + ), + "and ( + case (select count(*) from pub_attrs) + when 0 then true + else (a.attnum in (select * from pub_attrs)) + end )", - publication = quote_literal(publication), - ), - "and ( - case (select count(*) from pub_attrs) - when 0 then true - else (a.attnum in (select * from pub_attrs)) - end - )", - ) + ) + } else { + // Postgres 14 or earlier or unknown, fallback to no column-level filtering + ( + format!( + "with pub_table as ( + select 1 as exists_in_pub + from pg_publication_rel r + left join pg_publication p on r.prpubid = p.oid + where p.pubname = {publication} + and r.prrelid = {table_id} + )", + publication = quote_literal(publication), + ), + "and (select count(*) from pub_table) > 0", + ) + } } else { ("".into(), "") }; @@ -696,7 +731,6 @@ impl PgReplicationClient { Ok(column_schemas) } - /// Creates a COPY stream for reading data from a table using its OID. /// /// The stream will include only the specified columns and use text format. diff --git a/etl/tests/integration/pipeline_test.rs b/etl/tests/integration/pipeline_test.rs index 0e2ece881..a44a67f45 100644 --- a/etl/tests/integration/pipeline_test.rs +++ b/etl/tests/integration/pipeline_test.rs @@ -190,6 +190,15 @@ async fn publication_changes_are_correctly_handled() { table_1_done.notified().await; table_2_done.notified().await; + if let Some(server_version) = database.server_version() + && server_version.get() <= 150000 + { + println!( + "Skipping test for PostgreSQL version <= 15, CREATE PUBLICATION FOR TABLES IN SCHEMA is not supported" + ); + return; + } + // Insert one row in each table and wait for two insert events. let inserts_notify = destination .wait_for_events_count(vec![(EventType::Insert, 2)]) @@ -353,6 +362,15 @@ async fn publication_for_all_tables_in_schema_ignores_new_tables_until_restart() assert!(table_schemas.contains_key(&table_1_id)); assert!(!table_schemas.contains_key(&table_2_id)); + if let Some(server_version) = database.server_version() + && server_version.get() <= 150000 + { + println!( + "Skipping test for PostgreSQL version <= 15, CREATE PUBLICATION FOR TABLES IN SCHEMA is not supported" + ); + return; + } + // We restart the pipeline and verify that the new table is now processed. let mut pipeline = create_pipeline( &database.config, diff --git a/scripts/docker-compose.yaml b/scripts/docker-compose.yaml index c9058ee9e..bc4d07fc0 100644 --- a/scripts/docker-compose.yaml +++ b/scripts/docker-compose.yaml @@ -1,17 +1,16 @@ x-lakekeeper-env: &lakekeeper-env environment: - LAKEKEEPER__PG_ENCRYPTION_KEY=insecure-encryption-key - - LAKEKEEPER__PG_DATABASE_URL_READ=postgresql://postgres:postgres@postgres:5432/iceberg-catalog - - LAKEKEEPER__PG_DATABASE_URL_WRITE=postgresql://postgres:postgres@postgres:5432/iceberg-catalog + - LAKEKEEPER__PG_DATABASE_URL_READ=postgresql://lakekeeper-postgres:lakekeeper-postgres@lakekeeper-postgres:5432/iceberg-catalog + - LAKEKEEPER__PG_DATABASE_URL_WRITE=postgresql://lakekeeper-postgres:lakekeeper-postgres@lakekeeper-postgres:5432/iceberg-catalog - RUST_LOG=info -name: etl +name: etl-pg-${POSTGRES_VERSION:-17} services: # Start Postgres postgres: - image: postgres:17 - container_name: postgres + image: postgres:${POSTGRES_VERSION:-17} environment: POSTGRES_USER: ${POSTGRES_USER:-postgres} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-postgres} @@ -33,28 +32,24 @@ services: timeout: 5s retries: 5 - # Create database for iceberg catalog - create-catalog-db: - image: postgres:17 - container_name: create-catalog-db - depends_on: - postgres: - condition: service_healthy - environment: - PGHOST: postgres - PGPORT: 5432 - PGUSER: ${POSTGRES_USER:-postgres} - PGPASSWORD: ${POSTGRES_PASSWORD:-postgres} - entrypoint: > - /bin/sh -c " - psql -c 'create database \"iceberg-catalog\";'; - exit 0; - " + lakekeeper-postgres: + image: postgres:17 + environment: + POSTGRES_USER: ${LAKEKEEPER_POSTGRES_USER:-lakekeeper-postgres} + POSTGRES_PASSWORD: ${LAKEKEEPER_POSTGRES_PASSWORD:-lakekeeper-postgres} + POSTGRES_DB: ${LAKEKEEPER_POSTGRES_DB:-iceberg-catalog} + volumes: + - ${LAKEKEEPER_POSTGRES_DATA_VOLUME:-lakekeeper_postgres_data}:/var/lib/postgresql/data + restart: unless-stopped + healthcheck: + test: ["CMD-SHELL", "pg_isready -U ${LAKEKEEPER_POSTGRES_USER:-lakekeeper-postgres} -d ${LAKEKEEPER_POSTGRES_DB:-iceberg-catalog}"] + interval: 5s + timeout: 5s + retries: 5 # Start MinIO for S3-compatible object storage minio: image: minio/minio:latest - container_name: minio environment: MINIO_ROOT_USER: minio-admin MINIO_ROOT_PASSWORD: minio-admin-password @@ -73,7 +68,6 @@ services: # Create MinIO bucket create-bucket: image: minio/mc:latest - container_name: create-bucket depends_on: minio: condition: service_healthy @@ -87,18 +81,16 @@ services: # Migrate lakekeeper database migrate-lakekeeper: image: quay.io/lakekeeper/catalog:latest-main - container_name: migrate-lakekeeper <<: *lakekeeper-env restart: "no" command: ["migrate"] depends_on: - create-catalog-db: - condition: service_completed_successfully + lakekeeper-postgres: + condition: service_healthy # Start lakekeeper, an iceberg REST catalog lakekeeper: image: quay.io/lakekeeper/catalog:latest-main - container_name: lakekeeper depends_on: migrate-lakekeeper: condition: service_completed_successfully @@ -118,7 +110,6 @@ services: # Bootstrap lakekeeper. After deployment, lakekeeper needs to be bootstrapped. bootstrap-lakekeeper: image: curlimages/curl - container_name: bootstrap-lakekeeper depends_on: lakekeeper: condition: service_healthy @@ -141,7 +132,6 @@ services: # Create a warehouse for development and testing create-warehouse: image: curlimages/curl - container_name: create-warehouse depends_on: lakekeeper: condition: service_healthy @@ -170,5 +160,7 @@ services: volumes: postgres_data: driver: local + lakekeeper_postgres_data: + driver: local minio_data: driver: local