Skip to content

Commit 60294de

Browse files
authored
feat(cubestore-driver): Queue - protect possible race condition (query def) #6616)
1 parent 2b11202 commit 60294de

File tree

3 files changed

+12
-12
lines changed

3 files changed

+12
-12
lines changed

packages/cubejs-base-driver/src/queue-driver.interface.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ export interface QueueDriverConnectionInterface {
7474
// Return query keys which was sorted by priority and time
7575
getToProcessQueries(): Promise<string[]>;
7676
getActiveQueries(): Promise<string[]>;
77-
getQueryDef(queryKey: QueryKeyHash): Promise<QueryDef | null>;
77+
getQueryDef(hash: QueryKeyHash, queueId: QueueId | null): Promise<QueryDef | null>;
7878
// Queries which was added to queue, but was not processed and not needed
7979
getOrphanedQueries(): Promise<string[]>;
8080
// Queries which was not completed with old heartbeat

packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -232,9 +232,9 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
232232
return payload;
233233
}
234234

235-
public async getQueryDef(queryKey: QueryKeyHash): Promise<QueryDef | null> {
235+
public async getQueryDef(hash: QueryKeyHash, queueId: QueueId | null): Promise<QueryDef | null> {
236236
const rows = await this.driver.query('QUEUE GET ?', [
237-
this.prefixKey(queryKey)
237+
queueId || this.prefixKey(hash),
238238
]);
239239
if (rows && rows.length) {
240240
return this.decodeQueryDefFromRow(rows[0], 'getQueryDef');
@@ -243,10 +243,10 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
243243
return null;
244244
}
245245

246-
public async optimisticQueryUpdate(queryKeyHash: QueryKeyHash, toUpdate: unknown, _processingId: ProcessingId, queueId: QueueId): Promise<boolean> {
246+
public async optimisticQueryUpdate(hash: QueryKeyHash, toUpdate: unknown, _processingId: ProcessingId, queueId: QueueId): Promise<boolean> {
247247
await this.driver.query('QUEUE MERGE_EXTRA ? ?', [
248248
// queryKeyHash as compatibility fallback
249-
queueId || this.prefixKey(queryKeyHash),
249+
queueId || this.prefixKey(hash),
250250
JSON.stringify(toUpdate)
251251
]);
252252

@@ -257,10 +257,10 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
257257
// nothing to release
258258
}
259259

260-
public async retrieveForProcessing(queryKeyHashed: QueryKeyHash, _processingId: string): Promise<RetrieveForProcessingResponse> {
260+
public async retrieveForProcessing(hash: QueryKeyHash, _processingId: string): Promise<RetrieveForProcessingResponse> {
261261
const rows = await this.driver.query<{ id: string /* cube store convert int64 to string */, active: string | null, pending: string, payload: string, extra: string | null }>('QUEUE RETRIEVE EXTENDED CONCURRENCY ? ?', [
262262
this.options.concurrency,
263-
this.prefixKey(queryKeyHashed),
263+
this.prefixKey(hash),
264264
]);
265265
if (rows && rows.length) {
266266
const active = rows[0].active ? (rows[0].active).split(',') as unknown as QueryKeyHash[] : [];
@@ -287,11 +287,11 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
287287
return null;
288288
}
289289

290-
public async getResultBlocking(queryKeyHash: QueryKeyHash, queueId: QueueId): Promise<QueryDef | null> {
290+
public async getResultBlocking(hash: QueryKeyHash, queueId: QueueId): Promise<QueryDef | null> {
291291
const rows = await this.driver.query('QUEUE RESULT_BLOCKING ? ?', [
292292
this.options.continueWaitTimeout * 1000,
293293
// queryKeyHash as compatibility fallback
294-
queueId || this.prefixKey(queryKeyHash),
294+
queueId || this.prefixKey(hash),
295295
]);
296296
if (rows && rows.length) {
297297
return this.decodeQueryDefFromRow(rows[0], 'getResultBlocking');

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ export class QueryQueue {
234234
const queryKeyHash = this.redisHash(queryKey);
235235

236236
if (query.forceBuild) {
237-
const jobExists = await queueConnection.getQueryDef(queryKeyHash);
237+
const jobExists = await queueConnection.getQueryDef(queryKeyHash, null);
238238
if (jobExists) return null;
239239
}
240240

@@ -268,7 +268,7 @@ export class QueryQueue {
268268

269269
await this.reconcileQueue();
270270

271-
const queryDef = await queueConnection.getQueryDef(queryKeyHash);
271+
const queryDef = await queueConnection.getQueryDef(queryKeyHash, queueId);
272272
const [active, toProcess] = await queueConnection.getQueryStageState(true);
273273

274274
if (queryDef) {
@@ -700,7 +700,7 @@ export class QueryQueue {
700700

701701
const activated = activeKeys && activeKeys.indexOf(queryKeyHashed) !== -1;
702702
if (!query) {
703-
query = await queueConnection.getQueryDef(queryKeyHashed);
703+
query = await queueConnection.getQueryDef(queryKeyHashed, null);
704704
}
705705

706706
if (query && insertedCount && activated && processingLockAcquired) {

0 commit comments

Comments
 (0)