Skip to content

Commit fb39776

Browse files
authored
feat: Introduce CubeStoreCacheDriver (#5511)
1 parent ae5e8f8 commit fb39776

File tree

30 files changed

+369
-92
lines changed

30 files changed

+369
-92
lines changed

.github/workflows/push.yml

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,63 @@ jobs:
299299
CUBEJS_REDIS_USE_IOREDIS: true
300300
CUBEJS_REDIS_SENTINEL: "redis+sentinel://localhost:5000,localhost:5001,localhost:5002/mymaster/0"
301301

302+
integration-cubestore:
303+
needs: [unit, lint, latest-tag-sha]
304+
runs-on: ubuntu-20.04
305+
timeout-minutes: 60
306+
if: (needs['latest-tag-sha'].outputs.sha != github.sha)
307+
308+
strategy:
309+
matrix:
310+
node-version: [14.x]
311+
fail-fast: false
312+
313+
steps:
314+
- name: Checkout
315+
uses: actions/checkout@v2
316+
- name: Install Rust
317+
uses: actions-rs/toolchain@v1
318+
with:
319+
toolchain: nightly-2022-03-08
320+
override: true
321+
components: rustfmt
322+
- name: Install Node.js ${{ matrix.node-version }}
323+
uses: actions/setup-node@v1
324+
with:
325+
node-version: ${{ matrix.node-version }}
326+
- name: Get yarn cache directory path
327+
id: yarn-cache-dir-path
328+
run: echo "::set-output name=dir::$(yarn cache dir)"
329+
- name: Restore lerna
330+
uses: actions/cache@v2
331+
with:
332+
path: |
333+
${{ steps.yarn-cache-dir-path.outputs.dir }}
334+
node_modules
335+
rust/cubestore/node_modules
336+
packages/*/node_modules
337+
key: ${{ runner.os }}-workspace-main-${{ matrix.node-version }}-${{ hashFiles('**/yarn.lock') }}
338+
restore-keys: |
339+
${{ runner.os }}-workspace-main-${{ matrix.node-version }}-
340+
- name: Set Yarn version
341+
run: yarn policies set-version v1.22.5
342+
- name: Yarn install
343+
uses: nick-invision/retry@v2
344+
env:
345+
CUBESTORE_SKIP_POST_INSTALL: true
346+
with:
347+
max_attempts: 3
348+
retry_on: error
349+
retry_wait_seconds: 15
350+
timeout_minutes: 20
351+
command: yarn install --frozen-lockfile
352+
- name: Lerna tsc
353+
run: yarn tsc
354+
- name: Run Cubestore Integration
355+
timeout-minutes: 10
356+
run: |
357+
yarn lerna run --concurrency 1 --stream --no-prefix integration:cubestore
358+
302359
integration:
303360
needs: [unit, lint, latest-tag-sha]
304361
runs-on: ubuntu-20.04
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
export * from './BaseDriver';
22
export * from './utils';
33
export * from './driver.interface';
4+
export * from './queue-driver.interface';
5+
export * from './cache-driver.interface';
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
export interface QueueDriverConnectionInterface {
2+
getResultBlocking(queryKey: string): Promise<unknown>;
3+
getResult(queryKey: string): Promise<unknown>;
4+
addToQueue(queryKey: string): Promise<unknown>;
5+
getToProcessQueries(): Promise<unknown>;
6+
getActiveQueries(): Promise<unknown>;
7+
getOrphanedQueries(): Promise<unknown>;
8+
getStalledQueries(): Promise<unknown>;
9+
getQueryStageState(onlyKeys: any): Promise<unknown>;
10+
updateHeartBeat(queryKey: string): Promise<void>;
11+
getNextProcessingId(): Promise<string>;
12+
retrieveForProcessing(queryKey: string, processingId: string): Promise<unknown>;
13+
freeProcessingLock(queryKe: string, processingId: string, activated: unknown): Promise<unknown>;
14+
optimisticQueryUpdate(queryKey, toUpdate, processingId): Promise<unknown>;
15+
cancelQuery(queryKey: string): Promise<unknown>;
16+
setResultAndRemoveQuery(queryKey: string, executionResult: any, processingId: any): Promise<unknown>;
17+
release(): Promise<void>;
18+
}
19+
20+
export interface QueueDriverInterface {
21+
createConnection(): Promise<QueueDriverConnectionInterface>;
22+
release(connection: QueueDriverConnectionInterface): Promise<void>;
23+
}
Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
1+
const fromExports = require('./dist/src');
12
const { CubeStoreDriver } = require('./dist/src/CubeStoreDriver');
2-
const { CubeStoreDevDriver } = require('./dist/src/CubeStoreDevDriver');
3-
const { isCubeStoreSupported, CubeStoreHandler } = require('./dist/src/rexport');
43

54
/**
65
* After 5 years working with TypeScript, now I know
76
* that commonjs and nodejs require is not compatibility with using export default
87
*/
9-
module.exports = CubeStoreDriver;
8+
const toExport = CubeStoreDriver;
109

11-
/**
12-
* It's needed to move our CLI to destructing style on import
13-
* Please sync this file with src/index.ts
14-
*/
15-
module.exports.CubeStoreDevDriver = CubeStoreDevDriver;
16-
module.exports.isCubeStoreSupported = isCubeStoreSupported;
17-
module.exports.CubeStoreHandler = CubeStoreHandler;
10+
// eslint-disable-next-line no-restricted-syntax
11+
for (const [key, module] of Object.entries(fromExports)) {
12+
toExport[key] = module;
13+
}
14+
15+
module.exports = toExport;

packages/cubejs-cubestore-driver/package.json

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,11 @@
2323
"tsc": "tsc",
2424
"watch": "tsc -w",
2525
"lint": "eslint src/*.ts",
26-
"lint:fix": "eslint --fix src/*.ts",
27-
"test": "yarn integration",
28-
"integration": "jest",
29-
"integration:cubestore": "jest"
26+
"lint:fix": "eslint --fix src/*.ts"
3027
},
3128
"dependencies": {
3229
"@cubejs-backend/base-driver": "^0.31.32",
3330
"@cubejs-backend/cubestore": "^0.31.32",
34-
"@cubejs-backend/schema-compiler": "^0.31.32",
3531
"@cubejs-backend/shared": "^0.31.32",
3632
"csv-write-stream": "^2.0.0",
3733
"flatbuffers": "^1.12.0",
@@ -46,7 +42,6 @@
4642
},
4743
"devDependencies": {
4844
"@cubejs-backend/linter": "^0.31.0",
49-
"@cubejs-backend/testing-shared": "^0.31.32",
5045
"@types/flatbuffers": "^1.10.0",
5146
"@types/generic-pool": "^3.1.9",
5247
"@types/mysql": "^2.15.17",
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import { createCancelablePromise, MaybeCancelablePromise } from '@cubejs-backend/shared';
2+
import { CacheDriverInterface } from '@cubejs-backend/base-driver';
3+
4+
import { CubeStoreDriver } from './CubeStoreDriver';
5+
6+
export class CubeStoreCacheDriver implements CacheDriverInterface {
7+
public constructor(
8+
protected readonly connection: CubeStoreDriver
9+
) {}
10+
11+
public withLock = (
12+
key: string,
13+
cb: () => MaybeCancelablePromise<any>,
14+
expiration: number = 60,
15+
freeAfter: boolean = true,
16+
) => createCancelablePromise(async (tkn) => {
17+
if (tkn.isCanceled()) {
18+
return false;
19+
}
20+
21+
const rows = await this.connection.query('CACHE SET NX TTL ? ? ?', [expiration, key, '1']);
22+
if (rows && rows.length === 1 && rows[0]?.success === 'true') {
23+
if (tkn.isCanceled()) {
24+
if (freeAfter) {
25+
await this.connection.query('CACHE REMOVE ?', [
26+
key
27+
]);
28+
}
29+
30+
return false;
31+
}
32+
33+
try {
34+
await tkn.with(cb());
35+
} finally {
36+
if (freeAfter) {
37+
await this.connection.query('CACHE REMOVE ?', [
38+
key
39+
]);
40+
}
41+
}
42+
43+
return true;
44+
}
45+
46+
return false;
47+
});
48+
49+
public async get(key: string) {
50+
const rows = await this.connection.query('CACHE GET ?', [
51+
key
52+
]);
53+
if (rows && rows.length === 1) {
54+
return JSON.parse(rows[0].value);
55+
}
56+
57+
return null;
58+
}
59+
60+
public async set(key: string, value, expiration) {
61+
const strValue = JSON.stringify(value);
62+
await this.connection.query('CACHE SET TTL ? ? ?', [expiration, key, strValue]);
63+
64+
return {
65+
key,
66+
bytes: Buffer.byteLength(strValue),
67+
};
68+
}
69+
70+
public async remove(key: string) {
71+
await this.connection.query('CACHE REMOVE ?', [
72+
key
73+
]);
74+
}
75+
76+
public async keysStartingWith(prefix: string) {
77+
const rows = await this.connection.query('CACHE KEYS ?', [
78+
prefix
79+
]);
80+
return rows.map((row) => row.key);
81+
}
82+
83+
public async cleanup(): Promise<void> {
84+
//
85+
}
86+
87+
public async testConnection(): Promise<void> {
88+
return this.connection.testConnection();
89+
}
90+
}

packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,14 @@ import {
1010
ExternalCreateTableOptions,
1111
DownloadTableMemoryData, DriverInterface, IndexesSQL, CreateTableIndex,
1212
StreamTableData,
13-
DriverCapabilities,
1413
StreamingSourceTableData,
1514
QueryOptions,
15+
ExternalDriverCompatibilities,
1616
} from '@cubejs-backend/base-driver';
1717
import { getEnv } from '@cubejs-backend/shared';
1818
import { format as formatSql, escape } from 'sqlstring';
1919
import fetch from 'node-fetch';
2020

21-
import { CubeStoreQuery } from './CubeStoreQuery';
2221
import { ConnectionConfig } from './types';
2322
import { WebSocketConnection } from './WebSocketConnection';
2423

@@ -390,11 +389,7 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
390389
return this.createTableWithOptions(table, columns, options, queryTracingObj);
391390
}
392391

393-
public static dialectClass() {
394-
return CubeStoreQuery;
395-
}
396-
397-
public capabilities(): DriverCapabilities {
392+
public capabilities(): ExternalDriverCompatibilities {
398393
return {
399394
csvImport: true,
400395
streamImport: true,
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
export * from './CubeStoreQuery';
1+
export * from './CubeStoreCacheDriver';
22
export * from './CubeStoreDriver';
33
export * from './CubeStoreDevDriver';
44
export * from './rexport';
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
dist
2+
.cubestore

0 commit comments

Comments
 (0)