-
Notifications
You must be signed in to change notification settings - Fork 424
feat(multiagent): Add stream_async #961
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
src/strands/multiagent/swarm.py
Outdated
|
||
# Yield final result (consistent with Agent's AgentResultEvent format) | ||
result = self._build_result() | ||
yield {"result": result} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yield final result (consistent with Agent's AgentResultEvent format)
Is this an AgentResult as it is for single-agent streaming? If not, can we rename so that it doesn't conflict with different types
src/strands/multiagent/graph.py
Outdated
Yields: | ||
Dictionary events containing graph execution information including: | ||
- MultiAgentNodeStartEvent: When a node begins execution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These types aren't exposed to customers, so we should either remove these docs or document the shape of the dictionaries being emited
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also list out the the new events to the PR description (similar to #788) along with the signatures of the new apis being added. This well help the PR be more akin to the spec of what's being proposed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we should either remove these docs AND document the shape of the dictionaries being emitted*
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have matched the implementation/documentation to agent. I will add additional docs as a followup CR on docs repo. I will also update the RP description to explain the events emitted.
src/strands/multiagent/graph.py
Outdated
try: | ||
event = await asyncio.wait_for(async_generator.__anext__(), timeout=timeout) | ||
yield event | ||
except StopAsyncIteration: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this always thrown at the end and thus part of normal execution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: seems like theres might be a more pythonic way to do this without the while
async with asyncio.timeout(timeout):
async for event in async_generator:
src/strands/multiagent/graph.py
Outdated
start_event = MultiAgentNodeStartEvent( | ||
node_id=node.node_id, node_type="agent" if isinstance(node.executor, Agent) else "multiagent" | ||
) | ||
yield start_event.as_dict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we do this at a higher level instead of in here? That way we can ensure this method is always returning TypedEvent
s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for below
src/strands/multiagent/graph.py
Outdated
wrapped_event = MultiAgentNodeStreamEvent(node.node_id, event) | ||
yield wrapped_event.as_dict() | ||
# Capture the final result event | ||
if "result" in event: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just do an isinstance
check here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not easily, because agent also translates it to a dict before returning the responses. https://github.com/strands-agents/sdk-python/blob/main/src/strands/agent/agent.py#L591
Is there a reason we decided to go this way instead of returning typed events?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two reasons why whe didn't ship typed events publically:
- We didn't want to split the world between the old way (dict checking) and new way (classes)
- We weren't sure that classes was the right path forward
(1) is not a good enough reason IMHO and so (2) was stronger. For TypeScript we're thinking it's going to be type: "SomeName"
and I think we'd do the same in python.
I think it's worth revisiting now, however
src/strands/multiagent/swarm.py
Outdated
|
||
except Exception: | ||
logger.exception("node=<%s> | node execution failed", current_node.node_id) | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we use an exception type for this? This seems hacky
status=Status.EXECUTING, | ||
task=task, | ||
total_nodes=len(self.nodes), | ||
edges=[(edge.from_node, edge.to_node) for edge in self.edges], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note related to this PR, but why doesn't GraphState take Iterable[GraphEdge] instead of edges: list[Tuple["GraphNode", "GraphNode"]] = field(default_factory=list)
Did GraphEdge come later?
src/strands/multiagent/graph.py
Outdated
try: | ||
event = await asyncio.wait_for(async_generator.__anext__(), timeout=timeout) | ||
yield event | ||
except StopAsyncIteration: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: seems like theres might be a more pythonic way to do this without the while
async with asyncio.timeout(timeout):
async for event in async_generator:
src/strands/multiagent/swarm.py
Outdated
self, async_generator: AsyncIterator[dict[str, Any]], timeout: float, timeout_message: str | ||
) -> AsyncIterator[dict[str, Any]]: | ||
"""Wrap an async generator with timeout functionality.""" | ||
while True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same nit as in graph
src/strands/multiagent/swarm.py
Outdated
logger.exception("node=<%s> | node execution failed", current_node.node_id) | ||
except Exception as e: | ||
# Check if this is a timeout exception | ||
if "timed out after" in str(e): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we create a variable for this so we don't accidentally change the message in exception
- Update docstrings to match Agent's minimal style (use dict keys instead of class names) - Add isinstance checks for result event detection for type safety - Improve _stream_with_timeout to handle None timeout case - Add MultiAgentResultEvent for consistency with Agent pattern - Yield TypedEvent objects internally, convert to dict at API boundary - All 154 tests passing
- Remove unnecessary asyncio.gather() after event loop completion - Same issue as tool executor PR strands-agents#954 - By the time loop exits, all tasks have already completed - Gather was waiting for already-finished tasks (no-op) - All 154 tests passing
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
wrapped_event = MultiAgentNodeStreamEvent(node_name, event) | ||
yield wrapped_event | ||
# Capture the final result event | ||
if isinstance(event, dict) and "result" in event: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there ever a case where this is not an dict?
Args: | ||
result: The final result from multi-agent execution (SwarmResult, GraphResult, etc.) | ||
""" | ||
super().__init__({"result": result}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does the caller differentiate between an AgentResult (using the key result
) and a MultiAgent result
?
class MultiAgentResultEvent(TypedEvent): | ||
"""Event emitted when multi-agent execution completes with final result.""" | ||
|
||
def __init__(self, result: Any) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this Any
? - why is this not typed as GraphResult
?
try: | ||
async for event in graph.stream_async("Test streaming with failure"): | ||
events.append(event) | ||
raise AssertionError("Expected an exception") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why aren't we using pytest-raises?
https://docs.pytest.org/en/stable/reference/reference.html#pytest-raises
events = [] | ||
start_time = time.time() | ||
async for event in graph.stream_async("Test parallel streaming"): | ||
events.append(event) | ||
total_time = time.time() - start_time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have ahelper for event aggregations - can we use that throughout?
events = [] | |
start_time = time.time() | |
async for event in graph.stream_async("Test parallel streaming"): | |
events.append(event) | |
total_time = time.time() - start_time | |
start_time = time.time() | |
events = await alist(graph.stream_async("Test parallel streaming")) | |
total_time = time.time() - start_time |
coordinator.tool_registry.registry = {"handoff_to_specialist": handoff_to_specialist} | ||
|
||
# Collect all streaming events | ||
events = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here for using alist
# Count event categories | ||
node_start_events = [e for e in events if e.get("multi_agent_node_start")] | ||
node_stream_events = [e for e in events if e.get("multi_agent_node_stream")] | ||
custom_events = [e for e in events if e.get("custom_event")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everywhere else where we allow custom events, we wrap them rather than allowing passthrough - specifically so that we don't conflict going forward. My gut says that we should be doing that here. Thoughts?
src/strands/multiagent/graph.py
Outdated
wrapped_event = MultiAgentNodeStreamEvent(node.node_id, event) | ||
yield wrapped_event.as_dict() | ||
# Capture the final result event | ||
if "result" in event: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two reasons why whe didn't ship typed events publically:
- We didn't want to split the world between the old way (dict checking) and new way (classes)
- We weren't sure that classes was the right path forward
(1) is not a good enough reason IMHO and so (2) was stronger. For TypeScript we're thinking it's going to be type: "SomeName"
and I think we'd do the same in python.
I think it's worth revisiting now, however
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The typed events shape & fields are a blocker for me
{ | ||
"multi_agent_node_stream": True, | ||
"node_id": node_id, | ||
**agent_event, # Forward all original agent event data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's nest this instead of combining. Specifically ToolStreamEvent
has all data here as a sub-field and I think that's what we should do whenever we wrap things
Description
This PR adds streaming support to the Swarm and Graph multi-agent systems, enabling real-time event emission during multi-agent execution. This brings multi-agent systems to feature parity with the single Agent class streaming capabilities.
Key Changes
New Event Types (
src/strands/types/_events.py
):MultiAgentNodeStartEvent
: Emitted when a node begins executionMultiAgentNodeCompleteEvent
: Emitted when a node completes executionMultiAgentNodeStreamEvent
: Forwards agent events with node contextMultiAgentHandoffEvent
: Emitted during agent handoffs in Swarm (includes from_node, to_node, and message)Swarm Streaming (
src/strands/multiagent/swarm.py
):stream_async()
method that yields events during executioninvoke_async()
to usestream_async()
internally (maintains backward compatibility)Graph Streaming (
src/strands/multiagent/graph.py
):stream_async()
method for real-time event streaminginvoke_async()
to consumestream_async()
eventsTesting:
Benefits
Related Issues
Documentation PR
Type of Change
New feature
Testing
How have you tested the change?
tests/strands/multiagent/test_swarm.py
,tests/strands/multiagent/test_graph.py
)Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli
hatch run prepare
Checklist
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.