@@ -15,7 +15,12 @@ import {
1515} from "../utils/ioSerialization.js" ;
1616import { ApiError } from "./errors.js" ;
1717import { ApiClient } from "./index.js" ;
18- import { AsyncIterableStream , createAsyncIterableStream , zodShapeStream } from "./stream.js" ;
18+ import {
19+ AsyncIterableStream ,
20+ createAsyncIterableReadable ,
21+ createAsyncIterableStream ,
22+ zodShapeStream ,
23+ } from "./stream.js" ;
1924import { EventSourceParserStream } from "eventsource-parser/stream" ;
2025
2126export type RunShape < TRunTypes extends AnyRunTypes > = TRunTypes extends AnyRunTypes
@@ -82,25 +87,42 @@ export function runShapeStream<TRunTypes extends AnyRunTypes>(
8287 url : string ,
8388 options ?: RunShapeStreamOptions
8489) : RunSubscription < TRunTypes > {
90+ const abortController = new AbortController ( ) ;
91+
8592 const version1 = new SSEStreamSubscriptionFactory (
8693 getEnvVar ( "TRIGGER_STREAM_URL" , getEnvVar ( "TRIGGER_API_URL" ) ) ?? "https://api.trigger.dev" ,
8794 {
8895 headers : options ?. headers ,
89- signal : options ? .signal ,
96+ signal : abortController . signal ,
9097 }
9198 ) ;
9299
93100 const version2 = new ElectricStreamSubscriptionFactory (
94101 getEnvVar ( "TRIGGER_STREAM_URL" , getEnvVar ( "TRIGGER_API_URL" ) ) ?? "https://api.trigger.dev" ,
95102 {
96103 headers : options ?. headers ,
97- signal : options ? .signal ,
104+ signal : abortController . signal ,
98105 }
99106 ) ;
100107
108+ // If the user supplied AbortSignal is aborted, we should abort the internal controller
109+ options ?. signal ?. addEventListener (
110+ "abort" ,
111+ ( ) => {
112+ if ( ! abortController . signal . aborted ) {
113+ abortController . abort ( ) ;
114+ }
115+ } ,
116+ { once : true }
117+ ) ;
118+
101119 const $options : RunSubscriptionOptions = {
102- runShapeStream : zodShapeStream ( SubscribeRunRawShape , url , options ) ,
120+ runShapeStream : zodShapeStream ( SubscribeRunRawShape , url , {
121+ ...options ,
122+ signal : abortController . signal ,
123+ } ) ,
103124 streamFactory : new VersionedStreamSubscriptionFactory ( version1 , version2 ) ,
125+ abortController,
104126 ...options ,
105127 } ;
106128
@@ -218,11 +240,10 @@ export class ElectricStreamSubscriptionFactory implements StreamSubscriptionFact
218240 throw new Error ( "runId and streamKey are required" ) ;
219241 }
220242
221- const url = `${ baseUrl ?? this . baseUrl } /realtime/v2/streams/${ runId } /${ streamKey } ` ;
222-
223- console . log ( "Creating ElectricStreamSubscription with URL:" , url ) ;
224-
225- return new ElectricStreamSubscription ( url , this . options ) ;
243+ return new ElectricStreamSubscription (
244+ `${ baseUrl ?? this . baseUrl } /realtime/v2/streams/${ runId } /${ streamKey } ` ,
245+ this . options
246+ ) ;
226247 }
227248}
228249
@@ -264,39 +285,48 @@ export interface RunShapeProvider {
264285export type RunSubscriptionOptions = RunShapeStreamOptions & {
265286 runShapeStream : ReadableStream < SubscribeRunRawShape > ;
266287 streamFactory : StreamSubscriptionFactory ;
288+ abortController : AbortController ;
267289} ;
268290
269291export class RunSubscription < TRunTypes extends AnyRunTypes > {
270- private abortController : AbortController ;
271292 private unsubscribeShape ?: ( ) => void ;
272293 private stream : AsyncIterableStream < RunShape < TRunTypes > > ;
273294 private packetCache = new Map < string , any > ( ) ;
274295 private _closeOnComplete : boolean ;
275296 private _isRunComplete = false ;
276297
277298 constructor ( private options : RunSubscriptionOptions ) {
278- this . abortController = new AbortController ( ) ;
279299 this . _closeOnComplete =
280300 typeof options . closeOnComplete === "undefined" ? true : options . closeOnComplete ;
281301
282- this . stream = createAsyncIterableStream ( this . options . runShapeStream , {
283- transform : async ( chunk , controller ) => {
284- const run = await this . transformRunShape ( chunk ) ;
302+ this . stream = createAsyncIterableReadable (
303+ this . options . runShapeStream ,
304+ {
305+ transform : async ( chunk , controller ) => {
306+ const run = await this . transformRunShape ( chunk ) ;
285307
286- controller . enqueue ( run ) ;
308+ controller . enqueue ( run ) ;
287309
288- this . _isRunComplete = ! ! run . finishedAt ;
310+ this . _isRunComplete = ! ! run . finishedAt ;
289311
290- if ( this . _closeOnComplete && this . _isRunComplete && ! this . abortController . signal . aborted ) {
291- this . abortController . abort ( ) ;
292- }
312+ if (
313+ this . _closeOnComplete &&
314+ this . _isRunComplete &&
315+ ! this . options . abortController . signal . aborted
316+ ) {
317+ console . log ( "Closing stream because run is complete" ) ;
318+
319+ this . options . abortController . abort ( ) ;
320+ }
321+ } ,
293322 } ,
294- } ) ;
323+ this . options . abortController . signal
324+ ) ;
295325 }
296326
297327 unsubscribe ( ) : void {
298- if ( ! this . abortController . signal . aborted ) {
299- this . abortController . abort ( ) ;
328+ if ( ! this . options . abortController . signal . aborted ) {
329+ this . options . abortController . abort ( ) ;
300330 }
301331 this . unsubscribeShape ?.( ) ;
302332 }
@@ -315,60 +345,68 @@ export class RunSubscription<TRunTypes extends AnyRunTypes> {
315345 // Keep track of which streams we've already subscribed to
316346 const activeStreams = new Set < string > ( ) ;
317347
318- return createAsyncIterableStream ( this . stream , {
319- transform : async ( run , controller ) => {
320- controller . enqueue ( {
321- type : "run" ,
322- run,
323- } ) ;
324-
325- // Check for stream metadata
326- if ( run . metadata && "$$streams" in run . metadata && Array . isArray ( run . metadata . $$streams ) ) {
327- for ( const streamKey of run . metadata . $$streams ) {
328- if ( typeof streamKey !== "string" ) {
329- continue ;
330- }
331-
332- if ( ! activeStreams . has ( streamKey ) ) {
333- activeStreams . add ( streamKey ) ;
334-
335- const subscription = this . options . streamFactory . createSubscription (
336- run . metadata ,
337- run . id ,
338- streamKey ,
339- this . options . client ?. baseUrl
340- ) ;
341-
342- const stream = await subscription . subscribe ( ) ;
343-
344- // Create the pipeline and start it
345- stream
346- . pipeThrough (
347- new TransformStream ( {
348- transform ( chunk , controller ) {
349- controller . enqueue ( {
350- type : streamKey ,
351- chunk : chunk as TStreams [ typeof streamKey ] ,
352- run,
353- } as StreamPartResult < RunShape < TRunTypes > , TStreams > ) ;
354- } ,
355- } )
356- )
357- . pipeTo (
358- new WritableStream ( {
359- write ( chunk ) {
360- controller . enqueue ( chunk ) ;
361- } ,
362- } )
363- )
364- . catch ( ( error ) => {
365- console . error ( `Error in stream ${ streamKey } :` , error ) ;
366- } ) ;
348+ return createAsyncIterableReadable (
349+ this . stream ,
350+ {
351+ transform : async ( run , controller ) => {
352+ controller . enqueue ( {
353+ type : "run" ,
354+ run,
355+ } ) ;
356+
357+ // Check for stream metadata
358+ if (
359+ run . metadata &&
360+ "$$streams" in run . metadata &&
361+ Array . isArray ( run . metadata . $$streams )
362+ ) {
363+ for ( const streamKey of run . metadata . $$streams ) {
364+ if ( typeof streamKey !== "string" ) {
365+ continue ;
366+ }
367+
368+ if ( ! activeStreams . has ( streamKey ) ) {
369+ activeStreams . add ( streamKey ) ;
370+
371+ const subscription = this . options . streamFactory . createSubscription (
372+ run . metadata ,
373+ run . id ,
374+ streamKey ,
375+ this . options . client ?. baseUrl
376+ ) ;
377+
378+ const stream = await subscription . subscribe ( ) ;
379+
380+ // Create the pipeline and start it
381+ stream
382+ . pipeThrough (
383+ new TransformStream ( {
384+ transform ( chunk , controller ) {
385+ controller . enqueue ( {
386+ type : streamKey ,
387+ chunk : chunk as TStreams [ typeof streamKey ] ,
388+ run,
389+ } as StreamPartResult < RunShape < TRunTypes > , TStreams > ) ;
390+ } ,
391+ } )
392+ )
393+ . pipeTo (
394+ new WritableStream ( {
395+ write ( chunk ) {
396+ controller . enqueue ( chunk ) ;
397+ } ,
398+ } )
399+ )
400+ . catch ( ( error ) => {
401+ console . error ( `Error in stream ${ streamKey } :` , error ) ;
402+ } ) ;
403+ }
367404 }
368405 }
369- }
406+ } ,
370407 } ,
371- } ) ;
408+ this . options . abortController . signal
409+ ) ;
372410 }
373411
374412 private async transformRunShape ( row : SubscribeRunRawShape ) : Promise < RunShape < TRunTypes > > {
0 commit comments