Skip to content

Commit d8be78a

Browse files
authored
feat(cubestore): Support multiple parallel QUEUE RESULT_BLOCKING (#6038)
1 parent 4a965ba commit d8be78a

File tree

4 files changed

+130
-18
lines changed

4 files changed

+130
-18
lines changed

packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
6969
expect(result).toBe('select * from bar');
7070
});
7171

72-
const nonCubestoreTest = options.cacheAndQueueDriver !== 'cubestore' ? test : xtest;
73-
74-
nonCubestoreTest('instant double wait resolve', async () => {
72+
test('instant double wait resolve', async () => {
7573
const results = await Promise.all([
7674
queue.executeInQueue('delay', 'instant', { delay: 400, result: '2' }),
7775
queue.executeInQueue('delay', 'instant', { delay: 400, result: '2' })
@@ -149,7 +147,10 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
149147
expect(results.map(r => parseInt(r[0], 10) - parseInt(results[0][0], 10))).toEqual([0, 1, 2]);
150148
});
151149

152-
nonCubestoreTest('orphaned', async () => {
150+
const nonCubeStoreTest = options.cacheAndQueueDriver !== 'cubestore' ? test : xtest;
151+
152+
// TODO: CubeStore queue support
153+
nonCubeStoreTest('orphaned', async () => {
153154
for (let i = 1; i <= 4; i++) {
154155
await queue.executeInQueue('delay', `11${i}`, { delay: 50, result: `${i}` }, 0);
155156
}
@@ -170,7 +171,8 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
170171
await queue.executeInQueue('delay', '114', { delay: 50, result: '4' }, 0);
171172
});
172173

173-
nonCubestoreTest('queue hash process persistent flag properly', () => {
174+
// TODO: CubeStore queue support
175+
nonCubeStoreTest('queue hash process persistent flag properly', () => {
174176
const query = ['select * from table'];
175177
const key1 = queue.redisHash(query);
176178
// @ts-ignore
@@ -197,7 +199,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
197199
expect(result).toBe('select * from bar');
198200
});
199201

200-
nonCubestoreTest('queue driver lock obtain race condition', async () => {
202+
nonCubeStoreTest('queue driver lock obtain race condition', async () => {
201203
const redisClient: any = await queue.queueDriver.createConnection();
202204
const redisClient2: any = await queue.queueDriver.createConnection();
203205
const priority = 10;
@@ -252,7 +254,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
252254
await queue.queueDriver.release(redisClient2);
253255
});
254256

255-
nonCubestoreTest('activated but lock is not acquired', async () => {
257+
nonCubeStoreTest('activated but lock is not acquired', async () => {
256258
const redisClient = await queue.queueDriver.createConnection();
257259
const redisClient2 = await queue.queueDriver.createConnection();
258260
const priority = 10;

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,11 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
228228
t("cache_set_nx", cache_set_nx),
229229
t("cache_prefix_keys", cache_prefix_keys),
230230
t("queue_full_workflow", queue_full_workflow),
231+
t("queue_ack_then_result", queue_ack_then_result),
232+
t(
233+
"queue_multiple_result_blocking",
234+
queue_multiple_result_blocking,
235+
),
231236
];
232237

233238
fn t<F>(name: &'static str, f: fn(Box<dyn SqlClient>) -> F) -> (&'static str, TestFn)
@@ -6622,6 +6627,114 @@ async fn queue_full_workflow(service: Box<dyn SqlClient>) {
66226627
}
66236628
}
66246629

6630+
async fn queue_ack_then_result(service: Box<dyn SqlClient>) {
6631+
service
6632+
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:5555" "payload1";"#)
6633+
.await
6634+
.unwrap();
6635+
6636+
service
6637+
.exec_query(r#"QUEUE ACK "STANDALONE#queue:5555" "result:5555""#)
6638+
.await
6639+
.unwrap();
6640+
6641+
let result = service
6642+
.exec_query(r#"QUEUE RESULT "STANDALONE#queue:5555""#)
6643+
.await
6644+
.unwrap();
6645+
6646+
assert_eq!(
6647+
result.get_columns(),
6648+
&vec![
6649+
Column::new("payload".to_string(), ColumnType::String, 0),
6650+
Column::new("type".to_string(), ColumnType::String, 1),
6651+
]
6652+
);
6653+
assert_eq!(
6654+
result.get_rows(),
6655+
&vec![Row::new(vec![
6656+
TableValue::String("result:5555".to_string()),
6657+
TableValue::String("success".to_string())
6658+
]),]
6659+
);
6660+
6661+
// second call should not return anything, because first call should remove result
6662+
let result = service
6663+
.exec_query(r#"QUEUE RESULT "STANDALONE#queue:5555""#)
6664+
.await
6665+
.unwrap();
6666+
6667+
assert_eq!(result.get_rows().len(), 0);
6668+
}
6669+
6670+
async fn queue_multiple_result_blocking(service: Box<dyn SqlClient>) {
6671+
service
6672+
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:12345" "payload1";"#)
6673+
.await
6674+
.unwrap();
6675+
6676+
let service = Arc::new(service);
6677+
6678+
{
6679+
let service_to_move = service.clone();
6680+
let blocking1 = async move {
6681+
service_to_move
6682+
.exec_query(r#"QUEUE RESULT_BLOCKING 5000 "STANDALONE#queue:12345""#)
6683+
.await
6684+
.unwrap()
6685+
};
6686+
6687+
let service_to_move = service.clone();
6688+
let blocking2 = async move {
6689+
service_to_move
6690+
.exec_query(r#"QUEUE RESULT_BLOCKING 5000 "STANDALONE#queue:12345""#)
6691+
.await
6692+
.unwrap()
6693+
};
6694+
6695+
let service_to_move = service.clone();
6696+
let ack = async move {
6697+
tokio::time::sleep(Duration::from_millis(1000)).await;
6698+
6699+
service_to_move
6700+
.exec_query(r#"QUEUE ACK "STANDALONE#queue:12345" "result:12345""#)
6701+
.await
6702+
.unwrap()
6703+
};
6704+
6705+
let (blocking1_res, blocking2_res, _ack_res) = join!(blocking1, blocking2, ack);
6706+
assert_eq!(
6707+
blocking1_res.get_columns(),
6708+
&vec![
6709+
Column::new("payload".to_string(), ColumnType::String, 0),
6710+
Column::new("type".to_string(), ColumnType::String, 1),
6711+
]
6712+
);
6713+
assert_eq!(
6714+
blocking1_res.get_rows(),
6715+
&vec![Row::new(vec![
6716+
TableValue::String("result:12345".to_string()),
6717+
TableValue::String("success".to_string())
6718+
]),]
6719+
);
6720+
6721+
assert_eq!(
6722+
blocking2_res.get_columns(),
6723+
&vec![
6724+
Column::new("payload".to_string(), ColumnType::String, 0),
6725+
Column::new("type".to_string(), ColumnType::String, 1),
6726+
]
6727+
);
6728+
assert_eq!(
6729+
blocking2_res.get_rows(),
6730+
&vec![Row::new(vec![
6731+
TableValue::String("result:12345".to_string()),
6732+
TableValue::String("success".to_string())
6733+
]),]
6734+
);
6735+
}
6736+
}
6737+
66256738
pub fn to_rows(d: &DataFrame) -> Vec<Vec<TableValue>> {
66266739
return d
66276740
.get_rows()

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -702,12 +702,13 @@ impl CacheStore for RocksCacheStore {
702702
if let Some(item_row) = item_row {
703703
queue_schema.delete(item_row.get_id(), batch_pipe)?;
704704

705-
let queue_result = QueueResult::new(path.clone(), result);
705+
let queue_result = QueueResult::new(path.clone(), result.clone());
706706
let result_row = result_schema.insert(queue_result, batch_pipe)?;
707707

708708
batch_pipe.add_event(MetaStoreEvent::AckQueueItem(QueueResultAckEvent {
709709
row_id: result_row.get_id(),
710710
path,
711+
result,
711712
}));
712713

713714
Ok(())
@@ -747,16 +748,11 @@ impl CacheStore for RocksCacheStore {
747748
self.store
748749
.write_operation(move |db_ref, batch_pipe| {
749750
let queue_schema = QueueResultRocksTable::new(db_ref.clone());
750-
let queue_result =
751-
queue_schema.try_delete(ack_event.row_id, batch_pipe)?;
752-
753-
if let Some(queue_result) = queue_result {
754-
Ok(Some(QueueResultResponse::Success {
755-
value: queue_result.row.value,
756-
}))
757-
} else {
758-
Ok(None)
759-
}
751+
queue_schema.try_delete(ack_event.row_id, batch_pipe)?;
752+
753+
Ok(Some(QueueResultResponse::Success {
754+
value: ack_event.result,
755+
}))
760756
})
761757
.await
762758
}

rust/cubestore/cubestore/src/cachestore/queue_item.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ fn merge(a: serde_json::Value, b: serde_json::Value) -> Option<serde_json::Value
2929
pub struct QueueResultAckEvent {
3030
pub path: String,
3131
pub row_id: u64,
32+
pub result: String,
3233
}
3334

3435
#[repr(u8)]

0 commit comments

Comments
 (0)