Skip to content

Commit 7109039

Browse files
authored
feat(cubestore): Initial queue support (#5541)
1 parent 0d1a3d8 commit 7109039

File tree

18 files changed

+1924
-127
lines changed

18 files changed

+1924
-127
lines changed

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use std::pin::Pin;
2323
use std::sync::Arc;
2424
use std::time::{Duration, SystemTime};
2525
use tokio::io::{AsyncWriteExt, BufWriter};
26+
use tokio::join;
2627

2728
pub type TestFn = Box<
2829
dyn Fn(Box<dyn SqlClient>) -> Pin<Box<dyn Future<Output = ()> + Send>>
@@ -226,6 +227,7 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
226227
t("cache_compaction", cache_compaction),
227228
t("cache_set_nx", cache_set_nx),
228229
t("cache_prefix_keys", cache_prefix_keys),
230+
t("queue_full_workflow", queue_full_workflow),
229231
];
230232

231233
fn t<F>(name: &'static str, f: fn(Box<dyn SqlClient>) -> F) -> (&'static str, TestFn)
@@ -6411,6 +6413,189 @@ async fn cache_prefix_keys(service: Box<dyn SqlClient>) {
64116413
);
64126414
}
64136415

6416+
async fn queue_full_workflow(service: Box<dyn SqlClient>) {
6417+
service
6418+
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#)
6419+
.await
6420+
.unwrap();
6421+
6422+
service
6423+
.exec_query(r#"QUEUE ADD PRIORITY 10 "STANDALONE#queue:2" "payload2";"#)
6424+
.await
6425+
.unwrap();
6426+
6427+
service
6428+
.exec_query(r#"QUEUE ADD PRIORITY 100 "STANDALONE#queue:3" "payload3";"#)
6429+
.await
6430+
.unwrap();
6431+
6432+
service
6433+
.exec_query(r#"QUEUE ADD PRIORITY 50 "STANDALONE#queue:4" "payload3";"#)
6434+
.await
6435+
.unwrap();
6436+
6437+
{
6438+
let pending_response = service
6439+
.exec_query(r#"QUEUE PENDING "STANDALONE#queue""#)
6440+
.await
6441+
.unwrap();
6442+
assert_eq!(
6443+
pending_response.get_columns(),
6444+
&vec![
6445+
Column::new("id".to_string(), ColumnType::String, 0),
6446+
Column::new("status".to_string(), ColumnType::String, 1),
6447+
Column::new("extra".to_string(), ColumnType::String, 2),
6448+
]
6449+
);
6450+
assert_eq!(
6451+
pending_response.get_rows(),
6452+
&vec![
6453+
Row::new(vec![
6454+
TableValue::String("3".to_string()),
6455+
TableValue::String("pending".to_string()),
6456+
TableValue::Null
6457+
]),
6458+
Row::new(vec![
6459+
TableValue::String("4".to_string()),
6460+
TableValue::String("pending".to_string()),
6461+
TableValue::Null
6462+
]),
6463+
Row::new(vec![
6464+
TableValue::String("2".to_string()),
6465+
TableValue::String("pending".to_string()),
6466+
TableValue::Null
6467+
]),
6468+
Row::new(vec![
6469+
TableValue::String("1".to_string()),
6470+
TableValue::String("pending".to_string()),
6471+
TableValue::Null
6472+
]),
6473+
]
6474+
);
6475+
}
6476+
6477+
{
6478+
let active_response = service
6479+
.exec_query(r#"QUEUE ACTIVE "STANDALONE#queue""#)
6480+
.await
6481+
.unwrap();
6482+
assert_eq!(active_response.get_rows().len(), 0);
6483+
}
6484+
6485+
{
6486+
let retrieve_response = service
6487+
.exec_query(r#"QUEUE RETRIEVE CONCURRENCY 2 "STANDALONE#queue:3""#)
6488+
.await
6489+
.unwrap();
6490+
assert_eq!(
6491+
retrieve_response.get_columns(),
6492+
&vec![
6493+
Column::new("payload".to_string(), ColumnType::String, 0),
6494+
Column::new("extra".to_string(), ColumnType::String, 1),
6495+
]
6496+
);
6497+
assert_eq!(
6498+
retrieve_response.get_rows(),
6499+
&vec![Row::new(vec![
6500+
TableValue::String("payload3".to_string()),
6501+
TableValue::Null,
6502+
]),]
6503+
);
6504+
}
6505+
6506+
{
6507+
let active_response = service
6508+
.exec_query(r#"QUEUE ACTIVE "STANDALONE#queue""#)
6509+
.await
6510+
.unwrap();
6511+
assert_eq!(
6512+
active_response.get_rows(),
6513+
&vec![Row::new(vec![
6514+
TableValue::String("3".to_string()),
6515+
TableValue::String("active".to_string()),
6516+
TableValue::Null
6517+
]),]
6518+
);
6519+
}
6520+
6521+
let service = Arc::new(service);
6522+
6523+
{
6524+
let service_to_move = service.clone();
6525+
let blocking = async move {
6526+
service_to_move
6527+
.exec_query(r#"QUEUE RESULT_BLOCKING 5000 "STANDALONE#queue:3""#)
6528+
.await
6529+
.unwrap()
6530+
};
6531+
6532+
let service_to_move = service.clone();
6533+
let ack = async move {
6534+
tokio::time::sleep(Duration::from_millis(1000)).await;
6535+
6536+
service_to_move
6537+
.exec_query(r#"QUEUE ACK "STANDALONE#queue:3" "result:3""#)
6538+
.await
6539+
.unwrap()
6540+
};
6541+
6542+
let (blocking_res, _ack_res) = join!(blocking, ack);
6543+
assert_eq!(
6544+
blocking_res.get_rows(),
6545+
&vec![Row::new(vec![
6546+
TableValue::String("result:3".to_string()),
6547+
TableValue::String("success".to_string())
6548+
]),]
6549+
);
6550+
}
6551+
6552+
// previous job was finished
6553+
{
6554+
let active_response = service
6555+
.exec_query(r#"QUEUE ACTIVE "STANDALONE#queue""#)
6556+
.await
6557+
.unwrap();
6558+
assert_eq!(active_response.get_rows().len(), 0);
6559+
}
6560+
6561+
// get
6562+
{
6563+
let get_response = service
6564+
.exec_query(r#"QUEUE GET "STANDALONE#queue:2""#)
6565+
.await
6566+
.unwrap();
6567+
assert_eq!(
6568+
get_response.get_rows(),
6569+
&vec![Row::new(vec![
6570+
TableValue::String("payload2".to_string()),
6571+
TableValue::Null
6572+
]),]
6573+
);
6574+
}
6575+
6576+
// cancel job
6577+
{
6578+
let cancel_response = service
6579+
.exec_query(r#"QUEUE CANCEL "STANDALONE#queue:2""#)
6580+
.await
6581+
.unwrap();
6582+
assert_eq!(
6583+
cancel_response.get_rows(),
6584+
&vec![Row::new(vec![
6585+
TableValue::String("payload2".to_string()),
6586+
TableValue::Null
6587+
]),]
6588+
);
6589+
6590+
// assertion that job was removed
6591+
let get_response = service
6592+
.exec_query(r#"QUEUE GET "STANDALONE#queue:2""#)
6593+
.await
6594+
.unwrap();
6595+
assert_eq!(get_response.get_rows().len(), 0);
6596+
}
6597+
}
6598+
64146599
pub fn to_rows(d: &DataFrame) -> Vec<Vec<TableValue>> {
64156600
return d
64166601
.get_rows()

rust/cubestore/cubestore/src/cachestore/cache_item.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ impl RocksSecondaryIndex<CacheItem, CacheItemIndexKey> for CacheItemRocksIndex {
125125
true
126126
}
127127

128-
fn get_expire<'a>(&self, row: &'a CacheItem) -> &'a Option<DateTime<Utc>> {
129-
row.get_expire()
128+
fn get_expire(&self, row: &CacheItem) -> Option<DateTime<Utc>> {
129+
row.get_expire().clone()
130130
}
131131

132132
fn version(&self) -> u32 {

0 commit comments

Comments
 (0)