Skip to content

Commit 584febd

Browse files
committed
implement parallel agent
1 parent 69c3117 commit 584febd

File tree

11 files changed

+184
-14
lines changed

11 files changed

+184
-14
lines changed

docs/references.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
- [Command: A new tool for building multi-agent architectures in LangGraph](https://blog.langchain.com/command-a-new-tool-for-multi-agent-architectures-in-langgraph/)
1313
- [Combine control flow and state updates with Command](https://langchain-ai.github.io/langgraph/how-tos/graph-api/#combine-control-flow-and-state-updates-with-command)
1414
- [Command: a new tool for building multi-agent architectures in LangGraph](https://www.youtube.com/watch?v=6BJDKf90L9A)
15+
- [masamasa59/genai-agent-advanced-book > chapter6](https://github.com/masamasa59/genai-agent-advanced-book/blob/main/chapter6/arxiv_researcher/agent/paper_search_agent.py)
1516

1617
### Sample Codes
1718

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,4 @@ possibly-unbound-attribute = "ignore"
8686
unresolved-attribute = "ignore"
8787
invalid-argument-type = "ignore"
8888
invalid-type-form = "ignore"
89+
invalid-assignment = "ignore"

scripts/demo_agents_operator.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from dotenv import load_dotenv
55

66
from template_langgraph.agents.demo_agents.multi_agent import app as multi_agent_app
7-
from template_langgraph.agents.demo_agents.parallel_processor_agent import app as parallel_agent_app
7+
from template_langgraph.agents.demo_agents.parallel_processor_agent.agent import app as parallel_processor_agent_app
88
from template_langgraph.agents.demo_agents.weather_agent import app as weather_agent_app
99
from template_langgraph.loggers import get_logger
1010

@@ -74,7 +74,13 @@ def multi_agent(
7474

7575

7676
@app.command()
77-
def parallel_agent(
77+
def parallel_processor_agent(
78+
goal: str = typer.Option(
79+
"ソフトウェアシステム開発会社を立ち上げる戦略を立てるための情報収集をしたい",
80+
"--goal",
81+
"-g",
82+
help="The goal to decompose into tasks",
83+
),
7884
verbose: bool = typer.Option(
7985
False,
8086
"--verbose",
@@ -85,19 +91,14 @@ def parallel_agent(
8591
if verbose:
8692
logger.setLevel(logging.DEBUG)
8793

88-
parallel_agent_app.invoke(
89-
{
90-
"messages": [
91-
{"role": "user", "content": "Simulate multiple tasks in parallel"},
92-
],
93-
"tasks": [
94-
"Task 1",
95-
"Task 2",
96-
"Task 3",
97-
],
94+
for event in parallel_processor_agent_app.stream(
95+
input={
96+
"goal": goal,
9897
},
9998
debug=True,
100-
)
99+
):
100+
logger.info("-" * 20)
101+
logger.info(f"Event: {event}")
101102

102103

103104
if __name__ == "__main__":

template_langgraph/agents/demo_agents/multi_agent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from langgraph.graph import END, START, MessagesState, StateGraph
44
from langgraph.types import Command
55

6-
from template_langgraph.agents.demo_agent.weather_agent import app
6+
from template_langgraph.agents.demo_agents.weather_agent import app
77
from template_langgraph.llms.azure_openais import AzureOpenAiWrapper
88
from template_langgraph.loggers import get_logger
99

template_langgraph/agents/demo_agents/parallel_processor_agent/__init__.py

Whitespace-only changes.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from langchain_core.language_models.chat_models import BaseChatModel
2+
from langgraph.graph import StateGraph
3+
from langgraph.graph.state import CompiledStateGraph
4+
5+
from template_langgraph.agents.demo_agents.parallel_processor_agent.models import (
6+
ParallelProcessorAgentInputState,
7+
ParallelProcessorAgentOutputState,
8+
ParallelProcessorAgentState,
9+
)
10+
from template_langgraph.agents.demo_agents.parallel_processor_agent.nodes.decompose_tasks import DecomposeTasks
11+
from template_langgraph.agents.demo_agents.parallel_processor_agent.nodes.run_task import RunTask
12+
from template_langgraph.agents.demo_agents.parallel_processor_agent.nodes.summarize_results import SummarizeResults
13+
from template_langgraph.llms.azure_openais import AzureOpenAiWrapper
14+
15+
16+
class ParallelProcessorAgent:
17+
def __init__(self, llm: BaseChatModel):
18+
self.llm = llm
19+
self.decompose_tasks = DecomposeTasks(llm)
20+
self.run_task = RunTask()
21+
self.summarize_results = SummarizeResults()
22+
23+
def create_graph(self) -> CompiledStateGraph:
24+
workflow = StateGraph(
25+
state_schema=ParallelProcessorAgentState,
26+
input_schema=ParallelProcessorAgentInputState,
27+
output_schema=ParallelProcessorAgentOutputState,
28+
)
29+
workflow.add_node("decompose_tasks", self.decompose_tasks)
30+
workflow.add_node("run_task", self.run_task)
31+
workflow.add_node("summarize_results", self.summarize_results)
32+
33+
workflow.add_edge("run_task", "summarize_results")
34+
workflow.set_entry_point("decompose_tasks")
35+
workflow.set_finish_point("summarize_results")
36+
return workflow.compile()
37+
38+
39+
app = ParallelProcessorAgent(AzureOpenAiWrapper().chat_model).create_graph()
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import operator
2+
from typing import (
3+
Annotated,
4+
TypedDict,
5+
)
6+
7+
from pydantic import BaseModel, Field
8+
9+
10+
class Task(BaseModel):
11+
id: str = Field(description="タスクのID")
12+
description: str = Field(description="タスクの説明")
13+
state: str = Field(description="タスクの状態")
14+
15+
16+
class Tasks(BaseModel):
17+
tasks: list[Task] = Field(description="タスクのリスト")
18+
19+
20+
class TaskResult(TypedDict):
21+
task: Task = Field(description="タスクの詳細")
22+
result_code: int = Field(description="タスクの実行結果コード")
23+
message: str = Field(description="タスクの実行結果メッセージ")
24+
25+
26+
class ParallelProcessorAgentInputState(TypedDict):
27+
goal: str
28+
29+
30+
class ParallelProcessorAgentProcessingState(TypedDict):
31+
tasks: Tasks
32+
33+
34+
class ParallelProcessorAgentOutputState(TypedDict):
35+
task_results: Annotated[list[TaskResult], operator.add]
36+
summary: str
37+
38+
39+
class ParallelProcessorAgentState(
40+
ParallelProcessorAgentInputState,
41+
ParallelProcessorAgentProcessingState,
42+
ParallelProcessorAgentOutputState,
43+
):
44+
pass

template_langgraph/agents/demo_agents/parallel_processor_agent/nodes/__init__.py

Whitespace-only changes.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from typing import Literal
2+
3+
from langchain_core.language_models.chat_models import BaseChatModel
4+
from langgraph.types import Command, Send
5+
6+
from template_langgraph.agents.demo_agents.parallel_processor_agent.models import (
7+
Tasks,
8+
)
9+
10+
11+
class DecomposeTasks:
12+
def __init__(self, llm: BaseChatModel):
13+
self.llm = llm
14+
15+
def __call__(self, state: dict) -> Command[Literal["run_task"]]:
16+
goal = state.get("goal", "")
17+
tasks: Tasks = self.llm.with_structured_output(Tasks).invoke(
18+
input=f"Decompose the following goal into tasks: {goal}",
19+
)
20+
gotos = []
21+
for task in tasks.tasks:
22+
gotos.append(
23+
Send(
24+
"run_task",
25+
{
26+
"task": task,
27+
},
28+
)
29+
)
30+
31+
return Command(
32+
goto=gotos,
33+
update={"tasks": tasks},
34+
)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from template_langgraph.agents.demo_agents.parallel_processor_agent.models import (
2+
Task,
3+
TaskResult,
4+
)
5+
from template_langgraph.loggers import get_logger
6+
7+
logger = get_logger(__name__)
8+
9+
10+
class RunTask:
11+
def __init__(self):
12+
pass
13+
14+
def __call__(self, state: dict) -> dict:
15+
logger.info(f"Running state... {state}")
16+
task: Task = state.get("task", None)
17+
logger.info(f"Task: {task.model_dump_json(indent=2)}")
18+
19+
# FIXME: Simulate task processing for now. Replace with actual processing logic e.g. Tool call agent
20+
import time
21+
22+
time.sleep(3)
23+
result = TaskResult(
24+
task=task,
25+
message="Task completed successfully",
26+
result_code=0,
27+
)
28+
29+
return {
30+
"task_results": [result],
31+
}

0 commit comments

Comments
 (0)