Skip to content

Commit 69a35ed

Browse files
authored
feat: Re-use connection to Cube Store from externalDriver (#5993)
1 parent aae758f commit 69a35ed

File tree

4 files changed

+38
-18
lines changed

4 files changed

+38
-18
lines changed

packages/cubejs-cubestore-driver/src/CubeStoreCacheDriver.ts

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,20 @@ import { CubeStoreDriver } from './CubeStoreDriver';
55

66
export class CubeStoreCacheDriver implements CacheDriverInterface {
77
public constructor(
8-
protected readonly connection: CubeStoreDriver
8+
protected connectionFactory: () => Promise<CubeStoreDriver>,
99
) {}
1010

11+
protected connection: CubeStoreDriver | null = null;
12+
13+
protected async getConnection(): Promise<CubeStoreDriver> {
14+
if (this.connection) {
15+
return this.connection;
16+
}
17+
18+
// eslint-disable-next-line no-return-assign
19+
return this.connection = await this.connectionFactory();
20+
}
21+
1122
public withLock = (
1223
key: string,
1324
cb: () => MaybeCancelablePromise<any>,
@@ -18,11 +29,13 @@ export class CubeStoreCacheDriver implements CacheDriverInterface {
1829
return false;
1930
}
2031

21-
const rows = await this.connection.query('CACHE SET NX TTL ? ? ?', [expiration, key, '1']);
32+
const connection = (await this.getConnection());
33+
34+
const rows = await connection.query('CACHE SET NX TTL ? ? ?', [expiration, key, '1']);
2235
if (rows && rows.length === 1 && rows[0]?.success === 'true') {
2336
if (tkn.isCanceled()) {
2437
if (freeAfter) {
25-
await this.connection.query('CACHE REMOVE ?', [
38+
await connection.query('CACHE REMOVE ?', [
2639
key
2740
]);
2841
}
@@ -34,7 +47,7 @@ export class CubeStoreCacheDriver implements CacheDriverInterface {
3447
await tkn.with(cb());
3548
} finally {
3649
if (freeAfter) {
37-
await this.connection.query('CACHE REMOVE ?', [
50+
await connection.query('CACHE REMOVE ?', [
3851
key
3952
]);
4053
}
@@ -47,7 +60,7 @@ export class CubeStoreCacheDriver implements CacheDriverInterface {
4760
});
4861

4962
public async get(key: string) {
50-
const rows = await this.connection.query('CACHE GET ?', [
63+
const rows = await (await this.getConnection()).query('CACHE GET ?', [
5164
key
5265
]);
5366
if (rows && rows.length === 1) {
@@ -59,7 +72,7 @@ export class CubeStoreCacheDriver implements CacheDriverInterface {
5972

6073
public async set(key: string, value, expiration) {
6174
const strValue = JSON.stringify(value);
62-
await this.connection.query('CACHE SET TTL ? ? ?', [expiration, key, strValue]);
75+
await (await this.getConnection()).query('CACHE SET TTL ? ? ?', [expiration, key, strValue]);
6376

6477
return {
6578
key,
@@ -68,13 +81,13 @@ export class CubeStoreCacheDriver implements CacheDriverInterface {
6881
}
6982

7083
public async remove(key: string) {
71-
await this.connection.query('CACHE REMOVE ?', [
84+
await (await this.getConnection()).query('CACHE REMOVE ?', [
7285
key
7386
]);
7487
}
7588

7689
public async keysStartingWith(prefix: string) {
77-
const rows = await this.connection.query('CACHE KEYS ?', [
90+
const rows = await (await this.getConnection()).query('CACHE KEYS ?', [
7891
prefix
7992
]);
8093
return rows.map((row) => row.key);
@@ -85,6 +98,6 @@ export class CubeStoreCacheDriver implements CacheDriverInterface {
8598
}
8699

87100
public async testConnection(): Promise<void> {
88-
return this.connection.testConnection();
101+
return (await this.getConnection()).testConnection();
89102
}
90103
}

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ export interface QueryCacheOptions {
117117
heartBeatInterval?: number;
118118
}>;
119119
redisPool?: any;
120-
cubeStoreDriver?: CubeStoreDriver,
120+
cubeStoreDriverFactory?: () => Promise<CubeStoreDriver>,
121121
continueWaitTimeout?: number;
122122
cacheAndQueueDriver?: CacheAndQueryDriverType;
123123
maxInMemoryCacheEntries?: number;
@@ -148,7 +148,7 @@ export class QueryCache {
148148
break;
149149
case 'cubestore':
150150
this.cacheDriver = new CubeStoreCacheDriver(
151-
options.cubeStoreDriver || new CubeStoreDriver({})
151+
options.cubeStoreDriverFactory || (async () => new CubeStoreDriver({}))
152152
);
153153
break;
154154
default:

packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import R from 'ramda';
22
import { getEnv } from '@cubejs-backend/shared';
3+
import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver';
34

45
import { QueryCache, QueryBody, TempTable } from './QueryCache';
56
import { PreAggregations, PreAggregationDescription, getLastUpdatedAtTimestamp } from './PreAggregations';
@@ -59,13 +60,19 @@ export class QueryOrchestrator {
5960
throw new Error('Only \'redis\', \'memory\' or \'cubestore\' are supported for cacheAndQueueDriver option');
6061
}
6162

63+
const { externalDriverFactory, continueWaitTimeout, skipExternalCacheAndQueue } = options;
64+
6265
const redisPool = cacheAndQueueDriver === 'redis' ? new RedisPool(options.redisPoolOptions) : undefined;
6366
this.redisPool = redisPool;
6467

65-
// TODO: Re-use connection from external database
66-
const cubeStoreDriver = undefined;
68+
const cubeStoreDriverFactory = cacheAndQueueDriver === 'cubestore' ? async () => {
69+
const externalDriver = await externalDriverFactory();
70+
if (externalDriver instanceof CubeStoreDriver) {
71+
return externalDriver;
72+
}
6773

68-
const { externalDriverFactory, continueWaitTimeout, skipExternalCacheAndQueue } = options;
74+
throw new Error('It`s not possible to use CubeStore as queue & cache driver without using it as external');
75+
} : undefined;
6976

7077
this.queryCache = new QueryCache(
7178
this.redisPrefix,
@@ -75,7 +82,7 @@ export class QueryOrchestrator {
7582
externalDriverFactory,
7683
cacheAndQueueDriver,
7784
redisPool,
78-
cubeStoreDriver,
85+
cubeStoreDriverFactory,
7986
continueWaitTimeout,
8087
skipExternalCacheAndQueue,
8188
...options.queryCacheOptions,

packages/cubejs-query-orchestrator/test/integration/cubestore/QueryCacheCubestore.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { QueryCacheTest } from '../../unit/QueryCache.abstract';
33

44
let beforeAll;
55
let afterAll;
6-
let cubeStoreDriver = new CubeStoreDriver({});
6+
let cubeStoreDriverFactory = async () => new CubeStoreDriver({});
77

88
if ((process.env.CUBEJS_TESTING_CUBESTORE_AUTO_PROVISIONING || 'true') === 'true') {
99
const cubeStoreHandler = new CubeStoreHandler({
@@ -22,14 +22,14 @@ if ((process.env.CUBEJS_TESTING_CUBESTORE_AUTO_PROVISIONING || 'true') === 'true
2222
await cubeStoreHandler.acquire();
2323
};
2424
afterAll = async () => cubeStoreHandler.release(true);
25-
cubeStoreDriver = new CubeStoreDevDriver(cubeStoreHandler);
25+
cubeStoreDriverFactory = async () => new CubeStoreDevDriver(cubeStoreHandler);
2626
}
2727

2828
QueryCacheTest(
2929
'CubeStore Cache Driver',
3030
{
3131
cacheAndQueueDriver: 'cubestore',
32-
cubeStoreDriver,
32+
cubeStoreDriverFactory,
3333
beforeAll,
3434
afterAll
3535
}

0 commit comments

Comments
 (0)