diff --git a/docs/index.ja.md b/docs/index.ja.md index 82f590f..ef19e9a 100644 --- a/docs/index.ja.md +++ b/docs/index.ja.md @@ -89,7 +89,7 @@ uv run python scripts/elasticsearch_operator.py create-index \ # Azure AI Search make create-ai-search-index -# Azure Cosmos DB +# Azure Cosmos DB make create-cosmosdb-index # またはオペレータースクリプトを直接使用: @@ -129,7 +129,7 @@ make create-cosmosdb-index - `weather_agent.py` — シンプルなツール呼び出しエージェント。モック天気検索ツールを使った基本的な ReAct パターン。主要概念: ツール呼び出し、基本エージェントパターン。 - `multi_agent.py` — マルチエージェント協調。転送機能を使ったエージェント間の引き渡しを実演。主要概念: エージェント協調、ワークフロー転送。 -- `parallel_processor_agent/` — 並列実行タスク分解。目標をタスクに分解し並列処理。主要概念: 並列処理、タスク分解、Send 操作。 +- `parallel_rag_agent/` — 並列実行タスク分解。目標をタスクに分解し並列処理。主要概念: 並列処理、タスク分解、Send 操作。 ### サポートモジュール @@ -287,11 +287,11 @@ uv run python scripts/demo_agents_operator.py multi-agent \ --verbose ``` -- Parallel processor agent(タスク分解): +- Parallel rag agent(タスク分解): ```shell -uv run python scripts/demo_agents_operator.py parallel-processor-agent \ - --goal "ソフトウェア会社立ち上げのための情報収集戦略を計画する" \ +uv run python scripts/demo_agents_operator.py parallel-rag-agent \ + --query "ソフトウェア会社立ち上げのための情報収集戦略を計画する" \ --verbose ``` @@ -377,7 +377,7 @@ make mcp-inspector ## 次のステップ 1. **基本から始める**: `kabuto_helpdesk_agent`の例を実行 -2. **デモエージェントを試す**: `weather_agent`、`multi_agent`、`parallel_processor_agent`でシンプルなパターンを探索 +2. **デモエージェントを試す**: `weather_agent`、`multi_agent`、`parallel_rag_agent`でシンプルなパターンを探索 3. **実装を理解する**: `chat_with_tools_agent`と比較 4. **高度なパターンを探索**: タスク分解器とスーパーバイザーエージェントを試す 5. **独自のものを構築**: このテンプレートをあなたのユースケースの出発点として使用 @@ -404,6 +404,6 @@ uv run python scripts/otel_operator.py run -q "health check" -v 3. **構造化出力エージェント** - フォーマットされたデータの返却(`issue_formatter_agent`) 4. **計画エージェント** - 複雑なタスクの分解(`task_decomposer_agent`) 5. **マルチエージェントシステム** - 複数エージェントの協調(`supervisor_agent`、`multi_agent`) -6. **並列処理** - 同時タスク実行(`parallel_processor_agent`) +6. **並列処理** - 同時タスク実行(`parallel_rag_agent`) 各パターンは、いつどのように使用するかを理解するのに役立つ明確な例と文書で実装されています。 diff --git a/docs/index.md b/docs/index.md index 31015d4..6dcdae6 100644 --- a/docs/index.md +++ b/docs/index.md @@ -89,7 +89,7 @@ uv run python scripts/elasticsearch_operator.py create-index \ # Azure AI Search make create-ai-search-index -# Azure Cosmos DB +# Azure Cosmos DB make create-cosmosdb-index # Or use the operator scripts directly: @@ -129,7 +129,7 @@ Additional simple agents for learning and demonstration: - `weather_agent.py` — Simple tool-calling agent. Basic ReAct pattern with mock weather search tool. Key: tool calling, basic agent pattern. - `multi_agent.py` — Multi-agent coordination. Demonstrates agent-to-agent handoff using transfer functions. Key: agent coordination, workflow transfer. -- `parallel_processor_agent/` — Task decomposition with parallel execution. Breaks down goals into tasks and processes them in parallel. Key: parallel processing, task decomposition, Send operations. +- `parallel_rag_agent/` — Task decomposition with parallel execution. Breaks down querys into tasks and processes them in parallel. Key: parallel processing, task decomposition, Send operations. ### Supporting Modules @@ -287,11 +287,11 @@ uv run python scripts/demo_agents_operator.py multi-agent \ --verbose ``` -- Parallel processor agent (task decomposition): +- Parallel rag agent (task decomposition): ```shell -uv run python scripts/demo_agents_operator.py parallel-processor-agent \ - --goal "Plan information gathering strategy for launching a software company" \ +uv run python scripts/demo_agents_operator.py parallel-rag-agent \ + --query "Plan information gathering strategy for launching a software company" \ --verbose ``` @@ -379,7 +379,7 @@ make mcp-inspector ## Next Steps 1. **Start with the basics**: Run the `kabuto_helpdesk_agent` example -2. **Try demo agents**: Explore the simple patterns with `weather_agent`, `multi_agent`, and `parallel_processor_agent` +2. **Try demo agents**: Explore the simple patterns with `weather_agent`, `multi_agent`, and `parallel_rag_agent` 3. **Understand the implementation**: Compare it with `chat_with_tools_agent` 4. **Explore advanced patterns**: Try the task decomposer and supervisor agents 5. **Build your own**: Use this template as a starting point for your use case @@ -406,6 +406,6 @@ This template demonstrates several proven agent architectures: 3. **Structured Output Agent** - Returning formatted data (`issue_formatter_agent`) 4. **Planning Agent** - Breaking down complex tasks (`task_decomposer_agent`) 5. **Multi-Agent Systems** - Coordinating multiple agents (`supervisor_agent`, `multi_agent`) -6. **Parallel Processing** - Concurrent task execution (`parallel_processor_agent`) +6. **Parallel Processing** - Concurrent task execution (`parallel_rag_agent`) Each pattern is implemented with clear examples and documentation to help you understand when and how to use them. diff --git a/langgraph.json b/langgraph.json index 8282d75..aa1c5cc 100644 --- a/langgraph.json +++ b/langgraph.json @@ -1,10 +1,8 @@ { - "dependencies": [ - "." - ], + "dependencies": ["."], "graphs": { "chat_with_tools_agent": "template_langgraph.agents.chat_with_tools_agent.agent:graph", - "demo_agents_parallel_processor_agent": "template_langgraph.agents.demo_agents.parallel_processor_agent.agent:graph", + "demo_agents_parallel_rag_agent": "template_langgraph.agents.demo_agents.parallel_rag_agent.agent:graph", "demo_agents_multi_agent": "template_langgraph.agents.demo_agents.multi_agent:graph", "demo_agents_weather_agent": "template_langgraph.agents.demo_agents.weather_agent:graph", "image_classifier_agent": "template_langgraph.agents.image_classifier_agent.agent:graph", @@ -15,4 +13,4 @@ "task_decomposer_agent": "template_langgraph.agents.task_decomposer_agent.agent:graph" }, "env": ".env" -} \ No newline at end of file +} diff --git a/scripts/demo_agents_operator.py b/scripts/demo_agents_operator.py index f1047d6..ed11fb1 100644 --- a/scripts/demo_agents_operator.py +++ b/scripts/demo_agents_operator.py @@ -4,7 +4,7 @@ from dotenv import load_dotenv from template_langgraph.agents.demo_agents.multi_agent import graph as multi_agent_graph -from template_langgraph.agents.demo_agents.parallel_processor_agent.agent import graph as parallel_processor_agent_graph +from template_langgraph.agents.demo_agents.parallel_rag_agent.agent import graph as parallel_rag_agent_graph from template_langgraph.agents.demo_agents.weather_agent import graph as weather_agent_graph from template_langgraph.loggers import get_logger @@ -74,12 +74,12 @@ def multi_agent( @app.command() -def parallel_processor_agent( - goal: str = typer.Option( - "ソフトウェアシステム開発会社を立ち上げる戦略を立てるための情報収集をしたい", - "--goal", - "-g", - help="The goal to decompose into tasks", +def parallel_rag_agent( + query: str = typer.Option( + "KABUTO のシステム概要やトラブルシュート事例を多種多様な情報ソースから回答して", + "--query", + "-q", + help="The query to decompose into tasks", ), verbose: bool = typer.Option( False, @@ -91,9 +91,9 @@ def parallel_processor_agent( if verbose: logger.setLevel(logging.DEBUG) - for event in parallel_processor_agent_graph.stream( + for event in parallel_rag_agent_graph.stream( input={ - "goal": goal, + "query": query, }, debug=True, ): diff --git a/template_langgraph/agents/demo_agents/parallel_processor_agent/agent.py b/template_langgraph/agents/demo_agents/parallel_processor_agent/agent.py deleted file mode 100644 index 435c0f2..0000000 --- a/template_langgraph/agents/demo_agents/parallel_processor_agent/agent.py +++ /dev/null @@ -1,39 +0,0 @@ -from langchain_core.language_models.chat_models import BaseChatModel -from langgraph.graph import StateGraph -from langgraph.graph.state import CompiledStateGraph - -from template_langgraph.agents.demo_agents.parallel_processor_agent.models import ( - ParallelProcessorAgentInputState, - ParallelProcessorAgentOutputState, - ParallelProcessorAgentState, -) -from template_langgraph.agents.demo_agents.parallel_processor_agent.nodes.decompose_tasks import DecomposeTasks -from template_langgraph.agents.demo_agents.parallel_processor_agent.nodes.run_task import RunTask -from template_langgraph.agents.demo_agents.parallel_processor_agent.nodes.summarize_results import SummarizeResults -from template_langgraph.llms.azure_openais import AzureOpenAiWrapper - - -class ParallelProcessorAgent: - def __init__(self, llm: BaseChatModel): - self.llm = llm - self.decompose_tasks = DecomposeTasks(llm) - self.run_task = RunTask() - self.summarize_results = SummarizeResults() - - def create_graph(self) -> CompiledStateGraph: - workflow = StateGraph( - state_schema=ParallelProcessorAgentState, - input_schema=ParallelProcessorAgentInputState, - output_schema=ParallelProcessorAgentOutputState, - ) - workflow.add_node("decompose_tasks", self.decompose_tasks) - workflow.add_node("run_task", self.run_task) - workflow.add_node("summarize_results", self.summarize_results) - - workflow.add_edge("run_task", "summarize_results") - workflow.set_entry_point("decompose_tasks") - workflow.set_finish_point("summarize_results") - return workflow.compile() - - -graph = ParallelProcessorAgent(AzureOpenAiWrapper().chat_model).create_graph() diff --git a/template_langgraph/agents/demo_agents/parallel_processor_agent/nodes/decompose_tasks.py b/template_langgraph/agents/demo_agents/parallel_processor_agent/nodes/decompose_tasks.py deleted file mode 100644 index d56e12d..0000000 --- a/template_langgraph/agents/demo_agents/parallel_processor_agent/nodes/decompose_tasks.py +++ /dev/null @@ -1,34 +0,0 @@ -from typing import Literal - -from langchain_core.language_models.chat_models import BaseChatModel -from langgraph.types import Command, Send - -from template_langgraph.agents.demo_agents.parallel_processor_agent.models import ( - Tasks, -) - - -class DecomposeTasks: - def __init__(self, llm: BaseChatModel): - self.llm = llm - - def __call__(self, state: dict) -> Command[Literal["run_task"]]: - goal = state.get("goal", "") - tasks: Tasks = self.llm.with_structured_output(Tasks).invoke( - input=f"Decompose the following goal into tasks: {goal}", - ) - gotos = [] - for task in tasks.tasks: - gotos.append( - Send( - "run_task", - { - "task": task, - }, - ) - ) - - return Command( - goto=gotos, - update={"tasks": tasks}, - ) diff --git a/template_langgraph/agents/demo_agents/parallel_processor_agent/nodes/run_task.py b/template_langgraph/agents/demo_agents/parallel_processor_agent/nodes/run_task.py deleted file mode 100644 index f2d1b32..0000000 --- a/template_langgraph/agents/demo_agents/parallel_processor_agent/nodes/run_task.py +++ /dev/null @@ -1,31 +0,0 @@ -from template_langgraph.agents.demo_agents.parallel_processor_agent.models import ( - Task, - TaskResult, -) -from template_langgraph.loggers import get_logger - -logger = get_logger(__name__) - - -class RunTask: - def __init__(self): - pass - - def __call__(self, state: dict) -> dict: - logger.info(f"Running state... {state}") - task: Task = state.get("task", None) - logger.info(f"Task: {task.model_dump_json(indent=2)}") - - # FIXME: Simulate task processing for now. Replace with actual processing logic e.g. Tool call agent - import time - - time.sleep(3) - result = TaskResult( - task=task, - message="Task completed successfully", - result_code=0, - ) - - return { - "task_results": [result], - } diff --git a/template_langgraph/agents/demo_agents/parallel_processor_agent/nodes/summarize_results.py b/template_langgraph/agents/demo_agents/parallel_processor_agent/nodes/summarize_results.py deleted file mode 100644 index c5d7838..0000000 --- a/template_langgraph/agents/demo_agents/parallel_processor_agent/nodes/summarize_results.py +++ /dev/null @@ -1,19 +0,0 @@ -from template_langgraph.agents.demo_agents.parallel_processor_agent.models import ( - ParallelProcessorAgentState, -) -from template_langgraph.loggers import get_logger - -logger = get_logger(__name__) - - -class SummarizeResults: - def __init__(self): - pass - - def __call__(self, state: ParallelProcessorAgentState) -> dict: - logger.info(f"Summarizing results... {state}") - task_results = state.get("task_results", "") - logger.info(f"Task results: {task_results}") - return { - "summary": task_results.__str__(), - } diff --git a/template_langgraph/agents/demo_agents/parallel_processor_agent/__init__.py b/template_langgraph/agents/demo_agents/parallel_rag_agent/__init__.py similarity index 100% rename from template_langgraph/agents/demo_agents/parallel_processor_agent/__init__.py rename to template_langgraph/agents/demo_agents/parallel_rag_agent/__init__.py diff --git a/template_langgraph/agents/demo_agents/parallel_rag_agent/agent.py b/template_langgraph/agents/demo_agents/parallel_rag_agent/agent.py new file mode 100644 index 0000000..9828d03 --- /dev/null +++ b/template_langgraph/agents/demo_agents/parallel_rag_agent/agent.py @@ -0,0 +1,54 @@ +from langchain_core.language_models.chat_models import BaseChatModel +from langchain_core.tools.base import BaseTool +from langgraph.graph import StateGraph +from langgraph.graph.state import CompiledStateGraph + +from template_langgraph.agents.demo_agents.parallel_rag_agent.models import ( + ParallelRagAgentInputState, + ParallelRagAgentOutputState, + ParallelRagAgentState, +) +from template_langgraph.agents.demo_agents.parallel_rag_agent.nodes.decompose_tasks import DecomposeTasks +from template_langgraph.agents.demo_agents.parallel_rag_agent.nodes.run_task import RunTask +from template_langgraph.agents.demo_agents.parallel_rag_agent.nodes.summarize_results import SummarizeResults +from template_langgraph.llms.azure_openais import AzureOpenAiWrapper +from template_langgraph.tools.common import get_default_tools + + +class ParallelRagAgent: + def __init__( + self, + llm: BaseChatModel, + tools: list[BaseTool], + ): + self.llm = llm + self.decompose_tasks = DecomposeTasks( + llm=llm, + tools=tools, + ) + self.run_task = RunTask( + llm=llm, + tools=tools, + ) + self.summarize_results = SummarizeResults() + + def create_graph(self) -> CompiledStateGraph: + workflow = StateGraph( + state_schema=ParallelRagAgentState, + input_schema=ParallelRagAgentInputState, + output_schema=ParallelRagAgentOutputState, + ) + workflow.add_node("decompose_tasks", self.decompose_tasks) + workflow.add_node("run_task", self.run_task) + workflow.add_node("summarize_results", self.summarize_results) + + workflow.add_edge("run_task", "summarize_results") + workflow.set_entry_point("decompose_tasks") + workflow.set_finish_point("summarize_results") + return workflow.compile() + + +graph = ParallelRagAgent( + llm=AzureOpenAiWrapper().chat_model, + tools=get_default_tools(), +).create_graph() diff --git a/template_langgraph/agents/demo_agents/parallel_processor_agent/models.py b/template_langgraph/agents/demo_agents/parallel_rag_agent/models.py similarity index 58% rename from template_langgraph/agents/demo_agents/parallel_processor_agent/models.py rename to template_langgraph/agents/demo_agents/parallel_rag_agent/models.py index 7801f40..24122d3 100644 --- a/template_langgraph/agents/demo_agents/parallel_processor_agent/models.py +++ b/template_langgraph/agents/demo_agents/parallel_rag_agent/models.py @@ -9,8 +9,8 @@ class Task(BaseModel): id: str = Field(description="タスクのID") - description: str = Field(description="タスクの説明") - state: str = Field(description="タスクの状態") + tool_name: str = Field(description="タスクのツール名") + tool_args: dict = Field(description="タスクのツール引数") class Tasks(BaseModel): @@ -23,22 +23,22 @@ class TaskResult(TypedDict): message: str = Field(description="タスクの実行結果メッセージ") -class ParallelProcessorAgentInputState(TypedDict): - goal: str +class ParallelRagAgentInputState(TypedDict): + query: str -class ParallelProcessorAgentProcessingState(TypedDict): +class ParallelRagAgentProcessingState(TypedDict): tasks: Tasks -class ParallelProcessorAgentOutputState(TypedDict): +class ParallelRagAgentOutputState(TypedDict): task_results: Annotated[list[TaskResult], operator.add] summary: str -class ParallelProcessorAgentState( - ParallelProcessorAgentInputState, - ParallelProcessorAgentProcessingState, - ParallelProcessorAgentOutputState, +class ParallelRagAgentState( + ParallelRagAgentInputState, + ParallelRagAgentProcessingState, + ParallelRagAgentOutputState, ): pass diff --git a/template_langgraph/agents/demo_agents/parallel_processor_agent/nodes/__init__.py b/template_langgraph/agents/demo_agents/parallel_rag_agent/nodes/__init__.py similarity index 100% rename from template_langgraph/agents/demo_agents/parallel_processor_agent/nodes/__init__.py rename to template_langgraph/agents/demo_agents/parallel_rag_agent/nodes/__init__.py diff --git a/template_langgraph/agents/demo_agents/parallel_rag_agent/nodes/decompose_tasks.py b/template_langgraph/agents/demo_agents/parallel_rag_agent/nodes/decompose_tasks.py new file mode 100644 index 0000000..489ef4a --- /dev/null +++ b/template_langgraph/agents/demo_agents/parallel_rag_agent/nodes/decompose_tasks.py @@ -0,0 +1,61 @@ +import json +from typing import Literal + +from langchain_core.language_models.chat_models import BaseChatModel +from langchain_core.messages import AIMessage +from langchain_core.tools.base import BaseTool +from langgraph.types import Command, Send + +from template_langgraph.agents.demo_agents.parallel_rag_agent.models import ( + Task, + Tasks, +) +from template_langgraph.loggers import get_logger + +logger = get_logger(__name__) + + +class DecomposeTasks: + def __init__( + self, + llm: BaseChatModel, + tools: list[BaseTool], + ): + self.llm = llm + self.tools = tools + + def __call__(self, state: dict) -> Command[Literal["run_task"]]: + query = state.get("query", "") + response: AIMessage = self.llm.bind_tools(tools=self.tools).invoke(query) + + logger.info(f"{response}, {type(response)}") + gotos = [] + tasks_list: list[Task] = [] + for tool_call in response.tool_calls: + logger.info(f"name={tool_call['name']}, args={tool_call['args']}") + args = tool_call.get("args", {}) + if isinstance(args, str): + try: + args = json.loads(args) + except Exception: + pass + task = Task( + id=tool_call["id"], + tool_name=tool_call["name"], + tool_args=args, + ) + tasks_list.append(task) + gotos.append( + Send( + "run_task", + { + "task": task, + "query": query, + }, + ) + ) + + return Command( + goto=gotos, + update={"tasks": Tasks(tasks=tasks_list)}, + ) diff --git a/template_langgraph/agents/demo_agents/parallel_rag_agent/nodes/run_task.py b/template_langgraph/agents/demo_agents/parallel_rag_agent/nodes/run_task.py new file mode 100644 index 0000000..06bcab2 --- /dev/null +++ b/template_langgraph/agents/demo_agents/parallel_rag_agent/nodes/run_task.py @@ -0,0 +1,57 @@ +import json + +from langchain_core.language_models.chat_models import BaseChatModel +from langchain_core.messages import HumanMessage +from langchain_core.tools.base import BaseTool + +from template_langgraph.agents.demo_agents.parallel_rag_agent.models import ( + Task, + TaskResult, +) +from template_langgraph.loggers import get_logger + +logger = get_logger(__name__) + + +class RunTask: + def __init__( + self, + llm: BaseChatModel, + tools: list[BaseTool], + ): + self.llm = llm + self.tools = tools + self.tools_by_name = {tool.name: tool for tool in tools} + + def __call__(self, state: dict) -> dict: + logger.info(f"Running state... {state}") + task: Task = state.get("task", None) + query: str = state.get("query", None) + logger.info(f"Task: {task.model_dump_json(indent=2)}") + + try: + observation = self.tools_by_name[task.tool_name].invoke(task.tool_args) + except Exception as e: + logger.error(f"Error occurred while invoking tools: {e}") + observation = {"error": str(e)} + + result = self.llm.invoke( + input=[ + HumanMessage(content=query), + HumanMessage( + content=json.dumps(observation.__str__(), ensure_ascii=False), + ), + ], + ) + + logger.info(f"LLM response: {result.model_dump_json(indent=2)}, type: {type(result)}") + + result = TaskResult( + task=task, + result_code=0, + message=result.content, + ) + + return { + "task_results": [result], + } diff --git a/template_langgraph/agents/demo_agents/parallel_rag_agent/nodes/summarize_results.py b/template_langgraph/agents/demo_agents/parallel_rag_agent/nodes/summarize_results.py new file mode 100644 index 0000000..c6aa0f7 --- /dev/null +++ b/template_langgraph/agents/demo_agents/parallel_rag_agent/nodes/summarize_results.py @@ -0,0 +1,23 @@ +from template_langgraph.agents.demo_agents.parallel_rag_agent.models import ( + ParallelRagAgentState, + TaskResult, +) +from template_langgraph.loggers import get_logger + +logger = get_logger(__name__) + + +class SummarizeResults: + def __init__(self): + pass + + def __call__(self, state: ParallelRagAgentState) -> dict: + logger.info(f"Summarizing results... {state}") + task_results: list[TaskResult] = state.get("task_results", []) + summary = "" + for task_result in task_results: + summary += f"Tool: {task_result['task'].tool_name}: {task_result['message']}\n------\n" + logger.info(f"Final summary: {summary}") + return { + "summary": summary, + }