Skip to content

Commit d6702ab

Browse files
authored
feat(cubestore): Queue - support custom orphaned timeout (#6090)
1 parent c8cf300 commit d6702ab

File tree

10 files changed

+368
-59
lines changed

10 files changed

+368
-59
lines changed

packages/cubejs-base-driver/src/queue-driver.interface.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ export interface AddToQueueQuery {
1717

1818
export interface AddToQueueOptions {
1919
stageQueryKey: string,
20-
requestId: string
20+
requestId: string,
21+
orphanedTimeout?: number,
2122
}
2223

2324
export interface QueueDriverOptions {

packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,18 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
5959
addedToQueueTime: new Date().getTime()
6060
};
6161

62-
const rows = await this.driver.query('QUEUE ADD PRIORITY ? ? ?', [
62+
const values: (string | number)[] = [
6363
priority,
64-
this.prefixKey(this.redisHash(queryKey)),
65-
JSON.stringify(data)
66-
]);
64+
];
65+
66+
if (options.orphanedTimeout) {
67+
values.push(options.orphanedTimeout);
68+
}
69+
70+
values.push(this.prefixKey(this.redisHash(queryKey)));
71+
values.push(JSON.stringify(data));
72+
73+
const rows = await this.driver.query(`QUEUE ADD PRIORITY ?${options.orphanedTimeout ? ' ORPHANED ?' : ''} ? ?`, values);
6774
if (rows && rows.length) {
6875
return [
6976
rows[0].added === 'true' ? 1 : 0,

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,13 @@ export class QueryQueue {
241241
}
242242
const time = new Date().getTime();
243243
const keyScore = time + (10000 - priority) * 1E14;
244+
245+
options.orphanedTimeout = query.orphanedTimeout;
244246
const orphanedTimeout = 'orphanedTimeout' in query
245247
? query.orphanedTimeout
246248
: this.orphanedTimeout;
247249
const orphanedTime = time + (orphanedTimeout * 1000);
250+
248251
const [added, _b, _c, queueSize, addedToQueueTime] = await queueConnection.addToQueue(
249252
keyScore, queryKey, orphanedTime, queryHandler, query, priority, options
250253
);
@@ -341,8 +344,10 @@ export class QueryQueue {
341344
const time = new Date().getTime();
342345
const keyScore = time + (10000 - priority) * 1E14;
343346

347+
options.orphanedTimeout = query.orphanedTimeout;
344348
const orphanedTimeout = 'orphanedTimeout' in query ? query.orphanedTimeout : this.orphanedTimeout;
345349
const orphanedTime = time + (orphanedTimeout * 1000);
350+
346351
const [added, _b, _c, queueSize, addedToQueueTime] = await queueConnection.addToQueue(
347352
keyScore, queryKey, orphanedTime, queryHandler, query, priority, options
348353
);

packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,16 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
191191
expect(results.map(r => parseInt(r[0], 10) - parseInt(results[0][0], 10))).toEqual([0, 1, 2]);
192192
});
193193

194+
test('sequence', async () => {
195+
const p1 = queue.executeInQueue('delay', '111', { delay: 50, result: '1' }, 0);
196+
const p2 = delayFn(null, 50).then(() => queue.executeInQueue('delay', '112', { delay: 50, result: '2' }, 0));
197+
const p3 = delayFn(null, 75).then(() => queue.executeInQueue('delay', '113', { delay: 50, result: '3' }, 0));
198+
const p4 = delayFn(null, 100).then(() => queue.executeInQueue('delay', '114', { delay: 50, result: '4' }, 0));
199+
200+
const result = await Promise.all([p1, p2, p3, p4]);
201+
expect(result).toEqual(['10', '21', '32', '43']);
202+
});
203+
194204
test('orphaned', async () => {
195205
for (let i = 1; i <= 4; i++) {
196206
await queue.executeInQueue('delay', `11${i}`, { delay: 50, result: `${i}` }, 0);
@@ -200,8 +210,8 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
200210

201211
let result = queue.executeInQueue('delay', '111', { delay: 800, result: '1' }, 0);
202212
delayFn(null, 50).then(() => queue.executeInQueue('delay', '112', { delay: 800, result: '2' }, 0)).catch(e => e);
203-
delayFn(null, 60).then(() => queue.executeInQueue('delay', '113', { delay: 500, result: '3' }, 0)).catch(e => e);
204-
delayFn(null, 70).then(() => queue.executeInQueue('delay', '114', { delay: 900, result: '4' }, 0)).catch(e => e);
213+
delayFn(null, 75).then(() => queue.executeInQueue('delay', '113', { delay: 500, result: '3' }, 0)).catch(e => e);
214+
delayFn(null, 100).then(() => queue.executeInQueue('delay', '114', { delay: 900, result: '4' }, 0)).catch(e => e);
205215

206216
expect(await result).toBe('10');
207217
await queue.executeInQueue('delay', '112', { delay: 800, result: '2' }, 0);
@@ -212,6 +222,45 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
212222
await queue.executeInQueue('delay', '114', { delay: 50, result: '4' }, 0);
213223
});
214224

225+
test('orphaned with custom ttl', async () => {
226+
const connection = await queue.queueDriver.createConnection();
227+
228+
try {
229+
const priority = 10;
230+
const time = new Date().getTime();
231+
const keyScore = time + (10000 - priority) * 1E14;
232+
233+
expect(await connection.getOrphanedQueries()).toEqual([]);
234+
235+
let orphanedTimeout = 2;
236+
await connection.addToQueue(keyScore, ['1', []], time + (orphanedTimeout * 1000), 'delay', { isJob: true, orphanedTimeout: time, }, priority, {
237+
stageQueryKey: '1',
238+
requestId: '1',
239+
orphanedTimeout,
240+
});
241+
242+
expect(await connection.getOrphanedQueries()).toEqual([]);
243+
244+
orphanedTimeout = 60;
245+
await connection.addToQueue(keyScore, ['2', []], time + (orphanedTimeout * 1000), 'delay', { isJob: true, orphanedTimeout: time, }, priority, {
246+
stageQueryKey: '2',
247+
requestId: '2',
248+
orphanedTimeout,
249+
});
250+
251+
await pausePromise(2000);
252+
253+
expect(await connection.getOrphanedQueries()).toEqual([
254+
connection.redisHash(['1', []])
255+
]);
256+
} finally {
257+
await connection.getQueryAndRemove(connection.redisHash(['1', []]));
258+
await connection.getQueryAndRemove(connection.redisHash(['2', []]));
259+
260+
queue.queueDriver.release(connection);
261+
}
262+
});
263+
215264
test('queue hash process persistent flag properly', () => {
216265
const query: QueryKey = ['select * from table', []];
217266
const key1 = queue.redisHash(query);

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
236236
"queue_multiple_result_blocking",
237237
queue_multiple_result_blocking,
238238
),
239+
t("queue_custom_orphaned", queue_custom_orphaned),
239240
];
240241

241242
fn t<F>(name: &'static str, f: fn(Box<dyn SqlClient>) -> F) -> (&'static str, TestFn)
@@ -6884,6 +6885,34 @@ async fn queue_multiple_result_blocking(service: Box<dyn SqlClient>) {
68846885
}
68856886
}
68866887

6888+
async fn queue_custom_orphaned(service: Box<dyn SqlClient>) {
6889+
service
6890+
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#)
6891+
.await
6892+
.unwrap();
6893+
6894+
service
6895+
.exec_query(r#"QUEUE ADD PRIORITY 1 ORPHANED 60 "STANDALONE#queue:2" "payload1";"#)
6896+
.await
6897+
.unwrap();
6898+
6899+
tokio::time::sleep(Duration::new(1, 0)).await;
6900+
6901+
let res = service
6902+
.exec_query(r#"QUEUE TO_CANCEL 100 100 "STANDALONE#queue""#)
6903+
.await
6904+
.unwrap();
6905+
assert_eq!(
6906+
res.get_columns(),
6907+
&vec![Column::new("id".to_string(), ColumnType::String, 0),]
6908+
);
6909+
6910+
assert_eq!(
6911+
res.get_rows(),
6912+
&vec![Row::new(vec![TableValue::String("1".to_string()),]),]
6913+
);
6914+
}
6915+
68876916
pub fn to_rows(d: &DataFrame) -> Vec<Vec<TableValue>> {
68886917
return d
68896918
.get_rows()

0 commit comments

Comments
 (0)