Skip to content

Commit 7db90fb

Browse files
authored
feat(query-orchestrator): Queue - reduce traffic for processing (Cube Store only) (#7644)
1 parent ecf3826 commit 7db90fb

File tree

3 files changed

+15
-28
lines changed

3 files changed

+15
-28
lines changed

packages/cubejs-base-driver/src/queue-driver.interface.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ export interface QueueDriverConnectionInterface {
9595
release(): void;
9696
//
9797
getQueriesToCancel(): Promise<QueryKeysTuple[]>
98+
// @deprecated
9899
getActiveAndToProcess(): Promise<GetActiveAndToProcessResponse>;
99100
}
100101

packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -136,34 +136,13 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
136136
}
137137

138138
public async getActiveAndToProcess(): Promise<GetActiveAndToProcessResponse> {
139-
const rows = await this.driver.query<CubeStoreListResponse>('QUEUE LIST ?', [
140-
this.options.redisQueuePrefix
141-
]);
142-
if (rows.length) {
143-
const active: QueryKeysTuple[] = [];
144-
const toProcess: QueryKeysTuple[] = [];
145-
146-
for (const row of rows) {
147-
if (row.status === 'active') {
148-
active.push([
149-
row.id as QueryKeyHash,
150-
row.queue_id ? parseInt(row.queue_id, 10) : null,
151-
]);
152-
} else {
153-
toProcess.push([
154-
row.id as QueryKeyHash,
155-
row.queue_id ? parseInt(row.queue_id, 10) : null,
156-
]);
157-
}
158-
}
159-
160-
return [
161-
active,
162-
toProcess,
163-
];
164-
}
165-
166-
return [[], []];
139+
return [
140+
// We don't return active queries, because it's useless
141+
// There is only one place where it's used, and it's QueryQueue.reconcileQueueImpl
142+
// Cube Store provides strict guarantees that queue item cannot be active & pending in the same time
143+
[],
144+
await this.getToProcessQueries()
145+
];
167146
}
168147

169148
public async getNextProcessingId(): Promise<number | string> {

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,13 @@ export class QueryQueue {
547547
}
548548
}));
549549

550+
/**
551+
* There is a bug somewhere in Redis (maybe in memory too?),
552+
* which doesn't remove queue item from pending, while it's in active state
553+
*
554+
* TODO(ovr): Check LocalQueueDriver for strict guarantees that item cannot be in active & pending in the same time
555+
* TODO(ovr): Migrate to getToProcessQueries after removal of Redis
556+
*/
550557
const [active, toProcess] = await queueConnection.getActiveAndToProcess();
551558

552559
await Promise.all(

0 commit comments

Comments
 (0)