diff --git a/etl-api/Cargo.toml b/etl-api/Cargo.toml
index 10a55a139..286b1c479 100644
--- a/etl-api/Cargo.toml
+++ b/etl-api/Cargo.toml
@@ -53,6 +53,7 @@ sqlx = { workspace = true, features = [
] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
+tokio-postgres = { workspace = true }
tracing = { workspace = true, default-features = false }
tracing-actix-web = { workspace = true, features = ["emit_event_on_error"] }
utoipa = { workspace = true, features = ["actix_extras"] }
diff --git a/etl-api/src/db/publications.rs b/etl-api/src/db/publications.rs
index 5c01ac859..4758d76e3 100644
--- a/etl-api/src/db/publications.rs
+++ b/etl-api/src/db/publications.rs
@@ -3,6 +3,7 @@ use serde::Serialize;
use sqlx::{Executor, PgPool, Row};
use std::collections::HashMap;
use thiserror::Error;
+use tokio_postgres::types::Type;
use utoipa::ToSchema;
use crate::db::tables::Table;
@@ -11,6 +12,9 @@ use crate::db::tables::Table;
pub enum PublicationsDbError {
#[error("Error while interacting with Postgres for publications: {0}")]
Database(#[from] sqlx::Error),
+
+ #[error("Publication contains tables with unsupported column types: {0}")]
+ UnsupportedColumnTypes(String),
}
#[derive(Serialize, ToSchema)]
@@ -19,6 +23,71 @@ pub struct Publication {
pub tables: Vec
,
}
+/// Validates that all columns in the publication tables have supported types.
+///
+/// Queries `pg_attribute` to get type OIDs for all columns in the specified tables.
+/// Returns an error if any column has a type OID that is unknown to `tokio_postgres`.
+/// These unknown types would silently fall back to TEXT during replication, causing
+/// data corruption.
+pub async fn validate_publication_column_types(
+ publication: &Publication,
+ pool: &PgPool,
+) -> Result<(), PublicationsDbError> {
+ if publication.tables.is_empty() {
+ return Ok(());
+ }
+
+ let mut unsupported_columns = Vec::new();
+
+ for table in &publication.tables {
+ let quoted_schema = quote_literal(&table.schema);
+ let quoted_name = quote_literal(&table.name);
+
+ let query = format!(
+ r#"
+ select
+ a.attname as column_name,
+ a.atttypid::int as type_oid,
+ t.typname as type_name
+ from pg_attribute a
+ join pg_class c on a.attrelid = c.oid
+ join pg_namespace n on c.relnamespace = n.oid
+ join pg_type t on a.atttypid = t.oid
+ where n.nspname = {quoted_schema}
+ and c.relname = {quoted_name}
+ and a.attnum > 0
+ and not a.attisdropped
+ order by a.attnum
+ "#
+ );
+
+ let rows = pool.fetch_all(query.as_str()).await?;
+
+ for row in rows {
+ let column_name: String = row.get("column_name");
+ // OID type in Postgres is stored as i32 in sqlx but represents u32
+ let type_oid_raw: i32 = row.get("type_oid");
+ let type_oid = type_oid_raw as u32;
+ let type_name: String = row.get("type_name");
+
+ if Type::from_oid(type_oid).is_none() {
+ unsupported_columns.push(format!(
+ "{}.{}.{} (type: {}, oid: {})",
+ table.schema, table.name, column_name, type_name, type_oid
+ ));
+ }
+ }
+ }
+
+ if !unsupported_columns.is_empty() {
+ return Err(PublicationsDbError::UnsupportedColumnTypes(
+ unsupported_columns.join(", "),
+ ));
+ }
+
+ Ok(())
+}
+
pub async fn create_publication(
publication: &Publication,
pool: &PgPool,
diff --git a/etl-api/src/routes/sources/publications.rs b/etl-api/src/routes/sources/publications.rs
index f56a627e7..54d826787 100644
--- a/etl-api/src/routes/sources/publications.rs
+++ b/etl-api/src/routes/sources/publications.rs
@@ -25,6 +25,9 @@ enum PublicationError {
#[error("The publication with name {0} was not found")]
PublicationNotFound(String),
+ #[error("Invalid publication request: {0}")]
+ InvalidPublication(String),
+
#[error(transparent)]
TenantId(#[from] TenantIdError),
@@ -46,6 +49,15 @@ impl PublicationError {
| PublicationError::PublicationsDb(PublicationsDbError::Database(_)) => {
"internal server error".to_string()
}
+ // Validation errors are safe to expose - they help users fix their input
+ PublicationError::PublicationsDb(PublicationsDbError::UnsupportedColumnTypes(
+ details,
+ )) => {
+ format!(
+ "Publication contains tables with unsupported column types: {}",
+ details
+ )
+ }
// Every other message is ok, as they do not divulge sensitive information
e => e.to_string(),
}
@@ -55,13 +67,18 @@ impl PublicationError {
impl ResponseError for PublicationError {
fn status_code(&self) -> StatusCode {
match self {
- PublicationError::SourcesDb(_)
- | PublicationError::PublicationsDb(_)
- | PublicationError::Database(_) => StatusCode::INTERNAL_SERVER_ERROR,
+ PublicationError::SourcesDb(_) | PublicationError::Database(_) => {
+ StatusCode::INTERNAL_SERVER_ERROR
+ }
+ PublicationError::PublicationsDb(PublicationsDbError::Database(_)) => {
+ StatusCode::INTERNAL_SERVER_ERROR
+ }
+ PublicationError::PublicationsDb(PublicationsDbError::UnsupportedColumnTypes(_))
+ | PublicationError::InvalidPublication(_)
+ | PublicationError::TenantId(_) => StatusCode::BAD_REQUEST,
PublicationError::SourceNotFound(_) | PublicationError::PublicationNotFound(_) => {
StatusCode::NOT_FOUND
}
- PublicationError::TenantId(_) => StatusCode::BAD_REQUEST,
}
}
@@ -77,18 +94,18 @@ impl ResponseError for PublicationError {
}
}
-#[derive(Deserialize, ToSchema)]
+#[derive(Deserialize, Serialize, ToSchema)]
pub struct CreatePublicationRequest {
#[schema(example = "my_publication", required = true)]
- name: String,
+ pub name: String,
#[schema(required = true)]
- tables: Vec,
+ pub tables: Vec,
}
-#[derive(Deserialize, ToSchema)]
+#[derive(Deserialize, Serialize, ToSchema)]
pub struct UpdatePublicationRequest {
#[schema(required = true)]
- tables: Vec,
+ pub tables: Vec,
}
#[derive(Serialize, ToSchema)]
@@ -125,13 +142,18 @@ pub async fn create_publication(
.map(|s| s.config)
.ok_or(PublicationError::SourceNotFound(source_id))?;
- let source_pool =
- connect_to_source_database_with_defaults(&config.into_connection_config()).await?;
- let publication = publication.0;
+ let publication = publication.into_inner();
let publication = Publication {
name: publication.name,
tables: publication.tables,
};
+
+ validate_publication(&publication)?;
+
+ let source_pool =
+ connect_to_source_database_with_defaults(&config.into_connection_config()).await?;
+
+ db::publications::validate_publication_column_types(&publication, &source_pool).await?;
db::publications::create_publication(&publication, &source_pool).await?;
Ok(HttpResponse::Ok().finish())
@@ -206,13 +228,18 @@ pub async fn update_publication(
.map(|s| s.config)
.ok_or(PublicationError::SourceNotFound(source_id))?;
- let source_pool =
- connect_to_source_database_with_defaults(&config.into_connection_config()).await?;
- let publication = publication.0;
+ let publication = publication.into_inner();
let publication = Publication {
name: publication_name,
tables: publication.tables,
};
+
+ validate_publication(&publication)?;
+
+ let source_pool =
+ connect_to_source_database_with_defaults(&config.into_connection_config()).await?;
+
+ db::publications::validate_publication_column_types(&publication, &source_pool).await?;
db::publications::update_publication(&publication, &source_pool).await?;
Ok(HttpResponse::Ok().finish())
@@ -288,3 +315,30 @@ pub async fn read_all_publications(
Ok(Json(response))
}
+
+fn validate_publication(publication: &Publication) -> Result<(), PublicationError> {
+ let mut errors: Vec = Vec::new();
+
+ if publication.name.trim().is_empty() {
+ errors.push("name cannot be empty".to_string());
+ }
+
+ if publication.tables.is_empty() {
+ errors.push("tables cannot be empty".to_string());
+ } else {
+ for (i, table) in publication.tables.iter().enumerate() {
+ if table.schema.trim().is_empty() {
+ errors.push(format!("table[{}]: schema cannot be empty", i));
+ }
+ if table.name.trim().is_empty() {
+ errors.push(format!("table[{}]: name cannot be empty", i));
+ }
+ }
+ }
+
+ if !errors.is_empty() {
+ return Err(PublicationError::InvalidPublication(errors.join(", ")));
+ }
+
+ Ok(())
+}
diff --git a/etl-api/tests/publications.rs b/etl-api/tests/publications.rs
new file mode 100644
index 000000000..f66b7a604
--- /dev/null
+++ b/etl-api/tests/publications.rs
@@ -0,0 +1,786 @@
+use crate::support::mocks::sources::create_test_db_source;
+use crate::support::mocks::tenants::create_tenant;
+use crate::support::test_app::spawn_test_app;
+use etl_api::db::tables::Table;
+use etl_api::routes::sources::publications::{CreatePublicationRequest, UpdatePublicationRequest};
+use etl_telemetry::tracing::init_test_tracing;
+use reqwest::StatusCode;
+
+mod support;
+
+#[tokio::test(flavor = "multi_thread")]
+async fn create_publication_with_unsupported_column_types_fails() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+ let source_id = create_test_db_source(&app, &tenant_id).await;
+
+ let source_pool = app.get_source_pool(&tenant_id, source_id).await;
+
+ // Create a custom enum type (which will have an unknown OID to tokio-postgres)
+ sqlx::query(r#"create type custom_status as enum ('pending', 'active', 'inactive')"#)
+ .execute(&source_pool)
+ .await
+ .expect("failed to create enum type");
+
+ sqlx::query(
+ r#"
+ create table test_table (
+ id integer primary key,
+ name text not null,
+ status custom_status not null
+ )
+ "#,
+ )
+ .execute(&source_pool)
+ .await
+ .expect("failed to create table");
+
+ let publication = CreatePublicationRequest {
+ name: "test_publication".to_string(),
+ tables: vec![Table {
+ schema: "public".to_string(),
+ name: "test_table".to_string(),
+ }],
+ };
+
+ let response = app
+ .create_publication(&tenant_id, source_id, &publication)
+ .await;
+
+ assert_eq!(response.status(), StatusCode::BAD_REQUEST);
+
+ let error_body = response.text().await.expect("failed to read response body");
+ assert!(
+ error_body.contains("unsupported column types"),
+ "Expected error about unsupported types, got: {}",
+ error_body
+ );
+ assert!(
+ error_body.contains("custom_status"),
+ "Expected error to mention custom_status type, got: {}",
+ error_body
+ );
+ assert!(
+ error_body.contains("public.test_table.status"),
+ "Expected error to mention the problematic column, got: {}",
+ error_body
+ );
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn create_publication_with_supported_column_types_succeeds() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+ let source_id = create_test_db_source(&app, &tenant_id).await;
+
+ let source_pool = app.get_source_pool(&tenant_id, source_id).await;
+
+ sqlx::query(
+ r#"
+ create table test_table (
+ id integer primary key,
+ name text not null,
+ age int4,
+ created_at timestamptz,
+ metadata jsonb
+ )
+ "#,
+ )
+ .execute(&source_pool)
+ .await
+ .expect("failed to create table");
+
+ let publication = CreatePublicationRequest {
+ name: "test_publication".to_string(),
+ tables: vec![Table {
+ schema: "public".to_string(),
+ name: "test_table".to_string(),
+ }],
+ };
+ let response = app
+ .create_publication(&tenant_id, source_id, &publication)
+ .await;
+
+ assert_eq!(response.status(), StatusCode::OK);
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn update_publication_with_unsupported_column_types_fails() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+ let source_id = create_test_db_source(&app, &tenant_id).await;
+
+ let source_pool = app.get_source_pool(&tenant_id, source_id).await;
+
+ sqlx::query(
+ r#"
+ create table good_table (
+ id integer primary key,
+ name text not null
+ )
+ "#,
+ )
+ .execute(&source_pool)
+ .await
+ .expect("failed to create good_table");
+
+ sqlx::query("create type bad_enum as enum ('a', 'b')")
+ .execute(&source_pool)
+ .await
+ .expect("failed to create enum");
+
+ sqlx::query(
+ r#"
+ create table bad_table (
+ id integer primary key,
+ status bad_enum not null
+ )
+ "#,
+ )
+ .execute(&source_pool)
+ .await
+ .expect("failed to create bad_table");
+
+ let publication = CreatePublicationRequest {
+ name: "test_publication".to_string(),
+ tables: vec![Table {
+ schema: "public".to_string(),
+ name: "good_table".to_string(),
+ }],
+ };
+
+ let response = app
+ .create_publication(&tenant_id, source_id, &publication)
+ .await;
+
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let updated_publication = UpdatePublicationRequest {
+ tables: vec![
+ Table {
+ schema: "public".to_string(),
+ name: "good_table".to_string(),
+ },
+ Table {
+ schema: "public".to_string(),
+ name: "bad_table".to_string(),
+ },
+ ],
+ };
+
+ let response = app
+ .update_publication(
+ &tenant_id,
+ source_id,
+ "test_publication",
+ &updated_publication,
+ )
+ .await;
+
+ assert_eq!(response.status(), StatusCode::BAD_REQUEST);
+
+ let error_body = response.text().await.expect("failed to read response body");
+ assert!(error_body.contains("unsupported column types"));
+ assert!(error_body.contains("bad_enum"));
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn create_publication_with_nonexistent_source_fails() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+
+ let publication = CreatePublicationRequest {
+ name: "test_publication".to_string(),
+ tables: vec![],
+ };
+
+ let response = app
+ .create_publication(&tenant_id, 99999, &publication)
+ .await;
+
+ assert_eq!(response.status(), StatusCode::NOT_FOUND);
+ let error_body = response.text().await.expect("failed to read response body");
+ assert!(error_body.contains("source") || error_body.contains("not found"));
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn create_duplicate_publication_fails() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+ let source_id = create_test_db_source(&app, &tenant_id).await;
+
+ let source_pool = app.get_source_pool(&tenant_id, source_id).await;
+
+ sqlx::query(
+ r#"
+ create table test_table (
+ id integer primary key
+ )
+ "#,
+ )
+ .execute(&source_pool)
+ .await
+ .expect("failed to create table");
+
+ let publication = CreatePublicationRequest {
+ name: "test_publication".to_string(),
+ tables: vec![Table {
+ schema: "public".to_string(),
+ name: "test_table".to_string(),
+ }],
+ };
+
+ let response = app
+ .create_publication(&tenant_id, source_id, &publication)
+ .await;
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let response = app
+ .create_publication(&tenant_id, source_id, &publication)
+ .await;
+ assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn create_publication_with_empty_tables_fails() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+ let source_id = create_test_db_source(&app, &tenant_id).await;
+
+ let publication = CreatePublicationRequest {
+ name: "empty_publication".to_string(),
+ tables: vec![],
+ };
+
+ let response = app
+ .create_publication(&tenant_id, source_id, &publication)
+ .await;
+
+ assert_eq!(response.status(), StatusCode::BAD_REQUEST);
+
+ let error_body = response.text().await.expect("failed to read response body");
+ assert!(error_body.contains("tables cannot be empty"));
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn create_publication_with_empty_name_fails() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+ let source_id = create_test_db_source(&app, &tenant_id).await;
+
+ let source_pool = app.get_source_pool(&tenant_id, source_id).await;
+
+ sqlx::query(
+ r#"
+ create table test_table (
+ id integer primary key
+ )
+ "#,
+ )
+ .execute(&source_pool)
+ .await
+ .expect("failed to create table");
+
+ let publication = CreatePublicationRequest {
+ name: "".to_string(),
+ tables: vec![Table {
+ schema: "public".to_string(),
+ name: "test_table".to_string(),
+ }],
+ };
+
+ let response = app
+ .create_publication(&tenant_id, source_id, &publication)
+ .await;
+
+ assert_eq!(response.status(), StatusCode::BAD_REQUEST);
+ let error_body = response.text().await.expect("failed to read response body");
+ assert!(error_body.contains("name cannot be empty"));
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn create_publication_with_empty_table_schema_fails() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+ let source_id = create_test_db_source(&app, &tenant_id).await;
+
+ let publication = CreatePublicationRequest {
+ name: "test_publication".to_string(),
+ tables: vec![Table {
+ schema: "".to_string(),
+ name: "test_table".to_string(),
+ }],
+ };
+
+ let response = app
+ .create_publication(&tenant_id, source_id, &publication)
+ .await;
+
+ assert_eq!(response.status(), StatusCode::BAD_REQUEST);
+ let error_body = response.text().await.expect("failed to read response body");
+ assert!(error_body.contains("schema cannot be empty"));
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn create_publication_with_empty_table_name_fails() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+ let source_id = create_test_db_source(&app, &tenant_id).await;
+
+ let publication = CreatePublicationRequest {
+ name: "test_publication".to_string(),
+ tables: vec![Table {
+ schema: "public".to_string(),
+ name: "".to_string(),
+ }],
+ };
+
+ let response = app
+ .create_publication(&tenant_id, source_id, &publication)
+ .await;
+
+ assert_eq!(response.status(), StatusCode::BAD_REQUEST);
+ let error_body = response.text().await.expect("failed to read response body");
+ assert!(error_body.contains("name cannot be empty"));
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn create_publication_with_whitespace_only_name_fails() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+ let source_id = create_test_db_source(&app, &tenant_id).await;
+
+ let source_pool = app.get_source_pool(&tenant_id, source_id).await;
+
+ sqlx::query(
+ r#"
+ create table test_table (
+ id integer primary key
+ )
+ "#,
+ )
+ .execute(&source_pool)
+ .await
+ .expect("failed to create table");
+
+ let publication = CreatePublicationRequest {
+ name: " ".to_string(),
+ tables: vec![Table {
+ schema: "public".to_string(),
+ name: "test_table".to_string(),
+ }],
+ };
+
+ let response = app
+ .create_publication(&tenant_id, source_id, &publication)
+ .await;
+
+ assert_eq!(response.status(), StatusCode::BAD_REQUEST);
+ let error_body = response.text().await.expect("failed to read response body");
+ assert!(error_body.contains("name cannot be empty"));
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn read_publication_succeeds() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+ let source_id = create_test_db_source(&app, &tenant_id).await;
+
+ let source_pool = app.get_source_pool(&tenant_id, source_id).await;
+
+ sqlx::query(
+ r#"
+ create table users (
+ id integer primary key,
+ name text not null
+ )
+ "#,
+ )
+ .execute(&source_pool)
+ .await
+ .expect("failed to create table");
+
+ let publication = CreatePublicationRequest {
+ name: "test_publication".to_string(),
+ tables: vec![Table {
+ schema: "public".to_string(),
+ name: "users".to_string(),
+ }],
+ };
+
+ let response = app
+ .create_publication(&tenant_id, source_id, &publication)
+ .await;
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let response = app
+ .read_publication(&tenant_id, source_id, "test_publication")
+ .await;
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let body: serde_json::Value = response.json().await.expect("failed to parse json");
+ assert_eq!(body["name"], "test_publication");
+ assert_eq!(body["tables"].as_array().unwrap().len(), 1);
+ assert_eq!(body["tables"][0]["schema"], "public");
+ assert_eq!(body["tables"][0]["name"], "users");
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn read_nonexistent_publication_fails() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+ let source_id = create_test_db_source(&app, &tenant_id).await;
+
+ let response = app
+ .read_publication(&tenant_id, source_id, "nonexistent_publication")
+ .await;
+
+ assert_eq!(response.status(), StatusCode::NOT_FOUND);
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn read_publication_with_nonexistent_source_fails() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+
+ let response = app
+ .read_publication(&tenant_id, 99999, "test_publication")
+ .await;
+
+ assert_eq!(response.status(), StatusCode::NOT_FOUND);
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn read_all_publications_with_multiple_publications_succeeds() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+ let source_id = create_test_db_source(&app, &tenant_id).await;
+
+ let source_pool = app.get_source_pool(&tenant_id, source_id).await;
+
+ sqlx::query(
+ r#"
+ create table table1 (
+ id integer primary key
+ )
+ "#,
+ )
+ .execute(&source_pool)
+ .await
+ .expect("failed to create table1");
+
+ sqlx::query(
+ r#"
+ create table table2 (
+ id integer primary key
+ )
+ "#,
+ )
+ .execute(&source_pool)
+ .await
+ .expect("failed to create table2");
+
+ let pub1 = CreatePublicationRequest {
+ name: "publication_1".to_string(),
+ tables: vec![Table {
+ schema: "public".to_string(),
+ name: "table1".to_string(),
+ }],
+ };
+ let response = app.create_publication(&tenant_id, source_id, &pub1).await;
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let pub2 = CreatePublicationRequest {
+ name: "publication_2".to_string(),
+ tables: vec![Table {
+ schema: "public".to_string(),
+ name: "table2".to_string(),
+ }],
+ };
+ let response = app.create_publication(&tenant_id, source_id, &pub2).await;
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let response = app.read_all_publications(&tenant_id, source_id).await;
+
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let body: serde_json::Value = response.json().await.expect("failed to parse json");
+ let publications = body["publications"].as_array().unwrap();
+ assert_eq!(publications.len(), 2);
+
+ let pub_names: Vec<&str> = publications
+ .iter()
+ .map(|p| p["name"].as_str().unwrap())
+ .collect();
+
+ assert!(pub_names.contains(&"publication_1"));
+ assert!(pub_names.contains(&"publication_2"));
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn read_all_publications_with_empty_list_succeeds() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+ let source_id = create_test_db_source(&app, &tenant_id).await;
+
+ let response = app.read_all_publications(&tenant_id, source_id).await;
+
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let body: serde_json::Value = response.json().await.expect("failed to parse json");
+ assert_eq!(body["publications"].as_array().unwrap().len(), 0);
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn read_all_publications_with_nonexistent_source_fails() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+
+ let response = app.read_all_publications(&tenant_id, 99999).await;
+
+ assert_eq!(response.status(), StatusCode::NOT_FOUND);
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn update_publication_succeeds() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+ let source_id = create_test_db_source(&app, &tenant_id).await;
+
+ let source_pool = app.get_source_pool(&tenant_id, source_id).await;
+
+ sqlx::query(
+ r#"
+ create table table1 (
+ id integer primary key
+ )
+ "#,
+ )
+ .execute(&source_pool)
+ .await
+ .expect("failed to create table1");
+
+ sqlx::query(
+ r#"
+ create table table2 (
+ id integer primary key
+ )
+ "#,
+ )
+ .execute(&source_pool)
+ .await
+ .expect("failed to create table2");
+
+ let publication = CreatePublicationRequest {
+ name: "test_publication".to_string(),
+ tables: vec![Table {
+ schema: "public".to_string(),
+ name: "table1".to_string(),
+ }],
+ };
+
+ let response = app
+ .create_publication(&tenant_id, source_id, &publication)
+ .await;
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let updated_publication = UpdatePublicationRequest {
+ tables: vec![
+ Table {
+ schema: "public".to_string(),
+ name: "table1".to_string(),
+ },
+ Table {
+ schema: "public".to_string(),
+ name: "table2".to_string(),
+ },
+ ],
+ };
+
+ let response = app
+ .update_publication(
+ &tenant_id,
+ source_id,
+ "test_publication",
+ &updated_publication,
+ )
+ .await;
+
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let response = app
+ .read_publication(&tenant_id, source_id, "test_publication")
+ .await;
+
+ let body: serde_json::Value = response.json().await.expect("failed to parse json");
+ assert_eq!(body["tables"].as_array().unwrap().len(), 2);
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn update_nonexistent_publication_fails() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+ let source_id = create_test_db_source(&app, &tenant_id).await;
+
+ let source_pool = app.get_source_pool(&tenant_id, source_id).await;
+
+ sqlx::query(
+ r#"
+ create table test_table (
+ id integer primary key
+ )
+ "#,
+ )
+ .execute(&source_pool)
+ .await
+ .expect("failed to create table");
+
+ let updated_publication = UpdatePublicationRequest {
+ tables: vec![Table {
+ schema: "public".to_string(),
+ name: "test_table".to_string(),
+ }],
+ };
+
+ let response = app
+ .update_publication(
+ &tenant_id,
+ source_id,
+ "nonexistent_publication",
+ &updated_publication,
+ )
+ .await;
+
+ assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn update_publication_with_nonexistent_source_fails() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+
+ let updated_publication = UpdatePublicationRequest {
+ tables: vec![Table {
+ schema: "public".to_string(),
+ name: "test_table".to_string(),
+ }],
+ };
+
+ let response = app
+ .update_publication(&tenant_id, 99999, "test_publication", &updated_publication)
+ .await;
+
+ assert_eq!(response.status(), StatusCode::NOT_FOUND);
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn delete_publication_succeeds() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+ let source_id = create_test_db_source(&app, &tenant_id).await;
+
+ let source_pool = app.get_source_pool(&tenant_id, source_id).await;
+
+ sqlx::query(
+ r#"
+ create table test_table (
+ id integer primary key
+ )
+ "#,
+ )
+ .execute(&source_pool)
+ .await
+ .expect("failed to create table");
+
+ let publication = CreatePublicationRequest {
+ name: "test_publication".to_string(),
+ tables: vec![Table {
+ schema: "public".to_string(),
+ name: "test_table".to_string(),
+ }],
+ };
+
+ let response = app
+ .create_publication(&tenant_id, source_id, &publication)
+ .await;
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let response = app
+ .delete_publication(&tenant_id, source_id, "test_publication")
+ .await;
+ assert_eq!(response.status(), StatusCode::OK);
+
+ let response = app
+ .read_publication(&tenant_id, source_id, "test_publication")
+ .await;
+ assert_eq!(response.status(), StatusCode::NOT_FOUND);
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn delete_nonexistent_publication_succeeds() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+ let source_id = create_test_db_source(&app, &tenant_id).await;
+
+ let response = app
+ .delete_publication(&tenant_id, source_id, "nonexistent_publication")
+ .await;
+
+ assert_eq!(response.status(), StatusCode::OK);
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn delete_publication_with_nonexistent_source_fails() {
+ init_test_tracing();
+
+ let app = spawn_test_app().await;
+ let tenant_id = create_tenant(&app).await;
+
+ let response = app
+ .delete_publication(&tenant_id, 99999, "test_publication")
+ .await;
+
+ assert_eq!(response.status(), StatusCode::NOT_FOUND);
+}
diff --git a/etl-api/tests/support/mocks.rs b/etl-api/tests/support/mocks.rs
index 623933cc3..cad121bb8 100644
--- a/etl-api/tests/support/mocks.rs
+++ b/etl-api/tests/support/mocks.rs
@@ -8,6 +8,7 @@ use etl_api::routes::images::{CreateImageRequest, CreateImageResponse};
use etl_api::routes::pipelines::{CreatePipelineRequest, CreatePipelineResponse};
use etl_api::routes::sources::{CreateSourceRequest, CreateSourceResponse};
use etl_config::SerializableSecretString;
+use secrecy::ExposeSecret;
use crate::support::test_app::TestApp;
@@ -192,6 +193,28 @@ pub mod sources {
.expect("failed to deserialize response");
response.id
}
+
+ /// Returns a source config pointing to the test database itself.
+ /// Useful for publication tests where you need to create tables in the source.
+ pub fn test_db_source_config(app: &TestApp) -> FullApiSourceConfig {
+ let db_config = app.database_config();
+ FullApiSourceConfig {
+ host: db_config.host.clone(),
+ port: db_config.port,
+ name: db_config.name.clone(),
+ username: db_config.username.clone(),
+ password: db_config
+ .password
+ .as_ref()
+ .map(|p| p.expose_secret().to_string().into()),
+ }
+ }
+
+ /// Creates a source pointing to the test database and returns its id.
+ /// Useful for publication tests where you need to create tables in the source.
+ pub async fn create_test_db_source(app: &TestApp, tenant_id: &str) -> i64 {
+ create_source_with_config(app, tenant_id, new_name(), test_db_source_config(app)).await
+ }
}
/// Tenant helpers.
diff --git a/etl-api/tests/support/test_app.rs b/etl-api/tests/support/test_app.rs
index f27502eb9..520f7ce8e 100644
--- a/etl-api/tests/support/test_app.rs
+++ b/etl-api/tests/support/test_app.rs
@@ -1,6 +1,8 @@
#![allow(dead_code)]
+use etl_api::db;
use etl_api::k8s::K8sClient;
+use etl_api::routes::connect_to_source_database_with_defaults;
use etl_api::routes::destinations::{CreateDestinationRequest, UpdateDestinationRequest};
use etl_api::routes::destinations_pipelines::{
CreateDestinationPipelineRequest, UpdateDestinationPipelineRequest,
@@ -15,11 +17,7 @@ use etl_api::routes::tenants::{
CreateOrUpdateTenantRequest, CreateTenantRequest, UpdateTenantRequest,
};
use etl_api::routes::tenants_sources::CreateTenantSourceRequest;
-use etl_api::{
- config::ApiConfig,
- configs::encryption::{self, generate_random_key},
- startup::run,
-};
+use etl_api::{config::ApiConfig, configs::encryption, startup::run};
use etl_config::shared::PgConnectionConfig;
use etl_config::{Environment, load_config};
use etl_postgres::sqlx::test_utils::drop_pg_database;
@@ -39,6 +37,8 @@ pub struct TestApp {
pub api_client: reqwest::Client,
pub api_key: String,
config: ApiConfig,
+ pool: sqlx::PgPool,
+ encryption_key: encryption::EncryptionKey,
server_handle: tokio::task::JoinHandle>,
}
@@ -494,6 +494,100 @@ impl TestApp {
.await
.expect("failed to execute request")
}
+
+ pub async fn create_publication(
+ &self,
+ tenant_id: &str,
+ source_id: i64,
+ publication: &etl_api::routes::sources::publications::CreatePublicationRequest,
+ ) -> reqwest::Response {
+ self.post_authenticated(format!(
+ "{}/v1/sources/{}/publications",
+ &self.address, source_id
+ ))
+ .header("tenant_id", tenant_id)
+ .json(publication)
+ .send()
+ .await
+ .expect("failed to execute request")
+ }
+
+ pub async fn update_publication(
+ &self,
+ tenant_id: &str,
+ source_id: i64,
+ publication_name: &str,
+ publication: &etl_api::routes::sources::publications::UpdatePublicationRequest,
+ ) -> reqwest::Response {
+ self.post_authenticated(format!(
+ "{}/v1/sources/{}/publications/{}",
+ &self.address, source_id, publication_name
+ ))
+ .header("tenant_id", tenant_id)
+ .json(publication)
+ .send()
+ .await
+ .expect("failed to execute request")
+ }
+
+ pub async fn read_publication(
+ &self,
+ tenant_id: &str,
+ source_id: i64,
+ publication_name: &str,
+ ) -> reqwest::Response {
+ self.get_authenticated(format!(
+ "{}/v1/sources/{}/publications/{}",
+ &self.address, source_id, publication_name
+ ))
+ .header("tenant_id", tenant_id)
+ .send()
+ .await
+ .expect("failed to execute request")
+ }
+
+ pub async fn read_all_publications(
+ &self,
+ tenant_id: &str,
+ source_id: i64,
+ ) -> reqwest::Response {
+ self.get_authenticated(format!(
+ "{}/v1/sources/{}/publications",
+ &self.address, source_id
+ ))
+ .header("tenant_id", tenant_id)
+ .send()
+ .await
+ .expect("failed to execute request")
+ }
+
+ pub async fn delete_publication(
+ &self,
+ tenant_id: &str,
+ source_id: i64,
+ publication_name: &str,
+ ) -> reqwest::Response {
+ self.delete_authenticated(format!(
+ "{}/v1/sources/{}/publications/{}",
+ &self.address, source_id, publication_name
+ ))
+ .header("tenant_id", tenant_id)
+ .send()
+ .await
+ .expect("failed to execute request")
+ }
+
+ pub async fn get_source_pool(&self, tenant_id: &str, source_id: i64) -> sqlx::PgPool {
+ let source =
+ db::sources::read_source(&self.pool, tenant_id, source_id, &self.encryption_key)
+ .await
+ .expect("failed to read source")
+ .expect("source not found");
+
+ connect_to_source_database_with_defaults(&source.config.into_connection_config())
+ .await
+ .expect("failed to connect to source database")
+ }
}
impl Drop for TestApp {
@@ -525,8 +619,25 @@ pub async fn spawn_test_app() -> TestApp {
let api_db_pool = create_etl_api_database(&config.database).await;
- let key = generate_random_key::<32>().expect("failed to generate random key");
- let encryption_key = encryption::EncryptionKey { id: 0, key };
+ // Generate a single random key and create two RandomizedNonceKey instances
+ // (RandomizedNonceKey doesn't implement Clone, but we can create two from the same bytes)
+ let mut key_bytes = [0u8; 32];
+ aws_lc_rs::rand::fill(&mut key_bytes).expect("failed to generate random bytes");
+ let test_key =
+ aws_lc_rs::aead::RandomizedNonceKey::new(&aws_lc_rs::aead::AES_256_GCM, &key_bytes)
+ .expect("failed to create test encryption key");
+ let server_key =
+ aws_lc_rs::aead::RandomizedNonceKey::new(&aws_lc_rs::aead::AES_256_GCM, &key_bytes)
+ .expect("failed to create server encryption key");
+
+ let encryption_key = encryption::EncryptionKey {
+ id: 0,
+ key: test_key,
+ };
+ let server_encryption_key = encryption::EncryptionKey {
+ id: 0,
+ key: server_key,
+ };
// We choose a random API key from the ones configured to show that rotation works.
let api_key_index = random_range(0..config.api_keys.len());
@@ -537,8 +648,8 @@ pub async fn spawn_test_app() -> TestApp {
let server = run(
config.clone(),
listener,
- api_db_pool,
- encryption_key,
+ api_db_pool.clone(),
+ server_encryption_key,
k8s_client,
None,
)
@@ -552,6 +663,8 @@ pub async fn spawn_test_app() -> TestApp {
api_client: reqwest::Client::new(),
api_key,
config,
+ pool: api_db_pool,
+ encryption_key,
server_handle,
}
}