Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion etl-api/src/routes/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::configs::source::{FullApiSourceConfig, StoredSourceConfig, StrippedAp
use crate::db::sources::SourcesDbError;
use crate::k8s::TrustedRootCertsCache;
use crate::routes::{ErrorMessage, TenantIdError, extract_tenant_id};
use crate::validation::ValidationError;
use crate::validation::{FailureType, ValidationError, ValidationFailure};
use crate::{db, routes::common, routes::utils};
use actix_web::{
HttpRequest, HttpResponse, Responder, ResponseError, delete, get,
Expand Down Expand Up @@ -135,6 +135,37 @@ pub struct ReadSourcesResponse {
pub sources: Vec<ReadSourceResponse>,
}

#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct ValidateSourceRequest {
#[schema(required = true)]
pub config: FullApiSourceConfig,
}

#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct ValidationFailureResponse {
#[schema(example = "source Role Mismatch")]
pub name: String,
#[schema(example = "Source role 'postgres' does not match trusted username 'etl_user'")]
pub reason: String,
#[schema(example = "critical")]
pub failure_type: FailureType,
}

impl From<ValidationFailure> for ValidationFailureResponse {
fn from(failure: ValidationFailure) -> Self {
Self {
name: failure.name,
reason: failure.reason,
failure_type: failure.failure_type,
}
}
}

#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct ValidateSourceResponse {
pub validation_failures: Vec<ValidationFailureResponse>,
}

#[utoipa::path(
summary = "Create a source",
description = "Creates a source for the specified tenant.",
Expand Down Expand Up @@ -183,6 +214,43 @@ pub async fn create_source(
Ok(Json(response))
}

#[utoipa::path(
summary = "Validate source configuration",
description = "Validates source access using the source validation checks configured for the API.",
request_body = ValidateSourceRequest,
params(
("tenant_id" = String, Header, description = "Tenant ID used to scope the request")
),
responses(
(status = 200, description = "Validation completed", body = ValidateSourceResponse),
(status = 400, description = "Bad request", body = ErrorMessage),
(status = 500, description = "Internal server error", body = ErrorMessage)
),
tag = "Sources"
)]
#[post("/sources/validate")]
pub async fn validate_source(
req: HttpRequest,
api_config: Data<ApiConfig>,
trusted_root_certs_cache: Data<TrustedRootCertsCache>,
request: Json<ValidateSourceRequest>,
) -> Result<impl Responder, SourceError> {
let _tenant_id = extract_tenant_id(&req)?;
let request = request.into_inner();

let failures = common::validate_source_config(
request.config.into(),
api_config.as_ref(),
trusted_root_certs_cache.as_ref(),
)
.await?;
let response = ValidateSourceResponse {
validation_failures: failures.into_iter().map(Into::into).collect(),
};

Ok(Json(response))
}

#[utoipa::path(
summary = "Retrieve a source",
description = "Returns a source by ID. Sensitive fields are omitted from the configuration.",
Expand Down
9 changes: 7 additions & 2 deletions etl-api/src/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,15 @@ use crate::{
},
sources::{
CreateSourceRequest, CreateSourceResponse, ReadSourceResponse, ReadSourcesResponse,
UpdateSourceRequest, create_source, delete_source,
UpdateSourceRequest, ValidateSourceRequest, ValidateSourceResponse, create_source,
delete_source,
publications::{
CreatePublicationRequest, UpdatePublicationRequest, create_publication,
delete_publication, read_all_publications, read_publication, update_publication,
},
read_all_sources, read_source,
tables::read_table_names,
update_source,
update_source, validate_source,
},
tenants::{
CreateOrUpdateTenantRequest, CreateOrUpdateTenantResponse, CreateTenantRequest,
Expand Down Expand Up @@ -265,6 +266,8 @@ pub async fn run(
UpdateSourceRequest,
ReadSourceResponse,
ReadSourcesResponse,
ValidateSourceRequest,
ValidateSourceResponse,
CreatePublicationRequest,
UpdatePublicationRequest,
Publication,
Expand Down Expand Up @@ -316,6 +319,7 @@ pub async fn run(
crate::routes::sources::update_source,
crate::routes::sources::delete_source,
crate::routes::sources::read_all_sources,
crate::routes::sources::validate_source,
crate::routes::sources::publications::create_publication,
crate::routes::sources::publications::read_publication,
crate::routes::sources::publications::update_publication,
Expand Down Expand Up @@ -372,6 +376,7 @@ pub async fn run(
.service(update_source)
.service(delete_source)
.service(read_all_sources)
.service(validate_source)
//destinations
.service(validate_destination)
.service(create_destination)
Expand Down
56 changes: 55 additions & 1 deletion etl-api/tests/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use etl_api::configs::source::FullApiSourceConfig;
use etl_api::routes::pipelines::{CreatePipelineRequest, CreatePipelineResponse};
use etl_api::routes::sources::{
CreateSourceRequest, CreateSourceResponse, ReadSourceResponse, ReadSourcesResponse,
UpdateSourceRequest,
UpdateSourceRequest, ValidateSourceRequest, ValidateSourceResponse,
};
use etl_config::SerializableSecretString;
use etl_config::shared::PgConnectionConfig;
Expand Down Expand Up @@ -309,6 +309,30 @@ async fn source_creation_with_matching_trusted_username_succeeds() {
drop_trusted_source_database(trusted_source).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn source_validation_with_matching_trusted_username_succeeds() {
init_test_tracing();

let trusted_source = create_trusted_source_database().await;
let app =
spawn_test_app_with_trusted_username(Some(trusted_source.trusted_username.clone())).await;
let tenant_id = &create_tenant(&app).await;
let request = ValidateSourceRequest {
config: source_config_from_db_config(&trusted_source.trusted_config),
};

let response = app.validate_source(tenant_id, &request).await;

assert!(response.status().is_success());
let response: ValidateSourceResponse = response
.json()
.await
.expect("failed to deserialize response");
assert!(response.validation_failures.is_empty());

drop_trusted_source_database(trusted_source).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn source_creation_with_non_matching_trusted_username_fails() {
init_test_tracing();
Expand Down Expand Up @@ -343,6 +367,36 @@ async fn source_creation_with_non_matching_trusted_username_fails() {
);
}

#[tokio::test(flavor = "multi_thread")]
async fn source_validation_with_non_matching_trusted_username_returns_failure() {
init_test_tracing();

let mut source_db_config = get_test_db_config();
source_db_config.name = format!("test_source_db_{}", Uuid::new_v4());

let different_username = "different_user".to_string();
let app = spawn_test_app_with_trusted_username(Some(different_username)).await;
let tenant_id = &create_tenant(&app).await;

let _source_pool = create_pg_database(&source_db_config).await;

let request = ValidateSourceRequest {
config: source_config_from_db_config(&source_db_config),
};

let response = app.validate_source(tenant_id, &request).await;

assert!(response.status().is_success());
let response: ValidateSourceResponse = response
.json()
.await
.expect("failed to deserialize response");
assert!(
!response.validation_failures.is_empty(),
"Expected validation failures for non-matching trusted username"
);
}

#[tokio::test(flavor = "multi_thread")]
async fn source_update_with_matching_trusted_username_succeeds() {
init_test_tracing();
Expand Down
15 changes: 14 additions & 1 deletion etl-api/tests/support/test_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use etl_api::routes::pipelines::{
CreatePipelineRequest, RollbackTablesRequest, UpdatePipelineRequest,
UpdatePipelineVersionRequest,
};
use etl_api::routes::sources::{CreateSourceRequest, UpdateSourceRequest};
use etl_api::routes::sources::{CreateSourceRequest, UpdateSourceRequest, ValidateSourceRequest};
use etl_api::routes::tenants::{
CreateOrUpdateTenantRequest, CreateTenantRequest, UpdateTenantRequest,
};
Expand Down Expand Up @@ -183,6 +183,19 @@ impl TestApp {
.expect("failed to execute request")
}

pub async fn validate_source(
&self,
tenant_id: &str,
source: &ValidateSourceRequest,
) -> reqwest::Response {
self.post_authenticated(format!("{}/v1/sources/validate", &self.address))
.header("tenant_id", tenant_id)
.json(source)
.send()
.await
.expect("Failed to execute request.")
}

pub async fn create_destination(
&self,
tenant_id: &str,
Expand Down
Loading