Skip to content

Commit f0c1156

Browse files
authored
feat(cubestore): Queue - allow to GET by processingId (#6608)
1 parent 3c54190 commit f0c1156

File tree

6 files changed

+15
-34
lines changed

6 files changed

+15
-34
lines changed

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8669,10 +8669,7 @@ async fn queue_full_workflow_v2(service: Box<dyn SqlClient>) {
86698669

86708670
// get
86718671
{
8672-
let get_response = service
8673-
.exec_query(r#"QUEUE GET "STANDALONE#queue:2""#)
8674-
.await
8675-
.unwrap();
8672+
let get_response = service.exec_query(r#"QUEUE GET 2"#).await.unwrap();
86768673
assert_eq!(
86778674
get_response.get_rows(),
86788675
&vec![Row::new(vec![
@@ -8694,10 +8691,7 @@ async fn queue_full_workflow_v2(service: Box<dyn SqlClient>) {
86948691
);
86958692

86968693
// assertion that job was removed
8697-
let get_response = service
8698-
.exec_query(r#"QUEUE GET "STANDALONE#queue:2""#)
8699-
.await
8700-
.unwrap();
8694+
let get_response = service.exec_query(r#"QUEUE GET 2"#).await.unwrap();
87018695
assert_eq!(get_response.get_rows().len(), 0);
87028696
}
87038697
}
@@ -9141,10 +9135,7 @@ async fn queue_merge_extra_by_id(service: Box<dyn SqlClient>) {
91419135

91429136
// extra must be empty after creation
91439137
{
9144-
let res = service
9145-
.exec_query(r#"QUEUE GET "STANDALONE#queue:1";"#)
9146-
.await
9147-
.unwrap();
9138+
let res = service.exec_query(r#"QUEUE GET 1;"#).await.unwrap();
91489139
assert_eq!(
91499140
res.get_columns(),
91509141
&vec![
@@ -9168,10 +9159,7 @@ async fn queue_merge_extra_by_id(service: Box<dyn SqlClient>) {
91689159

91699160
// extra should contains first field
91709161
{
9171-
let res = service
9172-
.exec_query(r#"QUEUE GET "STANDALONE#queue:1";"#)
9173-
.await
9174-
.unwrap();
9162+
let res = service.exec_query(r#"QUEUE GET 1;"#).await.unwrap();
91759163
assert_eq!(
91769164
res.get_columns(),
91779165
&vec![

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ pub trait CacheStore: DIService + Send + Sync {
463463
priority_sort: bool,
464464
) -> Result<Vec<IdRow<QueueItem>>, CubeError>;
465465
// API with Path
466-
async fn queue_get_by_path(&self, key: String) -> Result<Option<IdRow<QueueItem>>, CubeError>;
466+
async fn queue_get(&self, key: QueueKey) -> Result<Option<IdRow<QueueItem>>, CubeError>;
467467
async fn queue_cancel(&self, key: QueueKey) -> Result<Option<IdRow<QueueItem>>, CubeError>;
468468
async fn queue_heartbeat(&self, key: QueueKey) -> Result<(), CubeError>;
469469
async fn queue_retrieve_by_path(
@@ -749,12 +749,11 @@ impl CacheStore for RocksCacheStore {
749749
.await
750750
}
751751

752-
async fn queue_get_by_path(&self, path: String) -> Result<Option<IdRow<QueueItem>>, CubeError> {
752+
async fn queue_get(&self, key: QueueKey) -> Result<Option<IdRow<QueueItem>>, CubeError> {
753753
self.store
754754
.read_operation(move |db_ref| {
755755
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
756-
let index_key = QueueItemIndexKey::ByPath(path);
757-
queue_schema.get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)
756+
queue_schema.get_row_by_key(key)
758757
})
759758
.await
760759
}
@@ -1079,11 +1078,8 @@ impl CacheStore for ClusterCacheStoreClient {
10791078
panic!("CacheStore cannot be used on the worker node! queue_list was used.")
10801079
}
10811080

1082-
async fn queue_get_by_path(
1083-
&self,
1084-
_path: String,
1085-
) -> Result<Option<IdRow<QueueItem>>, CubeError> {
1086-
panic!("CacheStore cannot be used on the worker node! queue_get_by_path was used.")
1081+
async fn queue_get(&self, _key: QueueKey) -> Result<Option<IdRow<QueueItem>>, CubeError> {
1082+
panic!("CacheStore cannot be used on the worker node! queue_get was used.")
10871083
}
10881084

10891085
async fn queue_cancel(&self, _key: QueueKey) -> Result<Option<IdRow<QueueItem>>, CubeError> {

rust/cubestore/cubestore/src/cachestore/lazy.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,8 @@ impl CacheStore for LazyRocksCacheStore {
246246
.await
247247
}
248248

249-
async fn queue_get_by_path(&self, path: String) -> Result<Option<IdRow<QueueItem>>, CubeError> {
250-
self.init().await?.queue_get_by_path(path).await
249+
async fn queue_get(&self, key: QueueKey) -> Result<Option<IdRow<QueueItem>>, CubeError> {
250+
self.init().await?.queue_get(key).await
251251
}
252252

253253
async fn queue_cancel(&self, key: QueueKey) -> Result<Option<IdRow<QueueItem>>, CubeError> {

rust/cubestore/cubestore/src/queryplanner/test_utils.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -784,10 +784,7 @@ impl CacheStore for CacheStoreMock {
784784
panic!("CacheStore mock!")
785785
}
786786

787-
async fn queue_get_by_path(
788-
&self,
789-
_path: String,
790-
) -> Result<Option<IdRow<QueueItem>>, CubeError> {
787+
async fn queue_get(&self, _key: QueueKey) -> Result<Option<IdRow<QueueItem>>, CubeError> {
791788
panic!("CacheStore mock!")
792789
}
793790

rust/cubestore/cubestore/src/sql/cachestore.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ impl CacheStoreSqlService {
228228
)
229229
}
230230
QueueCommand::Get { key } => {
231-
let result = self.cachestore.queue_get_by_path(key.value).await?;
231+
let result = self.cachestore.queue_get(key).await?;
232232
let rows = if let Some(result) = result {
233233
vec![result.into_row().into_queue_get_row()]
234234
} else {

rust/cubestore/cubestore/src/sql/parser.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ pub enum QueueCommand {
100100
value: String,
101101
},
102102
Get {
103-
key: Ident,
103+
key: QueueKey,
104104
},
105105
ToCancel {
106106
prefix: Ident,
@@ -467,7 +467,7 @@ impl<'a> CubeStoreParser<'a> {
467467
payload: self.parser.parse_literal_string()?,
468468
},
469469
"get" => QueueCommand::Get {
470-
key: self.parser.parse_identifier()?,
470+
key: self.parse_queue_key()?,
471471
},
472472
"stalled" => {
473473
let heartbeat_timeout = Some(self.parse_integer("heartbeat timeout", false)?);

0 commit comments

Comments
 (0)