Skip to content

Commit 9f23924

Browse files
authored
feat(cubestore): QUEUE - support extended flag for retrieve (#6248)
1 parent 3607c67 commit 9f23924

File tree

9 files changed

+253
-72
lines changed

9 files changed

+253
-72
lines changed

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -783,7 +783,7 @@ export class QueryQueue {
783783
const retrieveResult = await queueConnection.retrieveForProcessing(queryKeyHashed, processingId);
784784

785785
if (retrieveResult) {
786-
[insertedCount, _removedCount, activeKeys, queueSize, query, processingLockAcquired] = retrieveResult;
786+
[insertedCount /** todo(ovr): Remove */, _removedCount/** todo(ovr): Remove */, activeKeys, queueSize, query, processingLockAcquired] = retrieveResult;
787787
}
788788

789789
const activated = activeKeys && activeKeys.indexOf(queryKeyHashed) !== -1;

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
228228
t("cache_set_nx", cache_set_nx),
229229
t("cache_prefix_keys", cache_prefix_keys),
230230
t("queue_full_workflow", queue_full_workflow),
231+
t("queue_retrieve_extended", queue_retrieve_extended),
231232
t("queue_ack_then_result", queue_ack_then_result),
232233
t("queue_orphaned_timeout", queue_orphaned_timeout),
233234
t("queue_heartbeat", queue_heartbeat),
@@ -8014,13 +8015,17 @@ async fn queue_full_workflow(service: Box<dyn SqlClient>) {
80148015
&vec![
80158016
Column::new("payload".to_string(), ColumnType::String, 0),
80168017
Column::new("extra".to_string(), ColumnType::String, 1),
8018+
Column::new("pending".to_string(), ColumnType::Int, 2),
8019+
Column::new("active".to_string(), ColumnType::String, 3),
80178020
]
80188021
);
80198022
assert_eq!(
80208023
retrieve_response.get_rows(),
80218024
&vec![Row::new(vec![
80228025
TableValue::String("payload3".to_string()),
80238026
TableValue::Null,
8027+
TableValue::Int(4),
8028+
TableValue::String("3".to_string()),
80248029
]),]
80258030
);
80268031
}
@@ -8036,6 +8041,8 @@ async fn queue_full_workflow(service: Box<dyn SqlClient>) {
80368041
&vec![
80378042
Column::new("payload".to_string(), ColumnType::String, 0),
80388043
Column::new("extra".to_string(), ColumnType::String, 1),
8044+
Column::new("pending".to_string(), ColumnType::Int, 2),
8045+
Column::new("active".to_string(), ColumnType::String, 3),
80398046
]
80408047
);
80418048
assert_eq!(retrieve_response.get_rows().len(), 0);
@@ -8134,6 +8141,99 @@ async fn queue_full_workflow(service: Box<dyn SqlClient>) {
81348141
}
81358142
}
81368143

8144+
async fn queue_retrieve_extended(service: Box<dyn SqlClient>) {
8145+
service
8146+
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#)
8147+
.await
8148+
.unwrap();
8149+
8150+
service
8151+
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:2" "payload2";"#)
8152+
.await
8153+
.unwrap();
8154+
8155+
service
8156+
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:3" "payload3";"#)
8157+
.await
8158+
.unwrap();
8159+
8160+
{
8161+
let retrieve_response = service
8162+
.exec_query(r#"QUEUE RETRIEVE CONCURRENCY 1 "STANDALONE#queue:1""#)
8163+
.await
8164+
.unwrap();
8165+
assert_eq!(
8166+
retrieve_response.get_columns(),
8167+
&vec![
8168+
Column::new("payload".to_string(), ColumnType::String, 0),
8169+
Column::new("extra".to_string(), ColumnType::String, 1),
8170+
Column::new("pending".to_string(), ColumnType::Int, 2),
8171+
Column::new("active".to_string(), ColumnType::String, 3),
8172+
]
8173+
);
8174+
assert_eq!(
8175+
retrieve_response.get_rows(),
8176+
&vec![Row::new(vec![
8177+
TableValue::String("payload1".to_string()),
8178+
TableValue::Null,
8179+
TableValue::Int(2),
8180+
TableValue::String("1".to_string()),
8181+
]),]
8182+
);
8183+
}
8184+
8185+
{
8186+
// concurrency limit
8187+
let retrieve_response = service
8188+
.exec_query(r#"QUEUE RETRIEVE EXTENDED CONCURRENCY 1 "STANDALONE#queue:2""#)
8189+
.await
8190+
.unwrap();
8191+
assert_eq!(
8192+
retrieve_response.get_columns(),
8193+
&vec![
8194+
Column::new("payload".to_string(), ColumnType::String, 0),
8195+
Column::new("extra".to_string(), ColumnType::String, 1),
8196+
Column::new("pending".to_string(), ColumnType::Int, 2),
8197+
Column::new("active".to_string(), ColumnType::String, 3),
8198+
]
8199+
);
8200+
assert_eq!(
8201+
retrieve_response.get_rows(),
8202+
&vec![Row::new(vec![
8203+
TableValue::Null,
8204+
TableValue::Null,
8205+
TableValue::Int(2),
8206+
TableValue::String("1".to_string()),
8207+
]),]
8208+
);
8209+
}
8210+
8211+
{
8212+
let retrieve_response = service
8213+
.exec_query(r#"QUEUE RETRIEVE EXTENDED CONCURRENCY 2 "STANDALONE#queue:2""#)
8214+
.await
8215+
.unwrap();
8216+
assert_eq!(
8217+
retrieve_response.get_columns(),
8218+
&vec![
8219+
Column::new("payload".to_string(), ColumnType::String, 0),
8220+
Column::new("extra".to_string(), ColumnType::String, 1),
8221+
Column::new("pending".to_string(), ColumnType::Int, 2),
8222+
Column::new("active".to_string(), ColumnType::String, 3),
8223+
]
8224+
);
8225+
assert_eq!(
8226+
retrieve_response.get_rows(),
8227+
&vec![Row::new(vec![
8228+
TableValue::String("payload2".to_string()),
8229+
TableValue::Null,
8230+
TableValue::Int(1),
8231+
TableValue::String("1,2".to_string()),
8232+
]),]
8233+
);
8234+
}
8235+
}
8236+
81378237
async fn queue_ack_then_result(service: Box<dyn SqlClient>) {
81388238
service
81398239
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:5555" "payload1";"#)

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 58 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::cachestore::cache_item::{
33
};
44
use crate::cachestore::queue_item::{
55
QueueItem, QueueItemIndexKey, QueueItemRocksIndex, QueueItemRocksTable, QueueItemStatus,
6-
QueueResultAckEvent, QueueResultAckEventResult,
6+
QueueResultAckEvent, QueueResultAckEventResult, QueueRetrieveResponse,
77
};
88
use crate::cachestore::queue_result::{
99
QueueResultIndexKey, QueueResultRocksIndex, QueueResultRocksTable,
@@ -266,17 +266,6 @@ impl RocksCacheStore {
266266
.await
267267
}
268268

269-
fn queue_count_by_prefix_and_status(
270-
db_ref: DbTableRef,
271-
prefix: &Option<String>,
272-
status: QueueItemStatus,
273-
) -> Result<u64, CubeError> {
274-
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
275-
let index_key =
276-
QueueItemIndexKey::ByPrefixAndStatus(prefix.clone().unwrap_or("".to_string()), status);
277-
queue_schema.count_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefixAndStatus)
278-
}
279-
280269
fn filter_to_cancel(
281270
now: DateTime<Utc>,
282271
items: Vec<IdRow<QueueItem>>,
@@ -386,7 +375,7 @@ pub trait CacheStore: DIService + Send + Sync {
386375
&self,
387376
key: String,
388377
allow_concurrency: u32,
389-
) -> Result<Option<IdRow<QueueItem>>, CubeError>;
378+
) -> Result<QueueRetrieveResponse, CubeError>;
390379
async fn queue_ack(&self, key: String, result: Option<String>) -> Result<(), CubeError>;
391380
async fn queue_result(&self, key: String) -> Result<Option<QueueResultResponse>, CubeError>;
392381
async fn queue_result_blocking(
@@ -538,16 +527,17 @@ impl CacheStore for RocksCacheStore {
538527
self.store
539528
.write_operation(move |db_ref, batch_pipe| {
540529
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
530+
let pending = queue_schema.count_rows_by_index(
531+
&QueueItemIndexKey::ByPrefixAndStatus(
532+
item.get_prefix().clone().unwrap_or("".to_string()),
533+
QueueItemStatus::Pending,
534+
),
535+
&QueueItemRocksIndex::ByPrefixAndStatus,
536+
)?;
537+
541538
let index_key = QueueItemIndexKey::ByPath(item.get_path());
542539
let id_row_opt = queue_schema
543540
.get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?;
544-
545-
let pending = Self::queue_count_by_prefix_and_status(
546-
db_ref,
547-
item.get_prefix(),
548-
QueueItemStatus::Pending,
549-
)?;
550-
551541
let added = if id_row_opt.is_none() {
552542
queue_schema.insert(item, batch_pipe)?;
553543

@@ -697,44 +687,61 @@ impl CacheStore for RocksCacheStore {
697687
&self,
698688
key: String,
699689
allow_concurrency: u32,
700-
) -> Result<Option<IdRow<QueueItem>>, CubeError> {
690+
) -> Result<QueueRetrieveResponse, CubeError> {
701691
self.store
702692
.write_operation(move |db_ref, batch_pipe| {
703693
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
704-
let index_key = QueueItemIndexKey::ByPath(key.clone());
705-
let id_row_opt = queue_schema
706-
.get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?;
694+
let prefix = QueueItem::parse_path(key.clone())
695+
.0
696+
.unwrap_or("".to_string());
707697

708-
if let Some(id_row) = id_row_opt {
709-
if id_row.get_row().get_status() == &QueueItemStatus::Pending {
710-
let current_active = Self::queue_count_by_prefix_and_status(
711-
db_ref,
712-
id_row.get_row().get_prefix(),
713-
QueueItemStatus::Active,
714-
)?;
715-
if current_active >= (allow_concurrency as u64) {
716-
return Ok(None);
717-
}
698+
let mut pending = queue_schema.count_rows_by_index(
699+
&QueueItemIndexKey::ByPrefixAndStatus(prefix.clone(), QueueItemStatus::Pending),
700+
&QueueItemRocksIndex::ByPrefixAndStatus,
701+
)?;
718702

719-
let mut new = id_row.get_row().clone();
720-
new.status = QueueItemStatus::Active;
721-
// It's an important to insert heartbeat, because
722-
// without that created datetime will be used for orphaned filtering
723-
new.update_heartbeat();
703+
let mut active: Vec<String> = queue_schema
704+
.get_rows_by_index(
705+
&QueueItemIndexKey::ByPrefixAndStatus(prefix, QueueItemStatus::Active),
706+
&QueueItemRocksIndex::ByPrefixAndStatus,
707+
)?
708+
.into_iter()
709+
.map(|item| item.into_row().key)
710+
.collect();
711+
if active.len() >= (allow_concurrency as usize) {
712+
return Ok(QueueRetrieveResponse::NotFound { pending, active });
713+
}
724714

725-
let res = queue_schema.update(
726-
id_row.get_id(),
727-
new,
728-
id_row.get_row(),
729-
batch_pipe,
730-
)?;
715+
let id_row = queue_schema.get_single_opt_row_by_index(
716+
&QueueItemIndexKey::ByPath(key.clone()),
717+
&QueueItemRocksIndex::ByPath,
718+
)?;
719+
let id_row = if let Some(id_row) = id_row {
720+
id_row
721+
} else {
722+
return Ok(QueueRetrieveResponse::NotFound { pending, active });
723+
};
731724

732-
Ok(Some(res))
733-
} else {
734-
Ok(None)
735-
}
725+
if id_row.get_row().get_status() == &QueueItemStatus::Pending {
726+
let mut new = id_row.get_row().clone();
727+
new.status = QueueItemStatus::Active;
728+
// It's an important to insert heartbeat, because
729+
// without that created datetime will be used for orphaned filtering
730+
new.update_heartbeat();
731+
732+
let res =
733+
queue_schema.update(id_row.get_id(), new, id_row.get_row(), batch_pipe)?;
734+
735+
active.push(res.get_row().get_key().clone());
736+
pending -= 1;
737+
738+
Ok(QueueRetrieveResponse::Success {
739+
item: res.into_row(),
740+
pending,
741+
active,
742+
})
736743
} else {
737-
Ok(None)
744+
Ok(QueueRetrieveResponse::LockFailed { pending, active })
738745
}
739746
})
740747
.await
@@ -964,7 +971,7 @@ impl CacheStore for ClusterCacheStoreClient {
964971
&self,
965972
_key: String,
966973
_allow_concurrency: u32,
967-
) -> Result<Option<IdRow<QueueItem>>, CubeError> {
974+
) -> Result<QueueRetrieveResponse, CubeError> {
968975
panic!("CacheStore cannot be used on the worker node! queue_retrieve was used.")
969976
}
970977

rust/cubestore/cubestore/src/cachestore/lazy.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::cachestore::cache_rocksstore::QueueAddResponse;
2+
use crate::cachestore::queue_item::QueueRetrieveResponse;
23
use crate::cachestore::{
34
CacheItem, CacheStore, QueueItem, QueueItemStatus, QueueResultResponse, RocksCacheStore,
45
};
@@ -252,7 +253,7 @@ impl CacheStore for LazyRocksCacheStore {
252253
&self,
253254
key: String,
254255
allow_concurrency: u32,
255-
) -> Result<Option<IdRow<QueueItem>>, CubeError> {
256+
) -> Result<QueueRetrieveResponse, CubeError> {
256257
self.init()
257258
.await?
258259
.queue_retrieve(key, allow_concurrency)

rust/cubestore/cubestore/src/cachestore/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@ pub use cache_rocksstore::{
1313
QueueResultResponse, RocksCacheStore,
1414
};
1515
pub use lazy::LazyRocksCacheStore;
16-
pub use queue_item::{QueueItem, QueueItemStatus, QueueResultAckEvent};
16+
pub use queue_item::{QueueItem, QueueItemStatus, QueueResultAckEvent, QueueRetrieveResponse};
1717
pub use queue_result::QueueResult;
1818
pub use scheduler::CacheStoreSchedulerImpl;

0 commit comments

Comments
 (0)