diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index a492e974aa510..8435f0068dcbf 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -500,11 +500,27 @@ export class QueryCache { if (client.stream) { tableData = await client.stream(q.query, q.values, q); const errors = []; - await pipeline(tableData.rowStream, writer, (err) => { - if (err) { - errors.push(err); + + const iterator = tableData.rowStream[Symbol.asyncIterator](); + try { + let result = await iterator.next(); + while (!result.done) { + try { + writer.write(result.value); + } catch (writeErr) { + errors.push(writeErr); + } + result = await iterator.next(); } - }); + } catch (streamErr) { + errors.push(streamErr); + } + try { + writer.end(); + } catch (endErr) { + errors.push(endErr); + } + if (errors.length > 0) { throw new Error(`Lambda query errors ${errors.join(', ')}`); }