-
Notifications
You must be signed in to change notification settings - Fork 403
Description
Checked other resources
- I added a very descriptive title to this issue.
- I searched the LangGraph.js documentation with the integrated search.
- I used the GitHub search to find a similar question and didn't find it.
- I am sure that this is a bug in LangGraph.js rather than my code.
- The bug is not resolved by updating to the latest stable version of LangGraph (or the specific integration package).
Example Code
Bug Report: StreamMessagesHandler throws ERR_INVALID_STATE when stream is closed
Issue Title
TypeError [ERR_INVALID_STATE]: Controller is already closed in StreamMessagesHandler.handleLLMNewToken
Description
When using graph.stream() with streamMode: ['messages'], a TypeError is thrown if the stream consumer completes before the LLM finishes generating tokens. This happens because IterableReadableWritableStream.push() doesn't check if the controller is already closed before calling controller.enqueue().
Environment
- @langchain/langgraph version: 1.0.7 (also confirmed in 1.0.1)
- @langchain/core version: 1.0.1
- Node.js version: 22.x
- OS: macOS / Linux
Steps to Reproduce
- Create a LangGraph workflow with streaming enabled
- Use
streamMode: ['messages', 'custom'] - Consume the stream with
for await - If the stream consumer completes (break, return, or error) while LLM is still generating tokens, the error occurs
const graph = workflow.compile();
const stream = await graph.stream(initialState, {
streamMode: ['messages', 'custom'],
callbacks: [myCallbackHandler],
});
for await (const [mode, chunk] of stream) {
// Process chunks...
if (someCondition) {
break; // This closes the stream, but LLM may still be generating
}
}
// Error thrown here: TypeError [ERR_INVALID_STATE]: Controller is already closedError Message
Error in handler StreamMessagesHandler, handleLLMNewToken: TypeError [ERR_INVALID_STATE]: Invalid state: Controller is already closed
Root Cause Analysis
The issue is in src/pregel/stream.ts - the IterableReadableWritableStream class:
// Current implementation (problematic)
push(chunk) {
this.passthroughFn?.(chunk);
this.controller.enqueue(chunk); // ❌ No guard - throws if controller is closed
}
close() {
try {
this.controller.close();
} catch (e) {} finally {
this._closed = true; // This flag exists but is never checked in push()
}
}The _closed flag is set in close() but never checked in push().
Call Stack
1. Stream consumer completes → close() called → controller.close() → _closed = true
2. LLM still generating → handleLLMNewToken() called
3. StreamMessagesHandler._emit() → streamFn() → push()
4. push() → controller.enqueue() → ERR_INVALID_STATE (controller already closed)
Suggested Fix
Add a guard check in the push() method:
// Fixed implementation
push(chunk) {
if (this._closed) return; // ✅ Guard against closed controller
this.passthroughFn?.(chunk);
this.controller.enqueue(chunk);
}Or alternatively, wrap in try-catch:
push(chunk) {
if (this._closed) return;
try {
this.passthroughFn?.(chunk);
this.controller.enqueue(chunk);
} catch (e) {
if ((e as NodeJS.ErrnoException).code === 'ERR_INVALID_STATE') {
this._closed = true;
return;
}
throw e;
}
}Workaround
Until this is fixed, users can catch the error in their code:
try {
for await (const [mode, chunk] of stream) {
// Process chunks...
}
} catch (error) {
if (
error instanceof TypeError &&
(error as NodeJS.ErrnoException).code === 'ERR_INVALID_STATE' &&
error.message.includes('Controller is already closed')
) {
// Gracefully handle - stream was already consumed
console.warn('Stream controller closed before LLM finished');
return;
}
throw error;
}Impact
- Severity: Medium - causes unhandled errors in production
- Frequency: Occurs when stream consumers complete before LLM (common with timeouts, user disconnects, early breaks)
- User Experience: Error logs pollute monitoring, may cause confusion about stream success/failure
Related Files
src/pregel/stream.ts-IterableReadableWritableStreamclasssrc/pregel/messages.ts-StreamMessagesHandlerclasssrc/pregel/index.ts- WhereStreamMessagesHandleris instantiated
Additional Context
This is a race condition between the stream consumer and the LLM token producer. The stream infrastructure should gracefully handle late-arriving tokens after the consumer has finished.
Error Message and Stack Trace (if applicable)
Error in handler StreamMessagesHandler, handleLLMNewToken: TypeError [ERR_INVALID_STATE]: Invalid state: Controller is already closed
Description
When using graph.stream() with streamMode: ['messages'], a TypeError is thrown if the stream consumer completes before the LLM finishes generating tokens. This happens because IterableReadableWritableStream.push() doesn't check if the controller is already closed before calling controller.enqueue().
System Info
@langchain/langgraph version: 1.0.7 (also confirmed in 1.0.1)
@langchain/core version: 1.0.1
Node.js version: 22.x
OS: macOS 26