diff --git a/.changeset/modern-nails-refuse.md b/.changeset/modern-nails-refuse.md new file mode 100644 index 0000000000..f311a19b0e --- /dev/null +++ b/.changeset/modern-nails-refuse.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Multiple streams can now be consumed simultaneously diff --git a/packages/core/src/v3/apiClient/runStream.ts b/packages/core/src/v3/apiClient/runStream.ts index dd75458a86..9d2ae397d4 100644 --- a/packages/core/src/v3/apiClient/runStream.ts +++ b/packages/core/src/v3/apiClient/runStream.ts @@ -8,6 +8,7 @@ import { IOPacket, parsePacket, } from "../utils/ioSerialization.js"; +import { ApiError } from "./errors.js"; import { ApiClient } from "./index.js"; import { AsyncIterableStream, createAsyncIterableStream, zodShapeStream } from "./stream.js"; import { EventSourceParserStream } from "eventsource-parser/stream"; @@ -97,7 +98,7 @@ export function runShapeStream( // First, define interfaces for the stream handling export interface StreamSubscription { - subscribe(onChunk: (chunk: unknown) => Promise): Promise<() => void>; + subscribe(): Promise>; } export interface StreamSubscriptionFactory { @@ -111,33 +112,38 @@ export class SSEStreamSubscription implements StreamSubscription { private options: { headers?: Record; signal?: AbortSignal } ) {} - async subscribe(onChunk: (chunk: unknown) => Promise): Promise<() => void> { - const response = await fetch(this.url, { + async subscribe(): Promise> { + return fetch(this.url, { headers: { Accept: "text/event-stream", ...this.options.headers, }, signal: this.options.signal, - }); - - if (!response.body) { - throw new Error("No response body"); - } - - const reader = response.body - .pipeThrough(new TextDecoderStream()) - .pipeThrough(new EventSourceParserStream()) - .getReader(); - - while (true) { - const { done, value } = await reader.read(); - - if (done) break; + }).then((response) => { + if (!response.ok) { + throw ApiError.generate( + response.status, + {}, + "Could not subscribe to stream", + Object.fromEntries(response.headers) + ); + } - await onChunk(safeParseJSON(value.data)); - } + if (!response.body) { + throw new Error("No response body"); + } - return () => reader.cancel(); + return response.body + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new EventSourceParserStream()) + .pipeThrough( + new TransformStream({ + transform(chunk, controller) { + controller.enqueue(safeParseJSON(chunk.data)); + }, + }) + ); + }); } } @@ -254,13 +260,31 @@ export class RunSubscription { this.options.client?.baseUrl ); - await subscription.subscribe(async (chunk) => { - controller.enqueue({ - type: streamKey, - chunk: chunk as TStreams[typeof streamKey], - run, - } as StreamPartResult, TStreams>); - }); + const stream = await subscription.subscribe(); + + // Create the pipeline and start it + stream + .pipeThrough( + new TransformStream({ + transform(chunk, controller) { + controller.enqueue({ + type: streamKey, + chunk: chunk as TStreams[typeof streamKey], + run, + } as StreamPartResult, TStreams>); + }, + }) + ) + .pipeTo( + new WritableStream({ + write(chunk) { + controller.enqueue(chunk); + }, + }) + ) + .catch((error) => { + console.error(`Error in stream ${streamKey}:`, error); + }); } } } diff --git a/packages/core/test/runStream.test.ts b/packages/core/test/runStream.test.ts index 7cb1207d13..f97f6645ef 100644 --- a/packages/core/test/runStream.test.ts +++ b/packages/core/test/runStream.test.ts @@ -9,17 +9,23 @@ import { import type { SubscribeRunRawShape } from "../src/v3/schemas/api.js"; // Test implementations +// Update TestStreamSubscription to return a ReadableStream class TestStreamSubscription implements StreamSubscription { constructor(private chunks: unknown[]) {} - async subscribe(onChunk: (chunk: unknown) => Promise): Promise<() => void> { - for (const chunk of this.chunks) { - await onChunk(chunk); - } - return () => {}; + async subscribe(): Promise> { + return new ReadableStream({ + start: async (controller) => { + for (const chunk of this.chunks) { + controller.enqueue(chunk); + } + controller.close(); + }, + }); } } +// TestStreamSubscriptionFactory can remain the same class TestStreamSubscriptionFactory implements StreamSubscriptionFactory { private streams = new Map(); diff --git a/scripts/publish-prerelease.sh b/scripts/publish-prerelease.sh index efb019ee8e..ed22ed95b2 100755 --- a/scripts/publish-prerelease.sh +++ b/scripts/publish-prerelease.sh @@ -36,9 +36,17 @@ else fi # Run your commands - +# Run changeset version command and capture its output echo "Running: pnpm exec changeset version --snapshot $version" -pnpm exec changeset version --snapshot $version +if output=$(pnpm exec changeset version --snapshot $version 2>&1); then + if echo "$output" | grep -q "No unreleased changesets found"; then + echo "No unreleased changesets found. Exiting." + exit 0 + fi +else + echo "Error running changeset version command" + exit 1 +fi echo "Running: pnpm run build --filter \"@trigger.dev/*\" --filter \"trigger.dev\"" pnpm run build --filter "@trigger.dev/*" --filter "trigger.dev"