Skip to content

Commit ac2357d

Browse files
authored
fix(query-orchestrator): Queue - clear timer on streaming (#7501)
1 parent 065916e commit ac2357d

File tree

3 files changed

+26
-9
lines changed

3 files changed

+26
-9
lines changed

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -310,22 +310,32 @@ export class QueryQueue {
310310
if (queryHandler === 'stream') {
311311
const self = this;
312312
result = await new Promise((resolve) => {
313+
let timeoutTimerId = null;
314+
313315
const onStreamStarted = (streamStartedHash) => {
314316
if (streamStartedHash === queryKeyHash) {
317+
if (timeoutTimerId) {
318+
clearTimeout(timeoutTimerId);
319+
}
320+
315321
resolve(self.getQueryStream(queryKeyHash));
316322
}
317323
};
318324

319-
setTimeout(() => {
320-
self.streamEvents.removeListener('streamStarted', onStreamStarted);
321-
resolve(null);
322-
}, this.continueWaitTimeout * 1000);
323-
324325
self.streamEvents.addListener('streamStarted', onStreamStarted);
325-
const stream = this.getQueryStream(this.redisHash(queryKey));
326+
327+
const stream = this.getQueryStream(queryKeyHash);
326328
if (stream) {
327329
self.streamEvents.removeListener('streamStarted', onStreamStarted);
328330
resolve(stream);
331+
} else {
332+
timeoutTimerId = setTimeout(
333+
() => {
334+
self.streamEvents.removeListener('streamStarted', onStreamStarted);
335+
resolve(null);
336+
},
337+
this.continueWaitTimeout * 10000
338+
);
329339
}
330340
});
331341
} else {

packages/cubejs-query-orchestrator/src/orchestrator/QueryStream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import * as stream from 'stream';
22
import { getEnv } from '@cubejs-backend/shared';
33

44
export class QueryStream extends stream.Transform {
5-
private timeout = 5 * 60000 || getEnv('dbQueryTimeout');
5+
private timeout = 5 * 60 * 1000;
66

77
private timer = null;
88

packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
155155
test('timeout', async () => {
156156
const query = ['select * from 3'];
157157

158-
await queue.executeInQueue('delay', query, { delay: 60 * 60 * 1000, result: '1', isJob: true });
158+
// executionTimeout is 2s, 5s is enough
159+
await queue.executeInQueue('delay', query, { delay: 5 * 1000, result: '1', isJob: true });
159160
await awaitProcessing();
160161

161162
expect(logger.mock.calls.length).toEqual(5);
@@ -324,9 +325,15 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
324325
test('stream handler', async () => {
325326
const key: QueryKey = ['select * from table', []];
326327
key.persistent = true;
327-
await queue.executeInQueue('stream', key, { aliasNameToMember: {} }, 0);
328+
const stream = await queue.executeInQueue('stream', key, { aliasNameToMember: {} }, 0);
328329
await awaitProcessing();
329330

331+
// QueryStream has a debounce timer to destroy stream
332+
// without reading it, timer will block exit for jest
333+
for await (const chunk of stream) {
334+
console.log('streaming chunk: ', chunk);
335+
}
336+
330337
expect(streamCount).toEqual(1);
331338
expect(logger.mock.calls[logger.mock.calls.length - 1][0]).toEqual('Performing query completed');
332339
});

0 commit comments

Comments
 (0)