Skip to content

Commit a4c8b09

Browse files
authored
feat: streaming desync (#6034)
1 parent 9ac7d0c commit a4c8b09

File tree

3 files changed

+19
-3
lines changed

3 files changed

+19
-3
lines changed

packages/cubejs-backend-native/js/index.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,14 +119,18 @@ function wrapNativeFunctionWithChannelCallback(
119119
function wrapNativeFunctionWithStream(
120120
fn: (extra: any) => unknown | Promise<unknown>
121121
) {
122+
const chunkLength = parseInt(
123+
process.env.CUBEJS_DB_QUERY_STREAM_HIGH_WATER_MARK || '8192',
124+
10
125+
);
122126
return async (extra: any, writer: any) => {
123127
let streamResponse: any;
124128
try {
125129
streamResponse = await fn(JSON.parse(extra));
126130
let chunk: object[] = [];
127131
streamResponse.stream.on('data', (c: object) => {
128132
chunk.push(c);
129-
if (chunk.length >= 10000) {
133+
if (chunk.length >= chunkLength) {
130134
if (!writer.chunk(JSON.stringify(chunk))) {
131135
streamResponse.stream.destroy({
132136
stack: "Rejected by client"

packages/cubejs-backend-shared/src/env.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,7 @@ const variables: Record<string, (...args: any) => any> = {
568568
* Query stream `highWaterMark` value.
569569
*/
570570
dbQueryStreamHighWaterMark: (): number => get('CUBEJS_DB_QUERY_STREAM_HIGH_WATER_MARK')
571-
.default(5000)
571+
.default(8192)
572572
.asInt(),
573573

574574
/**

packages/cubejs-query-orchestrator/src/orchestrator/QueryStream.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ import * as stream from 'stream';
22
import { getEnv } from '@cubejs-backend/shared';
33

44
export class QueryStream extends stream.Transform {
5+
private counter = 0;
6+
57
public queryKey: string;
68

79
public maps: {
@@ -47,7 +49,17 @@ export class QueryStream extends stream.Transform {
4749
Object.keys(chunk).forEach((alias) => {
4850
row[this.aliasNameToMember[alias]] = chunk[alias];
4951
});
50-
callback(null, row);
52+
if (this.counter < this.writableHighWaterMark) {
53+
this.counter++;
54+
callback(null, row);
55+
} else {
56+
this.pause();
57+
setTimeout(() => {
58+
this.resume();
59+
this.counter = 0;
60+
callback(null, row);
61+
}, 0);
62+
}
5163
}
5264

5365
/**

0 commit comments

Comments
 (0)