-
Notifications
You must be signed in to change notification settings - Fork 6
Description
First off - I really like the idea of this. It looks like a very useful tool! I actually also started to contemplating doing something like this (slightly "different angle") but that hasn't come to fruitation (and not sure if will, especially given what you have done already ;-)).
Next, I tried to hook it up with a somewhat complex langchain base agent and didn't get much results. The agent is a langraph agent that consist of a initial analysis node which then calls a react agent node that in turn, call other react agent (create_agent) through tool calls. So I am not even sure this should be supported. I see the initial graph but no execution and of course not seeing the sub-agents.
Next, I tried it with a smaller mock agent that does fan-out and then fan-in - this also not seem to trace the execution properly.
As part of testing it, I seem to get long delay at the end of execution when using the watch. I see message from the execution of the last node and then a really long time passes until I get the printout. When watch is not used the printout at the end of the execution is immediate.
I include the code, log and output here in case it is helpful (or I am doing something incorrectly).
Last point, have you considered basing the langgraph integration on callback? In a sense this could be "less intrusive" as one would not have to wrap the all graph (though it might be harder to achieve the same functionality, I am not sure).
Dependencies:
langchain-core 1.2.16
langgraph 1.0.9
langgraph-checkpoint 4.0.0
langgraph-prebuilt 1.0.8
langgraph-sdk 0.3.9
langgraphics 0.1.0b2
langsmith 0.7.7
"""
LangGraph fan-out / fan-in demo with langgraphics live visualization.
Graph shape:
START → initial → [fanout_a, fanout_b, fanout_c]* → final → END
* each fan-out node is dispatched with an independent 3/4 probability;
at least one is always guaranteed to run.
Each node sleeps for a random 3-5 seconds before completing.
The final node prints a summary message that is also returned to the caller.
"""
import asyncio
import operator
import random
from datetime import datetime
from typing import Annotated, TypedDict
from langgraph.graph import END, START, StateGraph
from langgraph.types import Send
from langgraphics import watch
# ── Config ─────────────────────────────────────────────────────────────────
# Set to True → each fan-out node runs with an independent 3/4 probability (≥1 guaranteed)
# Set to False → all three fan-out nodes always run
RANDOM_FANOUT = False
# Set to True → wrap graph with langgraphics for live browser visualization
# Set to False → run without visualization
USE_WATCH = True
FANOUT_NODE_NAMES = ["fanout_a", "fanout_b", "fanout_c"]
FANOUT_CHANCE = 3 / 4 # used only when RANDOM_FANOUT is True
class GraphState(TypedDict):
"""Shared state threaded through the entire graph."""
# Written by the initial node
initial_result: str
# Each fan-out node appends one entry; operator.add merges concurrent writes
fanout_results: Annotated[list[str], operator.add]
# Written by the final node and returned to the caller
final_message: str
# ── Helpers ────────────────────────────────────────────────────────────────
def ts() -> str:
"""Current time as a compact HH:MM:SS.ss string."""
return datetime.now().strftime("%H:%M:%S.%f")[:-4]
# ── Nodes ──────────────────────────────────────────────────────────────────
async def initial_node(state: GraphState) -> dict:
"""Initial node"""
delay = random.uniform(3, 5)
print(f" {ts()} [initial] sleeping {delay:.1f}s …")
await asyncio.sleep(delay)
result = f"initial node finished after {delay:.1f}s"
print(f" {ts()} [initial] ✓ {result}")
return {"initial_result": result}
async def fanout_a(state: GraphState) -> dict:
"""Fan-out branch A."""
delay = random.uniform(3, 5)
print(f" {ts()} [fanout_a] sleeping {delay:.1f}s …")
await asyncio.sleep(delay)
result = f"fanout_a finished after {delay:.1f}s"
print(f" {ts()} [fanout_a] ✓ {result}")
return {"fanout_results": [result]}
async def fanout_b(state: GraphState) -> dict:
"""Fan-out branch B."""
delay = random.uniform(3, 5)
print(f" {ts()} [fanout_b] sleeping {delay:.1f}s …")
await asyncio.sleep(delay)
result = f"fanout_b finished after {delay:.1f}s"
print(f" {ts()} [fanout_b] ✓ {result}")
return {"fanout_results": [result]}
async def fanout_c(state: GraphState) -> dict:
"""Fan-out branch C."""
delay = random.uniform(3, 5)
print(f" {ts()} [fanout_c] sleeping {delay:.1f}s …")
await asyncio.sleep(delay)
result = f"fanout_c finished after {delay:.1f}s"
print(f" {ts()} [fanout_c] ✓ {result}")
return {"fanout_results": [result]}
async def final_node(state: GraphState) -> dict:
"""Final summarization node"""
delay = random.uniform(3, 5)
print(f" {ts()} [final] sleeping {delay:.1f}s …")
await asyncio.sleep(delay)
lines = [
"══════════════════════════════════════════",
" Graph run complete",
"══════════════════════════════════════════",
f" Initial stage : {state['initial_result']}",
f" Fan-out nodes : {len(state['fanout_results'])} ran",
]
for r in state["fanout_results"]:
lines.append(f" • {r}")
lines.append("══════════════════════════════════════════")
message = "\n".join(lines)
print(f" {ts()} [final] ✓ done")
return {"final_message": message}
# ── Routing (initial → fan-out) ────────────────────────────────────────────
def route_fanout(state: GraphState) -> list[Send]:
"""Randomly select which fan-out nodes to run (each has a 3/4 chance).
At least one node is guaranteed to be dispatched."""
chosen = [n for n in FANOUT_NODE_NAMES if random.random() < FANOUT_CHANCE]
if not chosen:
chosen = [random.choice(FANOUT_NODE_NAMES)]
print(f"\n {ts()} [router] dispatching to: {', '.join(chosen)}")
return [Send(name, state) for name in chosen]
# ── Graph assembly ─────────────────────────────────────────────────────────
def build_graph() -> StateGraph:
"""Build the execution graph.
When RANDOM_FANOUT is True, conditional edges (Send) are used so each
fan-out node fires with an independent 3/4 probability.
When RANDOM_FANOUT is False, plain edges ensure all three always run.
"""
graph = StateGraph(GraphState)
graph.add_node("initial", initial_node)
graph.add_node("fanout_a", fanout_a)
graph.add_node("fanout_b", fanout_b)
graph.add_node("fanout_c", fanout_c)
graph.add_node("final", final_node)
graph.add_edge(START, "initial")
if RANDOM_FANOUT:
# Each node fires independently with FANOUT_CHANCE probability
graph.add_conditional_edges(
"initial",
route_fanout,
{n: n for n in FANOUT_NODE_NAMES},
)
else:
# All three nodes always run in parallel
graph.add_edge("initial", "fanout_a")
graph.add_edge("initial", "fanout_b")
graph.add_edge("initial", "fanout_c")
graph.add_edge("fanout_a", "final")
graph.add_edge("fanout_b", "final")
graph.add_edge("fanout_c", "final")
graph.add_edge("final", END)
return graph
# ── Entry point ────────────────────────────────────────────────────────────
async def main() -> None:
"""Main"""
compiled = build_graph().compile()
watched = watch(compiled) if USE_WATCH else compiled
initial_state: GraphState = {
"initial_result": "",
"fanout_results": [],
"final_message": "",
}
print(f"\n{ts()} Starting graph …\n")
result = await watched.ainvoke(initial_state)
# Print the final message produced by the graph
print(f"\n{ts()} Graph finished\n" + result["final_message"])
if __name__ == "__main__":
asyncio.run(main())Trace output (final state). Note:
- all fanout_x where executed but only one is highlighted (and was green while executing)
- the edges are also incorrectly marked.
- note that initial->fanout_b was "activate" and also fanout_c -> final...
Expected output:
uv run fanout_demo.py
13:21:53.10 Starting graph …
13:21:53.10 [initial] sleeping 4.6s …
tcgetpgrp failed: Not a tty
127.0.0.1 - - [26/Feb/2026 13:21:54] "GET / HTTP/1.1" 304 -
127.0.0.1 - - [26/Feb/2026 13:21:54] "GET /assets/index-Cv8sy8DZ.js HTTP/1.1" 304 -
127.0.0.1 - - [26/Feb/2026 13:21:55] "GET /assets/index-BaM9nLW5.css HTTP/1.1" 304 -
13:21:57.73 [initial] ✓ initial node finished after 4.6s
13:21:57.73 [fanout_a] sleeping 3.7s …
13:21:57.73 [fanout_b] sleeping 3.6s …
13:21:57.73 [fanout_c] sleeping 4.3s …
127.0.0.1 - - [26/Feb/2026 13:21:58] "GET /icons/chain.svg HTTP/1.1" 304 -
13:22:01.34 [fanout_b] ✓ fanout_b finished after 3.6s
13:22:01.44 [fanout_a] ✓ fanout_a finished after 3.7s
13:22:02.07 [fanout_c] ✓ fanout_c finished after 4.3s
13:22:02.08 [final] sleeping 3.5s …
13:22:05.60 [final] ✓ done
13:23:20.13 Graph finished
══════════════════════════════════════════
Graph run complete
══════════════════════════════════════════
Initial stage : initial node finished after 4.6s
Fan-out nodes : 3 ran
• fanout_a finished after 3.7s
• fanout_b finished after 3.6s
• fanout_c finished after 4.3s
══════════════════════════════════════════
Not the long time (1:15 minutes) between the "[final] ✓ done" and the printout.
I will try to spend more time on it maybe discovering what is going on but I am not sure how quickly I can get to it... of course if I can be of any help with specific questions / tests etc let me know!