Skip to content

Commit ffdc3eb

Browse files
committed
Add an example of the Parallelization workflow
1 parent ef1d15b commit ffdc3eb

File tree

3 files changed

+214
-0
lines changed

3 files changed

+214
-0
lines changed

README.md

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,101 @@ chain = AgentSpec(
195195
)
196196
```
197197

198+
### Workflow: Parallelization
199+
200+
**Parallelization** distributes independent subtasks across multiple agents for concurrent processing.
201+
202+
<p align="center">
203+
<img src="assets/patterns-parallelization.png">
204+
</p>
205+
206+
**When to use this workflow:** Parallelization is effective when the divided subtasks can be parallelized for speed, or when multiple perspectives or attempts are needed for higher confidence results.
207+
208+
**Example** (see [examples/patterns/parallelization.py](examples/patterns/parallelization.py) for a runnable example):
209+
210+
```python
211+
from coagent.agents import Aggregator, ChatAgent, ModelClient, Parallel
212+
from coagent.core import AgentSpec, new
213+
214+
client = ModelClient(...)
215+
216+
customer = AgentSpec(
217+
"customer",
218+
new(
219+
ChatAgent,
220+
system="""\
221+
Customers:
222+
- Price sensitive
223+
- Want better tech
224+
- Environmental concerns\
225+
""",
226+
client=client,
227+
),
228+
)
229+
230+
employee = AgentSpec(
231+
"employee",
232+
new(
233+
ChatAgent,
234+
system="""\
235+
Employees:
236+
- Job security worries
237+
- Need new skills
238+
- Want clear direction\
239+
""",
240+
client=client,
241+
),
242+
)
243+
244+
investor = AgentSpec(
245+
"investor",
246+
new(
247+
ChatAgent,
248+
system="""\
249+
Investors:
250+
- Expect growth
251+
- Want cost control
252+
- Risk concerns\
253+
""",
254+
client=client,
255+
),
256+
)
257+
258+
supplier = AgentSpec(
259+
"supplier",
260+
new(
261+
ChatAgent,
262+
system="""\
263+
Suppliers:
264+
- Capacity constraints
265+
- Price pressures
266+
- Tech transitions\
267+
""",
268+
client=client,
269+
),
270+
)
271+
272+
aggregator = AgentSpec(
273+
"aggregator",
274+
new(Aggregator),
275+
)
276+
277+
parallel = AgentSpec(
278+
"parallel",
279+
new(
280+
Parallel,
281+
"customer",
282+
"employee",
283+
"investor",
284+
"supplier",
285+
aggregator="aggregator",
286+
),
287+
)
288+
```
289+
290+
291+
### Workflow: Routing
292+
198293
TODO
199294

200295

70.1 KB
Loading
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import asyncio
2+
import os
3+
4+
from coagent.agents import (
5+
Aggregator,
6+
AggregationResult,
7+
ChatAgent,
8+
ModelClient,
9+
Parallel,
10+
)
11+
from coagent.agents.messages import ChatMessage
12+
from coagent.core import AgentSpec, new, set_stderr_logger
13+
from coagent.runtimes import LocalRuntime
14+
15+
client = ModelClient(
16+
model=os.getenv("MODEL_NAME"),
17+
api_base=os.getenv("MODEL_API_BASE"),
18+
api_version=os.getenv("MODEL_API_VERSION"),
19+
api_key=os.getenv("MODEL_API_KEY"),
20+
)
21+
22+
customer = AgentSpec(
23+
"customer",
24+
new(
25+
ChatAgent,
26+
system="""\
27+
Customers:
28+
- Price sensitive
29+
- Want better tech
30+
- Environmental concerns\
31+
""",
32+
client=client,
33+
),
34+
)
35+
36+
employee = AgentSpec(
37+
"employee",
38+
new(
39+
ChatAgent,
40+
system="""\
41+
Employees:
42+
- Job security worries
43+
- Need new skills
44+
- Want clear direction\
45+
""",
46+
client=client,
47+
),
48+
)
49+
50+
investor = AgentSpec(
51+
"investor",
52+
new(
53+
ChatAgent,
54+
system="""\
55+
Investors:
56+
- Expect growth
57+
- Want cost control
58+
- Risk concerns\
59+
""",
60+
client=client,
61+
),
62+
)
63+
64+
supplier = AgentSpec(
65+
"supplier",
66+
new(
67+
ChatAgent,
68+
system="""\
69+
Suppliers:
70+
- Capacity constraints
71+
- Price pressures
72+
- Tech transitions\
73+
""",
74+
client=client,
75+
),
76+
)
77+
78+
aggregator = AgentSpec(
79+
"aggregator",
80+
new(Aggregator),
81+
)
82+
83+
parallel = AgentSpec(
84+
"parallel",
85+
new(
86+
Parallel,
87+
"customer",
88+
"employee",
89+
"investor",
90+
"supplier",
91+
aggregator="aggregator",
92+
),
93+
)
94+
95+
96+
async def main():
97+
async with LocalRuntime() as runtime:
98+
for spec in [customer, employee, investor, supplier, aggregator, parallel]:
99+
await runtime.register_spec(spec)
100+
101+
result = await parallel.run(
102+
ChatMessage(
103+
role="user",
104+
content="""\
105+
Analyze how market changes will impact this stakeholder group.
106+
Provide specific impacts and recommended actions.
107+
Format with clear sections and priorities.\
108+
""",
109+
).encode()
110+
)
111+
msg = AggregationResult.decode(result)
112+
for result in msg.results:
113+
x = ChatMessage.decode(result)
114+
print(x.content)
115+
116+
117+
if __name__ == "__main__":
118+
set_stderr_logger()
119+
asyncio.run(main())

0 commit comments

Comments
 (0)