Skip to content

Commit d60e758

Browse files
committed
fix: address code review issues
- Add buffer cleanup after all consumers complete (memory leak fix) - Use lazy initialization for broadcaster (race condition fix) - Add try-catch in onPreliminaryResult callback (error handling) - Add tests for completion between consumer iterations
1 parent be68a2d commit d60e758

File tree

3 files changed

+94
-13
lines changed

3 files changed

+94
-13
lines changed

src/lib/model-result.ts

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,21 @@ export class ModelResult<TTools extends readonly Tool[]> {
125125
this.options = options;
126126
}
127127

128+
/**
129+
* Get or create the tool event broadcaster (lazy initialization).
130+
* Ensures only one broadcaster exists for the lifetime of this ModelResult.
131+
*/
132+
private ensureBroadcaster(): ToolEventBroadcaster<{
133+
type: 'preliminary_result';
134+
toolCallId: string;
135+
result: InferToolEventsUnion<TTools>;
136+
}> {
137+
if (!this.toolEventBroadcaster) {
138+
this.toolEventBroadcaster = new ToolEventBroadcaster();
139+
}
140+
return this.toolEventBroadcaster;
141+
}
142+
128143
/**
129144
* Type guard to check if a value is a non-streaming response
130145
*/
@@ -332,11 +347,15 @@ export class ModelResult<TTools extends readonly Tool[]> {
332347
// Create callback for real-time preliminary results
333348
const onPreliminaryResult = this.toolEventBroadcaster
334349
? (callId: string, resultValue: unknown) => {
335-
this.toolEventBroadcaster!.push({
336-
type: 'preliminary_result' as const,
337-
toolCallId: callId,
338-
result: resultValue as InferToolEventsUnion<TTools>,
339-
});
350+
try {
351+
this.toolEventBroadcaster!.push({
352+
type: 'preliminary_result' as const,
353+
toolCallId: callId,
354+
result: resultValue as InferToolEventsUnion<TTools>,
355+
});
356+
} catch {
357+
// Don't crash tool execution if broadcasting fails
358+
}
340359
}
341360
: undefined;
342361

@@ -499,13 +518,13 @@ export class ModelResult<TTools extends readonly Tool[]> {
499518
throw new Error('Stream not initialized');
500519
}
501520

502-
// Create broadcaster for real-time tool events
503-
this.toolEventBroadcaster = new ToolEventBroadcaster();
504-
const toolEventConsumer = this.toolEventBroadcaster.createConsumer();
521+
// Get or create broadcaster for real-time tool events (lazy init prevents race conditions)
522+
const broadcaster = this.ensureBroadcaster();
523+
const toolEventConsumer = broadcaster.createConsumer();
505524

506525
// Start tool execution in background (completes broadcaster when done)
507526
const executionPromise = this.executeToolsIfNeeded().finally(() => {
508-
this.toolEventBroadcaster?.complete();
527+
broadcaster.complete();
509528
});
510529

511530
const consumer = this.reusableStream.createConsumer();
@@ -615,13 +634,13 @@ export class ModelResult<TTools extends readonly Tool[]> {
615634
throw new Error('Stream not initialized');
616635
}
617636

618-
// Create broadcaster for real-time tool events
619-
this.toolEventBroadcaster = new ToolEventBroadcaster();
620-
const toolEventConsumer = this.toolEventBroadcaster.createConsumer();
637+
// Get or create broadcaster for real-time tool events (lazy init prevents race conditions)
638+
const broadcaster = this.ensureBroadcaster();
639+
const toolEventConsumer = broadcaster.createConsumer();
621640

622641
// Start tool execution in background (completes broadcaster when done)
623642
const executionPromise = this.executeToolsIfNeeded().finally(() => {
624-
this.toolEventBroadcaster?.complete();
643+
broadcaster.complete();
625644
});
626645

627646
// Yield tool deltas from API stream

src/lib/tool-event-broadcaster.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,25 @@ export class ToolEventBroadcaster<T> {
2828
/**
2929
* Mark the broadcaster as complete - no more events will be pushed.
3030
* Optionally pass an error to signal failure to all consumers.
31+
* Cleans up buffer and consumers after completion.
3132
*/
3233
complete(error?: Error): void {
3334
this.isComplete = true;
3435
this.completionError = error ?? null;
3536
this.notifyWaitingConsumers();
37+
// Schedule cleanup after consumers have processed completion
38+
queueMicrotask(() => this.cleanup());
39+
}
40+
41+
/**
42+
* Clean up resources after all consumers have finished.
43+
* Called automatically after complete(), but can be called manually.
44+
*/
45+
private cleanup(): void {
46+
// Only cleanup if complete and all consumers are done
47+
if (this.isComplete && this.consumers.size === 0) {
48+
this.buffer = [];
49+
}
3650
}
3751

3852
/**
@@ -73,6 +87,7 @@ export class ToolEventBroadcaster<T> {
7387
// If complete and caught up, we're done
7488
if (self.isComplete) {
7589
self.consumers.delete(consumerId);
90+
self.cleanup();
7691
if (self.completionError) {
7792
throw self.completionError;
7893
}
@@ -105,6 +120,7 @@ export class ToolEventBroadcaster<T> {
105120
if (consumer) {
106121
consumer.cancelled = true;
107122
self.consumers.delete(consumerId);
123+
self.cleanup();
108124
}
109125
return { done: true, value: undefined };
110126
},
@@ -114,6 +130,7 @@ export class ToolEventBroadcaster<T> {
114130
if (consumer) {
115131
consumer.cancelled = true;
116132
self.consumers.delete(consumerId);
133+
self.cleanup();
117134
}
118135
throw e;
119136
},

tests/unit/tool-event-broadcaster.test.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,51 @@ describe('ToolEventBroadcaster', () => {
221221
});
222222
});
223223

224+
describe('completion between iterations', () => {
225+
it('should handle completion between consumer iterations', async () => {
226+
const broadcaster = new ToolEventBroadcaster<number>();
227+
const consumer = broadcaster.createConsumer();
228+
229+
broadcaster.push(1);
230+
const first = await consumer.next();
231+
expect(first.done).toBe(false);
232+
expect(first.value).toBe(1);
233+
234+
// Complete while consumer is between iterations
235+
broadcaster.complete();
236+
237+
const second = await consumer.next();
238+
expect(second.done).toBe(true);
239+
});
240+
241+
it('should handle completion with remaining buffered events', async () => {
242+
const broadcaster = new ToolEventBroadcaster<number>();
243+
const consumer = broadcaster.createConsumer();
244+
245+
broadcaster.push(1);
246+
broadcaster.push(2);
247+
broadcaster.push(3);
248+
249+
// Read first event
250+
const first = await consumer.next();
251+
expect(first.value).toBe(1);
252+
253+
// Complete with events still in buffer
254+
broadcaster.complete();
255+
256+
// Should still get remaining buffered events
257+
const second = await consumer.next();
258+
expect(second.value).toBe(2);
259+
260+
const third = await consumer.next();
261+
expect(third.value).toBe(3);
262+
263+
// Now should be done
264+
const fourth = await consumer.next();
265+
expect(fourth.done).toBe(true);
266+
});
267+
});
268+
224269
describe('typed events', () => {
225270
it('should work with typed tool events', async () => {
226271
type ToolEvent =

0 commit comments

Comments
 (0)