Skip to content

Commit 70e68ae

Browse files
authored
feat(cubestore): Queue - improve orphaned detection + optimizations (#6051)
1 parent 3b1b1ab commit 70e68ae

File tree

3 files changed

+183
-31
lines changed

3 files changed

+183
-31
lines changed

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,9 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
229229
t("cache_prefix_keys", cache_prefix_keys),
230230
t("queue_full_workflow", queue_full_workflow),
231231
t("queue_ack_then_result", queue_ack_then_result),
232+
t("queue_orphaned_timeout", queue_orphaned_timeout),
233+
t("queue_heartbeat", queue_heartbeat),
234+
t("queue_merge_extra", queue_merge_extra),
232235
t(
233236
"queue_multiple_result_blocking",
234237
queue_multiple_result_blocking,
@@ -6667,6 +6670,152 @@ async fn queue_ack_then_result(service: Box<dyn SqlClient>) {
66676670
assert_eq!(result.get_rows().len(), 0);
66686671
}
66696672

6673+
async fn queue_orphaned_timeout(service: Box<dyn SqlClient>) {
6674+
service
6675+
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#)
6676+
.await
6677+
.unwrap();
6678+
6679+
service
6680+
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:2" "payload2";"#)
6681+
.await
6682+
.unwrap();
6683+
6684+
let res = service
6685+
.exec_query(r#"QUEUE TO_CANCEL 1000 1000 "STANDALONE#queue";"#)
6686+
.await
6687+
.unwrap();
6688+
assert_eq!(res.len(), 0);
6689+
6690+
// only active jobs can be orphaned
6691+
// RETRIEVE updates heartbeat
6692+
{
6693+
service
6694+
.exec_query(r#"QUEUE RETRIEVE CONCURRENCY 2 "STANDALONE#queue:1""#)
6695+
.await
6696+
.unwrap();
6697+
6698+
service
6699+
.exec_query(r#"QUEUE RETRIEVE CONCURRENCY 2 "STANDALONE#queue:2""#)
6700+
.await
6701+
.unwrap();
6702+
}
6703+
6704+
tokio::time::sleep(Duration::from_millis(1000)).await;
6705+
6706+
service
6707+
.exec_query(r#"QUEUE HEARTBEAT "STANDALONE#queue:2";"#)
6708+
.await
6709+
.unwrap();
6710+
6711+
let res = service
6712+
.exec_query(r#"QUEUE TO_CANCEL 1000 1000 "STANDALONE#queue""#)
6713+
.await
6714+
.unwrap();
6715+
assert_eq!(
6716+
res.get_columns(),
6717+
&vec![Column::new("id".to_string(), ColumnType::String, 0),]
6718+
);
6719+
assert_eq!(
6720+
res.get_rows(),
6721+
&vec![Row::new(vec![TableValue::String("1".to_string()),]),]
6722+
);
6723+
6724+
// awaiting for expiring heart beat for queue:2
6725+
tokio::time::sleep(Duration::from_millis(1000)).await;
6726+
6727+
let res = service
6728+
.exec_query(r#"QUEUE TO_CANCEL 1000 1000 "STANDALONE#queue""#)
6729+
.await
6730+
.unwrap();
6731+
assert_eq!(res.len(), 2);
6732+
}
6733+
6734+
async fn queue_heartbeat(service: Box<dyn SqlClient>) {
6735+
service
6736+
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#)
6737+
.await
6738+
.unwrap();
6739+
6740+
let res = service
6741+
.exec_query(r#"SELECT heartbeat FROM system.queue WHERE prefix = 'STANDALONE#queue'"#)
6742+
.await
6743+
.unwrap();
6744+
assert_eq!(res.get_rows(), &vec![Row::new(vec![TableValue::Null,]),]);
6745+
6746+
service
6747+
.exec_query(r#"QUEUE HEARTBEAT "STANDALONE#queue:1";"#)
6748+
.await
6749+
.unwrap();
6750+
6751+
let res = service
6752+
.exec_query(r#"SELECT heartbeat FROM system.queue WHERE prefix = 'STANDALONE#queue'"#)
6753+
.await
6754+
.unwrap();
6755+
6756+
let row = res.get_rows().first().unwrap();
6757+
match row.values().first().unwrap() {
6758+
TableValue::Timestamp(_) => {}
6759+
other => panic!("heartbeat must be a timestamp type, actual: {:?}", other),
6760+
}
6761+
}
6762+
6763+
async fn queue_merge_extra(service: Box<dyn SqlClient>) {
6764+
service
6765+
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#)
6766+
.await
6767+
.unwrap();
6768+
6769+
// extra must be empty after creation
6770+
{
6771+
let res = service
6772+
.exec_query(r#"QUEUE GET "STANDALONE#queue:1";"#)
6773+
.await
6774+
.unwrap();
6775+
assert_eq!(
6776+
res.get_columns(),
6777+
&vec![
6778+
Column::new("payload".to_string(), ColumnType::String, 0),
6779+
Column::new("extra".to_string(), ColumnType::String, 1),
6780+
]
6781+
);
6782+
assert_eq!(
6783+
res.get_rows(),
6784+
&vec![Row::new(vec![
6785+
TableValue::String("payload1".to_string()),
6786+
TableValue::Null
6787+
]),]
6788+
);
6789+
}
6790+
6791+
service
6792+
.exec_query(r#"QUEUE MERGE_EXTRA "STANDALONE#queue:1" '{"first": true}';"#)
6793+
.await
6794+
.unwrap();
6795+
6796+
// extra should contains first field
6797+
{
6798+
let res = service
6799+
.exec_query(r#"QUEUE GET "STANDALONE#queue:1";"#)
6800+
.await
6801+
.unwrap();
6802+
assert_eq!(
6803+
res.get_columns(),
6804+
&vec![
6805+
Column::new("payload".to_string(), ColumnType::String, 0),
6806+
Column::new("extra".to_string(), ColumnType::String, 1),
6807+
]
6808+
);
6809+
assert_eq!(
6810+
res.get_rows(),
6811+
&vec![Row::new(vec![
6812+
TableValue::String("payload1".to_string()),
6813+
TableValue::String("{\"first\": true}".to_string())
6814+
]),]
6815+
);
6816+
}
6817+
}
6818+
66706819
async fn queue_multiple_result_blocking(service: Box<dyn SqlClient>) {
66716820
service
66726821
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:12345" "payload1";"#)

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::cachestore::compaction::CompactionPreloadedState;
3030
use crate::cachestore::listener::RocksCacheStoreListener;
3131
use chrono::Utc;
3232
use itertools::Itertools;
33+
use log::trace;
3334
use serde_derive::{Deserialize, Serialize};
3435
use std::path::Path;
3536
use std::sync::{Arc, Mutex};
@@ -553,11 +554,13 @@ impl CacheStore for RocksCacheStore {
553554

554555
if item.get_row().get_status() == &QueueItemStatus::Active {
555556
if let Some(orphaned_timeout) = orphaned_timeout {
556-
if let Some(heartbeat) = item.get_row().get_heartbeat() {
557-
let elapsed = now - heartbeat.clone();
558-
if elapsed.num_milliseconds() > orphaned_timeout as i64 {
559-
return true;
560-
}
557+
let elapsed = if let Some(heartbeat) = item.get_row().get_heartbeat() {
558+
now - heartbeat.clone()
559+
} else {
560+
now - item.get_row().get_created().clone()
561+
};
562+
if elapsed.num_milliseconds() > orphaned_timeout as i64 {
563+
return true;
561564
}
562565
}
563566
}
@@ -579,17 +582,14 @@ impl CacheStore for RocksCacheStore {
579582
self.store
580583
.read_operation(move |db_ref| {
581584
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
582-
let index_key = QueueItemIndexKey::ByPrefix(prefix);
583-
let items =
584-
queue_schema.get_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefix)?;
585585

586586
let items = if let Some(status_filter) = status_filter {
587-
items
588-
.into_iter()
589-
.filter(|item| item.get_row().status == status_filter)
590-
.collect()
587+
let index_key = QueueItemIndexKey::ByPrefixAndStatus(prefix, status_filter);
588+
queue_schema
589+
.get_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefixAndStatus)?
591590
} else {
592-
items
591+
let index_key = QueueItemIndexKey::ByPrefix(prefix);
592+
queue_schema.get_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefix)?
593593
};
594594

595595
if priority_sort {
@@ -632,13 +632,17 @@ impl CacheStore for RocksCacheStore {
632632
.get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?;
633633

634634
if let Some(id_row) = id_row_opt {
635-
queue_schema.update_with_fn(
636-
id_row.id,
637-
|item| item.update_heartbeat(),
638-
batch_pipe,
639-
)?;
635+
let mut new = id_row.get_row().clone();
636+
new.update_heartbeat();
637+
638+
queue_schema.update(id_row.id, new, id_row.get_row(), batch_pipe)?;
640639
Ok(())
641640
} else {
641+
trace!(
642+
"Unable to update heartbeat for queue item with path: {}",
643+
key
644+
);
645+
642646
Ok(())
643647
}
644648
})
@@ -668,18 +672,20 @@ impl CacheStore for RocksCacheStore {
668672
return Ok(None);
669673
}
670674

671-
let new = queue_schema.update_with_fn(
672-
id_row.id,
673-
|item| {
674-
let mut new = item.clone();
675-
new.status = QueueItemStatus::Active;
675+
let mut new = id_row.get_row().clone();
676+
new.status = QueueItemStatus::Active;
677+
// It's an important to insert heartbeat, because
678+
// without that created datetime will be used for orphaned filtering
679+
new.update_heartbeat();
676680

677-
new
678-
},
681+
let res = queue_schema.update(
682+
id_row.get_id(),
683+
new,
684+
id_row.get_row(),
679685
batch_pipe,
680686
)?;
681687

682-
Ok(Some(new))
688+
Ok(Some(res))
683689
} else {
684690
Ok(None)
685691
}

rust/cubestore/cubestore/src/cachestore/queue_item.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,8 @@ impl QueueItem {
190190
QueueItemStatus::Pending
191191
}
192192

193-
pub fn update_heartbeat(&self) -> Self {
194-
let mut new = self.clone();
195-
new.heartbeat = Some(Utc::now());
196-
197-
new
193+
pub fn update_heartbeat(&mut self) {
194+
self.heartbeat = Some(Utc::now());
198195
}
199196

200197
pub fn merge_extra(&self, payload: String) -> Result<Self, CubeError> {

0 commit comments

Comments
 (0)