Skip to content

Commit c5b6702

Browse files
authored
fix(query-orchestrator): streams cluster (#6048)
1 parent 69143d5 commit c5b6702

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -782,7 +782,7 @@ export class QueryQueue {
782782
let target;
783783
switch (handler) {
784784
case 'stream':
785-
target = this.getQueryStream(activeKeys[0]);
785+
target = this.getQueryStream(this.redisHash(queryKey));
786786
await this.queryTimeout(this.queryHandlers.stream(query.query, target));
787787
break;
788788
default:
@@ -897,6 +897,11 @@ export class QueryQueue {
897897
activated,
898898
queryExists: !!query
899899
});
900+
// closing stream
901+
if (query?.queryHandler === 'stream') {
902+
const stream = this.getQueryStream(this.redisHash(queryKey));
903+
stream.destroy();
904+
}
900905
const currentProcessingId = await queueConnection.freeProcessingLock(queryKey, processingId, activated);
901906
if (currentProcessingId) {
902907
this.logger('Skipping free processing lock', {

packages/cubejs-query-orchestrator/src/orchestrator/QueryStream.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ export class QueryStream extends stream.Transform {
3535
this.queryKey = key;
3636
this.maps = maps;
3737
this.aliasNameToMember = aliasNameToMember;
38+
if (!this.aliasNameToMember) {
39+
this.emit('error', 'The QueryStream `aliasNameToMember` property is missed.');
40+
}
3841
}
3942

4043
/**

0 commit comments

Comments
 (0)