Skip to content

Commit 75c1b20

Browse files
authored
feat(api): Add endpoint for returning pipeline version (#327)
1 parent c3961d1 commit 75c1b20

File tree

5 files changed

+290
-20
lines changed

5 files changed

+290
-20
lines changed

etl-api/src/routes/pipelines.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::k8s_client::{RESTARTED_AT_ANNOTATION_KEY, TRUSTED_ROOT_CERT_KEY_NAME}
3131
use crate::routes::{
3232
ErrorMessage, TenantIdError, connect_to_source_database_with_defaults, extract_tenant_id,
3333
};
34+
use crate::utils::parse_docker_image_tag;
3435

3536
#[derive(Debug, Error)]
3637
pub enum PipelineError {
@@ -368,6 +369,23 @@ pub enum PipelineStatus {
368369
Failed,
369370
}
370371

372+
#[derive(Debug, Serialize, Deserialize, ToSchema)]
373+
pub struct PipelineVersion {
374+
#[schema(example = 1)]
375+
pub id: i64,
376+
#[schema(example = "1.2.3")]
377+
pub name: String,
378+
}
379+
380+
#[derive(Debug, Serialize, Deserialize, ToSchema)]
381+
pub struct GetPipelineVersionResponse {
382+
#[schema(example = 1)]
383+
pub pipeline_id: i64,
384+
pub version: PipelineVersion,
385+
#[serde(skip_serializing_if = "Option::is_none")]
386+
pub new_version: Option<PipelineVersion>,
387+
}
388+
371389
#[utoipa::path(
372390
summary = "Create a pipeline",
373391
description = "Creates a pipeline linking a source to a destination.",
@@ -710,6 +728,66 @@ pub async fn stop_all_pipelines(
710728
Ok(HttpResponse::Ok().finish())
711729
}
712730

731+
#[utoipa::path(
732+
summary = "Get pipeline version",
733+
description = "Returns the current version for the pipeline and an optional new default version.",
734+
params(
735+
("pipeline_id" = i64, Path, description = "Unique ID of the pipeline"),
736+
("tenant_id" = String, Header, description = "Tenant ID used to scope the request")
737+
),
738+
responses(
739+
(status = 200, description = "Pipeline version retrieved successfully", body = GetPipelineVersionResponse),
740+
(status = 404, description = "Pipeline not found", body = ErrorMessage),
741+
(status = 500, description = "Internal server error", body = ErrorMessage)
742+
),
743+
tag = "Pipelines"
744+
)]
745+
#[get("/pipelines/{pipeline_id}/version")]
746+
pub async fn get_pipeline_version(
747+
req: HttpRequest,
748+
pool: Data<PgPool>,
749+
pipeline_id: Path<i64>,
750+
) -> Result<impl Responder, PipelineError> {
751+
let tenant_id = extract_tenant_id(&req)?;
752+
let pipeline_id = pipeline_id.into_inner();
753+
754+
let mut txn = pool.begin().await?;
755+
756+
let replicator =
757+
db::replicators::read_replicator_by_pipeline_id(txn.deref_mut(), tenant_id, pipeline_id)
758+
.await?
759+
.ok_or(PipelineError::ReplicatorNotFound(pipeline_id))?;
760+
761+
let current_image = db::images::read_image_by_replicator_id(txn.deref_mut(), replicator.id)
762+
.await?
763+
.ok_or(PipelineError::ImageNotFound(replicator.id))?;
764+
765+
let default_image = db::images::read_default_image(txn.deref_mut()).await?;
766+
767+
txn.commit().await?;
768+
769+
let current_version = PipelineVersion {
770+
id: current_image.id,
771+
name: parse_docker_image_tag(&current_image.name),
772+
};
773+
774+
let new_version = match default_image {
775+
Some(default_image) if default_image.id != current_image.id => Some(PipelineVersion {
776+
id: default_image.id,
777+
name: parse_docker_image_tag(&default_image.name),
778+
}),
779+
_ => None,
780+
};
781+
782+
let response = GetPipelineVersionResponse {
783+
pipeline_id,
784+
version: current_version,
785+
new_version,
786+
};
787+
788+
Ok(Json(response))
789+
}
790+
713791
#[utoipa::path(
714792
summary = "Check pipeline status",
715793
description = "Returns the current status of the pipeline's replicator.",

etl-api/src/startup.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,13 @@ use crate::{
3939
metrics::metrics,
4040
pipelines::{
4141
CreatePipelineRequest, CreatePipelineResponse, GetPipelineReplicationStatusResponse,
42-
GetPipelineStatusResponse, ReadPipelineResponse, ReadPipelinesResponse,
43-
SimpleTableReplicationState, TableReplicationStatus, UpdatePipelineImageRequest,
44-
UpdatePipelineRequest, create_pipeline, delete_pipeline,
45-
get_pipeline_replication_status, get_pipeline_status, read_all_pipelines,
46-
read_pipeline, rollback_table_state, start_pipeline, stop_all_pipelines, stop_pipeline,
47-
update_pipeline, update_pipeline_config, update_pipeline_image,
42+
GetPipelineStatusResponse, GetPipelineVersionResponse, ReadPipelineResponse,
43+
ReadPipelinesResponse, SimpleTableReplicationState, TableReplicationStatus,
44+
UpdatePipelineImageRequest, UpdatePipelineRequest, create_pipeline, delete_pipeline,
45+
get_pipeline_replication_status, get_pipeline_status, get_pipeline_version,
46+
read_all_pipelines, read_pipeline, rollback_table_state, start_pipeline,
47+
stop_all_pipelines, stop_pipeline, update_pipeline, update_pipeline_config,
48+
update_pipeline_image,
4849
},
4950
sources::{
5051
CreateSourceRequest, CreateSourceResponse, ReadSourceResponse, ReadSourcesResponse,
@@ -181,6 +182,7 @@ pub async fn run(
181182
UpdatePipelineRequest,
182183
ReadPipelineResponse,
183184
ReadPipelinesResponse,
185+
GetPipelineVersionResponse,
184186
UpdatePipelineImageRequest,
185187
GetPipelineStatusResponse,
186188
GetPipelineReplicationStatusResponse,
@@ -231,6 +233,7 @@ pub async fn run(
231233
crate::routes::pipelines::delete_pipeline,
232234
crate::routes::pipelines::read_all_pipelines,
233235
crate::routes::pipelines::get_pipeline_status,
236+
crate::routes::pipelines::get_pipeline_version,
234237
crate::routes::pipelines::get_pipeline_replication_status,
235238
crate::routes::pipelines::rollback_table_state,
236239
crate::routes::pipelines::update_pipeline_image,
@@ -315,6 +318,7 @@ pub async fn run(
315318
.service(stop_pipeline)
316319
.service(stop_all_pipelines)
317320
.service(get_pipeline_status)
321+
.service(get_pipeline_version)
318322
.service(get_pipeline_replication_status)
319323
.service(rollback_table_state)
320324
.service(update_pipeline_image)

etl-api/src/utils.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,88 @@ pub fn generate_random_alpha_str(len: usize) -> String {
1111
.map(|_| chars[rng.random_range(0..chars.len())])
1212
.collect()
1313
}
14+
15+
/// Parses a Docker image reference to extract the tag to be used as a version name.
16+
///
17+
/// Expected formats: `HOST[:PORT]/NAMESPACE/REPOSITORY[:TAG][@DIGEST]`.
18+
/// - If a tag is present, returns it (ignoring any trailing digest part).
19+
/// - If no tag is present and also no digest, defaults to `latest`.
20+
/// - If parsing fails or only a digest is present, returns `unavailable`.
21+
pub fn parse_docker_image_tag(image: &str) -> String {
22+
// Work on the last path segment only
23+
let last_slash = image.rfind('/').map(|i| i + 1).unwrap_or(0);
24+
let segment = &image[last_slash..];
25+
26+
// Identify optional digest marker within the segment
27+
let at_pos = segment.find('@');
28+
29+
// Search for ':' in the segment, but if a digest '@' exists, ignore ':' that occur after it
30+
let colon_pos_in_segment = match at_pos {
31+
Some(at_idx) => segment[..at_idx].find(':'),
32+
None => segment.find(':'),
33+
};
34+
35+
if let Some(col_idx) = colon_pos_in_segment {
36+
// Extract tag between ':' and optional '@'
37+
let after_colon = &segment[col_idx + 1..];
38+
let tag = match at_pos {
39+
Some(at_idx) => &segment[col_idx + 1..at_idx],
40+
None => after_colon,
41+
};
42+
43+
if tag.is_empty() {
44+
return "unavailable".to_string();
45+
}
46+
47+
return tag.to_string();
48+
}
49+
50+
// No tag in the segment. If there's a digest in the segment, we can't infer a tag.
51+
if at_pos.is_some() {
52+
return "unavailable".to_string();
53+
}
54+
55+
// No tag and no digest in the segment -> default docker tag is latest
56+
"latest".to_string()
57+
}
58+
59+
#[cfg(test)]
60+
mod tests {
61+
use crate::utils::parse_docker_image_tag;
62+
63+
#[test]
64+
fn parse_with_tag() {
65+
assert_eq!(parse_docker_image_tag("supabase/replicator:1.2.3"), "1.2.3");
66+
assert_eq!(
67+
parse_docker_image_tag("example.com:5000/team/my-app:2.0"),
68+
"2.0"
69+
);
70+
assert_eq!(
71+
parse_docker_image_tag("ghcr.io/dockersamples/example-app:pr-311"),
72+
"pr-311"
73+
);
74+
}
75+
76+
#[test]
77+
fn parse_with_tag_and_digest() {
78+
assert_eq!(
79+
parse_docker_image_tag("example.com:5000/team/my-app:2.0@sha256:abcdef0123456789"),
80+
"2.0"
81+
);
82+
}
83+
84+
#[test]
85+
fn parse_without_tag_defaults_to_latest() {
86+
assert_eq!(parse_docker_image_tag("alpine"), "latest");
87+
assert_eq!(parse_docker_image_tag("library/alpine"), "latest");
88+
assert_eq!(parse_docker_image_tag("docker.io/library/alpine"), "latest");
89+
}
90+
91+
#[test]
92+
fn parse_with_only_digest_unavailable() {
93+
assert_eq!(
94+
parse_docker_image_tag("repo/name@sha256:abcdef0123456789"),
95+
"unavailable"
96+
);
97+
}
98+
}

etl-api/tests/pipelines.rs

Lines changed: 102 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,9 @@
1-
use crate::support::database::{
2-
create_test_source_database, run_etl_migrations_on_source_database,
3-
};
4-
use crate::{
5-
support::mocks::create_default_image,
6-
support::mocks::destinations::create_destination,
7-
support::mocks::sources::create_source,
8-
support::mocks::tenants::{create_tenant, create_tenant_with_id_and_name},
9-
support::test_app::{TestApp, spawn_test_app},
10-
};
111
use etl_api::routes::pipelines::{
122
CreatePipelineRequest, CreatePipelineResponse, GetPipelineReplicationStatusResponse,
13-
ReadPipelineResponse, ReadPipelinesResponse, RollbackTableStateRequest,
14-
RollbackTableStateResponse, RollbackType, SimpleTableReplicationState,
15-
UpdatePipelineConfigRequest, UpdatePipelineConfigResponse, UpdatePipelineImageRequest,
16-
UpdatePipelineRequest,
3+
GetPipelineVersionResponse, ReadPipelineResponse, ReadPipelinesResponse,
4+
RollbackTableStateRequest, RollbackTableStateResponse, RollbackType,
5+
SimpleTableReplicationState, UpdatePipelineConfigRequest, UpdatePipelineConfigResponse,
6+
UpdatePipelineImageRequest, UpdatePipelineRequest,
177
};
188
use etl_config::shared::{BatchConfig, PgConnectionConfig};
199
use etl_postgres::sqlx::test_utils::drop_pg_database;
@@ -22,11 +12,22 @@ use reqwest::StatusCode;
2212
use sqlx::PgPool;
2313
use sqlx::postgres::types::Oid;
2414

15+
use crate::support::database::{
16+
create_test_source_database, run_etl_migrations_on_source_database,
17+
};
18+
use crate::support::mocks::create_image_with_name;
2519
use crate::support::mocks::pipelines::{
2620
ConfigUpdateType, create_pipeline_with_config, new_pipeline_config,
2721
partially_updated_optional_pipeline_config, updated_optional_pipeline_config,
2822
updated_pipeline_config,
2923
};
24+
use crate::{
25+
support::mocks::create_default_image,
26+
support::mocks::destinations::create_destination,
27+
support::mocks::sources::create_source,
28+
support::mocks::tenants::{create_tenant, create_tenant_with_id_and_name},
29+
support::test_app::{TestApp, spawn_test_app},
30+
};
3031

3132
mod support;
3233

@@ -1280,3 +1281,90 @@ async fn deleting_pipeline_removes_table_schemas_from_source_database() {
12801281

12811282
drop_pg_database(&source_db_config).await;
12821283
}
1284+
1285+
#[tokio::test(flavor = "multi_thread")]
1286+
async fn pipeline_version_returns_current_version_and_no_new_version_when_default_matches() {
1287+
init_test_tracing();
1288+
// Arrange
1289+
let app = spawn_test_app().await;
1290+
let tenant_id = create_tenant(&app).await;
1291+
let source_id = create_source(&app, &tenant_id).await;
1292+
let destination_id = create_destination(&app, &tenant_id).await;
1293+
1294+
// Create a default image without a tag -> should parse to "latest".
1295+
create_image_with_name(&app, "some/image".to_string(), true).await;
1296+
1297+
let pipeline_id = {
1298+
let req = CreatePipelineRequest {
1299+
source_id,
1300+
destination_id,
1301+
config: new_pipeline_config(),
1302+
};
1303+
let resp = app.create_pipeline(&tenant_id, &req).await;
1304+
let resp: CreatePipelineResponse =
1305+
resp.json().await.expect("failed to deserialize response");
1306+
resp.id
1307+
};
1308+
1309+
// Act
1310+
let response = app.get_pipeline_version(&tenant_id, pipeline_id).await;
1311+
1312+
// Assert
1313+
assert!(response.status().is_success());
1314+
let version: GetPipelineVersionResponse = response
1315+
.json()
1316+
.await
1317+
.expect("failed to deserialize response");
1318+
assert_eq!(version.version.name, "latest");
1319+
assert!(version.new_version.is_none());
1320+
}
1321+
1322+
#[tokio::test(flavor = "multi_thread")]
1323+
async fn pipeline_version_includes_new_default_version_when_available() {
1324+
init_test_tracing();
1325+
// Arrange
1326+
let app = spawn_test_app().await;
1327+
let tenant_id = create_tenant(&app).await;
1328+
let source_id = create_source(&app, &tenant_id).await;
1329+
let destination_id = create_destination(&app, &tenant_id).await;
1330+
1331+
// Initial default image for pipeline creation
1332+
let old_default_image_id =
1333+
create_image_with_name(&app, "supabase/replicator:1.2.3".to_string(), true).await;
1334+
1335+
let pipeline_id = {
1336+
let req = CreatePipelineRequest {
1337+
source_id,
1338+
destination_id,
1339+
config: new_pipeline_config(),
1340+
};
1341+
let resp = app.create_pipeline(&tenant_id, &req).await;
1342+
let resp: CreatePipelineResponse =
1343+
resp.json().await.expect("failed to deserialize response");
1344+
resp.id
1345+
};
1346+
1347+
// Create a new default image (should flip default)
1348+
let default_image_id =
1349+
create_image_with_name(&app, "supabase/replicator:1.3.0".to_string(), true).await;
1350+
1351+
// Act
1352+
let response = app.get_pipeline_version(&tenant_id, pipeline_id).await;
1353+
1354+
// Assert
1355+
assert!(response.status().is_success());
1356+
let version: GetPipelineVersionResponse = response
1357+
.json()
1358+
.await
1359+
.expect("failed to deserialize response");
1360+
1361+
let current_version = version.version;
1362+
assert_eq!(current_version.id, old_default_image_id);
1363+
assert_eq!(current_version.name, "1.2.3");
1364+
1365+
let new_version = version
1366+
.new_version
1367+
.expect("expected new_version to be present");
1368+
assert_eq!(new_version.id, default_image_id);
1369+
assert_eq!(new_version.name, "1.3.0");
1370+
}

etl-api/tests/support/test_app.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,21 @@ impl TestApp {
463463
.expect("failed to execute request")
464464
}
465465

466+
pub async fn get_pipeline_version(
467+
&self,
468+
tenant_id: &str,
469+
pipeline_id: i64,
470+
) -> reqwest::Response {
471+
self.get_authenticated(format!(
472+
"{}/v1/pipelines/{}/version",
473+
&self.address, pipeline_id
474+
))
475+
.header("tenant_id", tenant_id)
476+
.send()
477+
.await
478+
.expect("failed to execute request")
479+
}
480+
466481
pub async fn rollback_table_state(
467482
&self,
468483
tenant_id: &str,

0 commit comments

Comments
 (0)