diff --git a/.changeset/hip-cups-wave.md b/.changeset/hip-cups-wave.md new file mode 100644 index 0000000000..c21b94e37b --- /dev/null +++ b/.changeset/hip-cups-wave.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/sdk": patch +--- + +Fix issue where realtime streams would cut off after 5 minutes diff --git a/packages/core/src/v3/runMetadata/metadataStream.ts b/packages/core/src/v3/runMetadata/metadataStream.ts index 0ec263666b..3825408b37 100644 --- a/packages/core/src/v3/runMetadata/metadataStream.ts +++ b/packages/core/src/v3/runMetadata/metadataStream.ts @@ -1,3 +1,7 @@ +import { request as httpsRequest } from "node:https"; +import { request as httpRequest } from "node:http"; +import { URL } from "node:url"; + export type MetadataOptions = { baseUrl: string; runId: string; @@ -7,18 +11,25 @@ export type MetadataOptions = { signal?: AbortSignal; version?: "v1" | "v2"; target?: "self" | "parent" | "root"; + maxRetries?: number; }; export class MetadataStream { private controller = new AbortController(); private serverStream: ReadableStream; private consumerStream: ReadableStream; - private streamPromise: Promise; + private streamPromise: Promise; + private retryCount = 0; + private readonly maxRetries: number; + private currentChunkIndex = 0; + private reader: ReadableStreamDefaultReader; constructor(private options: MetadataOptions) { const [serverStream, consumerStream] = this.createTeeStreams(); this.serverStream = serverStream; this.consumerStream = consumerStream; + this.maxRetries = options.maxRetries ?? 10; + this.reader = this.serverStream.getReader(); this.streamPromise = this.initializeServerStream(); } @@ -26,38 +37,114 @@ export class MetadataStream { private createTeeStreams() { const readableSource = new ReadableStream({ start: async (controller) => { - for await (const value of this.options.source) { - controller.enqueue(value); + try { + for await (const value of this.options.source) { + controller.enqueue(value); + } + controller.close(); + } catch (error) { + controller.error(error); } - - controller.close(); }, }); return readableSource.tee(); } - private initializeServerStream(): Promise { - const serverStream = this.serverStream.pipeThrough( - new TransformStream({ - async transform(chunk, controller) { - controller.enqueue(JSON.stringify(chunk) + "\n"); + private async makeRequest(startFromChunk: number = 0): Promise { + return new Promise((resolve, reject) => { + const url = new URL(this.buildUrl()); + const timeout = 15 * 60 * 1000; // 15 minutes + + const requestFn = url.protocol === "https:" ? httpsRequest : httpRequest; + const req = requestFn({ + method: "POST", + hostname: url.hostname, + port: url.port || (url.protocol === "https:" ? 443 : 80), + path: url.pathname + url.search, + headers: { + ...this.options.headers, + "Content-Type": "application/json", + "X-Resume-From-Chunk": startFromChunk.toString(), }, - }) - ); - - return fetch(this.buildUrl(), { - method: "POST", - headers: this.options.headers ?? {}, - body: serverStream, - signal: this.controller.signal, - // @ts-expect-error - duplex: "half", + timeout, + }); + + req.on("error", (error) => { + reject(error); + }); + + req.on("timeout", () => { + req.destroy(new Error("Request timed out")); + }); + + req.on("response", (res) => { + if (res.statusCode === 408) { + if (this.retryCount < this.maxRetries) { + this.retryCount++; + + resolve(this.makeRequest(this.currentChunkIndex)); + return; + } + reject(new Error(`Max retries (${this.maxRetries}) exceeded after timeout`)); + return; + } + + if (res.statusCode && (res.statusCode < 200 || res.statusCode >= 300)) { + const error = new Error(`HTTP error! status: ${res.statusCode}`); + reject(error); + return; + } + + res.on("end", () => { + resolve(); + }); + + res.resume(); + }); + + if (this.options.signal) { + this.options.signal.addEventListener("abort", () => { + req.destroy(new Error("Request aborted")); + }); + } + + const processStream = async () => { + try { + while (true) { + const { done, value } = await this.reader.read(); + + if (done) { + req.end(); + break; + } + + const stringified = JSON.stringify(value) + "\n"; + req.write(stringified); + this.currentChunkIndex++; + } + } catch (error) { + req.destroy(error as Error); + } + }; + + processStream().catch((error) => { + reject(error); + }); }); } + private async initializeServerStream(): Promise { + try { + await this.makeRequest(0); + } catch (error) { + this.reader.releaseLock(); + throw error; + } + } + public async wait(): Promise { - return this.streamPromise.then(() => void 0); + return this.streamPromise; } public [Symbol.asyncIterator]() {