From 8ce06f979f01f1a9e50f9c108305bb9a8b2fd53f Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Thu, 24 Jul 2025 17:19:23 +0200 Subject: [PATCH] fix(query-orchestrator): QueryQueue - update heartbeat by QueueId --- .../src/queue-driver.interface.ts | 14 +++++++------- .../src/orchestrator/QueryQueue.js | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/cubejs-base-driver/src/queue-driver.interface.ts b/packages/cubejs-base-driver/src/queue-driver.interface.ts index 1c27750d59e68..29674f8c2077b 100644 --- a/packages/cubejs-base-driver/src/queue-driver.interface.ts +++ b/packages/cubejs-base-driver/src/queue-driver.interface.ts @@ -1,8 +1,8 @@ export type QueryDef = unknown; // Primary key of Queue item export type QueueId = string | number | bigint; -// This was used as lock for Redis, deprecated. -export type ProcessingId = string | number; +// This was used as a lock for Redis, deprecated. +export type ProcessingId = string | number | bigint; export type QueryKey = (string | [string, any[]]) & { persistent?: true, }; @@ -64,25 +64,25 @@ export interface QueueDriverConnectionInterface { * @param keyScore Redis specific thing * @param queryKey * @param orphanedTime - * @param queryHandler Our queue allow to use different handlers. For example query, cvsQuery, etc. + * @param queryHandler Our queue allows using different handlers. For example, query, cvsQuery, etc. * @param query * @param priority * @param options */ addToQueue(keyScore: number, queryKey: QueryKey, orphanedTime: number, queryHandler: string, query: AddToQueueQuery, priority: number, options: AddToQueueOptions): Promise; - // Return query keys which was sorted by priority and time + // Return query keys that were sorted by priority and time getToProcessQueries(): Promise; getActiveQueries(): Promise; getQueryDef(hash: QueryKeyHash, queueId: QueueId | null): Promise; - // Queries which was added to queue, but was not processed and not needed + // Queries that were added to queue, but was not processed and not needed getOrphanedQueries(): Promise; - // Queries which was not completed with old heartbeat + // Queries that were not completed with old heartbeat getStalledQueries(): Promise; getQueryStageState(onlyKeys: boolean): Promise; updateHeartBeat(hash: QueryKeyHash, queueId: QueueId | null): Promise; getNextProcessingId(): Promise; // Trying to acquire a lock for processing a queue item, this method can return null when - // multiple nodes tries to process the same query + // multiple nodes try to process the same query retrieveForProcessing(hash: QueryKeyHash, processingId: ProcessingId): Promise; freeProcessingLock(hash: QueryKeyHash, processingId: ProcessingId, activated: unknown): Promise; optimisticQueryUpdate(hash: QueryKeyHash, toUpdate: unknown, processingId: ProcessingId, queueId: QueueId | null): Promise; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js index 4c3e04966fdc2..923f2f3be1503 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js @@ -807,7 +807,7 @@ export class QueryQueue { queryProcessHeartbeat = Date.now(); } - return queueConnection.updateHeartBeat(queryKeyHashed); + return queueConnection.updateHeartBeat(queryKeyHashed, queueId); }, this.heartBeatInterval * 1000 );