From 1211d46c32aad42dd23f8e20d086e1339e9a67ce Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 2 Dec 2024 15:09:33 +0000 Subject: [PATCH 1/5] Mutliple streams can be now consumed simultaneously --- packages/core/src/v3/apiClient/runStream.ts | 72 ++++++++++++--------- packages/core/test/runStream.test.ts | 16 +++-- 2 files changed, 54 insertions(+), 34 deletions(-) diff --git a/packages/core/src/v3/apiClient/runStream.ts b/packages/core/src/v3/apiClient/runStream.ts index dd75458a86..e11581bbfc 100644 --- a/packages/core/src/v3/apiClient/runStream.ts +++ b/packages/core/src/v3/apiClient/runStream.ts @@ -97,7 +97,7 @@ export function runShapeStream( // First, define interfaces for the stream handling export interface StreamSubscription { - subscribe(onChunk: (chunk: unknown) => Promise): Promise<() => void>; + subscribe(): Promise>; } export interface StreamSubscriptionFactory { @@ -111,33 +111,29 @@ export class SSEStreamSubscription implements StreamSubscription { private options: { headers?: Record; signal?: AbortSignal } ) {} - async subscribe(onChunk: (chunk: unknown) => Promise): Promise<() => void> { - const response = await fetch(this.url, { + async subscribe(): Promise> { + return fetch(this.url, { headers: { Accept: "text/event-stream", ...this.options.headers, }, signal: this.options.signal, - }); - - if (!response.body) { - throw new Error("No response body"); - } - - const reader = response.body - .pipeThrough(new TextDecoderStream()) - .pipeThrough(new EventSourceParserStream()) - .getReader(); - - while (true) { - const { done, value } = await reader.read(); - - if (done) break; - - await onChunk(safeParseJSON(value.data)); - } + }).then((response) => { + if (!response.body) { + throw new Error("No response body"); + } - return () => reader.cancel(); + return response.body + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new EventSourceParserStream()) + .pipeThrough( + new TransformStream({ + transform(chunk, controller) { + controller.enqueue(safeParseJSON(chunk.data)); + }, + }) + ); + }); } } @@ -254,13 +250,31 @@ export class RunSubscription { this.options.client?.baseUrl ); - await subscription.subscribe(async (chunk) => { - controller.enqueue({ - type: streamKey, - chunk: chunk as TStreams[typeof streamKey], - run, - } as StreamPartResult, TStreams>); - }); + const stream = await subscription.subscribe(); + + // Create the pipeline and start it + stream + .pipeThrough( + new TransformStream({ + transform(chunk, controller) { + controller.enqueue({ + type: streamKey, + chunk: chunk as TStreams[typeof streamKey], + run, + } as StreamPartResult, TStreams>); + }, + }) + ) + .pipeTo( + new WritableStream({ + write(chunk) { + controller.enqueue(chunk); + }, + }) + ) + .catch((error) => { + console.error(`Error in stream ${streamKey}:`, error); + }); } } } diff --git a/packages/core/test/runStream.test.ts b/packages/core/test/runStream.test.ts index 7cb1207d13..f97f6645ef 100644 --- a/packages/core/test/runStream.test.ts +++ b/packages/core/test/runStream.test.ts @@ -9,17 +9,23 @@ import { import type { SubscribeRunRawShape } from "../src/v3/schemas/api.js"; // Test implementations +// Update TestStreamSubscription to return a ReadableStream class TestStreamSubscription implements StreamSubscription { constructor(private chunks: unknown[]) {} - async subscribe(onChunk: (chunk: unknown) => Promise): Promise<() => void> { - for (const chunk of this.chunks) { - await onChunk(chunk); - } - return () => {}; + async subscribe(): Promise> { + return new ReadableStream({ + start: async (controller) => { + for (const chunk of this.chunks) { + controller.enqueue(chunk); + } + controller.close(); + }, + }); } } +// TestStreamSubscriptionFactory can remain the same class TestStreamSubscriptionFactory implements StreamSubscriptionFactory { private streams = new Map(); From da2a80b256aa769d522989649156e7196f9c9a69 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 2 Dec 2024 15:14:14 +0000 Subject: [PATCH 2/5] Update prerelease script --- scripts/publish-prerelease.sh | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/scripts/publish-prerelease.sh b/scripts/publish-prerelease.sh index efb019ee8e..ed22ed95b2 100755 --- a/scripts/publish-prerelease.sh +++ b/scripts/publish-prerelease.sh @@ -36,9 +36,17 @@ else fi # Run your commands - +# Run changeset version command and capture its output echo "Running: pnpm exec changeset version --snapshot $version" -pnpm exec changeset version --snapshot $version +if output=$(pnpm exec changeset version --snapshot $version 2>&1); then + if echo "$output" | grep -q "No unreleased changesets found"; then + echo "No unreleased changesets found. Exiting." + exit 0 + fi +else + echo "Error running changeset version command" + exit 1 +fi echo "Running: pnpm run build --filter \"@trigger.dev/*\" --filter \"trigger.dev\"" pnpm run build --filter "@trigger.dev/*" --filter "trigger.dev" From 2940dc033b22805802c224f7a70cf068bfe5d685 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 2 Dec 2024 15:14:49 +0000 Subject: [PATCH 3/5] Add changeset --- .changeset/modern-nails-refuse.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/modern-nails-refuse.md diff --git a/.changeset/modern-nails-refuse.md b/.changeset/modern-nails-refuse.md new file mode 100644 index 0000000000..d8750a0f8b --- /dev/null +++ b/.changeset/modern-nails-refuse.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/sdk": patch +--- + +Multiple streams can now be consumed simultaneously From c5d67b5e641999d31961e27032fbc4711fe0342a Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 2 Dec 2024 15:16:36 +0000 Subject: [PATCH 4/5] Make it core --- .changeset/modern-nails-refuse.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/modern-nails-refuse.md b/.changeset/modern-nails-refuse.md index d8750a0f8b..f311a19b0e 100644 --- a/.changeset/modern-nails-refuse.md +++ b/.changeset/modern-nails-refuse.md @@ -1,5 +1,5 @@ --- -"@trigger.dev/sdk": patch +"@trigger.dev/core": patch --- Multiple streams can now be consumed simultaneously From 52b76278f7b8cba381ece4ebb8952eb78f2d3aef Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 2 Dec 2024 15:38:59 +0000 Subject: [PATCH 5/5] Handle API error responses when streaming --- packages/core/src/v3/apiClient/runStream.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/packages/core/src/v3/apiClient/runStream.ts b/packages/core/src/v3/apiClient/runStream.ts index e11581bbfc..9d2ae397d4 100644 --- a/packages/core/src/v3/apiClient/runStream.ts +++ b/packages/core/src/v3/apiClient/runStream.ts @@ -8,6 +8,7 @@ import { IOPacket, parsePacket, } from "../utils/ioSerialization.js"; +import { ApiError } from "./errors.js"; import { ApiClient } from "./index.js"; import { AsyncIterableStream, createAsyncIterableStream, zodShapeStream } from "./stream.js"; import { EventSourceParserStream } from "eventsource-parser/stream"; @@ -119,6 +120,15 @@ export class SSEStreamSubscription implements StreamSubscription { }, signal: this.options.signal, }).then((response) => { + if (!response.ok) { + throw ApiError.generate( + response.status, + {}, + "Could not subscribe to stream", + Object.fromEntries(response.headers) + ); + } + if (!response.body) { throw new Error("No response body"); }