Skip to content

Commit abcbc1e

Browse files
authored
fix: streaming (#6156)
1 parent c70b155 commit abcbc1e

File tree

10 files changed

+173
-135
lines changed

10 files changed

+173
-135
lines changed

packages/cubejs-backend-native/js/index.ts

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -127,28 +127,29 @@ function wrapNativeFunctionWithStream(
127127
let streamResponse: any;
128128
try {
129129
streamResponse = await fn(JSON.parse(extra));
130-
let chunk: object[] = [];
131-
streamResponse.stream.on('data', (c: object) => {
132-
chunk.push(c);
133-
if (chunk.length >= chunkLength) {
134-
if (!writer.chunk(JSON.stringify(chunk))) {
135-
streamResponse.stream.destroy({
136-
stack: "Rejected by client"
137-
});
130+
if (streamResponse) {
131+
let chunk: object[] = [];
132+
streamResponse.stream.on('data', (c: object) => {
133+
chunk.push(c);
134+
if (chunk.length >= chunkLength) {
135+
if (!writer.chunk(JSON.stringify(chunk))) {
136+
streamResponse.stream.destroy({
137+
stack: "Rejected by client"
138+
});
139+
}
140+
chunk = [];
138141
}
139-
chunk = [];
140-
}
141-
});
142-
streamResponse.stream.on('close', () => {
143-
if (chunk.length > 0) {
144-
writer.chunk(JSON.stringify(chunk));
145-
}
146-
writer.end("");
147-
});
148-
149-
streamResponse.stream.on('error', (err: any) => {
150-
writer.reject(err.message || "Unknown JS exception");
151-
});
142+
});
143+
streamResponse.stream.on('close', () => {
144+
if (chunk.length > 0) {
145+
writer.chunk(JSON.stringify(chunk));
146+
}
147+
writer.end("");
148+
});
149+
streamResponse.stream.on('error', (err: any) => {
150+
writer.reject(err.message || "Unknown JS exception");
151+
});
152+
}
152153
} catch (e: any) {
153154
if (!!streamResponse && !!streamResponse.stream) {
154155
streamResponse.stream.destroy(e);

packages/cubejs-backend-shared/src/env.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1394,6 +1394,9 @@ const variables: Record<string, (...args: any) => any> = {
13941394
redisUseIORedis: () => get('CUBEJS_REDIS_USE_IOREDIS')
13951395
.default('false')
13961396
.asBoolStrict(),
1397+
redisАcquireTimeout: () => get('CUBEJS_REDIS_ACQUIRE_TIMEOUT')
1398+
.default('5000')
1399+
.asInt(),
13971400
allowUngroupedWithoutPrimaryKey: () => get('CUBEJS_ALLOW_UNGROUPED_WITHOUT_PRIMARY_KEY')
13981401
.default('false')
13991402
.asBoolStrict(),

packages/cubejs-backend-shared/src/process.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,12 @@ const processUid = v5(v1(), v1()).toString();
1111
export function getProcessUid(): string {
1212
return processUid;
1313
}
14+
15+
let i = 0;
16+
17+
/**
18+
* Returns unique for the process number.
19+
*/
20+
export function getNext(): number {
21+
return i++;
22+
}

packages/cubejs-jdbc-driver/src/QueryStream.ts

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,20 @@ export class QueryStream extends Readable {
2929
* @override
3030
*/
3131
public _read(highWaterMark: number): void {
32-
for (let i = 0; i < highWaterMark; i++) {
33-
if (this.next) {
34-
const row = this.next();
35-
if (row.value) {
36-
this.push(row.value);
37-
}
38-
if (row.done) {
39-
this.push(null);
40-
break;
32+
setTimeout(() => {
33+
for (let i = 0; i < highWaterMark; i++) {
34+
if (this.next) {
35+
const row = this.next();
36+
if (row.value) {
37+
this.push(row.value);
38+
}
39+
if (row.done) {
40+
this.push(null);
41+
break;
42+
}
4143
}
4244
}
43-
}
45+
}, 0);
4446
}
4547

4648
/**

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 61 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -477,10 +477,13 @@ export class QueryCache {
477477
if (!persistent) {
478478
return queue.executeInQueue('query', cacheKey, _query, priority, opt);
479479
} else {
480-
const stream = queue.setQueryStream(cacheKey, aliasNameToMember);
481-
// we don't want to handle error here as we want it to bubble up
482-
// to the api gateway
483-
queue.executeInQueue('stream', cacheKey, _query, priority, opt);
480+
const stream = queue.createQueryStream(cacheKey, aliasNameToMember);
481+
// We don't want to handle error here as we want it to bubble up
482+
// to the api gateway. We need to increase orphaned timeout as well.
483+
queue.executeInQueue('stream', cacheKey, {
484+
..._query,
485+
orphanedTimeout: 1200,
486+
}, priority, opt);
484487
return stream;
485488
}
486489
}
@@ -590,47 +593,53 @@ export class QueryCache {
590593
return result;
591594
},
592595
stream: async (req, target) => {
593-
let logged = false;
594596
queue.logger('Streaming SQL', { ...req });
595-
const client = await clientFactory();
596-
try {
597-
const source = await client.streamQuery(req.query, req.values);
598-
599-
const cleanup = (error) => {
600-
if (!source.destroyed) {
601-
source.destroy(error);
602-
}
603-
if (!target.destroyed) {
604-
target.destroy(error);
605-
}
606-
if (!logged && source.destroyed && target.destroyed) {
607-
logged = true;
608-
if (error) {
609-
queue.logger('Streaming done with error', {
610-
query: req.query,
611-
query_values: req.values,
612-
error,
613-
});
614-
} else {
615-
queue.logger('Streaming successfully completed', {
616-
requestId: req.requestId,
617-
});
618-
}
619-
}
620-
};
621-
622-
source.once('end', cleanup);
623-
source.once('error', cleanup);
624-
source.once('close', cleanup);
625-
626-
target.once('end', cleanup);
627-
target.once('error', cleanup);
628-
target.once('close', cleanup);
629-
630-
source.pipe(target);
631-
} catch (e) {
632-
target.emit('error', e);
633-
}
597+
await (new Promise((resolve, reject) => {
598+
let logged = false;
599+
Promise
600+
.all([clientFactory()])
601+
.then(([client]) => client.streamQuery(req.query, req.values))
602+
.then((source) => {
603+
const cleanup = (error) => {
604+
if (!source.destroyed) {
605+
source.destroy(error);
606+
}
607+
if (!target.destroyed) {
608+
target.destroy(error);
609+
}
610+
if (!logged && source.destroyed && target.destroyed) {
611+
logged = true;
612+
if (error) {
613+
queue.logger('Streaming done with error', {
614+
query: req.query,
615+
query_values: req.values,
616+
error,
617+
});
618+
reject(error);
619+
} else {
620+
queue.logger('Streaming successfully completed', {
621+
requestId: req.requestId,
622+
});
623+
resolve(req.requestId);
624+
}
625+
}
626+
};
627+
628+
source.once('end', cleanup);
629+
source.once('error', cleanup);
630+
source.once('close', cleanup);
631+
632+
target.once('end', cleanup);
633+
target.once('error', cleanup);
634+
target.once('close', cleanup);
635+
636+
source.pipe(target);
637+
})
638+
.catch((reason) => {
639+
target.emit('error', reason);
640+
throw reason;
641+
});
642+
}));
634643
},
635644
},
636645
cancelHandlers: {
@@ -639,7 +648,14 @@ export class QueryCache {
639648
await queue.handles[req.cancelHandler].cancel();
640649
delete queue.handles[req.cancelHandler];
641650
}
642-
}
651+
},
652+
stream: async (req) => {
653+
req.queryKey.persistent = true;
654+
const queryKeyHash = queue.redisHash(req.queryKey);
655+
if (queue.streams.has(queryKeyHash)) {
656+
queue.streams.get(queryKeyHash).destroy();
657+
}
658+
},
643659
},
644660
logger: (msg, params) => options.logger(msg, params),
645661
...options

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,7 @@ export class QueryQueue {
130130
/**
131131
* Persistent queries streams maps.
132132
*/
133-
this.streams = {
134-
queued: new Map(),
135-
processing: new Map(),
136-
};
133+
this.streams = new Map();
137134
}
138135

139136
/**
@@ -142,26 +139,26 @@ export class QueryQueue {
142139
* @param {QueryKeyHash} queryKeyHash
143140
*/
144141
getQueryStream(queryKeyHash) {
145-
if (!this.streams.queued.has(queryKeyHash)) {
146-
throw new Error(`Unable to find stream for persisted query with id: ${queryKeyHash}`);
142+
if (!this.streams.has(queryKeyHash)) {
143+
throw new Error(
144+
`Unable to find the query stream ${queryKeyHash} for the process ${getProcessUid()}.`
145+
);
147146
}
148-
149-
return this.streams.queued.get(queryKeyHash);
147+
return this.streams.get(queryKeyHash);
150148
}
151149

152150
/**
153151
* @param {*} queryKey
154152
* @param {{ [alias: string]: string }} aliasNameToMember
155153
*/
156-
setQueryStream(queryKey, aliasNameToMember) {
154+
createQueryStream(queryKey, aliasNameToMember) {
157155
const key = this.redisHash(queryKey);
158156
const stream = new QueryStream({
159157
key,
160-
maps: this.streams,
158+
streams: this.streams,
161159
aliasNameToMember,
162160
});
163-
this.streams.queued.set(key, stream);
164-
161+
this.streams.set(key, stream);
165162
return stream;
166163
}
167164

@@ -802,10 +799,7 @@ export class QueryQueue {
802799
const handler = query?.queryHandler;
803800
switch (handler) {
804801
case 'stream':
805-
await this.queryTimeout(
806-
this.queryHandlers.stream(query.query, this.getQueryStream(queryKeyHashed))
807-
);
808-
802+
await this.queryHandlers.stream(query.query, this.getQueryStream(queryKeyHashed));
809803
// CubeStore has special handling for null
810804
executionResult = null;
811805
break;
@@ -908,6 +902,14 @@ export class QueryQueue {
908902
}
909903

910904
await this.reconcileQueue();
905+
} else if (query?.queryHandler === 'stream') {
906+
if (this.streams.has(queryKeyHashed)) {
907+
this.getQueryStream(queryKeyHashed).debounce();
908+
}
909+
await queueConnection.freeProcessingLock(queryKeyHashed, processingId, activated);
910+
setTimeout(() => {
911+
this.reconcileQueue();
912+
}, 5000);
911913
} else {
912914
this.logger('Skip processing', {
913915
processingId,
@@ -921,11 +923,6 @@ export class QueryQueue {
921923
activated,
922924
queryExists: !!query
923925
});
924-
// closing stream
925-
if (query?.queryHandler === 'stream') {
926-
const stream = this.getQueryStream(queryKeyHashed);
927-
stream.destroy();
928-
}
929926
const currentProcessingId = await queueConnection.freeProcessingLock(queryKeyHashed, processingId, activated);
930927
if (currentProcessingId) {
931928
this.logger('Skipping free processing lock', {

0 commit comments

Comments
 (0)