Skip to content

Commit 637896c

Browse files
authored
ref(api): Update the update image endpoint behavior (supabase#336)
1 parent 94011da commit 637896c

File tree

5 files changed

+69
-134
lines changed

5 files changed

+69
-134
lines changed

etl-api/src/routes/pipelines.rs

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ pub enum PipelineError {
9595
#[error("A pipeline already exists for this source and destination combination")]
9696
DuplicatePipeline,
9797

98-
#[error("The specified image with id {0} was not found")]
99-
ImageNotFoundById(i64),
98+
#[error("The specified image id {0} does not match the default image id")]
99+
ImageIdNotDefault(i64),
100100

101101
#[error("There was an error while looking up table information in the source database: {0}")]
102102
TableLookup(#[from] TableLookupError),
@@ -152,13 +152,14 @@ impl ResponseError for PipelineError {
152152
| PipelineError::TableLookup(_)
153153
| PipelineError::InvalidTableReplicationState(_)
154154
| PipelineError::MissingTableReplicationState => StatusCode::INTERNAL_SERVER_ERROR,
155-
PipelineError::NotRollbackable(_) => StatusCode::BAD_REQUEST,
156155
PipelineError::PipelineNotFound(_)
157-
| PipelineError::ImageNotFoundById(_)
158-
| PipelineError::EtlStateNotInitialized => StatusCode::NOT_FOUND,
159-
PipelineError::TenantId(_)
160-
| PipelineError::SourceNotFound(_)
161-
| PipelineError::DestinationNotFound(_) => StatusCode::BAD_REQUEST,
156+
| PipelineError::EtlStateNotInitialized
157+
| PipelineError::ImageIdNotDefault(_)
158+
| PipelineError::DestinationNotFound(_)
159+
| PipelineError::SourceNotFound(_) => StatusCode::NOT_FOUND,
160+
PipelineError::TenantId(_) | PipelineError::NotRollbackable(_) => {
161+
StatusCode::BAD_REQUEST
162+
}
162163
PipelineError::DuplicatePipeline => StatusCode::CONFLICT,
163164
}
164165
}
@@ -237,9 +238,9 @@ pub struct ReadPipelinesResponse {
237238
}
238239

239240
#[derive(Debug, Serialize, Deserialize, ToSchema)]
240-
pub struct UpdatePipelineImageRequest {
241-
#[schema(example = 1)]
242-
pub image_id: Option<i64>,
241+
pub struct UpdatePipelineVersionRequest {
242+
#[schema(example = 1, required = true)]
243+
pub version_id: i64,
243244
}
244245

245246
#[derive(Debug, Serialize, Deserialize, ToSchema)]
@@ -1044,29 +1045,29 @@ pub async fn rollback_table_state(
10441045
}
10451046

10461047
#[utoipa::path(
1047-
summary = "Update pipeline image",
1048-
description = "Updates the pipeline's container image while preserving its state.",
1049-
request_body = UpdatePipelineImageRequest,
1048+
summary = "Update pipeline version",
1049+
description = "Updates the pipeline's version while preserving its state.",
1050+
request_body = UpdatePipelineVersionRequest,
10501051
params(
10511052
("pipeline_id" = i64, Path, description = "Unique ID of the pipeline"),
10521053
("tenant_id" = String, Header, description = "Tenant ID used to scope the request")
10531054
),
10541055
responses(
1055-
(status = 200, description = "Pipeline image updated successfully"),
1056+
(status = 200, description = "Pipeline version updated successfully"),
10561057
(status = 400, description = "Bad request or pipeline not running", body = ErrorMessage),
1057-
(status = 404, description = "Pipeline or image not found", body = ErrorMessage),
1058+
(status = 404, description = "Pipeline or version not found", body = ErrorMessage),
10581059
(status = 500, description = "Internal server error", body = ErrorMessage)
10591060
),
10601061
tag = "Pipelines"
10611062
)]
1062-
#[post("/pipelines/{pipeline_id}/update-image")]
1063-
pub async fn update_pipeline_image(
1063+
#[post("/pipelines/{pipeline_id}/update-version")]
1064+
pub async fn update_pipeline_version(
10641065
req: HttpRequest,
10651066
pool: Data<PgPool>,
10661067
encryption_key: Data<EncryptionKey>,
10671068
k8s_client: Data<dyn K8sClient>,
10681069
pipeline_id: Path<i64>,
1069-
update_request: Json<UpdatePipelineImageRequest>,
1070+
update_request: Json<UpdatePipelineVersionRequest>,
10701071
) -> Result<impl Responder, PipelineError> {
10711072
let tenant_id = extract_tenant_id(&req)?;
10721073
let pipeline_id = pipeline_id.into_inner();
@@ -1077,14 +1078,18 @@ pub async fn update_pipeline_image(
10771078
let (pipeline, replicator, current_image, source, destination) =
10781079
read_all_required_data(&mut txn, tenant_id, pipeline_id, &encryption_key).await?;
10791080

1080-
let target_image = match update_request.image_id {
1081-
Some(image_id) => db::images::read_image(txn.deref_mut(), image_id)
1082-
.await?
1083-
.ok_or(PipelineError::ImageNotFoundById(image_id))?,
1084-
None => db::images::read_default_image(txn.deref_mut())
1085-
.await?
1086-
.ok_or(PipelineError::NoDefaultImageFound)?,
1087-
};
1081+
// Only allow updating to the current default image. The client must provide the version id and
1082+
// it must match the default version id. If it does not, we consider this a race condition and we
1083+
// fail the update.
1084+
let default_image = db::images::read_default_image(txn.deref_mut())
1085+
.await?
1086+
.ok_or(PipelineError::NoDefaultImageFound)?;
1087+
1088+
if update_request.version_id != default_image.id {
1089+
return Err(PipelineError::ImageIdNotDefault(update_request.version_id));
1090+
}
1091+
1092+
let target_image = default_image;
10881093

10891094
// If the image ids are different, we change the database entry.
10901095
if target_image.id != current_image.id {

etl-api/src/startup.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@ use crate::{
4141
CreatePipelineRequest, CreatePipelineResponse, GetPipelineReplicationStatusResponse,
4242
GetPipelineStatusResponse, GetPipelineVersionResponse, ReadPipelineResponse,
4343
ReadPipelinesResponse, SimpleTableReplicationState, TableReplicationStatus,
44-
UpdatePipelineImageRequest, UpdatePipelineRequest, create_pipeline, delete_pipeline,
44+
UpdatePipelineRequest, UpdatePipelineVersionRequest, create_pipeline, delete_pipeline,
4545
get_pipeline_replication_status, get_pipeline_status, get_pipeline_version,
4646
read_all_pipelines, read_pipeline, rollback_table_state, start_pipeline,
4747
stop_all_pipelines, stop_pipeline, update_pipeline, update_pipeline_config,
48-
update_pipeline_image,
48+
update_pipeline_version,
4949
},
5050
sources::{
5151
CreateSourceRequest, CreateSourceResponse, ReadSourceResponse, ReadSourcesResponse,
@@ -183,7 +183,7 @@ pub async fn run(
183183
ReadPipelineResponse,
184184
ReadPipelinesResponse,
185185
GetPipelineVersionResponse,
186-
UpdatePipelineImageRequest,
186+
UpdatePipelineVersionRequest,
187187
GetPipelineStatusResponse,
188188
GetPipelineReplicationStatusResponse,
189189
TableReplicationStatus,
@@ -236,7 +236,7 @@ pub async fn run(
236236
crate::routes::pipelines::get_pipeline_version,
237237
crate::routes::pipelines::get_pipeline_replication_status,
238238
crate::routes::pipelines::rollback_table_state,
239-
crate::routes::pipelines::update_pipeline_image,
239+
crate::routes::pipelines::update_pipeline_version,
240240
crate::routes::tenants::create_tenant,
241241
crate::routes::tenants::create_or_update_tenant,
242242
crate::routes::tenants::read_tenant,
@@ -321,7 +321,7 @@ pub async fn run(
321321
.service(get_pipeline_version)
322322
.service(get_pipeline_replication_status)
323323
.service(rollback_table_state)
324-
.service(update_pipeline_image)
324+
.service(update_pipeline_version)
325325
.service(update_pipeline_config)
326326
//tables
327327
.service(read_table_names)

etl-api/tests/pipelines.rs

Lines changed: 24 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use etl_api::routes::pipelines::{
33
GetPipelineVersionResponse, ReadPipelineResponse, ReadPipelinesResponse,
44
RollbackTableStateRequest, RollbackTableStateResponse, RollbackType,
55
SimpleTableReplicationState, UpdatePipelineConfigRequest, UpdatePipelineConfigResponse,
6-
UpdatePipelineImageRequest, UpdatePipelineRequest,
6+
UpdatePipelineRequest, UpdatePipelineVersionRequest,
77
};
88
use etl_config::shared::{BatchConfig, PgConnectionConfig};
99
use etl_postgres::sqlx::test_utils::drop_pg_database;
@@ -250,7 +250,7 @@ async fn pipeline_with_another_tenants_source_cannot_be_created() {
250250
let response = app.create_pipeline(tenant1_id, &pipeline).await;
251251

252252
// Assert
253-
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
253+
assert_eq!(response.status(), StatusCode::NOT_FOUND);
254254
}
255255

256256
#[tokio::test(flavor = "multi_thread")]
@@ -283,7 +283,7 @@ async fn pipeline_with_another_tenants_destination_cannot_be_created() {
283283
let response = app.create_pipeline(tenant1_id, &pipeline).await;
284284

285285
// Assert
286-
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
286+
assert_eq!(response.status(), StatusCode::NOT_FOUND);
287287
}
288288

289289
#[tokio::test(flavor = "multi_thread")]
@@ -432,7 +432,7 @@ async fn pipeline_with_another_tenants_source_cannot_be_updated() {
432432
.await;
433433

434434
// Assert
435-
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
435+
assert_eq!(response.status(), StatusCode::NOT_FOUND);
436436
}
437437

438438
#[tokio::test(flavor = "multi_thread")]
@@ -480,7 +480,7 @@ async fn pipeline_with_another_tenants_destination_cannot_be_updated() {
480480
.await;
481481

482482
// Assert
483-
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
483+
assert_eq!(response.status(), StatusCode::NOT_FOUND);
484484
}
485485

486486
#[tokio::test(flavor = "multi_thread")]
@@ -666,11 +666,11 @@ async fn updating_pipeline_to_duplicate_source_destination_combination_fails() {
666666
}
667667

668668
#[tokio::test(flavor = "multi_thread")]
669-
async fn pipeline_image_can_be_updated_with_specific_image() {
669+
async fn pipeline_version_can_be_updated() {
670670
init_test_tracing();
671671
// Arrange
672672
let app = spawn_test_app().await;
673-
create_default_image(&app).await;
673+
let default_image_id = create_default_image(&app).await;
674674
let tenant_id = &create_tenant(&app).await;
675675
let source_id = create_source(&app, tenant_id).await;
676676
let destination_id = create_destination(&app, tenant_id).await;
@@ -685,65 +685,36 @@ async fn pipeline_image_can_be_updated_with_specific_image() {
685685
.await;
686686

687687
// Act
688-
let update_request = UpdatePipelineImageRequest {
689-
image_id: Some(1), // Use the default image ID
688+
let update_request = UpdatePipelineVersionRequest {
689+
version_id: default_image_id,
690690
};
691691
let response = app
692-
.update_pipeline_image(tenant_id, pipeline_id, &update_request)
692+
.update_pipeline_version(tenant_id, pipeline_id, &update_request)
693693
.await;
694694

695695
// Assert
696696
assert!(response.status().is_success());
697697
}
698698

699699
#[tokio::test(flavor = "multi_thread")]
700-
async fn pipeline_image_can_be_updated_to_default_image() {
701-
init_test_tracing();
702-
// Arrange
703-
let app = spawn_test_app().await;
704-
create_default_image(&app).await;
705-
let tenant_id = &create_tenant(&app).await;
706-
let source_id = create_source(&app, tenant_id).await;
707-
let destination_id = create_destination(&app, tenant_id).await;
708-
709-
let pipeline_id = create_pipeline_with_config(
710-
&app,
711-
tenant_id,
712-
source_id,
713-
destination_id,
714-
new_pipeline_config(),
715-
)
716-
.await;
717-
718-
// Act - update to default image (no image_id specified)
719-
let update_request = UpdatePipelineImageRequest { image_id: None };
720-
let response = app
721-
.update_pipeline_image(tenant_id, pipeline_id, &update_request)
722-
.await;
723-
724-
// Assert
725-
assert!(response.status().is_success());
726-
}
727-
728-
#[tokio::test(flavor = "multi_thread")]
729-
async fn update_image_fails_for_non_existing_pipeline() {
700+
async fn update_version_fails_for_non_existing_pipeline() {
730701
init_test_tracing();
731702
// Arrange
732703
let app = spawn_test_app().await;
733704
let tenant_id = &create_tenant(&app).await;
734705

735706
// Act
736-
let update_request = UpdatePipelineImageRequest { image_id: None };
707+
let update_request = UpdatePipelineVersionRequest { version_id: 1 };
737708
let response = app
738-
.update_pipeline_image(tenant_id, 42, &update_request)
709+
.update_pipeline_version(tenant_id, 42, &update_request)
739710
.await;
740711

741712
// Assert
742713
assert_eq!(response.status(), StatusCode::NOT_FOUND);
743714
}
744715

745716
#[tokio::test(flavor = "multi_thread")]
746-
async fn update_image_fails_for_non_existing_image() {
717+
async fn update_version_fails_when_version_is_not_default() {
747718
init_test_tracing();
748719
// Arrange
749720
let app = spawn_test_app().await;
@@ -761,63 +732,19 @@ async fn update_image_fails_for_non_existing_image() {
761732
)
762733
.await;
763734

764-
// Act
765-
let update_request = UpdatePipelineImageRequest {
766-
image_id: Some(999), // Non-existing image ID
767-
};
768-
let response = app
769-
.update_pipeline_image(tenant_id, pipeline_id, &update_request)
770-
.await;
735+
// Create a non-default image
736+
let non_default_image_id =
737+
create_image_with_name(&app, "another/image".to_string(), false).await;
771738

772-
// Assert
773-
assert_eq!(response.status(), StatusCode::NOT_FOUND);
774-
}
775-
776-
#[tokio::test(flavor = "multi_thread")]
777-
async fn update_image_fails_for_pipeline_from_another_tenant() {
778-
init_test_tracing();
779-
// Arrange
780-
let app = spawn_test_app().await;
781-
create_default_image(&app).await;
782-
let tenant1_id = &create_tenant(&app).await;
783-
784-
let source1_id = create_source(&app, tenant1_id).await;
785-
let destination1_id = create_destination(&app, tenant1_id).await;
786-
787-
let pipeline_id = create_pipeline_with_config(
788-
&app,
789-
tenant1_id,
790-
source1_id,
791-
destination1_id,
792-
new_pipeline_config(),
793-
)
794-
.await;
795-
796-
// Act - Try to update image using wrong tenant credentials
797-
let update_request = UpdatePipelineImageRequest { image_id: None };
798-
let response = app
799-
.update_pipeline_image("wrong-tenant-id", pipeline_id, &update_request)
800-
.await;
801-
802-
// Assert
803-
assert_eq!(response.status(), StatusCode::NOT_FOUND);
804-
}
805-
806-
#[tokio::test(flavor = "multi_thread")]
807-
async fn update_image_fails_when_no_default_image_exists() {
808-
init_test_tracing();
809-
// Arrange
810-
let app = spawn_test_app().await;
811-
// Don't create default image
812-
let tenant_id = &create_tenant(&app).await;
813-
814-
// Act - Try to update to default image when none exists
815-
let update_request = UpdatePipelineImageRequest { image_id: None };
739+
// Act - attempt update with a non-default image id
740+
let update_request = UpdatePipelineVersionRequest {
741+
version_id: non_default_image_id,
742+
};
816743
let response = app
817-
.update_pipeline_image(tenant_id, 1, &update_request)
744+
.update_pipeline_version(tenant_id, pipeline_id, &update_request)
818745
.await;
819746

820-
// Assert
747+
// Assert - mismatching image id should be rejected
821748
assert_eq!(response.status(), StatusCode::NOT_FOUND);
822749
}
823750

@@ -906,6 +833,7 @@ async fn update_config_fails_for_pipeline_from_another_tenant() {
906833
init_test_tracing();
907834
// Arrange
908835
let app = spawn_test_app().await;
836+
create_default_image(&app).await;
909837
let tenant1_id = &create_tenant(&app).await;
910838

911839
let source1_id = create_source(&app, tenant1_id).await;

etl-api/tests/support/mocks.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub async fn create_image_with_name(app: &TestApp, name: String, is_default: boo
2424
.json()
2525
.await
2626
.expect("failed to deserialize response");
27+
2728
response.id
2829
}
2930

@@ -258,18 +259,19 @@ pub mod pipelines {
258259
destination_id: i64,
259260
config: FullApiPipelineConfig,
260261
) -> i64 {
261-
// Ensure there is a default image available.
262-
super::create_default_image(app).await;
263262
let pipeline = CreatePipelineRequest {
264263
source_id,
265264
destination_id,
266265
config,
267266
};
268267
let response = app.create_pipeline(tenant_id, &pipeline).await;
268+
assert!(response.status().is_success(), "failed to created pipeline");
269+
269270
let response: CreatePipelineResponse = response
270271
.json()
271272
.await
272273
.expect("failed to deserialize response");
274+
273275
response.id
274276
}
275277
}

0 commit comments

Comments
 (0)