Skip to content

Commit ea8ce58

Browse files
committed
fix(query-orchestrator): QueryQueue - improve performance for high concurrency setups
1 parent 585e633 commit ea8ce58

File tree

3 files changed

+57
-14
lines changed

3 files changed

+57
-14
lines changed

packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,31 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
136136
}
137137

138138
public async getActiveAndToProcess(): Promise<GetActiveAndToProcessResponse> {
139+
const active: QueryKeysTuple[] = [];
140+
const toProcess: QueryKeysTuple[] = [];
141+
142+
const rows = await this.driver.query<CubeStoreListResponse>('QUEUE LIST ?', [
143+
this.options.redisQueuePrefix
144+
]);
145+
if (rows.length) {
146+
for (const row of rows) {
147+
if (row.status === 'active') {
148+
active.push([
149+
row.id as QueryKeyHash,
150+
row.queue_id ? parseInt(row.queue_id, 10) : null,
151+
]);
152+
} else {
153+
toProcess.push([
154+
row.id as QueryKeyHash,
155+
row.queue_id ? parseInt(row.queue_id, 10) : null,
156+
]);
157+
}
158+
}
159+
}
160+
139161
return [
140-
// We don't return active queries, because it's useless
141-
// There is only one place where it's used, and it's QueryQueue.reconcileQueueImpl
142-
// Cube Store provides strict guarantees that queue item cannot be active & pending in the same time
143-
[],
144-
await this.getToProcessQueries()
162+
active,
163+
toProcess,
145164
];
146165
}
147166

@@ -233,6 +252,9 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
233252
const payload = JSON.parse(row.payload);
234253

235254
if (row.extra) {
255+
console.log(row.extra, typeof row.extra);
256+
console.log(payload, typeof payload);
257+
console.log(row.payload, typeof row.payload);
236258
return Object.assign(payload, JSON.parse(row.extra));
237259
}
238260

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,16 @@ export class QueryQueue {
543543
}
544544
}));
545545

546-
const [_active, toProcess] = await queueConnection.getActiveAndToProcess();
546+
const [active, toProcess] = await queueConnection.getActiveAndToProcess();
547+
548+
/**
549+
* Important notice: Concurrency configuration works per a specific queue, not per node.
550+
*
551+
* In production clusters where it contains N nodes, it shares the same concurrency. It leads to a point
552+
* where every node tries to pick up jobs as much as concurrency is defined for the whole cluster. To minimize
553+
* the effect of competition between nodes, it's important to reduce the number of tries to process by active jobs.
554+
*/
555+
const toProcessLimit = active.length >= this.concurrency ? 1 : this.concurrency - active.length;
547556

548557
const tasks = toProcess
549558
.filter(([queryKey, _queueId]) => {
@@ -559,7 +568,7 @@ export class QueryQueue {
559568
return false;
560569
}
561570
})
562-
.slice(0, this.concurrency)
571+
.slice(0, toProcessLimit)
563572
.map(([queryKey, queueId]) => this.sendProcessMessageFn(queryKey, queueId));
564573

565574
await Promise.all(tasks);

packages/cubejs-query-orchestrator/test/benchmarks/QueueBench.abstract.ts

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,10 @@ export function QueryQueueBenchmark(name: string, options: QueryQueueTestOptions
138138
} else {
139139
counters.events[event] = 1;
140140
}
141+
142+
if (event.includes('error')) {
143+
console.log(event, _params);
144+
}
141145
},
142146
queueDriverFactory,
143147
...options
@@ -159,7 +163,8 @@ export function QueryQueueBenchmark(name: string, options: QueryQueueTestOptions
159163
const progressIntervalId = setInterval(() => {
160164
console.log('running', {
161165
...counters,
162-
processingPromisses: processingPromisses.length
166+
processingPromisses: processingPromisses.length,
167+
benchSettings,
163168
});
164169
}, 1000);
165170

@@ -177,18 +182,25 @@ export function QueryQueueBenchmark(name: string, options: QueryQueueTestOptions
177182

178183
const queueId = crypto.randomBytes(12).toString('hex');
179184
const running = (async () => {
180-
await queue.executeInQueue('query', queueId, {
181-
// eslint-disable-next-line no-bitwise
182-
payload: 'a'.repeat(benchSettings.queuePayloadSize)
183-
}, 1, {
185+
try {
186+
await queue.executeInQueue('query', queueId, {
187+
// eslint-disable-next-line no-bitwise
188+
payload: {
189+
large_str: 'a'.repeat(benchSettings.queuePayloadSize)
190+
},
191+
orphanedTimeout: 120
192+
}, 1, {
184193
stageQueryKey: 1,
185194
requestId: 'request-id',
186195
spanId: 'span-id'
187-
});
196+
});
197+
} catch (e) {
198+
console.error(e);
199+
}
188200

189201
counters.queueResolved++;
190202

191-
// loosing memory for result
203+
// losing memory for a result
192204
return null;
193205
})();
194206

0 commit comments

Comments
 (0)