Skip to content

Commit e8814bf

Browse files
committed
feat(cubestore): Queue - return success marker on ack (set result)
1 parent f97fbdc commit e8814bf

File tree

5 files changed

+56
-14
lines changed

5 files changed

+56
-14
lines changed

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8274,10 +8274,14 @@ async fn queue_full_workflow_v1(service: Box<dyn SqlClient>) {
82748274
let ack = async move {
82758275
tokio::time::sleep(Duration::from_millis(1000)).await;
82768276

8277-
service_to_move
8277+
let ack_result = service_to_move
82788278
.exec_query(r#"QUEUE ACK "STANDALONE#queue:3" "result:3""#)
82798279
.await
8280-
.unwrap()
8280+
.unwrap();
8281+
assert_eq!(
8282+
ack_result.get_rows(),
8283+
&vec![Row::new(vec![TableValue::Boolean(true)])]
8284+
)
82818285
};
82828286

82838287
let (blocking_res, _ack_res) = join!(blocking, ack);
@@ -8439,10 +8443,38 @@ async fn queue_ack_then_result(service: Box<dyn SqlClient>) {
84398443
.await
84408444
.unwrap();
84418445

8442-
service
8446+
let ack_result = service
84438447
.exec_query(r#"QUEUE ACK "STANDALONE#queue:5555" "result:5555""#)
84448448
.await
84458449
.unwrap();
8450+
assert_eq!(
8451+
ack_result.get_rows(),
8452+
&vec![Row::new(vec![TableValue::Boolean(true)])]
8453+
);
8454+
8455+
// double ack for result
8456+
{
8457+
let ack_result = service
8458+
.exec_query(r#"QUEUE ACK "STANDALONE#queue:5555" "result:5555""#)
8459+
.await
8460+
.unwrap();
8461+
assert_eq!(
8462+
ack_result.get_rows(),
8463+
&vec![Row::new(vec![TableValue::Boolean(false)])]
8464+
);
8465+
}
8466+
8467+
// ack on unknown queue item
8468+
{
8469+
let ack_result = service
8470+
.exec_query(r#"QUEUE ACK "STANDALONE#queue:123456" "result:5555""#)
8471+
.await
8472+
.unwrap();
8473+
assert_eq!(
8474+
ack_result.get_rows(),
8475+
&vec![Row::new(vec![TableValue::Boolean(false)])]
8476+
);
8477+
}
84468478

84478479
let result = service
84488480
.exec_query(r#"QUEUE RESULT "STANDALONE#queue:5555""#)
@@ -8737,10 +8769,14 @@ async fn queue_multiple_result_blocking(service: Box<dyn SqlClient>) {
87378769
let ack = async move {
87388770
tokio::time::sleep(Duration::from_millis(1000)).await;
87398771

8740-
service_to_move
8772+
let ack_result = service_to_move
87418773
.exec_query(r#"QUEUE ACK "STANDALONE#queue:12345" "result:12345""#)
87428774
.await
8743-
.unwrap()
8775+
.unwrap();
8776+
assert_eq!(
8777+
ack_result.get_rows(),
8778+
&vec![Row::new(vec![TableValue::Boolean(true)])]
8779+
)
87448780
};
87458781

87468782
let (blocking1_res, blocking2_res, _ack_res) = join!(blocking1, blocking2, ack);

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ pub trait CacheStore: DIService + Send + Sync {
405405
&self,
406406
path: String,
407407
result: Option<String>,
408-
) -> Result<(), CubeError>;
408+
) -> Result<bool, CubeError>;
409409
async fn queue_result_by_path(
410410
&self,
411411
path: String,
@@ -811,7 +811,7 @@ impl CacheStore for RocksCacheStore {
811811
&self,
812812
path: String,
813813
result: Option<String>,
814-
) -> Result<(), CubeError> {
814+
) -> Result<bool, CubeError> {
815815
self.store
816816
.write_operation(move |db_ref, batch_pipe| {
817817
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
@@ -841,11 +841,11 @@ impl CacheStore for RocksCacheStore {
841841
}));
842842
}
843843

844-
Ok(())
844+
Ok(true)
845845
} else {
846846
warn!("Unable to ack queue, unknown path: {}", path);
847847

848-
Ok(())
848+
Ok(false)
849849
}
850850
})
851851
.await
@@ -1078,7 +1078,7 @@ impl CacheStore for ClusterCacheStoreClient {
10781078
&self,
10791079
_path: String,
10801080
_result: Option<String>,
1081-
) -> Result<(), CubeError> {
1081+
) -> Result<bool, CubeError> {
10821082
panic!("CacheStore cannot be used on the worker node! queue_ack_by_path was used.")
10831083
}
10841084

rust/cubestore/cubestore/src/cachestore/lazy.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ impl CacheStore for LazyRocksCacheStore {
271271
&self,
272272
path: String,
273273
result: Option<String>,
274-
) -> Result<(), CubeError> {
274+
) -> Result<bool, CubeError> {
275275
self.init().await?.queue_ack_by_path(path, result).await
276276
}
277277

rust/cubestore/cubestore/src/queryplanner/test_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -808,7 +808,7 @@ impl CacheStore for CacheStoreMock {
808808
&self,
809809
_path: String,
810810
_result: Option<String>,
811-
) -> Result<(), CubeError> {
811+
) -> Result<bool, CubeError> {
812812
panic!("CacheStore mock!")
813813
}
814814

rust/cubestore/cubestore/src/sql/cachestore.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,9 +229,15 @@ impl CacheStoreSqlService {
229229
(Arc::new(DataFrame::new(vec![], vec![])), true)
230230
}
231231
QueueCommand::Ack { key, result } => {
232-
self.cachestore.queue_ack_by_path(key.value, result).await?;
232+
let success = self.cachestore.queue_ack_by_path(key.value, result).await?;
233233

234-
(Arc::new(DataFrame::new(vec![], vec![])), true)
234+
(
235+
Arc::new(DataFrame::new(
236+
vec![Column::new("success".to_string(), ColumnType::Boolean, 0)],
237+
vec![Row::new(vec![TableValue::Boolean(success)])],
238+
)),
239+
true,
240+
)
235241
}
236242
QueueCommand::Get { key } => {
237243
let result = self.cachestore.queue_get_by_path(key.value).await?;

0 commit comments

Comments
 (0)