Skip to content

Commit d336c4e

Browse files
authored
feat: streaming capabilities (#5995)
1 parent 75699c8 commit d336c4e

File tree

28 files changed

+1480
-362
lines changed

28 files changed

+1480
-362
lines changed

packages/cubejs-api-gateway/src/gateway.ts

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/* eslint-disable no-restricted-syntax */
2+
import * as stream from 'stream';
23
import jwt, { Algorithm as JWTAlgorithm } from 'jsonwebtoken';
34
import R from 'ramda';
45
import bodyParser from 'body-parser';
@@ -86,15 +87,6 @@ import {
8687
transformPreAggregations,
8788
} from './helpers/transformMetaExtended';
8889

89-
// const timeoutPromise = (timeout) => (
90-
// new Promise((resolve) => (
91-
// setTimeout(
92-
// () => resolve(null),
93-
// timeout,
94-
// )
95-
// ))
96-
// );
97-
9890
/**
9991
* API gateway server class.
10092
*/
@@ -1069,10 +1061,11 @@ class ApiGateway {
10691061
protected async getNormalizedQueries(
10701062
query: Record<string, any> | Record<string, any>[],
10711063
context: RequestContext,
1064+
persistent = false,
10721065
): Promise<[QueryType, NormalizedQuery[]]> {
10731066
query = this.parseQueryParam(query);
1074-
let queryType: QueryType = QueryTypeEnum.REGULAR_QUERY;
10751067

1068+
let queryType: QueryType = QueryTypeEnum.REGULAR_QUERY;
10761069
if (!Array.isArray(query)) {
10771070
query = this.compareDateRangeTransformer(query);
10781071
if (Array.isArray(query)) {
@@ -1083,6 +1076,7 @@ class ApiGateway {
10831076
}
10841077

10851078
const queries = Array.isArray(query) ? query : [query];
1079+
10861080
const normalizedQueries: NormalizedQuery[] = await Promise.all(
10871081
queries.map(
10881082
async (currentQuery) => validatePostRewrite(
@@ -1094,6 +1088,10 @@ class ApiGateway {
10941088
)
10951089
);
10961090

1091+
normalizedQueries.forEach((q) => {
1092+
this.processQueryLimit(q, persistent);
1093+
});
1094+
10971095
if (normalizedQueries.find((currentQuery) => !currentQuery)) {
10981096
throw new Error('queryTransformer returned null query. Please check your queryTransformer implementation');
10991097
}
@@ -1112,6 +1110,29 @@ class ApiGateway {
11121110
return [queryType, normalizedQueries];
11131111
}
11141112

1113+
/**
1114+
* Asserts query limit, sets the default value if neccessary.
1115+
*
1116+
* @throw {Error}
1117+
*/
1118+
public processQueryLimit(query: NormalizedQuery, persistent = false): void {
1119+
const def = getEnv('dbQueryDefaultLimit') <= getEnv('dbQueryLimit')
1120+
? getEnv('dbQueryDefaultLimit')
1121+
: getEnv('dbQueryLimit');
1122+
1123+
if (!persistent) {
1124+
if (
1125+
typeof query.limit === 'number' &&
1126+
query.limit > getEnv('dbQueryLimit')
1127+
) {
1128+
throw new Error('The query limit has been exceeded.');
1129+
}
1130+
query.limit = typeof query.limit === 'number'
1131+
? query.limit
1132+
: def;
1133+
}
1134+
}
1135+
11151136
public async sql({ query, context, res }: QueryRequest) {
11161137
const requestStarted = new Date();
11171138

@@ -1269,7 +1290,6 @@ class ApiGateway {
12691290
context: RequestContext,
12701291
normalizedQuery: NormalizedQuery,
12711292
sqlQuery: any,
1272-
apiType: string,
12731293
) {
12741294
const queries = [{
12751295
...sqlQuery,
@@ -1279,7 +1299,7 @@ class ApiGateway {
12791299
renewQuery: normalizedQuery.renewQuery,
12801300
requestId: context.requestId,
12811301
context,
1282-
persistent: false, // apiType === 'sql',
1302+
persistent: false,
12831303
}];
12841304
if (normalizedQuery.total) {
12851305
const normalizedTotal = structuredClone(normalizedQuery);
@@ -1378,6 +1398,50 @@ class ApiGateway {
13781398
};
13791399
}
13801400

1401+
/**
1402+
* Returns stream object which will be used to stream results from
1403+
* the data source if applicable, returns `null` otherwise.
1404+
*/
1405+
public async stream(context: RequestContext, query: Query): Promise<null | {
1406+
originalQuery: Query;
1407+
normalizedQuery: NormalizedQuery;
1408+
streamingQuery: unknown;
1409+
stream: stream.Writable;
1410+
}> {
1411+
const requestStarted = new Date();
1412+
try {
1413+
this.log({ type: 'Streaming Query', query }, context);
1414+
const [, normalizedQueries] = await this.getNormalizedQueries(query, context, true);
1415+
const sqlQuery = (await this.getSqlQueriesInternal(context, normalizedQueries))[0];
1416+
const q = {
1417+
...sqlQuery,
1418+
query: sqlQuery.sql[0],
1419+
values: sqlQuery.sql[1],
1420+
continueWait: true,
1421+
renewQuery: false,
1422+
requestId: context.requestId,
1423+
context,
1424+
persistent: true,
1425+
forceNoCache: true,
1426+
};
1427+
const _stream = {
1428+
originalQuery: query,
1429+
normalizedQuery: normalizedQueries[0],
1430+
streamingQuery: q,
1431+
stream: await this.getAdapterApi(context).streamQuery(q),
1432+
};
1433+
return _stream;
1434+
} catch (e) {
1435+
this.log({
1436+
type: 'Streaming Error',
1437+
query,
1438+
error: (<Error>e).message,
1439+
duration: this.duration(requestStarted),
1440+
}, context);
1441+
return null;
1442+
}
1443+
}
1444+
13811445
/**
13821446
* Data queries APIs (`/load`, `/subscribe`) entry point. Used by
13831447
* `CubejsApi#load` and `CubejsApi#subscribe` methods to fetch the
@@ -1434,7 +1498,6 @@ class ApiGateway {
14341498
context,
14351499
normalizedQuery,
14361500
sqlQueries[index],
1437-
apiType,
14381501
);
14391502

14401503
return this.getResultInternal(

packages/cubejs-api-gateway/src/sql-server.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,33 @@ export class SQLServer {
112112
}
113113
});
114114
},
115+
stream: async ({ request, session, query }) => {
116+
let userForContext = session.user;
117+
118+
if (request.meta.changeUser && request.meta.changeUser !== session.user) {
119+
const canSwitch = session.superuser || await canSwitchSqlUser(session.user, request.meta.changeUser);
120+
if (canSwitch) {
121+
userForContext = request.meta.changeUser;
122+
} else {
123+
throw new Error(
124+
`You cannot change security context via __user from ${session.user} to ${request.meta.changeUser}, because it's not allowed.`
125+
);
126+
}
127+
}
128+
129+
// @todo Store security context in native for session's user, but not for switching
130+
const current = await checkSqlAuth(request, userForContext);
131+
const context = await this.contextByNativeReq(request, current.securityContext, request.id);
132+
133+
// eslint-disable-next-line no-async-promise-executor
134+
return new Promise(async (resolve, reject) => {
135+
try {
136+
resolve(await this.apiGateway.stream(context, query));
137+
} catch (e) {
138+
reject(e);
139+
}
140+
});
141+
},
115142
});
116143
}
117144

packages/cubejs-api-gateway/src/types/strings.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ type ApiType =
2525
'sql' |
2626
'graphql' |
2727
'rest' |
28-
'ws';
28+
'ws' |
29+
'stream';
2930

3031
/**
3132
* Parsed query type data type.

packages/cubejs-api-gateway/test/index.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ describe('API Gateway', () => {
240240
timezone: 'UTC',
241241
order: [],
242242
filters: [],
243+
limit: 10000,
243244
dimensions: [],
244245
timeDimensions: [],
245246
queryType: 'regularQuery'
@@ -251,6 +252,7 @@ describe('API Gateway', () => {
251252
timezone: 'UTC',
252253
order: [],
253254
filters: [],
255+
limit: 10000,
254256
dimensions: [],
255257
timeDimensions: [],
256258
queryType: 'regularQuery'

packages/cubejs-backend-native/Cargo.lock

Lines changed: 10 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/cubejs-backend-native/js/index.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import fs from 'fs';
22
import path from 'path';
3+
// import { getEnv } from '@cubejs-backend/shared';
34

45
export interface BaseMeta {
56
// postgres or mysql
@@ -53,6 +54,7 @@ export type SQLInterfaceOptions = {
5354
checkAuth: (payload: CheckAuthPayload) => CheckAuthResponse | Promise<CheckAuthResponse>,
5455
load: (payload: LoadPayload) => unknown | Promise<unknown>,
5556
meta: (payload: MetaPayload) => unknown | Promise<unknown>,
57+
stream: (payload: LoadPayload) => unknown | Promise<unknown>,
5658
};
5759

5860
function loadNative() {
@@ -106,6 +108,43 @@ function wrapNativeFunctionWithChannelCallback(
106108
};
107109
};
108110

111+
// TODO: Refactor - define classes
112+
function wrapNativeFunctionWithStream(
113+
fn: (extra: any) => unknown | Promise<unknown>
114+
) {
115+
return async (extra: any, writer: any) => {
116+
let streamResponse: any;
117+
try {
118+
streamResponse = await fn(JSON.parse(extra));
119+
let chunk: object[] = [];
120+
streamResponse.stream.on('data', (c: object) => {
121+
chunk.push(c);
122+
if (chunk.length >= 10000) {
123+
if (!writer.chunk(JSON.stringify(chunk))) {
124+
streamResponse.stream.removeAllListeners();
125+
}
126+
chunk = [];
127+
}
128+
});
129+
streamResponse.stream.on('close', () => {
130+
if (chunk.length > 0) {
131+
writer.chunk(JSON.stringify(chunk));
132+
}
133+
writer.end("");
134+
});
135+
136+
streamResponse.stream.on('error', (err: any) => {
137+
writer.reject(err.message || "Unknown JS exception");
138+
});
139+
} catch (e: any) {
140+
if (!!streamResponse && !!streamResponse.stream) {
141+
streamResponse.stream.destroy(e);
142+
}
143+
writer.reject(e.message || "Unknown JS exception");
144+
}
145+
};
146+
};
147+
109148
type LogLevel = 'error' | 'warn' | 'info' | 'debug' | 'trace';
110149

111150
export const setupLogger = (logger: (extra: any) => unknown, logLevel: LogLevel): void => {
@@ -132,12 +171,17 @@ export const registerInterface = async (options: SQLInterfaceOptions): Promise<S
132171
throw new Error('options.meta must be a function');
133172
}
134173

174+
if (typeof options.stream != 'function') {
175+
throw new Error('options.stream must be a function');
176+
}
177+
135178
const native = loadNative();
136179
return native.registerInterface({
137180
...options,
138181
checkAuth: wrapNativeFunctionWithChannelCallback(options.checkAuth),
139182
load: wrapNativeFunctionWithChannelCallback(options.load),
140183
meta: wrapNativeFunctionWithChannelCallback(options.meta),
184+
stream: wrapNativeFunctionWithStream(options.stream),
141185
});
142186
};
143187

packages/cubejs-backend-native/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
},
4040
"dependencies": {
4141
"@cubejs-backend/cubesql": "^0.31.40",
42+
"@cubejs-backend/shared": "^0.31.4",
4243
"@mapbox/node-pre-gyp": "^1"
4344
},
4445
"binary": {

0 commit comments

Comments
 (0)