Skip to content

Commit 3607c67

Browse files
authored
fix(cubestore-driver): Correct pending count and active keys in queue logs (#6250)
1 parent 2769200 commit 3607c67

File tree

4 files changed

+29
-4
lines changed

4 files changed

+29
-4
lines changed

packages/cubejs-base-driver/src/queue-driver.interface.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,23 @@ export interface QueryKeyHash extends String {
88

99
export type AddToQueueResponse = [added: number, _b: any, _c: any, queueSize: number, addedToQueueTime: number];
1010
export type QueryStageStateResponse = [active: string[], toProcess: string[]] | [active: string[], toProcess: string[], defs: Record<string, QueryDef>];
11-
export type RetrieveForProcessingResponse = [added: any, removed: any, active: QueryKeyHash[], toProcess: any, def: QueryDef, lockAquired: boolean] | null;
11+
export type RetrieveForProcessingSuccess = [
12+
added: any /** todo(ovr): Remove, useless */,
13+
removed: any /** todo(ovr): Remove, useless */,
14+
active: QueryKeyHash[],
15+
pending: number,
16+
def: QueryDef,
17+
lockAquired: true
18+
];
19+
export type RetrieveForProcessingFail = [
20+
added: any /** todo(ovr): Remove, useless */,
21+
removed: any /** todo(ovr): Remove, useless */,
22+
active: QueryKeyHash[],
23+
pending: number,
24+
def: null,
25+
lockAquired: false
26+
];
27+
export type RetrieveForProcessingResponse = RetrieveForProcessingSuccess | RetrieveForProcessingFail | null;
1228

1329
export interface AddToQueueQuery {
1430
isJob: boolean,

packages/cubejs-cubestore-driver/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
"@types/flatbuffers": "^1.10.0",
4646
"@types/generic-pool": "^3.1.9",
4747
"@types/ws": "^7.4.0",
48+
"@types/csv-write-stream": "^2.0.0",
4849
"jest": "^26.6.3",
4950
"typescript": "~4.9.5"
5051
},

packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,19 +256,20 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
256256
}
257257

258258
public async retrieveForProcessing(queryKeyHashed: QueryKeyHash, _processingId: string): Promise<RetrieveForProcessingResponse> {
259+
// TODO(ovr): Enable extended
259260
const rows = await this.driver.query('QUEUE RETRIEVE CONCURRENCY ? ?', [
260261
this.options.concurrency,
261262
this.prefixKey(queryKeyHashed),
262263
]);
263264
if (rows && rows.length) {
264265
const addedCount = 1;
265-
const active = [queryKeyHashed];
266-
const toProcess = 0;
266+
const active = (rows[0].active || queryKeyHashed /* backward compatibility for old Cube Store */).split(',');
267+
const pending = parseInt(rows[0].pending || '0' /* backward compatibility for old Cube Store */, 10);
267268
const lockAcquired = true;
268269
const def = this.decodeQueryDefFromRow(rows[0], 'retrieveForProcessing');
269270

270271
return [
271-
addedCount, null, active, toProcess, def, lockAcquired
272+
addedCount, null, active, pending, def, lockAcquired
272273
];
273274
}
274275

yarn.lock

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6579,6 +6579,13 @@
65796579
dependencies:
65806580
"@types/node" "*"
65816581

6582+
"@types/csv-write-stream@^2.0.0":
6583+
version "2.0.0"
6584+
resolved "https://registry.yarnpkg.com/@types/csv-write-stream/-/csv-write-stream-2.0.0.tgz#44dcabe9d85c0196c7fd6ffb8033658451ff9dcc"
6585+
integrity sha512-lAqCgs4614VpX/+U7BuR/SiEHdpet5ufc8cdjuQjhA7VVREJAT4LHKNUvAss9jf6eHH400yHhOEWbFP0RZPYrA==
6586+
dependencies:
6587+
"@types/node" "*"
6588+
65826589
"@types/data-api-client@^1.2.1":
65836590
version "1.2.3"
65846591
resolved "https://registry.yarnpkg.com/@types/data-api-client/-/data-api-client-1.2.3.tgz#31694580015493fac8f3f3528e3a7f0a41c1741d"

0 commit comments

Comments
 (0)