Skip to content

Commit 0aaf109

Browse files
committed
refactor(query-orchestrator): Migrate QueryQueue to TS
1 parent 94f01da commit 0aaf109

File tree

5 files changed

+125
-127
lines changed

5 files changed

+125
-127
lines changed

packages/cubejs-base-driver/src/queue-driver.interface.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ export interface QueueDriverConnectionInterface {
8686
// Trying to acquire a lock for processing a queue item, this method can return null when
8787
// multiple nodes tries to process the same query
8888
retrieveForProcessing(hash: QueryKeyHash, processingId: ProcessingId): Promise<RetrieveForProcessingResponse>;
89-
freeProcessingLock(hash: QueryKeyHash, processingId: ProcessingId, activated: unknown): Promise<void>;
89+
freeProcessingLock(hash: QueryKeyHash, processingId: ProcessingId, activated: unknown): Promise<void | string>;
9090
optimisticQueryUpdate(hash: QueryKeyHash, toUpdate: unknown, processingId: ProcessingId, queueId: QueueId | null): Promise<boolean>;
9191
cancelQuery(queryKey: QueryKey): Promise<QueryDef | null>;
9292
getQueryAndRemove(hash: QueryKeyHash): Promise<[QueryDef]>;

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -865,7 +865,7 @@ export class PreAggregationLoader {
865865
const queue = await this.preAggregations.getQueue(this.preAggregation.dataSource);
866866
return queue.executeInQueue(
867867
'query',
868-
this.preAggregationQueryKey(invalidationKeys),
868+
<any> this.preAggregationQueryKey(invalidationKeys),
869869
{
870870
preAggregation: this.preAggregation,
871871
preAggregationsTablesToTempTables: this.preAggregationsTablesToTempTables,
@@ -2061,7 +2061,7 @@ export class PreAggregations {
20612061
tables = tables.filter(row => `${schema}.${row.table_name}` === table);
20622062

20632063
// fetching query result
2064-
const { queueDriver } = this.queue[dataSource];
2064+
const queueDriver = this.queue[dataSource].getQueueDriver();
20652065
const conn = await queueDriver.createConnection();
20662066
const result = await conn.getResult(key);
20672067
queueDriver.release(conn);

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,7 @@ export class QueryCache {
468468
};
469469

470470
if (!persistent) {
471-
return queue.executeInQueue('query', cacheKey, _query, priority, opt);
471+
return queue.executeInQueue('query', cacheKey as any, _query, priority, opt);
472472
} else {
473473
return queue.executeInQueue('stream', cacheKey, {
474474
..._query,
@@ -636,11 +636,11 @@ export class QueryCache {
636636
source.once('end', () => cleanup(undefined));
637637
source.once('error', cleanup);
638638
source.once('close', () => cleanup(undefined));
639-
639+
640640
target.once('end', () => cleanup(undefined));
641641
target.once('error', cleanup);
642642
target.once('close', () => cleanup(undefined));
643-
643+
644644
source.pipe(target);
645645
})
646646
.catch((reason) => {

0 commit comments

Comments
 (0)