Skip to content

Commit f4fdfc0

Browse files
authored
fix(cubestore-driver): Queue - correct APM events (#6251)
1 parent ce8f3e2 commit f4fdfc0

File tree

2 files changed

+17
-13
lines changed

2 files changed

+17
-13
lines changed

packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
7474
await this.query('SELECT 1', []);
7575
}
7676

77-
public async query(query: string, values: any[], options?: QueryOptions) {
77+
public async query<R = any>(query: string, values: any[], options?: QueryOptions): Promise<R[]> {
7878
const { inlineTables, ...queryTracingObj } = options ?? {};
7979
return this.connection.query(formatSql(query, values || []), inlineTables ?? [], { ...queryTracingObj, instance: getEnv('instanceId') });
8080
}

packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
217217
return rows.map((row) => row.id);
218218
}
219219

220-
protected decodeQueryDefFromRow(row: { payload: string, extra?: string }, method: string): QueryDef {
220+
protected decodeQueryDefFromRow(row: { payload: string, extra?: string | null }, method: string): QueryDef {
221221
if (!row.payload) {
222222
throw new Error(`Field payload is empty, incorrect response for ${method} method`);
223223
}
@@ -256,21 +256,25 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
256256
}
257257

258258
public async retrieveForProcessing(queryKeyHashed: QueryKeyHash, _processingId: string): Promise<RetrieveForProcessingResponse> {
259-
// TODO(ovr): Enable extended
260-
const rows = await this.driver.query('QUEUE RETRIEVE CONCURRENCY ? ?', [
259+
const rows = await this.driver.query<{ active: string | null, pending: string, payload: string, extra: string | null }>('QUEUE RETRIEVE EXTENDED CONCURRENCY ? ?', [
261260
this.options.concurrency,
262261
this.prefixKey(queryKeyHashed),
263262
]);
264263
if (rows && rows.length) {
265-
const addedCount = 1;
266-
const active = (rows[0].active || queryKeyHashed /* backward compatibility for old Cube Store */).split(',');
267-
const pending = parseInt(rows[0].pending || '0' /* backward compatibility for old Cube Store */, 10);
268-
const lockAcquired = true;
269-
const def = this.decodeQueryDefFromRow(rows[0], 'retrieveForProcessing');
270-
271-
return [
272-
addedCount, null, active, pending, def, lockAcquired
273-
];
264+
const active = rows[0].active ? (rows[0].active).split(',') as unknown as QueryKeyHash[] : [];
265+
const pending = parseInt(rows[0].pending, 10);
266+
267+
if (rows[0].payload) {
268+
const def = this.decodeQueryDefFromRow(rows[0], 'retrieveForProcessing');
269+
270+
return [
271+
1, null, active, pending, def, true
272+
];
273+
} else {
274+
return [
275+
0, null, active, pending, null, false
276+
];
277+
}
274278
}
275279

276280
return null;

0 commit comments

Comments
 (0)