Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ members = [
"etl-postgres",
"etl-replicator",
"etl-telemetry",
"etl-validation",
]

[workspace.package]
Expand All @@ -27,6 +28,7 @@ etl-destinations = { path = "etl-destinations", default-features = false }
etl-postgres = { path = "etl-postgres", default-features = false }
etl-replicator = { path = "etl-replicator", default-features = false }
etl-telemetry = { path = "etl-telemetry", default-features = false }
etl-validation = { path = "etl-validation", default-features = false }

actix-web = { version = "4.11.0", default-features = false }
actix-web-httpauth = { version = "0.8.2", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions etl-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ name = "etl-api"
etl-config = { workspace = true, features = ["utoipa"] }
etl-postgres = { workspace = true, features = ["replication"] }
etl-telemetry = { workspace = true }
etl-validation = { workspace = true }

actix-web = { workspace = true, features = ["macros", "http2"] }
actix-web-httpauth = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions etl-api/src/configs/destination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ impl StoredDestinationConfig {
}
}

impl From<FullApiDestinationConfig> for DestinationConfig {
fn from(value: FullApiDestinationConfig) -> Self {
let stored_config: StoredDestinationConfig = value.into();
stored_config.into_etl_config()
}
}

impl From<FullApiDestinationConfig> for StoredDestinationConfig {
fn from(value: FullApiDestinationConfig) -> Self {
match value {
Expand Down
40 changes: 34 additions & 6 deletions etl-api/src/routes/destinations.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
use crate::configs::destination::FullApiDestinationConfig;
use crate::configs::encryption::EncryptionKey;
use crate::db;
use crate::db::destinations::DestinationsDbError;
use crate::routes::{ErrorMessage, TenantIdError, TestConnectionResponse, extract_tenant_id};
use actix_web::{
HttpRequest, HttpResponse, Responder, ResponseError, delete, get,
http::{StatusCode, header::ContentType},
post,
web::{Data, Json, Path},
};
use etl_config::shared::DestinationConfig;
use etl_validation::DestinationValidator;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use thiserror::Error;
use utoipa::ToSchema;

use crate::configs::destination::FullApiDestinationConfig;
use crate::configs::encryption::EncryptionKey;
use crate::db;
use crate::db::destinations::DestinationsDbError;
use crate::routes::{ErrorMessage, TenantIdError, extract_tenant_id};

#[derive(Debug, Error)]
pub enum DestinationError {
#[error("The destination with id {0} was not found")]
Expand Down Expand Up @@ -283,3 +284,30 @@ pub async fn read_all_destinations(

Ok(Json(response))
}

#[utoipa::path(
summary = "Test a destination connection (pre-creation)",
description = "Tests a destination connection without creating a resource. Validates connectivity, permissions, and configuration for BigQuery or Iceberg destinations. Useful for validating credentials before creating a destination.",
request_body = FullApiDestinationConfig,
responses(
(status = 200, description = "Connection test successful", body = TestConnectionResponse),
(status = 400, description = "Bad request or connection test failed", body = ErrorMessage),
(status = 500, description = "Internal server error", body = ErrorMessage),
),
tag = "Destinations"
)]
#[post("/destinations/test-connection")]
pub async fn test_destination_connection(
config: Json<FullApiDestinationConfig>,
) -> Result<impl Responder, DestinationError> {
let destination_config: DestinationConfig = config.into_inner().into();

destination_config.validate().await.map_err(|e| {
DestinationsDbError::Database(sqlx::Error::Configuration(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
e,
))))
Comment on lines +305 to +309

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Destination validation errors mapped to 500 internal error

Destination connection validation errors are converted into DestinationsDbError::Database(sqlx::Error::Configuration(..)), which DestinationError renders as HTTP 500 with the generic "internal server error" message. The route’s OpenAPI docs include a 400 response for connection test failures, and ValidationError provides specific, user‑actionable reasons (e.g., bad BigQuery key, unreachable Iceberg catalog). Currently all such user errors surface as 500s without details, making clients unable to tell misconfiguration from server faults.

Useful? React with 👍 / 👎.

})?;

Ok(Json(TestConnectionResponse { valid: true }))
}
6 changes: 6 additions & 0 deletions etl-api/src/routes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ const MIN_POOL_CONNECTIONS: u32 = 1;
/// Maximum number of connections for the source Postgres connection pool.
const MAX_POOL_CONNECTIONS: u32 = 1;

#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct TestConnectionResponse {
#[schema(example = true)]
pub valid: bool,
}

#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct ErrorMessage {
#[schema(example = "an error occurred in the api")]
Expand Down
32 changes: 30 additions & 2 deletions etl-api/src/routes/sources.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use crate::configs::encryption::EncryptionKey;
use crate::configs::source::{FullApiSourceConfig, StrippedApiSourceConfig};
use crate::configs::source::{FullApiSourceConfig, StoredSourceConfig, StrippedApiSourceConfig};
use crate::db;
use crate::db::sources::SourcesDbError;
use crate::routes::{ErrorMessage, TenantIdError, extract_tenant_id};
use crate::routes::{ErrorMessage, TenantIdError, TestConnectionResponse, extract_tenant_id};
use actix_web::{
HttpRequest, HttpResponse, Responder, ResponseError, delete, get,
http::{StatusCode, header::ContentType},
post,
web::{Data, Json, Path},
};
use etl_validation::SourceValidator;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use thiserror::Error;
Expand Down Expand Up @@ -282,3 +283,30 @@ pub async fn read_all_sources(

Ok(Json(response))
}

#[utoipa::path(
summary = "Test a source connection (pre-creation)",
description = "Tests a PostgreSQL source connection without creating a resource. Validates connectivity, version, WAL level, permissions, and replication slot availability. Useful for validating credentials before creating a source.",
request_body = FullApiSourceConfig,
responses(
(status = 200, description = "Connection test successful", body = TestConnectionResponse),
(status = 400, description = "Bad request or connection test failed", body = ErrorMessage),
(status = 500, description = "Internal server error", body = ErrorMessage),
),
tag = "Sources"
)]
#[post("/sources/test-connection")]
pub async fn test_source_connection(
config: Json<FullApiSourceConfig>,
) -> Result<impl Responder, SourceError> {
let config = config.into_inner();
let stored_config: StoredSourceConfig = config.into();
let pg_config = stored_config.into_connection_config();

pg_config
.validate()
.await
.map_err(|e| SourcesDbError::Database(sqlx::Error::Configuration(Box::new(e))))?;
Comment on lines +306 to +309

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Return source validation failures as client errors

Validation failures from pg_config.validate() are wrapped into SourcesDbError::Database(...) here, which SourceError::status_code returns as HTTP 500 with the generic "internal server error" body. The OpenAPI annotation for this endpoint advertises 400 for failed connection tests, and the new ValidationError types distinguish misconfiguration from server faults. With the current mapping, bad credentials, non‑logical wal_level, or missing replication slots will be reported as 500s with no actionable message, preventing clients from telling user errors apart from real server failures.

Useful? React with 👍 / 👎.


Ok(Json(TestConnectionResponse { valid: true }))
}
15 changes: 11 additions & 4 deletions etl-api/src/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ use crate::{
configs::encryption,
db::publications::Publication,
routes::{
TestConnectionResponse,
destinations::{
CreateDestinationRequest, CreateDestinationResponse, ReadDestinationResponse,
ReadDestinationsResponse, UpdateDestinationRequest, create_destination,
delete_destination, read_all_destinations, read_destination, update_destination,
delete_destination, read_all_destinations, read_destination,
test_destination_connection, update_destination,
},
destinations_pipelines::{
CreateDestinationPipelineRequest, CreateDestinationPipelineResponse,
Expand Down Expand Up @@ -60,7 +62,7 @@ use crate::{
},
read_all_sources, read_source,
tables::read_table_names,
update_source,
test_source_connection, update_source,
},
tenants::{
CreateOrUpdateTenantRequest, CreateOrUpdateTenantResponse, CreateTenantRequest,
Expand Down Expand Up @@ -242,6 +244,7 @@ pub async fn run(
ReadTenantsResponse,
CreateSourceRequest,
CreateSourceResponse,
TestConnectionResponse,
UpdateSourceRequest,
ReadSourceResponse,
ReadSourcesResponse,
Expand Down Expand Up @@ -293,6 +296,7 @@ pub async fn run(
crate::routes::sources::update_source,
crate::routes::sources::delete_source,
crate::routes::sources::read_all_sources,
crate::routes::sources::test_source_connection,
crate::routes::sources::publications::create_publication,
crate::routes::sources::publications::read_publication,
crate::routes::sources::publications::update_publication,
Expand All @@ -304,6 +308,7 @@ pub async fn run(
crate::routes::destinations::update_destination,
crate::routes::destinations::delete_destination,
crate::routes::destinations::read_all_destinations,
crate::routes::destinations::test_destination_connection,
crate::routes::tenants_sources::create_tenant_and_source,
crate::routes::destinations_pipelines::create_destination_and_pipeline,
crate::routes::destinations_pipelines::update_destination_and_pipeline,
Expand Down Expand Up @@ -343,16 +348,18 @@ pub async fn run(
.service(read_all_tenants)
//sources
.service(create_source)
.service(read_all_sources)
.service(test_source_connection)
.service(read_source)
.service(update_source)
.service(delete_source)
.service(read_all_sources)
//destinations
.service(create_destination)
.service(read_all_destinations)
.service(test_destination_connection)
.service(read_destination)
.service(update_destination)
.service(delete_destination)
.service(read_all_destinations)
//pipelines
.service(create_pipeline)
.service(read_pipeline)
Expand Down
39 changes: 35 additions & 4 deletions etl-api/tests/support/test_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,28 @@ impl TestApp {
.expect("failed to execute request")
}

pub async fn test_source_connection(
&self,
config: &etl_api::configs::source::FullApiSourceConfig,
) -> reqwest::Response {
self.post_authenticated(format!("{}/v1/sources/test-connection", &self.address))
.json(config)
.send()
.await
.expect("Failed to execute request.")
}

pub async fn test_destination_connection(
&self,
config: &etl_api::configs::destination::FullApiDestinationConfig,
) -> reqwest::Response {
self.post_authenticated(format!("{}/v1/destinations/test-connection", &self.address))
.json(config)
.send()
.await
.expect("Failed to execute request.")
}

pub async fn create_pipeline(
&self,
tenant_id: &str,
Expand Down Expand Up @@ -501,12 +523,21 @@ impl Drop for TestApp {
// First, abort the server task to ensure it's terminated.
self.server_handle.abort();

// Clone the config to avoid move issues
let db_config = self.config.database.clone();

// To use `block_in_place,` we need a multithreaded runtime since when a blocking
// task is issued, the runtime will offload existing tasks to another worker.
tokio::task::block_in_place(move || {
Handle::current()
.block_on(async move { drop_pg_database(&self.config.database).await });
});
// Wrap in catch_unwind to ensure panics during cleanup don't propagate
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
tokio::task::block_in_place(|| {
Handle::current().block_on(async {
// Give server time to shut down gracefully
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
drop_pg_database(&db_config).await;
});
});
}));
}
}

Expand Down
Loading