Skip to content

Commit 8fe8892

Browse files
authored
feat(cubestore): Queue - allow path lookup, if there was miss on external_id (#10543)
1 parent a49403f commit 8fe8892

File tree

2 files changed

+79
-22
lines changed

2 files changed

+79
-22
lines changed

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,10 @@ pub fn sql_tests(prefix: &str) -> Vec<(&'static str, TestFn)> {
294294
),
295295
t("queue_custom_orphaned", queue_custom_orphaned),
296296
t("queue_result_by_external_id", queue_result_by_external_id),
297+
t(
298+
"queue_result_by_id_external_id_mismatch",
299+
queue_result_by_id_external_id_mismatch,
300+
),
297301
t("limit_pushdown_group", limit_pushdown_group),
298302
t("limit_pushdown_group_order", limit_pushdown_group_order),
299303
t(
@@ -356,6 +360,7 @@ lazy_static::lazy_static! {
356360
"queue_ack_then_result_v2_with_external_id",
357361
"queue_custom_orphaned",
358362
"queue_result_by_external_id",
363+
"queue_result_by_id_external_id_mismatch",
359364
"queue_full_workflow_v1",
360365
"queue_full_workflow_v2",
361366
"queue_full_workflow_v2_with_external_id",
@@ -11254,6 +11259,57 @@ async fn queue_full_workflow_v2_with_external_id(
1125411259
Ok(())
1125511260
}
1125611261

11262+
async fn queue_result_by_id_external_id_mismatch(
11263+
service: Box<dyn SqlClient>,
11264+
) -> Result<(), CubeError> {
11265+
let add_response = service
11266+
.exec_query(
11267+
r#"QUEUE ADD PRIORITY 1 EXTERNAL_ID 'ext-match' "STANDALONE#queue:mismatch_test" "payload_mismatch";"#,
11268+
)
11269+
.await?;
11270+
let id = assert_queue_add_and_get_id(&add_response)?;
11271+
11272+
let ack_result = service
11273+
.exec_query(&format!(r#"QUEUE ACK {} "result:mismatch""#, id))
11274+
.await?;
11275+
assert_eq!(
11276+
ack_result.get_rows(),
11277+
&vec![Row::new(vec![TableValue::Boolean(true)])]
11278+
);
11279+
11280+
let result = service
11281+
.exec_query(&format!(r#"QUEUE RESULT EXTERNAL_ID "ext-match" {}"#, id))
11282+
.await?;
11283+
assert_queue_result_columns(&result);
11284+
assert_eq!(
11285+
result.get_rows(),
11286+
&vec![queue_result_row("result:mismatch", &id, Some("ext-match"))]
11287+
);
11288+
11289+
// External_id allows many reads
11290+
let result = service
11291+
.exec_query(&format!(r#"QUEUE RESULT EXTERNAL_ID "ext-match" {}"#, id))
11292+
.await?;
11293+
assert_queue_result_columns(&result);
11294+
assert_eq!(
11295+
result.get_rows(),
11296+
&vec![queue_result_row("result:mismatch", &id, Some("ext-match"))]
11297+
);
11298+
11299+
let err = service
11300+
.exec_query(&format!(r#"QUEUE RESULT EXTERNAL_ID "wrong-ext" {}"#, id))
11301+
.await;
11302+
assert!(err.is_err(), "Expected error for external_id mismatch");
11303+
let err_msg = err.unwrap_err().message;
11304+
assert!(
11305+
err_msg.contains("external_id mismatch"),
11306+
"Error should mention external_id mismatch, got: {}",
11307+
err_msg
11308+
);
11309+
11310+
Ok(())
11311+
}
11312+
1125711313
async fn sys_cachestore_info(service: Box<dyn SqlClient>) -> Result<(), CubeError> {
1125811314
service.migration_run_next_query();
1125911315
service.exec_query("SYS CACHESTORE INFO").await?;

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -595,44 +595,45 @@ impl RocksCacheStore {
595595
};
596596

597597
let id = queue_result.get_id();
598-
let external_id = queue_result.get_row().get_external_id().clone();
598+
let row_external_id = queue_result.get_row().get_external_id().clone();
599+
600+
if let Some(ref external_id) = external_id {
601+
if row_external_id.as_ref() != Some(external_id) {
602+
return Err(CubeError::user(format!(
603+
"Queue result (id = {}) external_id mismatch: expected {}, got {:?}",
604+
id, external_id, row_external_id
605+
)));
606+
}
607+
}
599608

600609
return Ok(Some(QueueResultResponse::Success {
601610
value: Some(queue_result.into_row().value),
602611
id,
603-
external_id,
612+
external_id: row_external_id,
604613
}));
605614
};
606615

607616
// try external_id first (if provided), then fall back to path lookup
617+
// external_id can be different for path, because path is re-used across different requests across time
608618
if let Some(ref external_id) = external_id {
609-
let Some(queue_result) =
619+
if let Some(queue_result) =
610620
result_schema.get_row_by_external_id(external_id.clone())?
611-
else {
612-
return Ok(None);
613-
};
614-
615-
let id = queue_result.get_id();
616-
let external_id = queue_result.get_row().get_external_id().clone();
621+
{
622+
let id = queue_result.get_id();
623+
let external_id = queue_result.get_row().get_external_id().clone();
617624

618-
return Ok(Some(QueueResultResponse::Success {
619-
value: Some(queue_result.into_row().value),
620-
id,
621-
external_id,
622-
}));
623-
};
625+
return Ok(Some(QueueResultResponse::Success {
626+
value: Some(queue_result.into_row().value),
627+
id,
628+
external_id,
629+
}));
630+
}
631+
}
624632

625633
let Some(queue_result) = result_schema.get_row_by_key(key)? else {
626634
return Ok(None);
627635
};
628636

629-
// When external_id filter is active, only return if it matches
630-
if let Some(ref external_id) = external_id {
631-
if queue_result.get_row().get_external_id().as_ref() != Some(external_id) {
632-
return Ok(None);
633-
}
634-
}
635-
636637
if queue_result.get_row().is_deleted() {
637638
Ok(None)
638639
} else {

0 commit comments

Comments
 (0)