Skip to content

Commit 08ce078

Browse files
authored
feat(query-orchestrator): Pass queueId for orphaned query cancellation (#7353)
1 parent 2c9c8d6 commit 08ce078

File tree

6 files changed

+137
-68
lines changed

6 files changed

+137
-68
lines changed

packages/cubejs-base-driver/src/queue-driver.interface.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ export interface QueryKeyHash extends String {
1010
__type: 'QueryKeyHash'
1111
}
1212

13-
export type QueryKeysTuple = [keyHash: QueryKeyHash, queueId: QueueId | null /** Cube Store supports real QueueId */];
13+
export type QueryKeysTuple = [keyHash: QueryKeyHash, queueId: QueueId | null /** Supported by new Cube Store and Memory */];
1414
export type GetActiveAndToProcessResponse = [active: QueryKeysTuple[], toProcess: QueryKeysTuple[]];
1515
export type AddToQueueResponse = [added: number, queueId: QueueId | null, queueSize: number, addedToQueueTime: number];
1616
export type QueryStageStateResponse = [active: string[], toProcess: string[]] | [active: string[], toProcess: string[], defs: Record<string, QueryDef>];
@@ -43,7 +43,7 @@ export interface AddToQueueOptions {
4343
stageQueryKey: string,
4444
requestId: string,
4545
orphanedTimeout?: number,
46-
queueId?: QueueId,
46+
queueId: QueueId,
4747
}
4848

4949
export interface QueueDriverOptions {
@@ -74,13 +74,13 @@ export interface QueueDriverConnectionInterface {
7474
*/
7575
addToQueue(keyScore: number, queryKey: QueryKey, orphanedTime: number, queryHandler: string, query: AddToQueueQuery, priority: number, options: AddToQueueOptions): Promise<AddToQueueResponse>;
7676
// Return query keys which was sorted by priority and time
77-
getToProcessQueries(): Promise<QueryKeyHash[]>;
78-
getActiveQueries(): Promise<QueryKeyHash[]>;
77+
getToProcessQueries(): Promise<QueryKeysTuple[]>;
78+
getActiveQueries(): Promise<QueryKeysTuple[]>;
7979
getQueryDef(hash: QueryKeyHash, queueId: QueueId | null): Promise<QueryDef | null>;
8080
// Queries which was added to queue, but was not processed and not needed
81-
getOrphanedQueries(): Promise<string[]>;
81+
getOrphanedQueries(): Promise<QueryKeysTuple[]>;
8282
// Queries which was not completed with old heartbeat
83-
getStalledQueries(): Promise<string[]>;
83+
getStalledQueries(): Promise<QueryKeysTuple[]>;
8484
getQueryStageState(onlyKeys: boolean): Promise<QueryStageStateResponse>;
8585
updateHeartBeat(hash: QueryKeyHash): Promise<void>;
8686
getNextProcessingId(): Promise<ProcessingId>;
@@ -94,7 +94,7 @@ export interface QueueDriverConnectionInterface {
9494
setResultAndRemoveQuery(hash: QueryKeyHash, executionResult: any, processingId: ProcessingId, queueId: QueueId | null): Promise<unknown>;
9595
release(): void;
9696
//
97-
getQueriesToCancel(): Promise<string[]>
97+
getQueriesToCancel(): Promise<QueryKeysTuple[]>
9898
getActiveAndToProcess(): Promise<GetActiveAndToProcessResponse>;
9999
}
100100

packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ function hashQueryKey(queryKey: QueryKey, processUid?: string): QueryKeyHash {
3131
return hash as any;
3232
}
3333

34+
type CubeStoreListResponse = {
35+
id: unknown,
36+
// eslint-disable-next-line camelcase
37+
queue_id?: string
38+
status: string
39+
};
40+
3441
class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
3542
public constructor(
3643
protected readonly driver: CubeStoreDriver,
@@ -108,22 +115,28 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
108115
// nothing to do
109116
}
110117

111-
public async getActiveQueries(): Promise<QueryKeyHash[]> {
112-
const rows = await this.driver.query('QUEUE ACTIVE ?', [
118+
public async getActiveQueries(): Promise<QueryKeysTuple[]> {
119+
const rows = await this.driver.query<CubeStoreListResponse>('QUEUE ACTIVE ?', [
113120
this.options.redisQueuePrefix
114121
]);
115-
return rows.map((row) => row.id);
122+
return rows.map((row) => [
123+
row.id as QueryKeyHash,
124+
row.queue_id ? parseInt(row.queue_id, 10) : null,
125+
]);
116126
}
117127

118-
public async getToProcessQueries(): Promise<QueryKeyHash[]> {
119-
const rows = await this.driver.query('QUEUE PENDING ?', [
128+
public async getToProcessQueries(): Promise<QueryKeysTuple[]> {
129+
const rows = await this.driver.query<CubeStoreListResponse>('QUEUE PENDING ?', [
120130
this.options.redisQueuePrefix
121131
]);
122-
return rows.map((row) => row.id);
132+
return rows.map((row) => [
133+
row.id as QueryKeyHash,
134+
row.queue_id ? parseInt(row.queue_id, 10) : null,
135+
]);
123136
}
124137

125138
public async getActiveAndToProcess(): Promise<GetActiveAndToProcessResponse> {
126-
const rows = await this.driver.query('QUEUE LIST ?', [
139+
const rows = await this.driver.query<CubeStoreListResponse>('QUEUE LIST ?', [
127140
this.options.redisQueuePrefix
128141
]);
129142
if (rows.length) {
@@ -133,12 +146,12 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
133146
for (const row of rows) {
134147
if (row.status === 'active') {
135148
active.push([
136-
row.id,
149+
row.id as QueryKeyHash,
137150
row.queue_id ? parseInt(row.queue_id, 10) : null,
138151
]);
139152
} else {
140153
toProcess.push([
141-
row.id,
154+
row.id as QueryKeyHash,
142155
row.queue_id ? parseInt(row.queue_id, 10) : null,
143156
]);
144157
}
@@ -165,7 +178,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
165178
}
166179

167180
public async getQueryStageState(onlyKeys: boolean): Promise<QueryStageStateResponse> {
168-
const rows = await this.driver.query(`QUEUE LIST ${onlyKeys ? '?' : 'WITH_PAYLOAD ?'}`, [
181+
const rows = await this.driver.query<CubeStoreListResponse & { payload: string }>(`QUEUE LIST ${onlyKeys ? '?' : 'WITH_PAYLOAD ?'}`, [
169182
this.options.redisQueuePrefix
170183
]);
171184

@@ -175,15 +188,15 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
175188

176189
for (const row of rows) {
177190
if (!onlyKeys) {
178-
defs[row.id] = this.decodeQueryDefFromRow(row, 'getQueryStageState');
191+
defs[row.id as string] = this.decodeQueryDefFromRow(row, 'getQueryStageState');
179192
}
180193

181194
if (row.status === 'pending') {
182-
toProcess.push(row.id);
195+
toProcess.push(row.id as string);
183196
} else if (row.status === 'active') {
184-
active.push(row.id);
197+
active.push(row.id as string);
185198
// TODO: getQueryStage is broken for Executing query stage...
186-
toProcess.push(row.id);
199+
toProcess.push(row.id as string);
187200
}
188201
}
189202

@@ -201,29 +214,38 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
201214
return null;
202215
}
203216

204-
public async getStalledQueries(): Promise<string[]> {
205-
const rows = await this.driver.query('QUEUE STALLED ? ?', [
217+
public async getStalledQueries(): Promise<QueryKeysTuple[]> {
218+
const rows = await this.driver.query<CubeStoreListResponse>('QUEUE STALLED ? ?', [
206219
this.options.heartBeatTimeout * 1000,
207220
this.options.redisQueuePrefix
208221
]);
209-
return rows.map((row) => row.id);
222+
return rows.map((row) => [
223+
row.id as QueryKeyHash,
224+
row.queue_id ? parseInt(row.queue_id, 10) : null,
225+
]);
210226
}
211227

212-
public async getOrphanedQueries(): Promise<string[]> {
213-
const rows = await this.driver.query('QUEUE ORPHANED ? ?', [
228+
public async getOrphanedQueries(): Promise<QueryKeysTuple[]> {
229+
const rows = await this.driver.query<CubeStoreListResponse>('QUEUE ORPHANED ? ?', [
214230
this.options.orphanedTimeout * 1000,
215231
this.options.redisQueuePrefix
216232
]);
217-
return rows.map((row) => row.id);
233+
return rows.map((row) => [
234+
row.id as QueryKeyHash,
235+
row.queue_id ? parseInt(row.queue_id, 10) : null,
236+
]);
218237
}
219238

220-
public async getQueriesToCancel(): Promise<string[]> {
221-
const rows = await this.driver.query('QUEUE TO_CANCEL ? ? ?', [
239+
public async getQueriesToCancel(): Promise<QueryKeysTuple[]> {
240+
const rows = await this.driver.query<CubeStoreListResponse>('QUEUE TO_CANCEL ? ? ?', [
222241
this.options.heartBeatTimeout * 1000,
223242
this.options.orphanedTimeout * 1000,
224243
this.options.redisQueuePrefix,
225244
]);
226-
return rows.map((row) => row.id);
245+
return rows.map((row) => [
246+
row.id as QueryKeyHash,
247+
row.queue_id ? parseInt(row.queue_id, 10) : null,
248+
]);
227249
}
228250

229251
protected decodeQueryDefFromRow(row: { payload: string, extra?: string | null }, method: string): QueryDef {

packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ export class LocalQueueDriverConnection {
3737
* @returns {Promise<GetActiveAndToProcessResponse>}
3838
*/
3939
async getActiveAndToProcess() {
40-
const active = this.queueArray(this.active);
41-
const toProcess = this.queueArray(this.toProcess);
40+
const active = this.queueArrayAsTuple(this.active);
41+
const toProcess = this.queueArrayAsTuple(this.toProcess);
4242

4343
return [
44-
active.map((queryKeyHash) => [queryKeyHash, null]),
45-
toProcess.map((queryKeyHash) => [queryKeyHash, null])
44+
active,
45+
toProcess
4646
];
4747
}
4848

@@ -92,6 +92,9 @@ export class LocalQueueDriverConnection {
9292
return null;
9393
}
9494

95+
/**
96+
* @protected
97+
*/
9598
queueArray(queueObj, orderFilterLessThan) {
9699
return R.pipe(
97100
R.values,
@@ -101,6 +104,21 @@ export class LocalQueueDriverConnection {
101104
)(queueObj);
102105
}
103106

107+
/**
108+
* @protected
109+
* @param queueObj
110+
* @param orderFilterLessThan
111+
* @returns {QueryKeysTuple[]}
112+
*/
113+
queueArrayAsTuple(queueObj, orderFilterLessThan) {
114+
return R.pipe(
115+
R.values,
116+
R.filter(orderFilterLessThan ? q => q.order < orderFilterLessThan : R.identity),
117+
R.sortBy(q => q.order),
118+
R.map(q => [q.key, q.queueId])
119+
)(queueObj);
120+
}
121+
104122
/**
105123
* Adds specified by the queryKey query to the queue, returns tuple
106124
* with the operation result.
@@ -127,19 +145,27 @@ export class LocalQueueDriverConnection {
127145
requestId: options.requestId,
128146
addedToQueueTime: new Date().getTime()
129147
};
148+
130149
const key = this.redisHash(queryKey);
131150
if (!this.queryDef[key]) {
132151
this.queryDef[key] = queryQueueObj;
133152
}
153+
134154
let added = 0;
135155
if (!this.toProcess[key]) {
136156
this.toProcess[key] = {
137157
order: keyScore,
158+
queueId: options.queueId,
138159
key
139160
};
140161
added = 1;
141162
}
142-
this.recent[key] = { order: orphanedTime, key };
163+
164+
this.recent[key] = {
165+
order: orphanedTime,
166+
key,
167+
queueId: options.queueId,
168+
};
143169

144170
if (this.getQueueEventsBus) {
145171
this.getQueueEventsBus().emit({
@@ -159,11 +185,11 @@ export class LocalQueueDriverConnection {
159185
}
160186

161187
getToProcessQueries() {
162-
return this.queueArray(this.toProcess);
188+
return this.queueArrayAsTuple(this.toProcess);
163189
}
164190

165191
getActiveQueries() {
166-
return this.queueArray(this.active);
192+
return this.queueArrayAsTuple(this.active);
167193
}
168194

169195
async getQueryAndRemove(queryKey) {
@@ -226,11 +252,11 @@ export class LocalQueueDriverConnection {
226252
}
227253

228254
getOrphanedQueries() {
229-
return this.queueArray(this.recent, new Date().getTime());
255+
return this.queueArrayAsTuple(this.recent, new Date().getTime());
230256
}
231257

232258
getStalledQueries() {
233-
return this.queueArray(this.heartBeat, new Date().getTime() - this.heartBeatTimeout * 1000);
259+
return this.queueArrayAsTuple(this.heartBeat, new Date().getTime() - this.heartBeatTimeout * 1000);
234260
}
235261

236262
async getQueryStageState(onlyKeys) {

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -430,8 +430,11 @@ export class QueryQueue {
430430
queueConnection.getToProcessQueries()
431431
]);
432432

433-
const mapWithDefinition = (arr) => Promise.all(arr.map(async queryKey => ({
434-
...(await queueConnection.getQueryDef(queryKey)),
433+
/**
434+
* @param {QueryKeysTuple[]} arr
435+
*/
436+
const mapWithDefinition = (arr) => Promise.all(arr.map(async ([queryKey, queueId]) => ({
437+
...(await queueConnection.getQueryDef(queryKey, queueId)),
435438
queryKey
436439
})));
437440

@@ -506,20 +509,22 @@ export class QueryQueue {
506509
const queueConnection = await this.queueDriver.createConnection();
507510
try {
508511
const toCancel = await queueConnection.getQueriesToCancel();
509-
await Promise.all(toCancel.map(async queryKey => {
510-
const [query] = await queueConnection.getQueryAndRemove(queryKey);
511-
if (query) {
512+
513+
await Promise.all(toCancel.map(async ([queryKey, queueId]) => {
514+
const [queryDef] = await queueConnection.getQueryAndRemove(queryKey);
515+
if (queryDef) {
512516
this.logger('Removing orphaned query', {
513-
queryKey: query.queryKey,
517+
queueId: queueId || queryDef.queueId /** Special handling for Redis */,
518+
queryKey: queryDef.queryKey,
514519
queuePrefix: this.redisQueuePrefix,
515-
requestId: query.requestId,
516-
metadata: query.query?.metadata,
517-
preAggregationId: query.query?.preAggregation?.preAggregationId,
518-
newVersionEntry: query.query?.newVersionEntry,
519-
preAggregation: query.query?.preAggregation,
520-
addedToQueueTime: query.addedToQueueTime,
520+
requestId: queryDef.requestId,
521+
metadata: queryDef.query?.metadata,
522+
preAggregationId: queryDef.query?.preAggregation?.preAggregationId,
523+
newVersionEntry: queryDef.query?.newVersionEntry,
524+
preAggregation: queryDef.query?.preAggregation,
525+
addedToQueueTime: queryDef.addedToQueueTime,
521526
});
522-
await this.sendCancelMessageFn(query);
527+
await this.sendCancelMessageFn(queryDef);
523528
}
524529
}));
525530

0 commit comments

Comments
 (0)