Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions src/oss/langgraph/streaming-tokens-example.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Streaming Tokens from LangGraph Nodes

## Issue Reference
This document addresses [Issue #1539](https://github.com/langchain-ai/docs/issues/1539) - How to stream individual tokens from LLM calls within LangGraph nodes.

## Problem
When using `graph.astream()`, you can stream the aggregated output of each node, but not the raw token-level stream produced during the model invocation. The user wants to stream individual tokens as they are generated by the LLM.

## Solution

There are two main approaches to stream individual tokens from LangGraph:

### Approach 1: Use `stream_mode="messages"`

The `messages` streaming mode allows you to stream LLM tokens as they are generated:

```python
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import StateGraph

# Your graph setup here
graph = ... # your compiled graph

# Stream with messages mode to get token-by-token output
async for msg, metadata in graph.astream(
{"messages": [HumanMessage(content="Your query here")]},
stream_mode="messages",
):
# msg.content contains the individual token
# metadata contains information about which node generated it
if msg.content:
print(msg.content, end="", flush=True)
```

### Approach 2: Use `astream_events` for Fine-Grained Control

For more detailed control, including filtering by specific nodes or tags, use `astream_events`:

```python
from langchain_core.runnables import RunnableConfig

async def fundamentals_analyst_node(state, config: RunnableConfig):
"""
Important: Add 'config: RunnableConfig' parameter to your node function
and pass it to the model invocation for streaming to work.
"""
prompt = ChatPromptTemplate.from_messages([...])
chain = prompt | llm.bind_tools(tools)

# Pass config to enable streaming
result = await chain.ainvoke(state["messages"], config)

return {"messages": [result]}

# Stream events from the graph
async for event in graph.astream_events(
{"messages": [HumanMessage(content="Your query")]},
version="v2"
):
# Filter for token chunks
if event["event"] == "on_chat_model_stream":
chunk = event["data"]["chunk"]
if hasattr(chunk, "content") and chunk.content:
print(chunk.content, end="", flush=True)
```

### Key Points

1. **Add `config` parameter**: Your node functions must accept a `config: RunnableConfig` parameter and pass it to model invocations
2. **Use `stream_mode="messages"`**: Simplest way to get token-by-token streaming
3. **Use `astream_events`**: For more control over which events to stream
4. **Metadata filtering**: Use metadata fields like `langgraph_node` or custom tags to filter streams from specific nodes

### Complete Example

```python
from typing import Annotated
from typing_extensions import TypedDict
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableConfig
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages

class State(TypedDict):
messages: Annotated[list, add_messages]

model = ChatOpenAI(model="gpt-4", streaming=True) # Enable streaming

# Important: Accept config parameter
async def my_node(state: State, config: RunnableConfig) -> State:
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("placeholder", "{messages}")
])
chain = prompt | model

# Pass config to enable streaming
response = await chain.ainvoke({"messages": state["messages"]}, config)
return {"messages": [response]}

builder = StateGraph(State)
builder.add_node("assistant", my_node)
builder.set_entry_point("assistant")
builder.add_edge("assistant", END)
graph = builder.compile()

# Stream tokens
async for msg, metadata in graph.astream(
{"messages": [HumanMessage(content="Tell me a joke")]},
stream_mode="messages"
):
if msg.content and metadata.get("langgraph_node") == "assistant":
print(msg.content, end="|", flush=True)
```

## Additional Resources

- [LangGraph Streaming Documentation](https://docs.langchain.com/oss/python/langgraph/streaming)
- [LangChain Streaming Guide](https://docs.langchain.com/oss/python/langchain/streaming)

## Related Issues

- [GitHub Issue #137](https://github.com/langchain-ai/langgraph/issues/137)
- [GitHub Discussion #533](https://github.com/langchain-ai/langgraph/discussions/533)