Skip to content

Commit 18e3432

Browse files
authored
feat(api): Limit maximum number of pipelines to 3 (#371)
1 parent 02a5172 commit 18e3432

File tree

6 files changed

+143
-2
lines changed

6 files changed

+143
-2
lines changed

etl-api/.sqlx/query-0f02ed5830e25fe721763c067db835db846fb41a1047310d3c8577906f469c5f.json

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

etl-api/src/db/pipelines.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ use crate::db::replicators::{ReplicatorsDbError, create_replicator};
1515
use crate::db::sources::Source;
1616
use crate::routes::connect_to_source_database_with_defaults;
1717

18+
/// Maximum number of pipelines allowed per tenant.
19+
pub const MAX_PIPELINES_PER_TENANT: i64 = 3;
20+
1821
pub struct Pipeline {
1922
pub id: i64,
2023
pub tenant_id: String,
@@ -47,6 +50,27 @@ pub enum PipelinesDbError {
4750
SlotError(#[from] slots::SlotError),
4851
}
4952

53+
pub async fn count_pipelines_for_tenant<'c, E>(
54+
executor: E,
55+
tenant_id: &str,
56+
) -> Result<i64, PipelinesDbError>
57+
where
58+
E: PgExecutor<'c>,
59+
{
60+
let record = sqlx::query!(
61+
r#"
62+
select count(*) as "count!"
63+
from app.pipelines
64+
where tenant_id = $1
65+
"#,
66+
tenant_id
67+
)
68+
.fetch_one(executor)
69+
.await?;
70+
71+
Ok(record.count)
72+
}
73+
5074
pub async fn create_pipeline(
5175
txn: &mut PgTransaction<'_>,
5276
tenant_id: &str,

etl-api/src/routes/destinations_pipelines.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ use crate::db;
1818
use crate::db::destinations::{DestinationsDbError, destination_exists};
1919
use crate::db::destinations_pipelines::DestinationPipelinesDbError;
2020
use crate::db::images::ImagesDbError;
21-
use crate::db::pipelines::{PipelinesDbError, read_pipeline};
21+
use crate::db::pipelines::{
22+
MAX_PIPELINES_PER_TENANT, PipelinesDbError, count_pipelines_for_tenant, read_pipeline,
23+
};
2224
use crate::db::sources::{SourcesDbError, source_exists};
2325

2426
#[derive(Debug, Error)]
@@ -47,6 +49,9 @@ enum DestinationPipelineError {
4749
#[error("A pipeline already exists for this source and destination combination")]
4850
DuplicatePipeline,
4951

52+
#[error("The maximum number of pipelines ({limit}) has been reached for this project")]
53+
PipelineLimitReached { limit: i64 },
54+
5055
#[error(transparent)]
5156
DestinationPipelinesDb(DestinationPipelinesDbError),
5257

@@ -116,6 +121,9 @@ impl ResponseError for DestinationPipelineError {
116121
StatusCode::BAD_REQUEST
117122
}
118123
DestinationPipelineError::DuplicatePipeline => StatusCode::CONFLICT,
124+
DestinationPipelineError::PipelineLimitReached { .. } => {
125+
StatusCode::UNPROCESSABLE_ENTITY
126+
}
119127
}
120128
}
121129

@@ -202,6 +210,13 @@ pub async fn create_destination_and_pipeline(
202210
));
203211
}
204212

213+
let pipeline_count = count_pipelines_for_tenant(txn.deref_mut(), tenant_id).await?;
214+
if pipeline_count >= MAX_PIPELINES_PER_TENANT {
215+
return Err(DestinationPipelineError::PipelineLimitReached {
216+
limit: MAX_PIPELINES_PER_TENANT,
217+
});
218+
}
219+
205220
let image = db::images::read_default_image(&**pool)
206221
.await?
207222
.ok_or(DestinationPipelineError::NoDefaultImageFound)?;

etl-api/src/routes/pipelines.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::configs::source::StoredSourceConfig;
2626
use crate::db;
2727
use crate::db::destinations::{Destination, DestinationsDbError, destination_exists};
2828
use crate::db::images::{Image, ImagesDbError};
29-
use crate::db::pipelines::{Pipeline, PipelinesDbError};
29+
use crate::db::pipelines::{MAX_PIPELINES_PER_TENANT, Pipeline, PipelinesDbError};
3030
use crate::db::replicators::{Replicator, ReplicatorsDbError};
3131
use crate::db::sources::{Source, SourcesDbError, source_exists};
3232
use crate::k8s::http::{
@@ -103,6 +103,9 @@ pub enum PipelineError {
103103
#[error("The specified image id {0} does not match the default image id")]
104104
ImageIdNotDefault(i64),
105105

106+
#[error("The maximum number of pipelines ({limit}) has been reached for this project")]
107+
PipelineLimitReached { limit: i64 },
108+
106109
#[error("There was an error while looking up table information in the source database: {0}")]
107110
TableLookup(#[from] TableLookupError),
108111

@@ -170,6 +173,7 @@ impl ResponseError for PipelineError {
170173
StatusCode::BAD_REQUEST
171174
}
172175
PipelineError::DuplicatePipeline => StatusCode::CONFLICT,
176+
PipelineError::PipelineLimitReached { .. } => StatusCode::UNPROCESSABLE_ENTITY,
173177
}
174178
}
175179

@@ -428,6 +432,14 @@ pub async fn create_pipeline(
428432
return Err(PipelineError::DestinationNotFound(pipeline.destination_id));
429433
}
430434

435+
let pipeline_count =
436+
db::pipelines::count_pipelines_for_tenant(txn.deref_mut(), tenant_id).await?;
437+
if pipeline_count >= MAX_PIPELINES_PER_TENANT {
438+
return Err(PipelineError::PipelineLimitReached {
439+
limit: MAX_PIPELINES_PER_TENANT,
440+
});
441+
}
442+
431443
let image = db::images::read_default_image(txn.deref_mut())
432444
.await?
433445
.ok_or(PipelineError::NoDefaultImageFound)?;

etl-api/tests/destinations_pipelines.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,42 @@ async fn destination_and_pipeline_can_be_created() {
7878
insta::assert_debug_snapshot!(response.config);
7979
}
8080

81+
#[tokio::test(flavor = "multi_thread")]
82+
async fn tenant_cannot_create_more_than_three_destinations_pipelines() {
83+
init_test_tracing();
84+
// Arrange
85+
let app = spawn_test_app().await;
86+
let tenant_id = &create_tenant(&app).await;
87+
let source_id = create_source(&app, tenant_id).await;
88+
create_default_image(&app).await;
89+
90+
for idx in 0..3 {
91+
let destination_pipeline = CreateDestinationPipelineRequest {
92+
destination_name: format!("BigQuery Destination {idx}"),
93+
destination_config: new_destination_config(),
94+
source_id,
95+
pipeline_config: new_pipeline_config(),
96+
};
97+
let response = app
98+
.create_destination_pipeline(tenant_id, &destination_pipeline)
99+
.await;
100+
assert!(response.status().is_success());
101+
}
102+
103+
let destination_pipeline = CreateDestinationPipelineRequest {
104+
destination_name: "BigQuery Destination 3".to_string(),
105+
destination_config: new_destination_config(),
106+
source_id,
107+
pipeline_config: new_pipeline_config(),
108+
};
109+
let response = app
110+
.create_destination_pipeline(tenant_id, &destination_pipeline)
111+
.await;
112+
113+
// Assert
114+
assert_eq!(response.status(), StatusCode::UNPROCESSABLE_ENTITY);
115+
}
116+
81117
#[tokio::test(flavor = "multi_thread")]
82118
async fn destination_and_pipeline_with_another_tenants_source_cannot_be_created() {
83119
init_test_tracing();

etl-api/tests/pipelines.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,38 @@ async fn pipeline_can_be_created() {
221221
assert_eq!(response.id, 1);
222222
}
223223

224+
#[tokio::test(flavor = "multi_thread")]
225+
async fn tenant_cannot_create_more_than_three_pipelines() {
226+
init_test_tracing();
227+
// Arrange
228+
let app = spawn_test_app().await;
229+
create_default_image(&app).await;
230+
let tenant_id = &create_tenant(&app).await;
231+
232+
for _ in 0..3 {
233+
let source_id = create_source(&app, tenant_id).await;
234+
let destination_id = create_destination(&app, tenant_id).await;
235+
let pipeline = CreatePipelineRequest {
236+
source_id,
237+
destination_id,
238+
config: new_pipeline_config(),
239+
};
240+
let response = app.create_pipeline(tenant_id, &pipeline).await;
241+
assert!(response.status().is_success());
242+
}
243+
244+
let source_id = create_source(&app, tenant_id).await;
245+
let destination_id = create_destination(&app, tenant_id).await;
246+
let pipeline = CreatePipelineRequest {
247+
source_id,
248+
destination_id,
249+
config: new_pipeline_config(),
250+
};
251+
let response = app.create_pipeline(tenant_id, &pipeline).await;
252+
253+
assert_eq!(response.status(), StatusCode::UNPROCESSABLE_ENTITY);
254+
}
255+
224256
#[tokio::test(flavor = "multi_thread")]
225257
async fn pipeline_with_another_tenants_source_cannot_be_created() {
226258
init_test_tracing();

0 commit comments

Comments
 (0)