Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/references.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- [Command: A new tool for building multi-agent architectures in LangGraph](https://blog.langchain.com/command-a-new-tool-for-multi-agent-architectures-in-langgraph/)
- [Combine control flow and state updates with Command](https://langchain-ai.github.io/langgraph/how-tos/graph-api/#combine-control-flow-and-state-updates-with-command)
- [Command: a new tool for building multi-agent architectures in LangGraph](https://www.youtube.com/watch?v=6BJDKf90L9A)
- [masamasa59/genai-agent-advanced-book > chapter6](https://github.com/masamasa59/genai-agent-advanced-book/blob/main/chapter6/arxiv_researcher/agent/paper_search_agent.py)

### Sample Codes

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,4 @@ possibly-unbound-attribute = "ignore"
unresolved-attribute = "ignore"
invalid-argument-type = "ignore"
invalid-type-form = "ignore"
invalid-assignment = "ignore"
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import typer
from dotenv import load_dotenv

from template_langgraph.agents.simple_multi_agent.multi_agent import app as multi_agent_app
from template_langgraph.agents.simple_multi_agent.weather_agent import app as weather_agent_app
from template_langgraph.agents.demo_agents.multi_agent import app as multi_agent_app
from template_langgraph.agents.demo_agents.parallel_processor_agent.agent import app as parallel_processor_agent_app
from template_langgraph.agents.demo_agents.weather_agent import app as weather_agent_app
from template_langgraph.loggers import get_logger

app = typer.Typer(
add_completion=False,
help="SimpleMultiAgent CLI",
help="Demo Agents CLI",
)
logger = get_logger(__name__)

Expand Down Expand Up @@ -72,6 +73,34 @@ def multi_agent(
logger.info(response["messages"][-1].content)


@app.command()
def parallel_processor_agent(
goal: str = typer.Option(
"ソフトウェアシステム開発会社を立ち上げる戦略を立てるための情報収集をしたい",
"--goal",
"-g",
help="The goal to decompose into tasks",
),
verbose: bool = typer.Option(
False,
"--verbose",
"-v",
help="Enable verbose output",
),
):
if verbose:
logger.setLevel(logging.DEBUG)

for event in parallel_processor_agent_app.stream(
input={
"goal": goal,
},
debug=True,
):
logger.info("-" * 20)
logger.info(f"Event: {event}")


if __name__ == "__main__":
load_dotenv(
override=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.types import Command

from template_langgraph.agents.simple_multi_agent.weather_agent import app
from template_langgraph.agents.demo_agents.weather_agent import app
from template_langgraph.llms.azure_openais import AzureOpenAiWrapper
from template_langgraph.loggers import get_logger

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from langchain_core.language_models.chat_models import BaseChatModel
from langgraph.graph import StateGraph
from langgraph.graph.state import CompiledStateGraph

from template_langgraph.agents.demo_agents.parallel_processor_agent.models import (
ParallelProcessorAgentInputState,
ParallelProcessorAgentOutputState,
ParallelProcessorAgentState,
)
from template_langgraph.agents.demo_agents.parallel_processor_agent.nodes.decompose_tasks import DecomposeTasks
from template_langgraph.agents.demo_agents.parallel_processor_agent.nodes.run_task import RunTask
from template_langgraph.agents.demo_agents.parallel_processor_agent.nodes.summarize_results import SummarizeResults
from template_langgraph.llms.azure_openais import AzureOpenAiWrapper


class ParallelProcessorAgent:
def __init__(self, llm: BaseChatModel):
self.llm = llm
self.decompose_tasks = DecomposeTasks(llm)
self.run_task = RunTask()
self.summarize_results = SummarizeResults()

def create_graph(self) -> CompiledStateGraph:
workflow = StateGraph(
state_schema=ParallelProcessorAgentState,
input_schema=ParallelProcessorAgentInputState,
output_schema=ParallelProcessorAgentOutputState,
)
workflow.add_node("decompose_tasks", self.decompose_tasks)
workflow.add_node("run_task", self.run_task)
workflow.add_node("summarize_results", self.summarize_results)

workflow.add_edge("run_task", "summarize_results")
workflow.set_entry_point("decompose_tasks")
workflow.set_finish_point("summarize_results")
return workflow.compile()


app = ParallelProcessorAgent(AzureOpenAiWrapper().chat_model).create_graph()
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import operator
from typing import (
Annotated,
TypedDict,
)

from pydantic import BaseModel, Field


class Task(BaseModel):
id: str = Field(description="タスクのID")
description: str = Field(description="タスクの説明")
state: str = Field(description="タスクの状態")


class Tasks(BaseModel):
tasks: list[Task] = Field(description="タスクのリスト")


class TaskResult(TypedDict):
task: Task = Field(description="タスクの詳細")
result_code: int = Field(description="タスクの実行結果コード")
message: str = Field(description="タスクの実行結果メッセージ")


class ParallelProcessorAgentInputState(TypedDict):
goal: str


class ParallelProcessorAgentProcessingState(TypedDict):
tasks: Tasks


class ParallelProcessorAgentOutputState(TypedDict):
task_results: Annotated[list[TaskResult], operator.add]
summary: str


class ParallelProcessorAgentState(
ParallelProcessorAgentInputState,
ParallelProcessorAgentProcessingState,
ParallelProcessorAgentOutputState,
):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import Literal

from langchain_core.language_models.chat_models import BaseChatModel
from langgraph.types import Command, Send

from template_langgraph.agents.demo_agents.parallel_processor_agent.models import (
Tasks,
)


class DecomposeTasks:
def __init__(self, llm: BaseChatModel):
self.llm = llm

def __call__(self, state: dict) -> Command[Literal["run_task"]]:
goal = state.get("goal", "")
tasks: Tasks = self.llm.with_structured_output(Tasks).invoke(
input=f"Decompose the following goal into tasks: {goal}",
)
gotos = []
for task in tasks.tasks:
gotos.append(
Send(
"run_task",
{
"task": task,
},
)
)

return Command(
goto=gotos,
update={"tasks": tasks},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from template_langgraph.agents.demo_agents.parallel_processor_agent.models import (
Task,
TaskResult,
)
from template_langgraph.loggers import get_logger

logger = get_logger(__name__)


class RunTask:
def __init__(self):
pass

def __call__(self, state: dict) -> dict:
logger.info(f"Running state... {state}")
task: Task = state.get("task", None)
logger.info(f"Task: {task.model_dump_json(indent=2)}")

# FIXME: Simulate task processing for now. Replace with actual processing logic e.g. Tool call agent
import time

time.sleep(3)
result = TaskResult(
task=task,
message="Task completed successfully",
result_code=0,
)

return {
"task_results": [result],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from template_langgraph.agents.demo_agents.parallel_processor_agent.models import (
ParallelProcessorAgentState,
)
from template_langgraph.loggers import get_logger

logger = get_logger(__name__)


class SummarizeResults:
def __init__(self):
pass

def __call__(self, state: ParallelProcessorAgentState) -> dict:
logger.info(f"Summarizing results... {state}")
task_results = state.get("task_results", "")
logger.info(f"Task results: {task_results}")
return {
"summary": task_results.__str__(),
}