Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ jobs:
if: github.event.pull_request.head.repo.fork == true
permissions:
contents: read
strategy:
matrix:
postgres_version: [17, 16, 15, 14]
steps:
- name: Checkout repository
uses: actions/checkout@v4
Expand All @@ -61,9 +64,9 @@ jobs:
with:
key: test-partial

- name: Start Docker Compose Environment
- name: Start Docker Compose Environment (Postgres ${{ matrix.postgres_version }})
run: |
docker compose -f ./scripts/docker-compose.yaml up -d
POSTGRES_VERSION=${{ matrix.postgres_version }} docker compose -f ./scripts/docker-compose.yaml up -d

- name: Install sqlx-cli
run: |
Expand Down Expand Up @@ -96,6 +99,9 @@ jobs:
permissions:
contents: read
id-token: write
strategy:
matrix:
postgres_version: [17, 16, 15, 14]
steps:
- name: Checkout repository
uses: actions/checkout@v4
Expand All @@ -108,9 +114,9 @@ jobs:
with:
key: test-full

- name: Start Docker Compose Environment
- name: Start Docker Compose Environment (Postgres ${{ matrix.postgres_version }})
run: |
docker compose -f ./scripts/docker-compose.yaml up -d
POSTGRES_VERSION=${{ matrix.postgres_version }} docker compose -f ./scripts/docker-compose.yaml up -d

- name: Install sqlx-cli
run: |
Expand Down
106 changes: 106 additions & 0 deletions etl-postgres/src/replication/db.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::num::NonZeroI32;

use etl_config::shared::{IntoConnectOptions, PgConnectionConfig};
use sqlx::{PgPool, Row, postgres::PgPoolOptions};
use thiserror::Error;
Expand Down Expand Up @@ -68,3 +70,107 @@ pub async fn get_table_name_from_oid(
None => Err(TableLookupError::TableNotFound(table_id)),
}
}

/// Extracts the PostgreSQL server version from a version string.
///
/// This function parses version strings like "15.5 (Homebrew)" or "14.2"
/// and converts them to the numeric format used by PostgreSQL.
///
/// Returns the version in the format: MAJOR * 10000 + MINOR * 100 + PATCH
/// For example: PostgreSQL 14.2 = 140200, PostgreSQL 15.1 = 150100
///
/// Returns `None` if the version string cannot be parsed or results in zero.
pub fn extract_server_version(server_version_str: impl AsRef<str>) -> Option<NonZeroI32> {
// Parse version string like "15.5 (Homebrew)" or "14.2"
let version_part = server_version_str
.as_ref()
.split_whitespace()
.next()
.unwrap_or("0.0");

let version_components: Vec<&str> = version_part.split('.').collect();

let major = version_components
.first()
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let minor = version_components
.get(1)
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let patch = version_components
.get(2)
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);

let version = major * 10000 + minor * 100 + patch;

NonZeroI32::new(version)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_extract_server_version_basic_versions() {
assert_eq!(extract_server_version("15.5"), NonZeroI32::new(150500));
assert_eq!(extract_server_version("14.2"), NonZeroI32::new(140200));
assert_eq!(extract_server_version("13.0"), NonZeroI32::new(130000));
assert_eq!(extract_server_version("16.1"), NonZeroI32::new(160100));
}

#[test]
fn test_extract_server_version_with_suffixes() {
assert_eq!(
extract_server_version("15.5 (Homebrew)"),
NonZeroI32::new(150500)
);
assert_eq!(
extract_server_version("14.2 on x86_64-pc-linux-gnu"),
NonZeroI32::new(140200)
);
assert_eq!(
extract_server_version("13.7 (Ubuntu 13.7-1.pgdg20.04+1)"),
NonZeroI32::new(130700)
);
assert_eq!(
extract_server_version("16.0 devel"),
NonZeroI32::new(160000)
);
}

#[test]
fn test_extract_server_version_patch_versions() {
// Test versions with patch numbers
assert_eq!(extract_server_version("15.5.1"), NonZeroI32::new(150501));
assert_eq!(extract_server_version("14.10.3"), NonZeroI32::new(141003));
assert_eq!(extract_server_version("13.12.25"), NonZeroI32::new(131225));
}

#[test]
fn test_extract_server_version_invalid_inputs() {
// Test invalid inputs that should return None
assert_eq!(extract_server_version(""), None);
assert_eq!(extract_server_version("invalid"), None);
assert_eq!(extract_server_version("not.a.version"), None);
assert_eq!(extract_server_version("PostgreSQL"), None);
assert_eq!(extract_server_version(" "), None);
}

#[test]
fn test_extract_server_version_zero_versions() {
assert_eq!(extract_server_version("0.0.0"), None);
assert_eq!(extract_server_version("0.0"), None);
}

#[test]
fn test_extract_server_version_whitespace_handling() {
assert_eq!(extract_server_version(" 15.5 "), NonZeroI32::new(150500));
assert_eq!(
extract_server_version("15.5\t(Homebrew)"),
NonZeroI32::new(150500)
);
assert_eq!(extract_server_version("15.5\n"), NonZeroI32::new(150500));
}
}
86 changes: 67 additions & 19 deletions etl-postgres/src/tokio/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::num::NonZeroI32;

use etl_config::shared::{IntoConnectOptions, PgConnectionConfig};
use tokio::runtime::Handle;
use tokio_postgres::types::{ToSql, Type};
use tokio_postgres::{Client, GenericClient, NoTls, Transaction};
use tracing::info;

use crate::replication::extract_server_version;
use crate::types::{ColumnSchema, TableId, TableName};

/// Table modification operations for ALTER TABLE statements.
Expand Down Expand Up @@ -34,10 +37,15 @@ pub enum TableModification<'a> {
pub struct PgDatabase<G> {
pub config: PgConnectionConfig,
pub client: Option<G>,
server_version: Option<NonZeroI32>,
destroy_on_drop: bool,
}

impl<G: GenericClient> PgDatabase<G> {
pub fn server_version(&self) -> Option<NonZeroI32> {
self.server_version
}

/// Creates a Postgres publication for the specified tables.
///
/// Sets up logical replication by creating a publication that includes
Expand Down Expand Up @@ -71,19 +79,51 @@ impl<G: GenericClient> PgDatabase<G> {
publication_name: &str,
schema: Option<&str>,
) -> Result<(), tokio_postgres::Error> {
let create_publication_query = match schema {
Some(schema_name) => format!(
"create publication {} for tables in schema {}",
publication_name, schema_name
),
None => format!("create publication {} for all tables", publication_name),
};

self.client
.as_ref()
.unwrap()
.execute(&create_publication_query, &[])
.await?;
let client = self.client.as_ref().unwrap();

if let Some(server_version) = self.server_version
&& server_version.get() >= 150000
{
// PostgreSQL 15+ supports FOR ALL TABLES IN SCHEMA syntax
let create_publication_query = match schema {
Some(schema_name) => format!(
"create publication {} for tables in schema {}",
publication_name, schema_name
),
None => format!("create publication {} for all tables", publication_name),
};

client.execute(&create_publication_query, &[]).await?;
} else {
// PostgreSQL 14 and earlier: create publication and add tables individually
match schema {
Some(schema_name) => {
let create_pub_query = format!("create publication {}", publication_name);
client.execute(&create_pub_query, &[]).await?;

let tables_query = format!(
"select schemaname, tablename from pg_tables where schemaname = '{}'",
schema_name
);
let rows = client.query(&tables_query, &[]).await?;

for row in rows {
let schema: String = row.get(0);
let table: String = row.get(1);
let add_table_query = format!(
"alter publication {} add table {}.{}",
publication_name, schema, table
);
client.execute(&add_table_query, &[]).await?;
}
}
None => {
let create_publication_query =
format!("create publication {} for all tables", publication_name);
client.execute(&create_publication_query, &[]).await?;
}
}
}

Ok(())
}
Expand Down Expand Up @@ -369,7 +409,8 @@ impl PgDatabase<Client> {

Self {
config,
client: Some(client),
client: Some(client.0),
server_version: client.1,
destroy_on_drop: true,
}
}
Expand All @@ -386,7 +427,8 @@ impl PgDatabase<Client> {

Self {
config,
client: Some(client),
client: Some(client.0),
server_version: client.1,
destroy_on_drop: true,
}
}
Expand All @@ -401,6 +443,7 @@ impl PgDatabase<Client> {
PgDatabase {
config: self.config.clone(),
client: Some(transaction),
server_version: self.server_version,
destroy_on_drop: false,
}
}
Expand Down Expand Up @@ -450,7 +493,7 @@ pub fn id_column_schema() -> ColumnSchema {
///
/// # Panics
/// Panics if connection or database creation fails.
pub async fn create_pg_database(config: &PgConnectionConfig) -> Client {
pub async fn create_pg_database(config: &PgConnectionConfig) -> (Client, Option<NonZeroI32>) {
// Create the database via a single connection
let (client, connection) = {
let config: tokio_postgres::Config = config.without_db();
Expand All @@ -474,14 +517,16 @@ pub async fn create_pg_database(config: &PgConnectionConfig) -> Client {
.expect("Failed to create database");

// Connects to the actual Postgres database
connect_to_pg_database(config).await
let (client, server_version) = connect_to_pg_database(config).await;

(client, server_version)
}

/// Connects to an existing Postgres database.
///
/// Establishes a client connection to the database specified in the configuration.
/// Assumes the database already exists.
pub async fn connect_to_pg_database(config: &PgConnectionConfig) -> Client {
pub async fn connect_to_pg_database(config: &PgConnectionConfig) -> (Client, Option<NonZeroI32>) {
// Create a new client connected to the created database
let (client, connection) = {
let config: tokio_postgres::Config = config.with_db();
Expand All @@ -490,6 +535,9 @@ pub async fn connect_to_pg_database(config: &PgConnectionConfig) -> Client {
.await
.expect("Failed to connect to Postgres")
};
let server_version = connection
.parameter("server_version")
.and_then(extract_server_version);

// Spawn the connection on a new task
tokio::spawn(async move {
Expand All @@ -498,7 +546,7 @@ pub async fn connect_to_pg_database(config: &PgConnectionConfig) -> Client {
}
});

client
(client, server_version)
}

/// Drops a Postgres database and cleans up all resources.
Expand Down
Loading