diff --git a/Cargo.toml b/Cargo.toml index 775f17177..34de98c95 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "etl-postgres", "etl-replicator", "etl-telemetry", + "etl-validation", ] [workspace.package] @@ -27,6 +28,7 @@ etl-destinations = { path = "etl-destinations", default-features = false } etl-postgres = { path = "etl-postgres", default-features = false } etl-replicator = { path = "etl-replicator", default-features = false } etl-telemetry = { path = "etl-telemetry", default-features = false } +etl-validation = { path = "etl-validation", default-features = false } actix-web = { version = "4.11.0", default-features = false } actix-web-httpauth = { version = "0.8.2", default-features = false } diff --git a/etl-api/Cargo.toml b/etl-api/Cargo.toml index 10a55a139..aec1763e8 100644 --- a/etl-api/Cargo.toml +++ b/etl-api/Cargo.toml @@ -18,6 +18,7 @@ name = "etl-api" etl-config = { workspace = true, features = ["utoipa"] } etl-postgres = { workspace = true, features = ["replication"] } etl-telemetry = { workspace = true } +etl-validation = { workspace = true } actix-web = { workspace = true, features = ["macros", "http2"] } actix-web-httpauth = { workspace = true } diff --git a/etl-api/src/configs/destination.rs b/etl-api/src/configs/destination.rs index 521cd30e5..d70fcaa0a 100644 --- a/etl-api/src/configs/destination.rs +++ b/etl-api/src/configs/destination.rs @@ -171,6 +171,13 @@ impl StoredDestinationConfig { } } +impl From for DestinationConfig { + fn from(value: FullApiDestinationConfig) -> Self { + let stored_config: StoredDestinationConfig = value.into(); + stored_config.into_etl_config() + } +} + impl From for StoredDestinationConfig { fn from(value: FullApiDestinationConfig) -> Self { match value { diff --git a/etl-api/src/routes/destinations.rs b/etl-api/src/routes/destinations.rs index c3f00c6a8..439a87e18 100644 --- a/etl-api/src/routes/destinations.rs +++ b/etl-api/src/routes/destinations.rs @@ -1,20 +1,21 @@ +use crate::configs::destination::FullApiDestinationConfig; +use crate::configs::encryption::EncryptionKey; +use crate::db; +use crate::db::destinations::DestinationsDbError; +use crate::routes::{ErrorMessage, TenantIdError, TestConnectionResponse, extract_tenant_id}; use actix_web::{ HttpRequest, HttpResponse, Responder, ResponseError, delete, get, http::{StatusCode, header::ContentType}, post, web::{Data, Json, Path}, }; +use etl_config::shared::DestinationConfig; +use etl_validation::DestinationValidator; use serde::{Deserialize, Serialize}; use sqlx::PgPool; use thiserror::Error; use utoipa::ToSchema; -use crate::configs::destination::FullApiDestinationConfig; -use crate::configs::encryption::EncryptionKey; -use crate::db; -use crate::db::destinations::DestinationsDbError; -use crate::routes::{ErrorMessage, TenantIdError, extract_tenant_id}; - #[derive(Debug, Error)] pub enum DestinationError { #[error("The destination with id {0} was not found")] @@ -283,3 +284,30 @@ pub async fn read_all_destinations( Ok(Json(response)) } + +#[utoipa::path( + summary = "Test a destination connection (pre-creation)", + description = "Tests a destination connection without creating a resource. Validates connectivity, permissions, and configuration for BigQuery or Iceberg destinations. Useful for validating credentials before creating a destination.", + request_body = FullApiDestinationConfig, + responses( + (status = 200, description = "Connection test successful", body = TestConnectionResponse), + (status = 400, description = "Bad request or connection test failed", body = ErrorMessage), + (status = 500, description = "Internal server error", body = ErrorMessage), + ), + tag = "Destinations" +)] +#[post("/destinations/test-connection")] +pub async fn test_destination_connection( + config: Json, +) -> Result { + let destination_config: DestinationConfig = config.into_inner().into(); + + destination_config.validate().await.map_err(|e| { + DestinationsDbError::Database(sqlx::Error::Configuration(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + e, + )))) + })?; + + Ok(Json(TestConnectionResponse { valid: true })) +} diff --git a/etl-api/src/routes/mod.rs b/etl-api/src/routes/mod.rs index d6b2ca272..ca3c70cf9 100644 --- a/etl-api/src/routes/mod.rs +++ b/etl-api/src/routes/mod.rs @@ -21,6 +21,12 @@ const MIN_POOL_CONNECTIONS: u32 = 1; /// Maximum number of connections for the source Postgres connection pool. const MAX_POOL_CONNECTIONS: u32 = 1; +#[derive(Debug, Serialize, Deserialize, ToSchema)] +pub struct TestConnectionResponse { + #[schema(example = true)] + pub valid: bool, +} + #[derive(Debug, Serialize, Deserialize, ToSchema)] pub struct ErrorMessage { #[schema(example = "an error occurred in the api")] diff --git a/etl-api/src/routes/sources.rs b/etl-api/src/routes/sources.rs index dfc44eea8..7f0af6fe6 100644 --- a/etl-api/src/routes/sources.rs +++ b/etl-api/src/routes/sources.rs @@ -1,14 +1,15 @@ use crate::configs::encryption::EncryptionKey; -use crate::configs::source::{FullApiSourceConfig, StrippedApiSourceConfig}; +use crate::configs::source::{FullApiSourceConfig, StoredSourceConfig, StrippedApiSourceConfig}; use crate::db; use crate::db::sources::SourcesDbError; -use crate::routes::{ErrorMessage, TenantIdError, extract_tenant_id}; +use crate::routes::{ErrorMessage, TenantIdError, TestConnectionResponse, extract_tenant_id}; use actix_web::{ HttpRequest, HttpResponse, Responder, ResponseError, delete, get, http::{StatusCode, header::ContentType}, post, web::{Data, Json, Path}, }; +use etl_validation::SourceValidator; use serde::{Deserialize, Serialize}; use sqlx::PgPool; use thiserror::Error; @@ -282,3 +283,30 @@ pub async fn read_all_sources( Ok(Json(response)) } + +#[utoipa::path( + summary = "Test a source connection (pre-creation)", + description = "Tests a PostgreSQL source connection without creating a resource. Validates connectivity, version, WAL level, permissions, and replication slot availability. Useful for validating credentials before creating a source.", + request_body = FullApiSourceConfig, + responses( + (status = 200, description = "Connection test successful", body = TestConnectionResponse), + (status = 400, description = "Bad request or connection test failed", body = ErrorMessage), + (status = 500, description = "Internal server error", body = ErrorMessage), + ), + tag = "Sources" +)] +#[post("/sources/test-connection")] +pub async fn test_source_connection( + config: Json, +) -> Result { + let config = config.into_inner(); + let stored_config: StoredSourceConfig = config.into(); + let pg_config = stored_config.into_connection_config(); + + pg_config + .validate() + .await + .map_err(|e| SourcesDbError::Database(sqlx::Error::Configuration(Box::new(e))))?; + + Ok(Json(TestConnectionResponse { valid: true })) +} diff --git a/etl-api/src/startup.rs b/etl-api/src/startup.rs index bf06e5d0d..318e40719 100644 --- a/etl-api/src/startup.rs +++ b/etl-api/src/startup.rs @@ -24,10 +24,12 @@ use crate::{ configs::encryption, db::publications::Publication, routes::{ + TestConnectionResponse, destinations::{ CreateDestinationRequest, CreateDestinationResponse, ReadDestinationResponse, ReadDestinationsResponse, UpdateDestinationRequest, create_destination, - delete_destination, read_all_destinations, read_destination, update_destination, + delete_destination, read_all_destinations, read_destination, + test_destination_connection, update_destination, }, destinations_pipelines::{ CreateDestinationPipelineRequest, CreateDestinationPipelineResponse, @@ -60,7 +62,7 @@ use crate::{ }, read_all_sources, read_source, tables::read_table_names, - update_source, + test_source_connection, update_source, }, tenants::{ CreateOrUpdateTenantRequest, CreateOrUpdateTenantResponse, CreateTenantRequest, @@ -242,6 +244,7 @@ pub async fn run( ReadTenantsResponse, CreateSourceRequest, CreateSourceResponse, + TestConnectionResponse, UpdateSourceRequest, ReadSourceResponse, ReadSourcesResponse, @@ -293,6 +296,7 @@ pub async fn run( crate::routes::sources::update_source, crate::routes::sources::delete_source, crate::routes::sources::read_all_sources, + crate::routes::sources::test_source_connection, crate::routes::sources::publications::create_publication, crate::routes::sources::publications::read_publication, crate::routes::sources::publications::update_publication, @@ -304,6 +308,7 @@ pub async fn run( crate::routes::destinations::update_destination, crate::routes::destinations::delete_destination, crate::routes::destinations::read_all_destinations, + crate::routes::destinations::test_destination_connection, crate::routes::tenants_sources::create_tenant_and_source, crate::routes::destinations_pipelines::create_destination_and_pipeline, crate::routes::destinations_pipelines::update_destination_and_pipeline, @@ -343,16 +348,18 @@ pub async fn run( .service(read_all_tenants) //sources .service(create_source) + .service(read_all_sources) + .service(test_source_connection) .service(read_source) .service(update_source) .service(delete_source) - .service(read_all_sources) //destinations .service(create_destination) + .service(read_all_destinations) + .service(test_destination_connection) .service(read_destination) .service(update_destination) .service(delete_destination) - .service(read_all_destinations) //pipelines .service(create_pipeline) .service(read_pipeline) diff --git a/etl-api/tests/support/test_app.rs b/etl-api/tests/support/test_app.rs index f27502eb9..786bf717f 100644 --- a/etl-api/tests/support/test_app.rs +++ b/etl-api/tests/support/test_app.rs @@ -248,6 +248,28 @@ impl TestApp { .expect("failed to execute request") } + pub async fn test_source_connection( + &self, + config: &etl_api::configs::source::FullApiSourceConfig, + ) -> reqwest::Response { + self.post_authenticated(format!("{}/v1/sources/test-connection", &self.address)) + .json(config) + .send() + .await + .expect("Failed to execute request.") + } + + pub async fn test_destination_connection( + &self, + config: &etl_api::configs::destination::FullApiDestinationConfig, + ) -> reqwest::Response { + self.post_authenticated(format!("{}/v1/destinations/test-connection", &self.address)) + .json(config) + .send() + .await + .expect("Failed to execute request.") + } + pub async fn create_pipeline( &self, tenant_id: &str, @@ -501,12 +523,21 @@ impl Drop for TestApp { // First, abort the server task to ensure it's terminated. self.server_handle.abort(); + // Clone the config to avoid move issues + let db_config = self.config.database.clone(); + // To use `block_in_place,` we need a multithreaded runtime since when a blocking // task is issued, the runtime will offload existing tasks to another worker. - tokio::task::block_in_place(move || { - Handle::current() - .block_on(async move { drop_pg_database(&self.config.database).await }); - }); + // Wrap in catch_unwind to ensure panics during cleanup don't propagate + let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + tokio::task::block_in_place(|| { + Handle::current().block_on(async { + // Give server time to shut down gracefully + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + drop_pg_database(&db_config).await; + }); + }); + })); } } diff --git a/etl-api/tests/test_connections.rs b/etl-api/tests/test_connections.rs new file mode 100644 index 000000000..99efe03bc --- /dev/null +++ b/etl-api/tests/test_connections.rs @@ -0,0 +1,273 @@ +use etl_api::configs::destination::{FullApiDestinationConfig, FullApiIcebergConfig}; +use etl_api::configs::source::FullApiSourceConfig; +use etl_config::SerializableSecretString; +use etl_telemetry::tracing::init_test_tracing; +use reqwest::StatusCode; + +use crate::support::test_app::spawn_test_app; + +mod support; + +// ============================================================================ +// Source Connection Tests +// ============================================================================ + +#[tokio::test(flavor = "multi_thread")] +async fn test_source_connection_with_invalid_credentials_fails() { + init_test_tracing(); + + // Arrange + let app = spawn_test_app().await; + let config = FullApiSourceConfig { + host: "localhost".to_string(), + port: 5430, + username: "postgres".to_string(), + password: Some(SerializableSecretString::from("wrong_password".to_string())), + name: "postgres".to_string(), + }; + + // Act + let response = app.test_source_connection(&config).await; + + // Assert + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_source_connection_with_nonexistent_database_fails() { + init_test_tracing(); + + // Arrange + let app = spawn_test_app().await; + let config = FullApiSourceConfig { + host: "localhost".to_string(), + port: 5430, + username: "postgres".to_string(), + password: Some(SerializableSecretString::from("postgres".to_string())), + name: "nonexistent_database_12345".to_string(), + }; + + // Act + let response = app.test_source_connection(&config).await; + + // Assert + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_source_connection_with_wrong_port_fails() { + init_test_tracing(); + + // Arrange + let app = spawn_test_app().await; + let config = FullApiSourceConfig { + host: "localhost".to_string(), + port: 9999, // Wrong port + username: "postgres".to_string(), + password: Some(SerializableSecretString::from("postgres".to_string())), + name: "postgres".to_string(), + }; + + // Act + let response = app.test_source_connection(&config).await; + + // Assert + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_source_connection_with_wrong_host_fails() { + init_test_tracing(); + + // Arrange + let app = spawn_test_app().await; + let config = FullApiSourceConfig { + host: "nonexistent.host.example.com".to_string(), + port: 5432, + username: "postgres".to_string(), + password: Some(SerializableSecretString::from("postgres".to_string())), + name: "postgres".to_string(), + }; + + // Act + let response = app.test_source_connection(&config).await; + + // Assert + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); +} + +// ============================================================================ +// Destination Connection Tests - BigQuery +// ============================================================================ + +#[tokio::test(flavor = "multi_thread")] +async fn test_destination_connection_bigquery_with_invalid_service_account_fails() { + init_test_tracing(); + + // Arrange + let app = spawn_test_app().await; + let config = FullApiDestinationConfig::BigQuery { + project_id: "test-project".to_string(), + dataset_id: "test_dataset".to_string(), + service_account_key: SerializableSecretString::from("invalid-json".to_string()), + max_staleness_mins: None, + max_concurrent_streams: None, + }; + + // Act + let response = app.test_destination_connection(&config).await; + + // Assert + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_destination_connection_bigquery_with_malformed_json_fails() { + init_test_tracing(); + + // Arrange + let app = spawn_test_app().await; + let config = FullApiDestinationConfig::BigQuery { + project_id: "test-project".to_string(), + dataset_id: "test_dataset".to_string(), + service_account_key: SerializableSecretString::from("{not valid json".to_string()), + max_staleness_mins: None, + max_concurrent_streams: None, + }; + + // Act + let response = app.test_destination_connection(&config).await; + + // Assert + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); +} + +// ============================================================================ +// Destination Connection Tests - Iceberg REST +// ============================================================================ + +#[tokio::test(flavor = "multi_thread")] +async fn test_destination_connection_iceberg_rest_with_invalid_uri_fails() { + init_test_tracing(); + + // Arrange + let app = spawn_test_app().await; + let config = FullApiDestinationConfig::Iceberg { + config: FullApiIcebergConfig::Rest { + catalog_uri: "http://localhost:9999/nonexistent".to_string(), // Invalid port/URI + warehouse_name: "test_warehouse".to_string(), + namespace: None, + s3_endpoint: "http://localhost:9010".to_string(), + s3_access_key_id: SerializableSecretString::from("test-key".to_string()), + s3_secret_access_key: SerializableSecretString::from("test-secret".to_string()), + }, + }; + + // Act + let response = app.test_destination_connection(&config).await; + + // Assert + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_destination_connection_iceberg_rest_with_invalid_host_fails() { + init_test_tracing(); + + // Arrange + let app = spawn_test_app().await; + let config = FullApiDestinationConfig::Iceberg { + config: FullApiIcebergConfig::Rest { + catalog_uri: "http://nonexistent.host.example.com/catalog".to_string(), + warehouse_name: "test_warehouse".to_string(), + namespace: None, + s3_endpoint: "http://localhost:9010".to_string(), + s3_access_key_id: SerializableSecretString::from("test-key".to_string()), + s3_secret_access_key: SerializableSecretString::from("test-secret".to_string()), + }, + }; + + // Act + let response = app.test_destination_connection(&config).await; + + // Assert + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); +} + +// ============================================================================ +// Destination Connection Tests - Iceberg Supabase +// ============================================================================ + +#[tokio::test(flavor = "multi_thread")] +async fn test_destination_connection_iceberg_supabase_with_fake_credentials_fails() { + init_test_tracing(); + + // Arrange + let app = spawn_test_app().await; + let config = FullApiDestinationConfig::Iceberg { + config: FullApiIcebergConfig::Supabase { + project_ref: "fake-project-ref".to_string(), + warehouse_name: "test_warehouse".to_string(), + namespace: None, + catalog_token: SerializableSecretString::from("fake-token".to_string()), + s3_access_key_id: SerializableSecretString::from("fake-key".to_string()), + s3_secret_access_key: SerializableSecretString::from("fake-secret".to_string()), + s3_region: "us-east-1".to_string(), + }, + }; + + // Act + let response = app.test_destination_connection(&config).await; + + // Assert + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); +} + +// ============================================================================ +// Memory Destination Tests +// ============================================================================ + +#[tokio::test(flavor = "multi_thread")] +async fn test_destination_connection_memory_succeeds() { + init_test_tracing(); + + // Arrange + let app = spawn_test_app().await; + let config = FullApiDestinationConfig::Memory; + + // Act + let response = app.test_destination_connection(&config).await; + + // Assert + // Memory destinations don't require connection testing and always succeed + assert_eq!(response.status(), StatusCode::OK); +} + +// ============================================================================ +// Response Format Tests +// ============================================================================ + +#[tokio::test(flavor = "multi_thread")] +async fn test_connection_response_has_correct_format_on_error() { + init_test_tracing(); + + // Arrange + let app = spawn_test_app().await; + let config = FullApiSourceConfig { + host: "localhost".to_string(), + port: 9999, // Wrong port + username: "postgres".to_string(), + password: Some(SerializableSecretString::from("postgres".to_string())), + name: "postgres".to_string(), + }; + + // Act + let response = app.test_source_connection(&config).await; + + // Assert + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); + + // Verify response body contains error information + let body = response.text().await.expect("Failed to read response body"); + assert!(body.contains("error") || body.contains("Error")); +} diff --git a/etl-postgres/src/sqlx/test_utils.rs b/etl-postgres/src/sqlx/test_utils.rs index d26c20923..0051887c1 100644 --- a/etl-postgres/src/sqlx/test_utils.rs +++ b/etl-postgres/src/sqlx/test_utils.rs @@ -29,16 +29,21 @@ pub async fn create_pg_database(config: &PgConnectionConfig) -> PgPool { /// Connects to Postgres server, forcefully terminates active connections /// to the target database, and drops it if it exists. Used for test cleanup. /// -/// # Panics -/// Panics if any database operation fails. +/// This function will not panic on errors - it logs them and continues. +/// This ensures test cleanup doesn't fail when databases are already gone +/// or connections can't be established. pub async fn drop_pg_database(config: &PgConnectionConfig) { // Connect to the default database. - let mut connection = PgConnection::connect_with(&config.without_db()) - .await - .expect("Failed to connect to Postgres"); + let mut connection = match PgConnection::connect_with(&config.without_db()).await { + Ok(conn) => conn, + Err(e) => { + eprintln!("Warning: Failed to connect to Postgres for cleanup: {}", e); + return; + } + }; // Forcefully terminate any remaining connections to the database. - connection + if let Err(e) = connection .execute(&*format!( r#" select pg_terminate_backend(pg_stat_activity.pid) @@ -48,11 +53,18 @@ pub async fn drop_pg_database(config: &PgConnectionConfig) { config.name )) .await - .expect("Failed to terminate database connections"); + { + eprintln!( + "Warning: Failed to terminate connections for database {}: {}", + config.name, e + ); + } // Drop the database. - connection + if let Err(e) = connection .execute(&*format!(r#"drop database if exists "{}";"#, config.name)) .await - .expect("Failed to destroy database"); + { + eprintln!("Warning: Failed to drop database {}: {}", config.name, e); + } } diff --git a/etl-validation/Cargo.toml b/etl-validation/Cargo.toml new file mode 100644 index 000000000..a0d60660e --- /dev/null +++ b/etl-validation/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "etl-validation" +version = "0.1.0" +edition.workspace = true +license.workspace = true +rust-version.workspace = true +repository.workspace = true +homepage.workspace = true + +[dependencies] +etl-config = { workspace = true } +async-trait = { workspace = true } +thiserror = { workspace = true } +sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] } +tokio = { workspace = true, features = ["time"] } +secrecy = { workspace = true } +gcp-bigquery-client = { workspace = true, features = ["aws-lc-rs"] } +iceberg = { workspace = true } +iceberg-catalog-rest = { workspace = true } +serde_json = { workspace = true } + +[dev-dependencies] +etl-postgres = { workspace = true, features = ["sqlx", "test-utils"] } +etl-telemetry = { workspace = true } +tokio = { workspace = true, features = ["full"] } +uuid = { workspace = true } +reqwest = { workspace = true } +serde = { workspace = true } diff --git a/etl-validation/src/destination/bigquery.rs b/etl-validation/src/destination/bigquery.rs new file mode 100644 index 000000000..ee2764539 --- /dev/null +++ b/etl-validation/src/destination/bigquery.rs @@ -0,0 +1,47 @@ +use crate::error::ValidationError; +use gcp_bigquery_client::Client; +use gcp_bigquery_client::yup_oauth2::parse_service_account_key; +use secrecy::{ExposeSecret, SecretString}; + +pub(super) async fn validate( + project_id: &str, + dataset_id: &str, + service_account_key: &SecretString, +) -> Result<(), ValidationError> { + let sa_key = parse_service_account_key(service_account_key.expose_secret()).map_err(|e| { + ValidationError::AuthenticationFailed(format!("Invalid service account key format: {}", e)) + })?; + + let client = Client::from_service_account_key(sa_key, false) + .await + .map_err(|e| { + ValidationError::AuthenticationFailed(format!( + "Failed to create BigQuery client: {}", + e + )) + })?; + + let _dataset = client + .dataset() + .get(project_id, dataset_id) + .await + .map_err(|e| { + ValidationError::ConnectionFailed(format!( + "Cannot access dataset {}.{}: {}", + project_id, dataset_id, e + )) + })?; + + client + .table() + .list(project_id, dataset_id, Default::default()) + .await + .map_err(|e| { + ValidationError::PermissionDenied(format!( + "Service account lacks permissions to access dataset: {}", + e + )) + })?; + + Ok(()) +} diff --git a/etl-validation/src/destination/iceberg.rs b/etl-validation/src/destination/iceberg.rs new file mode 100644 index 000000000..44336d4d6 --- /dev/null +++ b/etl-validation/src/destination/iceberg.rs @@ -0,0 +1,55 @@ +use crate::error::ValidationError; +use iceberg::{Catalog, CatalogBuilder}; +use iceberg_catalog_rest::{ + REST_CATALOG_PROP_URI, REST_CATALOG_PROP_WAREHOUSE, RestCatalogBuilder, +}; +use std::collections::HashMap; + +pub(super) async fn validate_rest( + catalog_uri: &str, + warehouse_name: &str, + properties: &HashMap, +) -> Result<(), ValidationError> { + let mut props = properties.clone(); + props.insert(REST_CATALOG_PROP_URI.to_string(), catalog_uri.to_string()); + props.insert( + REST_CATALOG_PROP_WAREHOUSE.to_string(), + warehouse_name.to_string(), + ); + + let builder = RestCatalogBuilder::default(); + let catalog = builder.load("RestCatalog", props).await.map_err(|e| { + ValidationError::ConnectionFailed(format!("Failed to create Iceberg REST catalog: {}", e)) + })?; + + catalog.list_namespaces(None).await.map_err(|e| { + ValidationError::ConnectionFailed(format!( + "Failed to list namespaces in Iceberg catalog: {}", + e + )) + })?; + + Ok(()) +} + +pub(super) async fn validate_supabase( + project_ref: &str, + warehouse_name: &str, + catalog_token: &str, + s3_access_key_id: &str, + s3_secret_access_key: &str, + s3_region: &str, +) -> Result<(), ValidationError> { + let catalog_uri = format!("https://{}.supabase.co/storage/v1/s3", project_ref); + + let mut properties = HashMap::new(); + properties.insert("s3.access-key-id".to_string(), s3_access_key_id.to_string()); + properties.insert( + "s3.secret-access-key".to_string(), + s3_secret_access_key.to_string(), + ); + properties.insert("s3.region".to_string(), s3_region.to_string()); + properties.insert("token".to_string(), catalog_token.to_string()); + + validate_rest(&catalog_uri, warehouse_name, &properties).await +} diff --git a/etl-validation/src/destination/mod.rs b/etl-validation/src/destination/mod.rs new file mode 100644 index 000000000..247f8036f --- /dev/null +++ b/etl-validation/src/destination/mod.rs @@ -0,0 +1,80 @@ +mod bigquery; +mod iceberg; + +use crate::error::ValidationError; +use async_trait::async_trait; +use etl_config::shared::{DestinationConfig, IcebergConfig}; +use secrecy::ExposeSecret; +use std::collections::HashMap; + +#[async_trait] +pub trait DestinationValidator { + async fn validate(&self) -> Result<(), ValidationError>; +} + +#[async_trait] +impl DestinationValidator for DestinationConfig { + async fn validate(&self) -> Result<(), ValidationError> { + match self { + DestinationConfig::Memory => Ok(()), + + DestinationConfig::BigQuery { + project_id, + dataset_id, + service_account_key, + .. + } => bigquery::validate(project_id, dataset_id, service_account_key).await, + + DestinationConfig::Iceberg { config } => config.validate().await, + } + } +} + +#[async_trait] +impl DestinationValidator for IcebergConfig { + async fn validate(&self) -> Result<(), ValidationError> { + match self { + IcebergConfig::Rest { + catalog_uri, + warehouse_name, + s3_access_key_id, + s3_secret_access_key, + s3_endpoint, + .. + } => { + let mut properties = HashMap::new(); + properties.insert( + "s3.access-key-id".to_string(), + s3_access_key_id.expose_secret().to_string(), + ); + properties.insert( + "s3.secret-access-key".to_string(), + s3_secret_access_key.expose_secret().to_string(), + ); + properties.insert("s3.endpoint".to_string(), s3_endpoint.to_string()); + + iceberg::validate_rest(catalog_uri, warehouse_name, &properties).await + } + + IcebergConfig::Supabase { + project_ref, + warehouse_name, + catalog_token, + s3_access_key_id, + s3_secret_access_key, + s3_region, + .. + } => { + iceberg::validate_supabase( + project_ref, + warehouse_name, + catalog_token.expose_secret(), + s3_access_key_id.expose_secret(), + s3_secret_access_key.expose_secret(), + s3_region, + ) + .await + } + } + } +} diff --git a/etl-validation/src/error.rs b/etl-validation/src/error.rs new file mode 100644 index 000000000..e161bd585 --- /dev/null +++ b/etl-validation/src/error.rs @@ -0,0 +1,19 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum ValidationError { + #[error("Connection failed: {0}")] + ConnectionFailed(String), + + #[error("Invalid configuration: {0}")] + InvalidConfig(String), + + #[error("Permission denied: {0}")] + PermissionDenied(String), + + #[error("Authentication failed: {0}")] + AuthenticationFailed(String), + + #[error("Validation error: {0}")] + Other(String), +} diff --git a/etl-validation/src/lib.rs b/etl-validation/src/lib.rs new file mode 100644 index 000000000..5f8e63c8f --- /dev/null +++ b/etl-validation/src/lib.rs @@ -0,0 +1,6 @@ +pub mod destination; +pub mod error; +pub mod source; +pub use destination::DestinationValidator; +pub use error::ValidationError; +pub use source::SourceValidator; diff --git a/etl-validation/src/source.rs b/etl-validation/src/source.rs new file mode 100644 index 000000000..84e33268d --- /dev/null +++ b/etl-validation/src/source.rs @@ -0,0 +1,97 @@ +use async_trait::async_trait; +use etl_config::shared::{IntoConnectOptions, PgConnectionConfig}; +use sqlx::postgres::PgPoolOptions; +use std::time::Duration; + +use crate::error::ValidationError; + +#[async_trait] +pub trait SourceValidator { + async fn validate(&self) -> Result<(), ValidationError>; +} + +#[async_trait] +impl SourceValidator for PgConnectionConfig { + async fn validate(&self) -> Result<(), ValidationError> { + let pool = PgPoolOptions::new() + .max_connections(1) + .acquire_timeout(Duration::from_secs(10)) + .connect_with(self.with_db()) + .await + .map_err(|e| ValidationError::ConnectionFailed(e.to_string()))?; + + let version: String = sqlx::query_scalar("SELECT version()") + .fetch_one(&pool) + .await + .map_err(|e| ValidationError::ConnectionFailed(e.to_string()))?; + + let version_major = + parse_postgres_version(&version).map_err(ValidationError::InvalidConfig)?; + + if ![14, 15, 16, 17].contains(&version_major) { + return Err(ValidationError::InvalidConfig(format!( + "PostgreSQL version {} is not supported. Supported versions: 14, 15, 16, 17", + version_major + ))); + } + + let wal_level: String = sqlx::query_scalar("SHOW wal_level") + .fetch_one(&pool) + .await + .map_err(|e| ValidationError::ConnectionFailed(e.to_string()))?; + + if wal_level != "logical" { + return Err(ValidationError::InvalidConfig(format!( + "wal_level is '{}', must be 'logical' for replication", + wal_level + ))); + } + + let has_replication: bool = + sqlx::query_scalar("SELECT rolreplication FROM pg_roles WHERE rolname = current_user") + .fetch_one(&pool) + .await + .map_err(|e| ValidationError::ConnectionFailed(e.to_string()))?; + + if !has_replication { + return Err(ValidationError::PermissionDenied( + "User lacks REPLICATION privilege".to_string(), + )); + } + + let available_slots: i64 = sqlx::query_scalar( + "SELECT + (SELECT setting::int FROM pg_settings WHERE name = 'max_replication_slots') - + (SELECT COUNT(*) FROM pg_replication_slots)", + ) + .fetch_one(&pool) + .await + .map_err(|e| ValidationError::ConnectionFailed(e.to_string()))?; + + if available_slots <= 0 { + return Err(ValidationError::InvalidConfig( + "No replication slots available. Increase max_replication_slots setting" + .to_string(), + )); + } + + Ok(()) + } +} + +fn parse_postgres_version(version_string: &str) -> Result { + let parts: Vec<&str> = version_string.split_whitespace().collect(); + if parts.len() < 2 { + return Err(format!("Invalid version string: {}", version_string)); + } + + let version_part = parts[1]; + let major_version_str = version_part + .split('.') + .next() + .ok_or_else(|| format!("Could not extract major version from: {}", version_part))?; + + major_version_str + .parse::() + .map_err(|e| format!("Failed to parse version number: {}", e)) +} diff --git a/etl-validation/tests/iceberg_validation.rs b/etl-validation/tests/iceberg_validation.rs new file mode 100644 index 000000000..c6037f98d --- /dev/null +++ b/etl-validation/tests/iceberg_validation.rs @@ -0,0 +1,190 @@ +use etl_config::shared::IcebergConfig; +use etl_validation::{DestinationValidator, ValidationError}; +use secrecy::SecretString; +use std::error::Error; + +const LAKEKEEPER_URL: &str = "http://localhost:8182"; +const PROJECT_ID_HEADER: &str = "x-project-id"; +const PROJECT_ID: &str = "00000000-0000-0000-0000-000000000000"; + +fn test_lakekeeper_config() -> IcebergConfig { + let catalog_uri = "http://localhost:8182/catalog".to_string(); + let warehouse_name = format!("test_warehouse_{}", uuid::Uuid::new_v4()); + + IcebergConfig::Rest { + catalog_uri, + warehouse_name, + namespace: None, + s3_access_key_id: SecretString::from("minio-admin"), + s3_secret_access_key: SecretString::from("minio-admin-password"), + s3_endpoint: "http://localhost:9010".to_string(), + } +} + +async fn create_lakekeeper_warehouse(warehouse_name: String) -> Result> { + let client = reqwest::Client::new(); + let url = format!("{}/management/v1/warehouse", LAKEKEEPER_URL); + + let key_prefix = uuid::Uuid::new_v4(); + let body = format!( + r#"{{ + "warehouse-name": "{}", + "project-id": "00000000-0000-0000-0000-000000000000", + "storage-profile": {{ + "type": "s3", + "bucket": "dev-and-test", + "key-prefix": "{}", + "endpoint": "http://minio:9000", + "region": "local-01", + "path-style-access": true, + "flavor": "minio", + "sts-enabled": true + }}, + "storage-credential": {{ + "type": "s3", + "credential-type": "access-key", + "aws-access-key-id": "minio-admin", + "aws-secret-access-key": "minio-admin-password" + }} + }}"#, + warehouse_name, key_prefix + ); + + let response = client + .post(url) + .header(PROJECT_ID_HEADER, PROJECT_ID) + .header("Content-Type", "application/json") + .body(body) + .send() + .await?; + + let json: serde_json::Value = response.json().await?; + let warehouse_id = json["warehouse-id"] + .as_str() + .ok_or_else(|| format!("Missing warehouse-id in response: {:?}", json))? + .parse()?; + + Ok(warehouse_id) +} + +async fn delete_lakekeeper_warehouse(warehouse_id: uuid::Uuid) -> Result<(), Box> { + let client = reqwest::Client::new(); + let url = format!( + "{}/management/v1/warehouse/{}", + LAKEKEEPER_URL, warehouse_id + ); + + const MAX_RETRIES: u8 = 10; + for _ in 0..MAX_RETRIES { + let response = client.delete(&url).send().await?; + + if response.status().is_success() { + return Ok(()); + } + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_valid_iceberg_rest_catalog_connection() { + let config = test_lakekeeper_config(); + + // Extract warehouse name for creation + let warehouse_name = match &config { + IcebergConfig::Rest { warehouse_name, .. } => warehouse_name.clone(), + _ => panic!("Expected Rest config"), + }; + + // Create warehouse in Lakekeeper + let warehouse_id = match create_lakekeeper_warehouse(warehouse_name).await { + Ok(id) => id, + Err(e) => { + if e.to_string().contains("Connection refused") || e.to_string().contains("connect") { + println!("WARNING: Lakekeeper not running at localhost:8182, skipping test"); + println!( + "Start services with: docker compose -f ./scripts/docker-compose.yaml up -d" + ); + return; + } + panic!("Failed to create warehouse: {}", e); + } + }; + + // Validate should succeed + let result = config.validate().await; + + // Clean up warehouse + let _ = delete_lakekeeper_warehouse(warehouse_id).await; + + match result { + Ok(_) => { + println!("Iceberg validation succeeded"); + } + Err(err) => { + panic!("Validation failed: {:?}", err); + } + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_invalid_iceberg_catalog_uri() { + let config = IcebergConfig::Rest { + catalog_uri: "http://localhost:9999/nonexistent".to_string(), // Invalid port + warehouse_name: "test-warehouse".to_string(), + namespace: None, + s3_access_key_id: SecretString::from("minio-admin"), + s3_secret_access_key: SecretString::from("minio-admin-password"), + s3_endpoint: "http://localhost:9010".to_string(), + }; + + // Validation should fail + let result = config.validate().await; + assert!( + result.is_err(), + "Validation should fail for invalid catalog URI" + ); + + match result.unwrap_err() { + ValidationError::ConnectionFailed(msg) => { + assert!( + msg.contains("Failed to create Iceberg REST catalog") + || msg.contains("Failed to list namespaces") + || msg.contains("Connection refused"), + "Expected connection error, got: {}", + msg + ); + } + err => panic!("Expected ConnectionFailed error, got: {:?}", err), + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_iceberg_supabase_config_structure() { + // Test that Supabase config can be created (without actual validation) + let config = IcebergConfig::Supabase { + project_ref: "test-project".to_string(), + warehouse_name: "test-warehouse".to_string(), + namespace: None, + catalog_token: SecretString::from("test-token"), + s3_access_key_id: SecretString::from("test-key"), + s3_secret_access_key: SecretString::from("test-secret"), + s3_region: "us-east-1".to_string(), + }; + + // This will fail to connect (since it's not real Supabase), but verifies config structure + let result = config.validate().await; + assert!( + result.is_err(), + "Validation should fail for fake Supabase config" + ); + + match result.unwrap_err() { + ValidationError::ConnectionFailed(_) => { + // Expected - can't connect to fake Supabase + println!("Supabase config structure is valid (connection failed as expected)"); + } + err => panic!("Expected ConnectionFailed error, got: {:?}", err), + } +} diff --git a/etl-validation/tests/source_validation.rs b/etl-validation/tests/source_validation.rs new file mode 100644 index 000000000..ad3a731b7 --- /dev/null +++ b/etl-validation/tests/source_validation.rs @@ -0,0 +1,74 @@ +use etl_config::shared::{PgConnectionConfig, TlsConfig}; +use etl_postgres::sqlx::test_utils::{create_pg_database, drop_pg_database}; +use etl_validation::{SourceValidator, ValidationError}; + +fn test_pg_config() -> PgConnectionConfig { + PgConnectionConfig { + host: "localhost".to_string(), + port: 5430, + username: "postgres".to_string(), + password: Some("postgres".to_string().into()), + name: format!("test_validator_{}", uuid::Uuid::new_v4()), + tls: TlsConfig { + trusted_root_certs: String::new(), + enabled: false, + }, + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_valid_postgres_connection() { + let config = test_pg_config(); + let _pool = create_pg_database(&config).await; + + let result = config.validate().await; + assert!(result.is_ok(), "Validation should succeed: {:?}", result); + + drop_pg_database(&config).await; +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_postgres_connection_failure() { + let mut config = test_pg_config(); + config.password = Some("wrong_password".to_string().into()); + + let result = config.validate().await; + assert!( + result.is_err(), + "Validation should fail with wrong password" + ); + + match result.unwrap_err() { + ValidationError::ConnectionFailed(msg) => { + assert!( + msg.contains("password authentication failed") || msg.contains("error"), + "Expected connection error, got: {}", + msg + ); + } + err => panic!("Expected ConnectionFailed error, got: {:?}", err), + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_postgres_nonexistent_database() { + let mut config = test_pg_config(); + config.name = format!("nonexistent_db_{}", uuid::Uuid::new_v4()); + + let result = config.validate().await; + assert!( + result.is_err(), + "Validation should fail for nonexistent database" + ); + + match result.unwrap_err() { + ValidationError::ConnectionFailed(msg) => { + assert!( + msg.contains("database") || msg.contains("does not exist") || msg.contains("error"), + "Expected database error, got: {}", + msg + ); + } + err => panic!("Expected ConnectionFailed error, got: {:?}", err), + } +}