Skip to content

Commit a026ceb

Browse files
authored
test(query-stream): Queue - make stream test stable (#7508)
1 parent 59557e8 commit a026ceb

File tree

2 files changed

+13
-4
lines changed

2 files changed

+13
-4
lines changed

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,24 +148,26 @@ export class QueryQueue {
148148
* Returns stream object which will be used to pipe data from data source.
149149
*
150150
* @param {QueryKeyHash} queryKeyHash
151+
* @return {QueryStream | undefined}
151152
*/
152153
getQueryStream(queryKeyHash) {
153154
return this.streams.get(queryKeyHash);
154155
}
155156

156157
/**
157-
* @param {*} queryKey
158+
* @param {QueryKeyHash} key
158159
* @param {{ [alias: string]: string }} aliasNameToMember
160+
* @return {QueryStream}
159161
*/
160-
createQueryStream(queryKeyHash, aliasNameToMember) {
161-
const key = queryKeyHash;
162+
createQueryStream(key, aliasNameToMember) {
162163
const stream = new QueryStream({
163164
key,
164165
streams: this.streams,
165166
aliasNameToMember,
166167
});
167168
this.streams.set(key, stream);
168-
this.streamEvents.emit('streamStarted', queryKeyHash);
169+
this.streamEvents.emit('streamStarted', key);
170+
169171
return stream;
170172
}
171173

@@ -307,6 +309,8 @@ export class QueryQueue {
307309

308310
// Stream processing goes here under assumption there's no way of a stream close just after it was added to the `streams` map.
309311
// Otherwise `streamStarted` event listener should go before the `reconcileQueue` call.
312+
// TODO: Fix an issue with a fast execution of stream handler which caused by removal of QueryStream from streams,
313+
// while EventListener doesnt start to listen for started stream event
310314
if (queryHandler === 'stream') {
311315
const self = this;
312316
result = await new Promise((resolve) => {

packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
4141
},
4242
stream: async (query, stream) => {
4343
streamCount++;
44+
45+
// TODO: Fix an issue with a fast execution of stream handler which caused by removal of QueryStream from streams,
46+
// while EventListener doesnt start to listen for started stream event
47+
await pausePromise(250);
48+
4449
return new Promise((resolve, reject) => {
4550
const readable = Readable.from([]);
4651
readable.once('end', () => resolve(null));

0 commit comments

Comments
 (0)