Skip to content

Commit 1211d46

Browse files
committed
Mutliple streams can be now consumed simultaneously
1 parent f7bf7bc commit 1211d46

File tree

2 files changed

+54
-34
lines changed

2 files changed

+54
-34
lines changed

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ export function runShapeStream<TRunTypes extends AnyRunTypes>(
9797

9898
// First, define interfaces for the stream handling
9999
export interface StreamSubscription {
100-
subscribe(onChunk: (chunk: unknown) => Promise<void>): Promise<() => void>;
100+
subscribe(): Promise<ReadableStream<unknown>>;
101101
}
102102

103103
export interface StreamSubscriptionFactory {
@@ -111,33 +111,29 @@ export class SSEStreamSubscription implements StreamSubscription {
111111
private options: { headers?: Record<string, string>; signal?: AbortSignal }
112112
) {}
113113

114-
async subscribe(onChunk: (chunk: unknown) => Promise<void>): Promise<() => void> {
115-
const response = await fetch(this.url, {
114+
async subscribe(): Promise<ReadableStream<unknown>> {
115+
return fetch(this.url, {
116116
headers: {
117117
Accept: "text/event-stream",
118118
...this.options.headers,
119119
},
120120
signal: this.options.signal,
121-
});
122-
123-
if (!response.body) {
124-
throw new Error("No response body");
125-
}
126-
127-
const reader = response.body
128-
.pipeThrough(new TextDecoderStream())
129-
.pipeThrough(new EventSourceParserStream())
130-
.getReader();
131-
132-
while (true) {
133-
const { done, value } = await reader.read();
134-
135-
if (done) break;
136-
137-
await onChunk(safeParseJSON(value.data));
138-
}
121+
}).then((response) => {
122+
if (!response.body) {
123+
throw new Error("No response body");
124+
}
139125

140-
return () => reader.cancel();
126+
return response.body
127+
.pipeThrough(new TextDecoderStream())
128+
.pipeThrough(new EventSourceParserStream())
129+
.pipeThrough(
130+
new TransformStream({
131+
transform(chunk, controller) {
132+
controller.enqueue(safeParseJSON(chunk.data));
133+
},
134+
})
135+
);
136+
});
141137
}
142138
}
143139

@@ -254,13 +250,31 @@ export class RunSubscription<TRunTypes extends AnyRunTypes> {
254250
this.options.client?.baseUrl
255251
);
256252

257-
await subscription.subscribe(async (chunk) => {
258-
controller.enqueue({
259-
type: streamKey,
260-
chunk: chunk as TStreams[typeof streamKey],
261-
run,
262-
} as StreamPartResult<RunShape<TRunTypes>, TStreams>);
263-
});
253+
const stream = await subscription.subscribe();
254+
255+
// Create the pipeline and start it
256+
stream
257+
.pipeThrough(
258+
new TransformStream({
259+
transform(chunk, controller) {
260+
controller.enqueue({
261+
type: streamKey,
262+
chunk: chunk as TStreams[typeof streamKey],
263+
run,
264+
} as StreamPartResult<RunShape<TRunTypes>, TStreams>);
265+
},
266+
})
267+
)
268+
.pipeTo(
269+
new WritableStream({
270+
write(chunk) {
271+
controller.enqueue(chunk);
272+
},
273+
})
274+
)
275+
.catch((error) => {
276+
console.error(`Error in stream ${streamKey}:`, error);
277+
});
264278
}
265279
}
266280
}

packages/core/test/runStream.test.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,23 @@ import {
99
import type { SubscribeRunRawShape } from "../src/v3/schemas/api.js";
1010

1111
// Test implementations
12+
// Update TestStreamSubscription to return a ReadableStream
1213
class TestStreamSubscription implements StreamSubscription {
1314
constructor(private chunks: unknown[]) {}
1415

15-
async subscribe(onChunk: (chunk: unknown) => Promise<void>): Promise<() => void> {
16-
for (const chunk of this.chunks) {
17-
await onChunk(chunk);
18-
}
19-
return () => {};
16+
async subscribe(): Promise<ReadableStream<unknown>> {
17+
return new ReadableStream({
18+
start: async (controller) => {
19+
for (const chunk of this.chunks) {
20+
controller.enqueue(chunk);
21+
}
22+
controller.close();
23+
},
24+
});
2025
}
2126
}
2227

28+
// TestStreamSubscriptionFactory can remain the same
2329
class TestStreamSubscriptionFactory implements StreamSubscriptionFactory {
2430
private streams = new Map<string, unknown[]>();
2531

0 commit comments

Comments
 (0)