Skip to content

Commit ef1d15b

Browse files
committed
Add Parallel
1 parent 2a51a3c commit ef1d15b

File tree

3 files changed

+111
-2
lines changed

3 files changed

+111
-2
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ An open-source framework for building monolithic or distributed agentic systems,
2020
- [x] Agent Discovery
2121
- [x] Static orchestration
2222
- [x] Sequential
23+
- [x] Parallel
2324
- [x] Dynamic orchestration
2425
- [x] Dynamic Triage
2526
- [x] Handoffs (based on async Swarm)
@@ -194,13 +195,12 @@ chain = AgentSpec(
194195
)
195196
```
196197

197-
### Agents
198-
199198
TODO
200199

201200

202201
## Examples
203202

203+
- [patterns](examples/patterns)
204204
- [ping-pong](examples/ping-pong)
205205
- [stream-ping-pong](examples/stream-ping-pong)
206206
- [discovery](examples/discovery)

coagent/agents/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@
33
from .dynamic_triage import DynamicTriage
44
from .messages import ChatHistory, ChatMessage
55
from .model_client import ModelClient
6+
from .parallel import Aggregator, AggregationResult, Parallel
67
from .sequential import Sequential

coagent/agents/parallel.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
from coagent.core import (
2+
Address,
3+
BaseAgent,
4+
Context,
5+
handler,
6+
GenericMessage,
7+
Message,
8+
RawMessage,
9+
SetReplyAgent,
10+
)
11+
12+
13+
class StartAggregation(Message):
14+
candidates: list[str]
15+
reply_addr: Address
16+
17+
18+
class AggregationStatus(Message):
19+
status: str
20+
21+
@property
22+
def busy(self) -> bool:
23+
return self.status == "busy"
24+
25+
26+
class AggregationResult(Message):
27+
results: list[RawMessage]
28+
29+
30+
class Aggregator(BaseAgent):
31+
def __init__(self):
32+
super().__init__()
33+
34+
self._busy: bool = False
35+
self._data: StartAggregation | None = None
36+
self._result: AggregationResult | None = None
37+
38+
@handler
39+
async def start_aggregation(
40+
self, msg: StartAggregation, ctx: Context
41+
) -> AggregationStatus:
42+
if self._busy:
43+
return AggregationStatus(status="busy")
44+
45+
self._busy = True
46+
self._data = msg
47+
self._result = AggregationResult(results=[])
48+
49+
return AggregationStatus(status="ok")
50+
51+
@handler
52+
async def handle(self, msg: GenericMessage, ctx: Context) -> None:
53+
if not self._busy:
54+
return
55+
56+
self._result.results.append(msg.encode())
57+
58+
if len(self._result.results) == len(self._data.candidates):
59+
if self._data.reply_addr:
60+
await self.channel.publish(self._data.reply_addr, self._result.encode())
61+
self._busy = False
62+
63+
64+
class Parallel(BaseAgent):
65+
"""Parallel is a composite agent that orchestrates its children agents
66+
concurrently and have their outputs aggregated by the given aggregator agent.
67+
"""
68+
69+
def __init__(self, *agent_types: str, aggregator: str = ""):
70+
super().__init__()
71+
self._agent_types = agent_types
72+
self._aggregator_type = aggregator
73+
74+
async def started(self) -> None:
75+
aggregator_addr = Address(name=self._aggregator_type, id=self.address.id)
76+
# Make each agent reply to the aggregator agent.
77+
for agent_type in self._agent_types:
78+
addr = Address(name=agent_type, id=self.address.id)
79+
await self.channel.publish(
80+
addr,
81+
SetReplyAgent(address=aggregator_addr).encode(),
82+
)
83+
84+
@handler
85+
async def handle(self, msg: GenericMessage, ctx: Context) -> None:
86+
if len(self._agent_types) == 0:
87+
return
88+
89+
# Let the aggregator agent reply to the sending agent, if asked.
90+
reply_address = self.reply_address or msg.reply
91+
if reply_address:
92+
# Reset the reply address of the message, since it will be replied by the aggregator agent.
93+
msg.reply = None
94+
95+
result = await self.channel.publish(
96+
Address(name=self._aggregator_type, id=self.address.id),
97+
StartAggregation(
98+
candidates=self._agent_types, reply_addr=reply_address
99+
).encode(),
100+
request=True,
101+
)
102+
status = AggregationStatus.decode(result)
103+
if status.busy:
104+
return # The aggregator agent is busy.
105+
106+
for agent_type in self._agent_types:
107+
addr = Address(name=agent_type, id=self.address.id)
108+
await self.channel.publish(addr, msg.encode())

0 commit comments

Comments
 (0)