-
Notifications
You must be signed in to change notification settings - Fork 404
Open
Description
Privileged issue
- I am a LangGraph.js maintainer, or was asked directly by a LangGraph.js maintainer to create an issue here.
Description
When streaming a graph with parallel LLM calls and the stream is aborted (or completes), a race condition causes numerous TypeError [ERR_INVALID_STATE]: Invalid state: Controller is already closed errors to be logged. In production scenarios with many parallel tasks/subagents, this can result in 500+ error messages flooding the console.
Reproduction
// Graph with 3 parallel nodes, each making a streaming LLM call
const graph = new StateGraph(StateAnnotation)
.addNode("node1", node1) // async LLM call
.addNode("node2", node2) // async LLM call
.addNode("node3", node3) // async LLM call
.addEdge("__start__", "node1")
.addEdge("__start__", "node2")
.addEdge("__start__", "node3")
// ... fan-in edges
.compile();
const abortController = new AbortController();
const stream = await graph.stream(input, {
signal: abortController.signal,
streamMode: ["messages", "values"],
});
// Abort mid-stream
setTimeout(() => abortController.abort(), 500);
for await (const chunk of stream) {
// Process chunks...
}Output:
Error in handler StreamMessagesHandler, handleLLMNewToken: TypeError [ERR_INVALID_STATE]: Invalid state: Controller is already closed
Error in handler StreamMessagesHandler, handleLLMNewToken: TypeError [ERR_INVALID_STATE]: Invalid state: Controller is already closed
Error in handler StreamMessagesHandler, handleLLMNewToken: TypeError [ERR_INVALID_STATE]: Invalid state: Controller is already closed
... (repeated for each in-flight token across all parallel LLM calls)
Root Cause
The issue is in IterableReadableWritableStream.push() in @langchain/langgraph-core/src/pregel/stream.ts:
push(chunk: StreamChunk) {
this.passthroughFn?.(chunk);
this.controller.enqueue(chunk); // ← Throws if controller is closed
}Race condition timeline:
Model streaming: [sleep] → handleLLMNewToken(token) → check signal.aborted → [sleep] → ...
↑
User aborts: ──────────────────┼─── stream.close() called
│
└── Token in-flight, push() throws
- Parallel LLM calls are streaming tokens via
handleLLMNewTokencallbacks - User aborts (or stream completes naturally) →
stream.close()is called close()sets_closed = trueand callscontroller.close()- In-flight LLM calls wake up from their async operations and call
handleLLMNewToken StreamMessagesHandler._emit()callsstreamFn()→stream.push()push()callscontroller.enqueue()on the closed controller → Error
The abort signal IS properly propagated to the models (they do check and stop), but there's an unavoidable race window between:
- Stream closing (synchronous)
- Models checking the abort signal (asynchronous, on next iteration)
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels