Skip to content

Commit dcf7866

Browse files
authored
fix: Reconcile streaming queries in case of cluster execution (#6204)
* fix: Reconcile streams in case of cluster execution * Fix queue tests
1 parent b068f7f commit dcf7866

File tree

13 files changed

+278
-140
lines changed

13 files changed

+278
-140
lines changed

packages/cubejs-base-driver/src/queue-driver.interface.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ export interface QueueDriverOptions {
2828
orphanedTimeout: number,
2929
heartBeatTimeout: number,
3030
getQueueEventsBus?: any,
31+
processUid?: string;
3132
}
3233

3334
export interface QueueDriverConnectionInterface {

packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@ import { getProcessUid } from '@cubejs-backend/shared';
1616

1717
import { CubeStoreDriver } from './CubeStoreDriver';
1818

19-
function hashQueryKey(queryKey: QueryKey): QueryKeyHash {
19+
function hashQueryKey(queryKey: QueryKey, processUid?: string): QueryKeyHash {
20+
processUid = processUid || getProcessUid();
2021
const hash = crypto.createHash('md5').update(JSON.stringify(queryKey)).digest('hex');
2122

2223
if (typeof queryKey === 'object' && queryKey.persistent) {
23-
return `${hash}@${getProcessUid()}` as any;
24+
return `${hash}@${processUid}` as any;
2425
}
2526

2627
return hash as any;
@@ -33,7 +34,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
3334
) { }
3435

3536
public redisHash(queryKey: QueryKey): QueryKeyHash {
36-
return hashQueryKey(queryKey);
37+
return hashQueryKey(queryKey, this.options.processUid);
3738
}
3839

3940
protected prefixKey(queryKey: QueryKey): string {
@@ -275,9 +276,13 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
275276
}
276277

277278
public async getResultBlocking(queryKey: string): Promise<QueryDef | null> {
279+
return this.getResultBlockingByHash(this.redisHash(queryKey));
280+
}
281+
282+
public async getResultBlockingByHash(queryKeyHash: QueryKeyHash): Promise<QueryDef | null> {
278283
const rows = await this.driver.query('QUEUE RESULT_BLOCKING ? ?', [
279284
this.options.continueWaitTimeout * 1000,
280-
this.prefixKey(this.redisHash(queryKey)),
285+
this.prefixKey(queryKeyHash),
281286
]);
282287
if (rows && rows.length) {
283288
return this.decodeQueryDefFromRow(rows[0], 'getResultBlocking');

packages/cubejs-jdbc-driver/src/JDBCDriver.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,10 @@ export class JDBCDriver extends BaseDriver {
174174
try {
175175
connection = await this.pool._factory.create();
176176
} catch (e: any) {
177-
err = e.message;
177+
err = e.message || e;
178178
}
179179
if (err) {
180-
throw new Error(err);
180+
throw new Error(err.toString());
181181
} else {
182182
await this.pool._factory.destroy(connection);
183183
}

packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ import {
77
import { getCacheHash } from './utils';
88

99
export abstract class BaseQueueDriver implements QueueDriverInterface {
10+
public constructor(protected processUid: string) {
11+
}
12+
1013
public redisHash(queryKey: QueryKey): QueryKeyHash {
11-
return getCacheHash(queryKey);
14+
return getCacheHash(queryKey, this.processUid);
1215
}
1316

1417
abstract createConnection(): Promise<QueueDriverConnectionInterface>;

packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,13 @@ export class LocalQueueDriverConnection {
4949
}
5050

5151
async getResultBlocking(queryKey) {
52-
const resultListKey = this.resultListKey(queryKey);
53-
if (!this.queryDef[this.redisHash(queryKey)] && !this.resultPromises[resultListKey]) {
52+
return this.getResultBlockingByHash(this.redisHash(queryKey));
53+
}
54+
55+
async getResultBlockingByHash(queryKeyHash) {
56+
// Double redisHash apply is being used here
57+
const resultListKey = this.resultListKey(queryKeyHash);
58+
if (!this.queryDef[queryKeyHash] && !this.resultPromises[resultListKey]) {
5459
return null;
5560
}
5661
const timeoutPromise = (timeout) => new Promise((resolve) => setTimeout(() => resolve(null), timeout));
@@ -335,7 +340,7 @@ const processingLocks = {};
335340
*/
336341
export class LocalQueueDriver extends BaseQueueDriver {
337342
constructor(options) {
338-
super();
343+
super(options.processUid);
339344
this.options = options;
340345
results[options.redisQueuePrefix] = results[options.redisQueuePrefix] || {};
341346
resultPromises[options.redisQueuePrefix] = resultPromises[options.redisQueuePrefix] || {};

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2244,47 +2244,50 @@ export class PreAggregations {
22442244

22452245
public async getQueue(dataSource: string = 'default') {
22462246
if (!this.queue[dataSource]) {
2247-
this.queue[dataSource] = QueryCache.createQueue(
2248-
`SQL_PRE_AGGREGATIONS_${this.redisPrefix}_${dataSource}`,
2249-
() => this.driverFactory(dataSource),
2250-
(client, q) => {
2251-
const {
2252-
preAggregation, preAggregationsTablesToTempTables, newVersionEntry, requestId, invalidationKeys, buildRangeEnd
2253-
} = q;
2254-
const loader = new PreAggregationLoader(
2255-
this.redisPrefix,
2256-
() => this.driverFactory(dataSource),
2257-
this.logger,
2258-
this.queryCache,
2259-
this,
2260-
preAggregation,
2261-
preAggregationsTablesToTempTables,
2262-
new PreAggregationLoadCache(
2247+
const queueOptions = await this.options.queueOptions(dataSource);
2248+
if (!this.queue[dataSource]) {
2249+
this.queue[dataSource] = QueryCache.createQueue(
2250+
`SQL_PRE_AGGREGATIONS_${this.redisPrefix}_${dataSource}`,
2251+
() => this.driverFactory(dataSource),
2252+
(client, q) => {
2253+
const {
2254+
preAggregation, preAggregationsTablesToTempTables, newVersionEntry, requestId, invalidationKeys, buildRangeEnd
2255+
} = q;
2256+
const loader = new PreAggregationLoader(
22632257
this.redisPrefix,
22642258
() => this.driverFactory(dataSource),
2259+
this.logger,
22652260
this.queryCache,
22662261
this,
2267-
{
2268-
requestId,
2269-
dataSource,
2270-
},
2271-
),
2272-
{ requestId, externalRefresh: this.externalRefresh, buildRangeEnd }
2273-
);
2274-
return loader.refresh(newVersionEntry, invalidationKeys, client);
2275-
},
2276-
{
2277-
concurrency: 1,
2278-
logger: this.logger,
2279-
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
2280-
redisPool: this.options.redisPool,
2281-
cubeStoreDriverFactory: this.options.cubeStoreDriverFactory,
2282-
// Centralized continueWaitTimeout that can be overridden in queueOptions
2283-
continueWaitTimeout: this.options.continueWaitTimeout,
2284-
...(await this.options.queueOptions(dataSource)),
2285-
getQueueEventsBus: this.getQueueEventsBus,
2286-
}
2287-
);
2262+
preAggregation,
2263+
preAggregationsTablesToTempTables,
2264+
new PreAggregationLoadCache(
2265+
this.redisPrefix,
2266+
() => this.driverFactory(dataSource),
2267+
this.queryCache,
2268+
this,
2269+
{
2270+
requestId,
2271+
dataSource,
2272+
},
2273+
),
2274+
{ requestId, externalRefresh: this.externalRefresh, buildRangeEnd }
2275+
);
2276+
return loader.refresh(newVersionEntry, invalidationKeys, client);
2277+
},
2278+
{
2279+
concurrency: 1,
2280+
logger: this.logger,
2281+
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
2282+
redisPool: this.options.redisPool,
2283+
cubeStoreDriverFactory: this.options.cubeStoreDriverFactory,
2284+
// Centralized continueWaitTimeout that can be overridden in queueOptions
2285+
continueWaitTimeout: this.options.continueWaitTimeout,
2286+
...queueOptions,
2287+
getQueueEventsBus: this.getQueueEventsBus,
2288+
}
2289+
);
2290+
}
22882291
}
22892292
return this.queue[dataSource];
22902293
}

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -477,42 +477,41 @@ export class QueryCache {
477477
if (!persistent) {
478478
return queue.executeInQueue('query', cacheKey, _query, priority, opt);
479479
} else {
480-
const stream = queue.createQueryStream(cacheKey, aliasNameToMember);
481-
// We don't want to handle error here as we want it to bubble up
482-
// to the api gateway. We need to increase orphaned timeout as well.
483-
queue.executeInQueue('stream', cacheKey, {
480+
return queue.executeInQueue('stream', cacheKey, {
484481
..._query,
485-
orphanedTimeout: 1200,
482+
aliasNameToMember,
486483
}, priority, opt);
487-
return stream;
488484
}
489485
}
490486

491487
public async getQueue(dataSource = 'default') {
492488
if (!this.queue[dataSource]) {
493-
this.queue[dataSource] = QueryCache.createQueue(
494-
`SQL_QUERY_${this.redisPrefix}_${dataSource}`,
495-
() => this.driverFactory(dataSource),
496-
(client, req) => {
497-
this.logger('Executing SQL', { ...req });
498-
if (req.useCsvQuery) {
499-
return this.csvQuery(client, req);
500-
} else if (req.tablesSchema) {
501-
return client.tablesSchema();
502-
} else {
503-
return client.query(req.query, req.values, req);
489+
const queueOptions = await this.options.queueOptions(dataSource);
490+
if (!this.queue[dataSource]) {
491+
this.queue[dataSource] = QueryCache.createQueue(
492+
`SQL_QUERY_${this.redisPrefix}_${dataSource}`,
493+
() => this.driverFactory(dataSource),
494+
(client, req) => {
495+
this.logger('Executing SQL', { ...req });
496+
if (req.useCsvQuery) {
497+
return this.csvQuery(client, req);
498+
} else if (req.tablesSchema) {
499+
return client.tablesSchema();
500+
} else {
501+
return client.query(req.query, req.values, req);
502+
}
503+
},
504+
{
505+
logger: this.logger,
506+
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
507+
redisPool: this.options.redisPool,
508+
cubeStoreDriverFactory: this.options.cubeStoreDriverFactory,
509+
// Centralized continueWaitTimeout that can be overridden in queueOptions
510+
continueWaitTimeout: this.options.continueWaitTimeout,
511+
...queueOptions,
504512
}
505-
},
506-
{
507-
logger: this.logger,
508-
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
509-
redisPool: this.options.redisPool,
510-
cubeStoreDriverFactory: this.options.cubeStoreDriverFactory,
511-
// Centralized continueWaitTimeout that can be overridden in queueOptions
512-
continueWaitTimeout: this.options.continueWaitTimeout,
513-
...(await this.options.queueOptions(dataSource)),
514-
}
515-
);
513+
);
514+
}
516515
}
517516
return this.queue[dataSource];
518517
}

packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { RedisPool, RedisPoolOptions } from './RedisPool';
99
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
1010
import { RedisQueueEventsBus } from './RedisQueueEventsBus';
1111
import { LocalQueueEventsBus } from './LocalQueueEventsBus';
12+
import { QueryStream } from './QueryStream';
1213

1314
export type CacheAndQueryDriverType = 'redis' | 'memory' | 'cubestore';
1415

@@ -264,6 +265,11 @@ export class QueryOrchestrator {
264265
result.lastRefreshTime?.getTime()
265266
]);
266267

268+
if (result instanceof QueryStream) {
269+
// TODO do some wrapper object to provide metadata?
270+
return result;
271+
}
272+
267273
return {
268274
...result,
269275
dataSource: queryBody.dataSource,

0 commit comments

Comments
 (0)