Skip to content

Commit d503598

Browse files
authored
feat(cubestore-driver): Queue - protect race conditions (#6598)
1 parent b582bcf commit d503598

File tree

6 files changed

+88
-142
lines changed

6 files changed

+88
-142
lines changed

packages/cubejs-base-driver/src/queue-driver.interface.ts

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
export type QueryDef = unknown;
2+
// Primary key of Queue item
3+
export type QueueId = string | number | bigint;
4+
// This was used as lock for Redis, deprecated.
5+
export type ProcessingId = string | number;
26
export type QueryKey = (string | [string, any[]]) & {
37
persistent?: true,
48
};
@@ -7,19 +11,21 @@ export interface QueryKeyHash extends String {
711
}
812

913
export type GetActiveAndToProcessResponse = [active: string[], toProcess: string[]];
10-
export type AddToQueueResponse = [added: number, _b: any, _c: any, queueSize: number, addedToQueueTime: number];
14+
export type AddToQueueResponse = [added: number, queueId: QueueId | null, queueSize: number, addedToQueueTime: number];
1115
export type QueryStageStateResponse = [active: string[], toProcess: string[]] | [active: string[], toProcess: string[], defs: Record<string, QueryDef>];
1216
export type RetrieveForProcessingSuccess = [
13-
added: any /** todo(ovr): Remove, useless */,
14-
removed: any /** todo(ovr): Remove, useless */,
17+
added: unknown,
18+
// QueueId is required for Cube Store, other providers doesn't support it
19+
queueId: QueueId | null,
1520
active: QueryKeyHash[],
1621
pending: number,
1722
def: QueryDef,
1823
lockAquired: true
1924
];
2025
export type RetrieveForProcessingFail = [
21-
added: any /** todo(ovr): Remove, useless */,
22-
removed: any /** todo(ovr): Remove, useless */,
26+
added: unknown,
27+
// QueueId is required for Cube Store, other providers doesn't support it
28+
queueId: QueueId | null,
2329
active: QueryKeyHash[],
2430
pending: number,
2531
def: null,
@@ -48,11 +54,9 @@ export interface QueueDriverOptions {
4854
processUid?: string;
4955
}
5056

51-
export type ProcessingId = string | number;
52-
5357
export interface QueueDriverConnectionInterface {
5458
redisHash(queryKey: QueryKey): QueryKeyHash;
55-
getResultBlocking(queryKey: QueryKey): Promise<unknown>;
59+
getResultBlocking(queryKey: QueryKeyHash, queueId: QueueId): Promise<unknown>;
5660
getResult(queryKey: QueryKey): Promise<any>;
5761
/**
5862
* Adds specified by the queryKey query to the queue, returns tuple
@@ -82,10 +86,10 @@ export interface QueueDriverConnectionInterface {
8286
// multiple nodes tries to process the same query
8387
retrieveForProcessing(hash: QueryKeyHash, processingId: ProcessingId): Promise<RetrieveForProcessingResponse>;
8488
freeProcessingLock(hash: QueryKeyHash, processingId: ProcessingId, activated: unknown): Promise<void>;
85-
optimisticQueryUpdate(hash: QueryKeyHash, toUpdate: unknown, processingId: ProcessingId): Promise<boolean>;
89+
optimisticQueryUpdate(hash: QueryKeyHash, toUpdate: unknown, processingId: ProcessingId, queueId: QueueId | null): Promise<boolean>;
8690
cancelQuery(queryKey: QueryKey): Promise<QueryDef | null>;
8791
getQueryAndRemove(hash: QueryKeyHash): Promise<[QueryDef]>;
88-
setResultAndRemoveQuery(hash: QueryKeyHash, executionResult: any, processingId: ProcessingId): Promise<unknown>;
92+
setResultAndRemoveQuery(hash: QueryKeyHash, executionResult: any, processingId: ProcessingId, queueId: QueueId | null): Promise<unknown>;
8993
release(): void;
9094
//
9195
getQueriesToCancel(): Promise<string[]>

packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
7676

7777
public async query<R = any>(query: string, values: any[], options?: QueryOptions): Promise<R[]> {
7878
const { inlineTables, ...queryTracingObj } = options ?? {};
79-
return this.connection.query(formatSql(query, values || []), inlineTables ?? [], { ...queryTracingObj, instance: getEnv('instanceId') });
79+
const sql = formatSql(query, values || []);
80+
return this.connection.query(sql, inlineTables ?? [], { ...queryTracingObj, instance: getEnv('instanceId') });
8081
}
8182

8283
public async release() {

packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
QueryKey,
1313
QueryKeyHash,
1414
ProcessingId,
15+
QueueId,
1516
} from '@cubejs-backend/base-driver';
1617
import { getProcessUid } from '@cubejs-backend/shared';
1718

@@ -76,8 +77,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
7677
if (rows && rows.length) {
7778
return [
7879
rows[0].added === 'true' ? 1 : 0,
79-
null,
80-
null,
80+
rows[0].id ? parseInt(rows[0].id, 10) : null,
8181
parseInt(rows[0].pending, 10),
8282
data.addedToQueueTime
8383
];
@@ -243,9 +243,10 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
243243
return null;
244244
}
245245

246-
public async optimisticQueryUpdate(queryKey: any, toUpdate: any, _processingId: any): Promise<boolean> {
246+
public async optimisticQueryUpdate(queryKeyHash: QueryKeyHash, toUpdate: unknown, _processingId: ProcessingId, queueId: QueueId): Promise<boolean> {
247247
await this.driver.query('QUEUE MERGE_EXTRA ? ?', [
248-
this.prefixKey(queryKey),
248+
// queryKeyHash as compatibility fallback
249+
queueId || this.prefixKey(queryKeyHash),
249250
JSON.stringify(toUpdate)
250251
]);
251252

@@ -257,7 +258,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
257258
}
258259

259260
public async retrieveForProcessing(queryKeyHashed: QueryKeyHash, _processingId: string): Promise<RetrieveForProcessingResponse> {
260-
const rows = await this.driver.query<{ active: string | null, pending: string, payload: string, extra: string | null }>('QUEUE RETRIEVE EXTENDED CONCURRENCY ? ?', [
261+
const rows = await this.driver.query<{ id: string /* cube store convert int64 to string */, active: string | null, pending: string, payload: string, extra: string | null }>('QUEUE RETRIEVE EXTENDED CONCURRENCY ? ?', [
261262
this.options.concurrency,
262263
this.prefixKey(queryKeyHashed),
263264
]);
@@ -269,7 +270,12 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
269270
const def = this.decodeQueryDefFromRow(rows[0], 'retrieveForProcessing');
270271

271272
return [
272-
1, null, active, pending, def, true
273+
1,
274+
rows[0].id ? parseInt(rows[0].id, 10) : null,
275+
active,
276+
pending,
277+
def,
278+
true
273279
];
274280
} else {
275281
return [
@@ -281,14 +287,11 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
281287
return null;
282288
}
283289

284-
public async getResultBlocking(queryKey: string): Promise<QueryDef | null> {
285-
return this.getResultBlockingByHash(this.redisHash(queryKey));
286-
}
287-
288-
public async getResultBlockingByHash(queryKeyHash: QueryKeyHash): Promise<QueryDef | null> {
290+
public async getResultBlocking(queryKeyHash: QueryKeyHash, queueId: QueueId): Promise<QueryDef | null> {
289291
const rows = await this.driver.query('QUEUE RESULT_BLOCKING ? ?', [
290292
this.options.continueWaitTimeout * 1000,
291-
this.prefixKey(queryKeyHash),
293+
// queryKeyHash as compatibility fallback
294+
queueId || this.prefixKey(queryKeyHash),
292295
]);
293296
if (rows && rows.length) {
294297
return this.decodeQueryDefFromRow(rows[0], 'getResultBlocking');
@@ -297,9 +300,10 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
297300
return null;
298301
}
299302

300-
public async setResultAndRemoveQuery(hash: QueryKeyHash, executionResult: unknown, _processingId: ProcessingId): Promise<boolean> {
303+
public async setResultAndRemoveQuery(hash: QueryKeyHash, executionResult: unknown, _processingId: ProcessingId, queueId: QueueId): Promise<boolean> {
301304
const rows = await this.driver.query('QUEUE ACK ? ? ', [
302-
this.prefixKey(hash),
305+
// queryKeyHash as compatibility fallback
306+
queueId || this.prefixKey(hash),
303307
executionResult ? JSON.stringify(executionResult) : executionResult
304308
]);
305309
if (rows && rows.length === 1) {

packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,7 @@ export class LocalQueueDriverConnection {
4848
return this.resultPromises[resultListKey];
4949
}
5050

51-
async getResultBlocking(queryKey) {
52-
return this.getResultBlockingByHash(this.redisHash(queryKey));
53-
}
54-
55-
async getResultBlockingByHash(queryKeyHash) {
51+
async getResultBlocking(queryKeyHash) {
5652
// Double redisHash apply is being used here
5753
const resultListKey = this.resultListKey(queryKeyHash);
5854
if (!this.queryDef[queryKeyHash] && !this.resultPromises[resultListKey]) {
@@ -144,7 +140,13 @@ export class LocalQueueDriverConnection {
144140
});
145141
}
146142

147-
return [added, null, null, Object.keys(this.toProcess).length, queryQueueObj.addedToQueueTime]; // TODO nulls
143+
return [
144+
added,
145+
// this driver doesnt support queue id
146+
null,
147+
Object.keys(this.toProcess).length,
148+
queryQueueObj.addedToQueueTime
149+
];
148150
}
149151

150152
getToProcessQueries() {
@@ -268,8 +270,14 @@ export class LocalQueueDriverConnection {
268270
}
269271

270272
return [
271-
added, null, this.queueArray(this.active), Object.keys(this.toProcess).length, this.queryDef[key], lockAcquired
272-
]; // TODO nulls
273+
added,
274+
// this driver doesnt support queue id
275+
null,
276+
this.queueArray(this.active),
277+
Object.keys(this.toProcess).length,
278+
this.queryDef[key],
279+
lockAcquired
280+
];
273281
}
274282

275283
freeProcessingLock(queryKey, processingId, activated) {

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js

Lines changed: 12 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -167,95 +167,6 @@ export class QueryQueue {
167167
return stream;
168168
}
169169

170-
/**
171-
* Depends on the `queryHandler` value either runs `executeQueryInQueue`
172-
* or `executeStreamInQueue` method.
173-
*
174-
* @param {string} queryHandler For the regular query is eq to 'query'.
175-
* @param {*} queryKey
176-
* @param {*} query
177-
* @param {number=} priority
178-
* @param {*=} options
179-
* @returns {*}
180-
*
181-
* @throw {ContinueWaitError}
182-
*/
183-
async executeInQueue(
184-
queryHandler,
185-
queryKey,
186-
query,
187-
priority,
188-
options,
189-
) {
190-
return this.executeQueryInQueue(
191-
queryHandler,
192-
queryKey,
193-
query,
194-
priority,
195-
options,
196-
);
197-
}
198-
199-
/**
200-
* Push persistent query to the queue and call `QueryQueue.reconcileQueue()` method.
201-
*
202-
* @param {string} queryHandler
203-
* @param {*} queryKey
204-
* @param {*} query
205-
* @param {number=} priority
206-
* @param {*=} options
207-
* @returns {Promise<void>}
208-
*/
209-
async executeStreamInQueue(
210-
queryHandler,
211-
queryKey,
212-
query,
213-
priority,
214-
options,
215-
) {
216-
options = options || {};
217-
const queueConnection = await this.queueDriver.createConnection();
218-
try {
219-
priority = priority || 0;
220-
if (!(priority >= -10000 && priority <= 10000)) {
221-
throw new Error(
222-
'Priority should be between -10000 and 10000'
223-
);
224-
}
225-
const time = new Date().getTime();
226-
const keyScore = time + (10000 - priority) * 1E14;
227-
228-
options.orphanedTimeout = query.orphanedTimeout;
229-
const orphanedTimeout = 'orphanedTimeout' in query
230-
? query.orphanedTimeout
231-
: this.orphanedTimeout;
232-
const orphanedTime = time + (orphanedTimeout * 1000);
233-
234-
const [added, _b, _c, queueSize, addedToQueueTime] = await queueConnection.addToQueue(
235-
keyScore, queryKey, orphanedTime, queryHandler, query, priority, options
236-
);
237-
if (added > 0) {
238-
this.logger('Added to queue (persistent)', {
239-
priority,
240-
queueSize,
241-
queryKey,
242-
queuePrefix: this.redisQueuePrefix,
243-
requestId: options.requestId,
244-
metadata: query.metadata,
245-
preAggregationId: query.preAggregation?.preAggregationId,
246-
newVersionEntry: query.newVersionEntry,
247-
forceBuild: query.forceBuild,
248-
preAggregation: query.preAggregation,
249-
addedToQueueTime,
250-
persistent: queryKey.persistent,
251-
});
252-
}
253-
this.reconcileQueue();
254-
} finally {
255-
this.queueDriver.release(queueConnection);
256-
}
257-
}
258-
259170
/**
260171
* Push query to the queue and call `QueryQueue.reconcileQueue()` method if
261172
* `options.skipQueue` is set to `false`, execute query skipping queue
@@ -270,7 +181,7 @@ export class QueryQueue {
270181
*
271182
* @throw {ContinueWaitError}
272183
*/
273-
async executeQueryInQueue(
184+
async executeInQueue(
274185
queryHandler,
275186
queryKey,
276187
query,
@@ -334,7 +245,7 @@ export class QueryQueue {
334245
const orphanedTimeout = 'orphanedTimeout' in query ? query.orphanedTimeout : this.orphanedTimeout;
335246
const orphanedTime = time + (orphanedTimeout * 1000);
336247

337-
const [added, _b, _c, queueSize, addedToQueueTime] = await queueConnection.addToQueue(
248+
const [added, queueId, queueSize, addedToQueueTime] = await queueConnection.addToQueue(
338249
keyScore, queryKey, orphanedTime, queryHandler, query, priority, options
339250
);
340251

@@ -400,7 +311,7 @@ export class QueryQueue {
400311
} else {
401312
// Result here won't be fetched for a jobed build query (initialized by
402313
// the /cubejs-system/v1/pre-aggregations/jobs endpoint).
403-
result = !query.isJob && await queueConnection.getResultBlocking(queryKey);
314+
result = !query.isJob && await queueConnection.getResultBlocking(queryKeyHash, queueId);
404315
}
405316

406317
// We don't want to throw the ContinueWaitError for a jobed build query.
@@ -763,8 +674,8 @@ export class QueryQueue {
763674
}
764675

765676
/**
766-
* Processing query specified by the `queryKey`. This method incapsulate most
767-
* of the logic related with the queues updates, heartbeating, etc.
677+
* Processing query specified by the `queryKey`. This method encapsulate most
678+
* of the logic related with the queues updates, heartbeat, etc.
768679
*
769680
* @param {QueryKeyHash} queryKeyHashed
770681
* @return {Promise<{ result: undefined | Object, error: string | undefined }>}
@@ -773,17 +684,18 @@ export class QueryQueue {
773684
const queueConnection = await this.queueDriver.createConnection();
774685

775686
let insertedCount;
776-
let _removedCount;
687+
let queueId;
777688
let activeKeys;
778689
let queueSize;
779690
let query;
780691
let processingLockAcquired;
692+
781693
try {
782694
const processingId = await queueConnection.getNextProcessingId();
783695
const retrieveResult = await queueConnection.retrieveForProcessing(queryKeyHashed, processingId);
784696

785697
if (retrieveResult) {
786-
[insertedCount /** todo(ovr): Remove */, _removedCount/** todo(ovr): Remove */, activeKeys, queueSize, query, processingLockAcquired] = retrieveResult;
698+
[insertedCount, queueId, activeKeys, queueSize, query, processingLockAcquired] = retrieveResult;
787699
}
788700

789701
const activated = activeKeys && activeKeys.indexOf(queryKeyHashed) !== -1;
@@ -808,7 +720,7 @@ export class QueryQueue {
808720
preAggregation: query.query?.preAggregation,
809721
addedToQueueTime: query.addedToQueueTime,
810722
});
811-
await queueConnection.optimisticQueryUpdate(queryKeyHashed, { startQueryTime }, processingId);
723+
await queueConnection.optimisticQueryUpdate(queryKeyHashed, { startQueryTime }, processingId, queueId);
812724

813725
const heartBeatTimer = setInterval(
814726
() => queueConnection.updateHeartBeat(queryKeyHashed),
@@ -840,7 +752,7 @@ export class QueryQueue {
840752
query.query,
841753
async (cancelHandler) => {
842754
try {
843-
return queueConnection.optimisticQueryUpdate(queryKeyHashed, { cancelHandler }, processingId);
755+
return queueConnection.optimisticQueryUpdate(queryKeyHashed, { cancelHandler }, processingId, queueId);
844756
} catch (e) {
845757
this.logger('Error while query update', {
846758
queryKey: query.queryKey,
@@ -916,7 +828,7 @@ export class QueryQueue {
916828

917829
clearInterval(heartBeatTimer);
918830

919-
if (!(await queueConnection.setResultAndRemoveQuery(queryKeyHashed, executionResult, processingId))) {
831+
if (!(await queueConnection.setResultAndRemoveQuery(queryKeyHashed, executionResult, processingId, queueId))) {
920832
this.logger('Orphaned execution result', {
921833
processingId,
922834
warn: 'Result for query was not set due to processing lock wasn\'t acquired',
@@ -937,7 +849,7 @@ export class QueryQueue {
937849
// if (query?.queryHandler === 'stream') {
938850
// const [active] = await queueConnection.getQueryStageState(true);
939851
// if (active && active.length > 0) {
940-
// await Promise.race(active.map(keyHash => queueConnection.getResultBlockingByHash(keyHash)));
852+
// await Promise.race(active.map(keyHash => queueConnection.getResultBlocking(keyHash)));
941853
// await this.reconcileQueue();
942854
// }
943855
// }

0 commit comments

Comments
 (0)