diff --git a/src/oss/langgraph/streaming-tokens-example.md b/src/oss/langgraph/streaming-tokens-example.md new file mode 100644 index 0000000000..501d5784d0 --- /dev/null +++ b/src/oss/langgraph/streaming-tokens-example.md @@ -0,0 +1,126 @@ +# Streaming Tokens from LangGraph Nodes + +## Issue Reference +This document addresses [Issue #1539](https://github.com/langchain-ai/docs/issues/1539) - How to stream individual tokens from LLM calls within LangGraph nodes. + +## Problem +When using `graph.astream()`, you can stream the aggregated output of each node, but not the raw token-level stream produced during the model invocation. The user wants to stream individual tokens as they are generated by the LLM. + +## Solution + +There are two main approaches to stream individual tokens from LangGraph: + +### Approach 1: Use `stream_mode="messages"` + +The `messages` streaming mode allows you to stream LLM tokens as they are generated: + +```python +from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder +from langgraph.graph import StateGraph + +# Your graph setup here +graph = ... # your compiled graph + +# Stream with messages mode to get token-by-token output +async for msg, metadata in graph.astream( + {"messages": [HumanMessage(content="Your query here")]}, + stream_mode="messages", +): + # msg.content contains the individual token + # metadata contains information about which node generated it + if msg.content: + print(msg.content, end="", flush=True) +``` + +### Approach 2: Use `astream_events` for Fine-Grained Control + +For more detailed control, including filtering by specific nodes or tags, use `astream_events`: + +```python +from langchain_core.runnables import RunnableConfig + +async def fundamentals_analyst_node(state, config: RunnableConfig): + """ + Important: Add 'config: RunnableConfig' parameter to your node function + and pass it to the model invocation for streaming to work. + """ + prompt = ChatPromptTemplate.from_messages([...]) + chain = prompt | llm.bind_tools(tools) + + # Pass config to enable streaming + result = await chain.ainvoke(state["messages"], config) + + return {"messages": [result]} + +# Stream events from the graph +async for event in graph.astream_events( + {"messages": [HumanMessage(content="Your query")]}, + version="v2" +): + # Filter for token chunks + if event["event"] == "on_chat_model_stream": + chunk = event["data"]["chunk"] + if hasattr(chunk, "content") and chunk.content: + print(chunk.content, end="", flush=True) +``` + +### Key Points + +1. **Add `config` parameter**: Your node functions must accept a `config: RunnableConfig` parameter and pass it to model invocations +2. **Use `stream_mode="messages"`**: Simplest way to get token-by-token streaming +3. **Use `astream_events`**: For more control over which events to stream +4. **Metadata filtering**: Use metadata fields like `langgraph_node` or custom tags to filter streams from specific nodes + +### Complete Example + +```python +from typing import Annotated +from typing_extensions import TypedDict +from langchain_core.messages import AIMessage, HumanMessage +from langchain_core.prompts import ChatPromptTemplate +from langchain_core.runnables import RunnableConfig +from langchain_openai import ChatOpenAI +from langgraph.graph import StateGraph, END +from langgraph.graph.message import add_messages + +class State(TypedDict): + messages: Annotated[list, add_messages] + +model = ChatOpenAI(model="gpt-4", streaming=True) # Enable streaming + +# Important: Accept config parameter +async def my_node(state: State, config: RunnableConfig) -> State: + prompt = ChatPromptTemplate.from_messages([ + ("system", "You are a helpful assistant."), + ("placeholder", "{messages}") + ]) + chain = prompt | model + + # Pass config to enable streaming + response = await chain.ainvoke({"messages": state["messages"]}, config) + return {"messages": [response]} + +builder = StateGraph(State) +builder.add_node("assistant", my_node) +builder.set_entry_point("assistant") +builder.add_edge("assistant", END) +graph = builder.compile() + +# Stream tokens +async for msg, metadata in graph.astream( + {"messages": [HumanMessage(content="Tell me a joke")]}, + stream_mode="messages" +): + if msg.content and metadata.get("langgraph_node") == "assistant": + print(msg.content, end="|", flush=True) +``` + +## Additional Resources + +- [LangGraph Streaming Documentation](https://docs.langchain.com/oss/python/langgraph/streaming) +- [LangChain Streaming Guide](https://docs.langchain.com/oss/python/langchain/streaming) + +## Related Issues + +- [GitHub Issue #137](https://github.com/langchain-ai/langgraph/issues/137) +- [GitHub Discussion #533](https://github.com/langchain-ai/langgraph/discussions/533)