Skip to content

Commit bf8df29

Browse files
authored
feat(cubestore): Queue - allow to do merge_extra, heartbeat by processingId (#6534)
1 parent 2a4de4b commit bf8df29

File tree

6 files changed

+210
-23
lines changed

6 files changed

+210
-23
lines changed

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 90 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -233,12 +233,14 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
233233
t("cache_compaction", cache_compaction),
234234
t("cache_set_nx", cache_set_nx),
235235
t("cache_prefix_keys", cache_prefix_keys),
236-
t("queue_full_workflow", queue_full_workflow),
236+
t("queue_full_workflow_v1", queue_full_workflow_v1),
237237
t("queue_retrieve_extended", queue_retrieve_extended),
238238
t("queue_ack_then_result", queue_ack_then_result),
239239
t("queue_orphaned_timeout", queue_orphaned_timeout),
240-
t("queue_heartbeat", queue_heartbeat),
241-
t("queue_merge_extra", queue_merge_extra),
240+
t("queue_heartbeat_by_id", queue_heartbeat_by_id),
241+
t("queue_heartbeat_by_path", queue_heartbeat_by_path),
242+
t("queue_merge_extra_by_path", queue_merge_extra_by_path),
243+
t("queue_merge_extra_by_id", queue_merge_extra_by_id),
242244
t(
243245
"queue_multiple_result_blocking",
244246
queue_multiple_result_blocking,
@@ -8073,7 +8075,7 @@ async fn limit_pushdown_unique_key(service: Box<dyn SqlClient>) {
80738075
);
80748076
}
80758077

8076-
async fn queue_full_workflow(service: Box<dyn SqlClient>) {
8078+
async fn queue_full_workflow_v1(service: Box<dyn SqlClient>) {
80778079
let add_response = service
80788080
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#)
80798081
.await
@@ -8539,7 +8541,7 @@ async fn queue_orphaned_timeout(service: Box<dyn SqlClient>) {
85398541
assert_eq!(res.len(), 2);
85408542
}
85418543

8542-
async fn queue_heartbeat(service: Box<dyn SqlClient>) {
8544+
async fn queue_heartbeat_by_path(service: Box<dyn SqlClient>) {
85438545
service
85448546
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#)
85458547
.await
@@ -8568,7 +8570,33 @@ async fn queue_heartbeat(service: Box<dyn SqlClient>) {
85688570
}
85698571
}
85708572

8571-
async fn queue_merge_extra(service: Box<dyn SqlClient>) {
8573+
async fn queue_heartbeat_by_id(service: Box<dyn SqlClient>) {
8574+
service
8575+
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#)
8576+
.await
8577+
.unwrap();
8578+
8579+
let res = service
8580+
.exec_query(r#"SELECT heartbeat FROM system.queue WHERE prefix = 'STANDALONE#queue'"#)
8581+
.await
8582+
.unwrap();
8583+
assert_eq!(res.get_rows(), &vec![Row::new(vec![TableValue::Null,]),]);
8584+
8585+
service.exec_query(r#"QUEUE HEARTBEAT 1;"#).await.unwrap();
8586+
8587+
let res = service
8588+
.exec_query(r#"SELECT heartbeat FROM system.queue WHERE prefix = 'STANDALONE#queue'"#)
8589+
.await
8590+
.unwrap();
8591+
8592+
let row = res.get_rows().first().unwrap();
8593+
match row.values().first().unwrap() {
8594+
TableValue::Timestamp(_) => {}
8595+
other => panic!("heartbeat must be a timestamp type, actual: {:?}", other),
8596+
}
8597+
}
8598+
8599+
async fn queue_merge_extra_by_path(service: Box<dyn SqlClient>) {
85728600
service
85738601
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#)
85748602
.await
@@ -8624,6 +8652,62 @@ async fn queue_merge_extra(service: Box<dyn SqlClient>) {
86248652
}
86258653
}
86268654

8655+
async fn queue_merge_extra_by_id(service: Box<dyn SqlClient>) {
8656+
service
8657+
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#)
8658+
.await
8659+
.unwrap();
8660+
8661+
// extra must be empty after creation
8662+
{
8663+
let res = service
8664+
.exec_query(r#"QUEUE GET "STANDALONE#queue:1";"#)
8665+
.await
8666+
.unwrap();
8667+
assert_eq!(
8668+
res.get_columns(),
8669+
&vec![
8670+
Column::new("payload".to_string(), ColumnType::String, 0),
8671+
Column::new("extra".to_string(), ColumnType::String, 1),
8672+
]
8673+
);
8674+
assert_eq!(
8675+
res.get_rows(),
8676+
&vec![Row::new(vec![
8677+
TableValue::String("payload1".to_string()),
8678+
TableValue::Null
8679+
]),]
8680+
);
8681+
}
8682+
8683+
service
8684+
.exec_query(r#"QUEUE MERGE_EXTRA 1 '{"first": true}';"#)
8685+
.await
8686+
.unwrap();
8687+
8688+
// extra should contains first field
8689+
{
8690+
let res = service
8691+
.exec_query(r#"QUEUE GET "STANDALONE#queue:1";"#)
8692+
.await
8693+
.unwrap();
8694+
assert_eq!(
8695+
res.get_columns(),
8696+
&vec![
8697+
Column::new("payload".to_string(), ColumnType::String, 0),
8698+
Column::new("extra".to_string(), ColumnType::String, 1),
8699+
]
8700+
);
8701+
assert_eq!(
8702+
res.get_rows(),
8703+
&vec![Row::new(vec![
8704+
TableValue::String("payload1".to_string()),
8705+
TableValue::String("{\"first\": true}".to_string())
8706+
]),]
8707+
);
8708+
}
8709+
}
8710+
86278711
async fn queue_multiple_result_blocking(service: Box<dyn SqlClient>) {
86288712
service
86298713
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:12345" "payload1";"#)

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,7 @@ pub trait CacheStore: DIService + Send + Sync {
394394
&self,
395395
path: String,
396396
) -> Result<Option<IdRow<QueueItem>>, CubeError>;
397+
async fn queue_heartbeat_by_id(&self, id: u64) -> Result<(), CubeError>;
397398
async fn queue_heartbeat_by_path(&self, path: String) -> Result<(), CubeError>;
398399
async fn queue_retrieve_by_path(
399400
&self,
@@ -414,6 +415,7 @@ pub trait CacheStore: DIService + Send + Sync {
414415
path: String,
415416
timeout: u64,
416417
) -> Result<Option<QueueResultResponse>, CubeError>;
418+
async fn queue_merge_extra_by_id(&self, id: u64, payload: String) -> Result<(), CubeError>;
417419
async fn queue_merge_extra_by_path(
418420
&self,
419421
path: String,
@@ -696,6 +698,27 @@ impl CacheStore for RocksCacheStore {
696698
.await
697699
}
698700

701+
async fn queue_heartbeat_by_id(&self, id: u64) -> Result<(), CubeError> {
702+
self.store
703+
.write_operation(move |db_ref, batch_pipe| {
704+
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
705+
let id_row_opt = queue_schema.get_row(id)?;
706+
707+
if let Some(id_row) = id_row_opt {
708+
let mut new = id_row.get_row().clone();
709+
new.update_heartbeat();
710+
711+
queue_schema.update(id_row.id, new, id_row.get_row(), batch_pipe)?;
712+
Ok(())
713+
} else {
714+
trace!("Unable to update heartbeat, unknown id: {}", id);
715+
716+
Ok(())
717+
}
718+
})
719+
.await
720+
}
721+
699722
async fn queue_heartbeat_by_path(&self, path: String) -> Result<(), CubeError> {
700723
self.store
701724
.write_operation(move |db_ref, batch_pipe| {
@@ -881,6 +904,25 @@ impl CacheStore for RocksCacheStore {
881904
}
882905
}
883906

907+
async fn queue_merge_extra_by_id(&self, id: u64, payload: String) -> Result<(), CubeError> {
908+
self.store
909+
.write_operation(move |db_ref, batch_pipe| {
910+
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
911+
let id_row_opt = queue_schema.get_row(id)?;
912+
913+
if let Some(id_row) = id_row_opt {
914+
let new = id_row.get_row().merge_extra(payload)?;
915+
916+
queue_schema.update(id_row.id, new, id_row.get_row(), batch_pipe)?;
917+
} else {
918+
warn!("Unable to merge extra, unknown id: {}", id);
919+
}
920+
921+
Ok(())
922+
})
923+
.await
924+
}
925+
884926
async fn queue_merge_extra_by_path(
885927
&self,
886928
path: String,
@@ -1006,57 +1048,67 @@ impl CacheStore for ClusterCacheStoreClient {
10061048
&self,
10071049
_path: String,
10081050
) -> Result<Option<IdRow<QueueItem>>, CubeError> {
1009-
panic!("CacheStore cannot be used on the worker node! queue_get was used.")
1051+
panic!("CacheStore cannot be used on the worker node! queue_get_by_path was used.")
10101052
}
10111053

10121054
async fn queue_cancel_by_path(
10131055
&self,
10141056
_path: String,
10151057
) -> Result<Option<IdRow<QueueItem>>, CubeError> {
1016-
panic!("CacheStore cannot be used on the worker node! queue_cancel was used.")
1058+
panic!("CacheStore cannot be used on the worker node! queue_cancel_by_path was used.")
1059+
}
1060+
1061+
async fn queue_heartbeat_by_id(&self, _id: u64) -> Result<(), CubeError> {
1062+
panic!("CacheStore cannot be used on the worker node! queue_heartbeat_by_id was used.")
10171063
}
10181064

10191065
async fn queue_heartbeat_by_path(&self, _path: String) -> Result<(), CubeError> {
1020-
panic!("CacheStore cannot be used on the worker node! queue_heartbeat was used.")
1066+
panic!("CacheStore cannot be used on the worker node! queue_heartbeat_by_path was used.")
10211067
}
10221068

10231069
async fn queue_retrieve_by_path(
10241070
&self,
10251071
_path: String,
10261072
_allow_concurrency: u32,
10271073
) -> Result<QueueRetrieveResponse, CubeError> {
1028-
panic!("CacheStore cannot be used on the worker node! queue_retrieve was used.")
1074+
panic!("CacheStore cannot be used on the worker node! queue_retrieve_by_path was used.")
10291075
}
10301076

10311077
async fn queue_ack_by_path(
10321078
&self,
10331079
_path: String,
10341080
_result: Option<String>,
10351081
) -> Result<(), CubeError> {
1036-
panic!("CacheStore cannot be used on the worker node! queue_ack was used.")
1082+
panic!("CacheStore cannot be used on the worker node! queue_ack_by_path was used.")
10371083
}
10381084

10391085
async fn queue_result_by_path(
10401086
&self,
10411087
_path: String,
10421088
) -> Result<Option<QueueResultResponse>, CubeError> {
1043-
panic!("CacheStore cannot be used on the worker node! queue_result was used.")
1089+
panic!("CacheStore cannot be used on the worker node! queue_result_by_path was used.")
10441090
}
10451091

10461092
async fn queue_result_blocking_by_path(
10471093
&self,
10481094
_path: String,
10491095
_timeout: u64,
10501096
) -> Result<Option<QueueResultResponse>, CubeError> {
1051-
panic!("CacheStore cannot be used on the worker node! queue_result_blocking was used.")
1097+
panic!(
1098+
"CacheStore cannot be used on the worker node! queue_result_blocking_by_path was used."
1099+
)
1100+
}
1101+
1102+
async fn queue_merge_extra_by_id(&self, _id: u64, _payload: String) -> Result<(), CubeError> {
1103+
panic!("CacheStore cannot be used on the worker node! queue_merge_extra_by_id was used.")
10521104
}
10531105

10541106
async fn queue_merge_extra_by_path(
10551107
&self,
10561108
_path: String,
10571109
_payload: String,
10581110
) -> Result<(), CubeError> {
1059-
panic!("CacheStore cannot be used on the worker node! queue_merge_extra was used.")
1111+
panic!("CacheStore cannot be used on the worker node! queue_merge_extra_by_path was used.")
10601112
}
10611113

10621114
async fn compaction(&self) -> Result<(), CubeError> {

rust/cubestore/cubestore/src/cachestore/lazy.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,10 @@ impl CacheStore for LazyRocksCacheStore {
248248
self.init().await?.queue_cancel_by_path(path).await
249249
}
250250

251+
async fn queue_heartbeat_by_id(&self, id: u64) -> Result<(), CubeError> {
252+
self.init().await?.queue_heartbeat_by_id(id).await
253+
}
254+
251255
async fn queue_heartbeat_by_path(&self, path: String) -> Result<(), CubeError> {
252256
self.init().await?.queue_heartbeat_by_path(path).await
253257
}
@@ -289,6 +293,13 @@ impl CacheStore for LazyRocksCacheStore {
289293
.await
290294
}
291295

296+
async fn queue_merge_extra_by_id(&self, id: u64, payload: String) -> Result<(), CubeError> {
297+
self.init()
298+
.await?
299+
.queue_merge_extra_by_id(id, payload)
300+
.await
301+
}
302+
292303
async fn queue_merge_extra_by_path(
293304
&self,
294305
path: String,

rust/cubestore/cubestore/src/queryplanner/test_utils.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -788,6 +788,10 @@ impl CacheStore for CacheStoreMock {
788788
panic!("CacheStore mock!")
789789
}
790790

791+
async fn queue_heartbeat_by_id(&self, _id: u64) -> Result<(), CubeError> {
792+
panic!("CacheStore mock!")
793+
}
794+
791795
async fn queue_heartbeat_by_path(&self, _path: String) -> Result<(), CubeError> {
792796
panic!("CacheStore mock!")
793797
}
@@ -823,6 +827,10 @@ impl CacheStore for CacheStoreMock {
823827
panic!("CacheStore mock!")
824828
}
825829

830+
async fn queue_merge_extra_by_id(&self, _id: u64, _payload: String) -> Result<(), CubeError> {
831+
panic!("CacheStore mock!")
832+
}
833+
826834
async fn queue_merge_extra_by_path(
827835
&self,
828836
_key: String,

rust/cubestore/cubestore/src/sql/cachestore.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::metastore::{Column, ColumnType};
33

44
use crate::queryplanner::{QueryPlan, QueryPlanner};
55
use crate::sql::parser::{
6-
CacheCommand, CacheStoreCommand, CubeStoreParser, QueueCommand,
6+
CacheCommand, CacheStoreCommand, CubeStoreParser, QueueCommand, QueueKey,
77
Statement as CubeStoreStatement, SystemCommand,
88
};
99
use crate::sql::{QueryPlans, SqlQueryContext, SqlService};
@@ -207,14 +207,24 @@ impl CacheStoreSqlService {
207207
(Arc::new(DataFrame::new(columns, rows)), true)
208208
}
209209
QueueCommand::Heartbeat { key } => {
210-
self.cachestore.queue_heartbeat_by_path(key.value).await?;
210+
match key {
211+
QueueKey::ById(id) => self.cachestore.queue_heartbeat_by_id(id).await?,
212+
QueueKey::ByPath(path) => self.cachestore.queue_heartbeat_by_path(path).await?,
213+
}
211214

212215
(Arc::new(DataFrame::new(vec![], vec![])), true)
213216
}
214217
QueueCommand::MergeExtra { key, payload } => {
215-
self.cachestore
216-
.queue_merge_extra_by_path(key.value, payload)
217-
.await?;
218+
match key {
219+
QueueKey::ById(id) => {
220+
self.cachestore.queue_merge_extra_by_id(id, payload).await?
221+
}
222+
QueueKey::ByPath(path) => {
223+
self.cachestore
224+
.queue_merge_extra_by_path(path, payload)
225+
.await?
226+
}
227+
}
218228

219229
(Arc::new(DataFrame::new(vec![], vec![])), true)
220230
}

0 commit comments

Comments
 (0)