Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 52 additions & 0 deletions etl-api/src/db/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,58 @@ where
Ok(pipelines)
}

pub async fn find_pipelines_by_publication<'c, E>(
executor: E,
tenant_id: &str,
source_id: i64,
publication_name: &str,
) -> Result<Vec<Pipeline>, PipelinesDbError>
where
E: PgExecutor<'c>,
{
let records = sqlx::query!(
r#"
select p.id,
p.tenant_id,
source_id,
s.name as source_name,
destination_id,
d.name as destination_name,
replicator_id,
p.config
from app.pipelines p
join app.sources s on p.source_id = s.id
join app.destinations d on p.destination_id = d.id
where p.tenant_id = $1
and p.source_id = $2
and p.config->>'publication_name' = $3
"#,
tenant_id,
source_id,
publication_name,
)
.fetch_all(executor)
.await?;

let mut pipelines = Vec::with_capacity(records.len());
for record in records {
let config = deserialize_from_value::<StoredPipelineConfig>(record.config.clone())?;

pipelines.push(Pipeline {
id: record.id,
tenant_id: record.tenant_id,
source_id: record.source_id,
source_name: record.source_name,
destination_id: record.destination_id,
destination_name: record.destination_name,
replicator_id: record.replicator_id,
config,
});
}

Ok(pipelines)
}

pub async fn update_pipeline_config(
txn: &mut PgTransaction<'_>,
tenant_id: &str,
Expand Down
18 changes: 18 additions & 0 deletions etl-api/src/db/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,21 @@ pub async fn read_all_publications(pool: &PgPool) -> Result<Vec<Publication>, Pu

Ok(publications)
}

pub async fn publication_exists(
pool: &PgPool,
publication_name: &str,
) -> Result<bool, PublicationsDbError> {
let exists = sqlx::query_scalar!(
r#"
select exists(
select 1 from pg_publication where pubname = $1
) as "exists!"
"#,
publication_name
)
.fetch_one(pool)
.await?;

Ok(exists)
}
81 changes: 66 additions & 15 deletions etl-api/src/routes/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::db;
use crate::db::destinations::{DestinationsDbError, destination_exists};
use crate::db::images::ImagesDbError;
use crate::db::pipelines::{MAX_PIPELINES_PER_TENANT, PipelinesDbError, read_pipeline_components};
use crate::db::publications::PublicationsDbError;
use crate::db::replicators::ReplicatorsDbError;
use crate::db::sources::{SourcesDbError, source_exists};
use crate::k8s::core::{
Expand Down Expand Up @@ -51,6 +52,9 @@ pub enum PipelineError {
#[error("No default image was found")]
NoDefaultImageFound,

#[error("The publication '{0}' was not found on the source database")]
PublicationNotFound(String),

#[error(transparent)]
TenantId(#[from] TenantIdError),

Expand Down Expand Up @@ -78,6 +82,9 @@ pub enum PipelineError {
#[error(transparent)]
DestinationsDb(#[from] DestinationsDbError),

#[error(transparent)]
PublicationsDb(#[from] PublicationsDbError),

#[error(transparent)]
PipelinesDb(PipelinesDbError),

Expand Down Expand Up @@ -128,6 +135,7 @@ impl PipelineError {
// Do not expose internal database details in error messages
PipelineError::SourcesDb(SourcesDbError::Database(_))
| PipelineError::DestinationsDb(DestinationsDbError::Database(_))
| PipelineError::PublicationsDb(PublicationsDbError::Database(_))
| PipelineError::PipelinesDb(PipelinesDbError::Database(_))
| PipelineError::ReplicatorsDb(ReplicatorsDbError::Database(_))
| PipelineError::ImagesDb(ImagesDbError::Database(_))
Expand All @@ -147,6 +155,7 @@ impl ResponseError for PipelineError {
| PipelineError::NoDefaultImageFound
| PipelineError::SourcesDb(_)
| PipelineError::DestinationsDb(_)
| PipelineError::PublicationsDb(_)
| PipelineError::PipelinesDb(_)
| PipelineError::ReplicatorsDb(_)
| PipelineError::ImagesDb(_)
Expand All @@ -162,9 +171,9 @@ impl ResponseError for PipelineError {
| PipelineError::ImageIdNotDefault(_)
| PipelineError::DestinationNotFound(_)
| PipelineError::SourceNotFound(_) => StatusCode::NOT_FOUND,
PipelineError::TenantId(_) | PipelineError::NotRollbackable(_) => {
StatusCode::BAD_REQUEST
}
PipelineError::TenantId(_)
| PipelineError::NotRollbackable(_)
| PipelineError::PublicationNotFound(_) => StatusCode::BAD_REQUEST,
PipelineError::DuplicatePipeline => StatusCode::CONFLICT,
PipelineError::PipelineLimitReached { .. } => StatusCode::UNPROCESSABLE_ENTITY,
}
Expand Down Expand Up @@ -460,19 +469,23 @@ pub struct GetPipelineVersionResponse {
pub async fn create_pipeline(
req: HttpRequest,
pool: Data<PgPool>,
encryption_key: Data<EncryptionKey>,
pipeline: Json<CreatePipelineRequest>,
) -> Result<impl Responder, PipelineError> {
let tenant_id = extract_tenant_id(&req)?;
let pipeline = pipeline.into_inner();

let mut txn = pool.begin().await?;
if !source_exists(txn.deref_mut(), tenant_id, pipeline.source_id).await? {
return Err(PipelineError::SourceNotFound(pipeline.source_id));
}

if !destination_exists(txn.deref_mut(), tenant_id, pipeline.destination_id).await? {
return Err(PipelineError::DestinationNotFound(pipeline.destination_id));
}
validate_pipeline_inputs(
&mut txn,
tenant_id,
pipeline.source_id,
pipeline.destination_id,
&pipeline.config.publication_name,
&encryption_key,
)
.await?;

let pipeline_count =
db::pipelines::count_pipelines_for_tenant(txn.deref_mut(), tenant_id).await?;
Expand Down Expand Up @@ -566,6 +579,7 @@ pub async fn read_pipeline(
pub async fn update_pipeline(
req: HttpRequest,
pool: Data<PgPool>,
encryption_key: Data<EncryptionKey>,
pipeline_id: Path<i64>,
pipeline: Json<UpdatePipelineRequest>,
) -> Result<impl Responder, PipelineError> {
Expand All @@ -574,13 +588,16 @@ pub async fn update_pipeline(
let pipeline = pipeline.into_inner();

let mut txn = pool.begin().await?;
if !source_exists(txn.deref_mut(), tenant_id, pipeline.source_id).await? {
return Err(PipelineError::SourceNotFound(pipeline.source_id));
}

if !destination_exists(txn.deref_mut(), tenant_id, pipeline.destination_id).await? {
return Err(PipelineError::DestinationNotFound(pipeline.destination_id));
}
validate_pipeline_inputs(
&mut txn,
tenant_id,
pipeline.source_id,
pipeline.destination_id,
&pipeline.config.publication_name,
&encryption_key,
)
.await?;

db::pipelines::update_pipeline(
txn.deref_mut(),
Expand Down Expand Up @@ -1225,3 +1242,37 @@ pub async fn update_pipeline_config(

Ok(Json(response))
}

/// Validates pipeline inputs: checks source/destination existence and publication validity.
async fn validate_pipeline_inputs(
txn: &mut sqlx::PgTransaction<'_>,
tenant_id: &str,
source_id: i64,
destination_id: i64,
publication_name: &str,
encryption_key: &EncryptionKey,
) -> Result<(), PipelineError> {
if !source_exists(txn.deref_mut(), tenant_id, source_id).await? {
return Err(PipelineError::SourceNotFound(source_id));
}

if !destination_exists(txn.deref_mut(), tenant_id, destination_id).await? {
return Err(PipelineError::DestinationNotFound(destination_id));
}

let source = db::sources::read_source(txn.deref_mut(), tenant_id, source_id, encryption_key)
.await?
.ok_or(PipelineError::SourceNotFound(source_id))?;

let source_pool =
connect_to_source_database_with_defaults(&source.config.into_connection_config()).await?;

let exists = db::publications::publication_exists(&source_pool, publication_name).await?;
if !exists {
return Err(PipelineError::PublicationNotFound(
publication_name.to_string(),
));
}

Ok(())
}
Loading