Skip to content

Commit 508a015

Browse files
committed
chore: fixes
1 parent 9eb4922 commit 508a015

File tree

4 files changed

+34
-13
lines changed

4 files changed

+34
-13
lines changed

packages/cubejs-base-driver/src/queue-driver.interface.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,11 @@ export interface AddToQueueQuery {
3838
}
3939

4040
export interface AddToQueueOptions {
41-
stageQueryKey: string,
41+
queueId?: QueueId,
42+
stageQueryKey?: any,
4243
requestId: string,
44+
spanId?: string,
4345
orphanedTimeout?: number,
44-
queueId: QueueId,
4546
}
4647

4748
export interface QueueDriverOptions {

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
QueryKeyHash,
77
QueueId,
88
QueryDef,
9-
QueryStageStateResponse
9+
QueryStageStateResponse, AddToQueueOptions
1010
} from '@cubejs-backend/base-driver';
1111
import { CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver';
1212

@@ -23,6 +23,10 @@ export type QueryHandlersMap = Record<string, QueryHandlerFn>;
2323
export type SendProcessMessageFn = (queryKeyHash: QueryKeyHash, queueId: QueueId | null) => Promise<void> | void;
2424
export type SendCancelMessageFn = (query: QueryDef, queueId: QueueId | null) => Promise<void> | void;
2525

26+
export type ExecuteInQueueOptions = Omit<AddToQueueOptions, 'queueId'> & {
27+
spanId?: string
28+
};
29+
2630
export type QueryQueueOptions = {
2731
cacheAndQueueDriver: string;
2832
logger: (message, event) => void;
@@ -176,9 +180,12 @@ export class QueryQueue {
176180
queryKey: QueryKey,
177181
query: QueryDef,
178182
priority?: number,
179-
options?: any,
183+
executeOptions?: ExecuteInQueueOptions,
180184
) {
181-
options.queueId = this.generateQueueId();
185+
const options: AddToQueueOptions = {
186+
queueId: this.generateQueueId(),
187+
...executeOptions,
188+
};
182189

183190
if (this.skipQueue) {
184191
const queryDef = {
@@ -236,6 +243,7 @@ export class QueryQueue {
236243
const keyScore = time + (10000 - priority) * 1E14;
237244

238245
options.orphanedTimeout = query.orphanedTimeout;
246+
239247
const orphanedTimeout = 'orphanedTimeout' in query ? query.orphanedTimeout : this.orphanedTimeout;
240248
const orphanedTime = time + (orphanedTimeout * 1000);
241249

packages/cubejs-query-orchestrator/test/benchmarks/QueueBench.abstract.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,9 @@ export function QueryQueueBenchmark(name: string, options: QueryQueueTestOptions
181181
// eslint-disable-next-line no-bitwise
182182
payload: 'a'.repeat(benchSettings.queuePayloadSize)
183183
}, 1, {
184-
184+
stageQueryKey: 1,
185+
requestId: 'request-id',
186+
spanId: 'span-id'
185187
});
186188

187189
counters.queueResolved++;

packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -178,17 +178,29 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions) =>
178178
});
179179

180180
test('stage reporting', async () => {
181-
const resultPromise = queue.executeInQueue('delay', '1', { delay: 200, result: '1' }, 0, { stageQueryKey: '1' });
181+
const resultPromise = queue.executeInQueue('delay', '1', { delay: 200, result: '1' }, 0, {
182+
stageQueryKey: '1',
183+
requestId: 'request-id',
184+
spanId: 'span-id'
185+
});
182186
await delayFn(null, 50);
183187
expect((await queue.getQueryStage('1')).stage).toBe('Executing query');
184188
await resultPromise;
185189
expect(await queue.getQueryStage('1')).toEqual(undefined);
186190
});
187191

188192
test('priority stage reporting', async () => {
189-
const resultPromise1 = queue.executeInQueue('delay', '31', { delay: 200, result: '1' }, 20, { stageQueryKey: '12' });
193+
const resultPromise1 = queue.executeInQueue('delay', '31', { delay: 200, result: '1' }, 20, {
194+
stageQueryKey: '12',
195+
requestId: 'request-id',
196+
spanId: 'span-id'
197+
});
190198
await delayFn(null, 50);
191-
const resultPromise2 = queue.executeInQueue('delay', '32', { delay: 200, result: '1' }, 10, { stageQueryKey: '12' });
199+
const resultPromise2 = queue.executeInQueue('delay', '32', { delay: 200, result: '1' }, 10, {
200+
stageQueryKey: '12',
201+
requestId: 'request-id',
202+
spanId: 'span-id'
203+
});
192204
await delayFn(null, 50);
193205

194206
expect((await queue.getQueryStage('12', 10)).stage).toBe('#1 in queue');
@@ -275,7 +287,6 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions) =>
275287

276288
let orphanedTimeout = 2;
277289
await connection.addToQueue(keyScore, ['1', []], time + (orphanedTimeout * 1000), 'delay', { isJob: true, orphanedTimeout: time, }, priority, {
278-
queueId: 1,
279290
stageQueryKey: '1',
280291
requestId: '1',
281292
orphanedTimeout,
@@ -286,7 +297,6 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions) =>
286297
orphanedTimeout = 60;
287298

288299
await connection.addToQueue(keyScore, ['2', []], time + (orphanedTimeout * 1000), 'delay', { isJob: true, orphanedTimeout: time, }, priority, {
289-
queueId: 2,
290300
stageQueryKey: '2',
291301
requestId: '2',
292302
orphanedTimeout,
@@ -423,11 +433,11 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions) =>
423433
await queue.reconcileQueue();
424434

425435
await redisClient.addToQueue(
426-
keyScore, 'activated1', time, 'handler', <any>['select'], priority, { stageQueryKey: 'race', requestId: '1', queueId: 1 }
436+
keyScore, 'activated1', time, 'handler', <any>['select'], priority, { stageQueryKey: 'race', requestId: '1' }
427437
);
428438

429439
await redisClient.addToQueue(
430-
keyScore + 100, 'activated2', time + 100, 'handler2', <any>['select2'], priority, { stageQueryKey: 'race2', requestId: '1', queueId: 2 }
440+
keyScore + 100, 'activated2', time + 100, 'handler2', <any>['select2'], priority, { stageQueryKey: 'race2', requestId: '1' }
431441
);
432442

433443
const processingId1 = await redisClient.getNextProcessingId();

0 commit comments

Comments
 (0)