diff --git a/mesa_llm/module_llm.py b/mesa_llm/module_llm.py index a2309838..499f1f4c 100644 --- a/mesa_llm/module_llm.py +++ b/mesa_llm/module_llm.py @@ -8,6 +8,7 @@ RateLimitError, Timeout, ) +from pydantic import BaseModel from tenacity import AsyncRetrying, retry, retry_if_exception_type, wait_exponential RETRYABLE_EXCEPTIONS = ( @@ -79,7 +80,11 @@ def __init__( self.llm_model, ) - def _build_messages(self, prompt: str | list[str] | None = None) -> list[dict]: + def _build_messages( + self, + prompt: str | list[str] | None = None, + system_prompt: str | None = None, + ) -> list[dict]: """ Format the prompt messages for the LLM of the form : {"role": ..., "content": ...} @@ -92,7 +97,10 @@ def _build_messages(self, prompt: str | list[str] | None = None) -> list[dict]: messages = [] # Always include a system message. Default to empty string if no system prompt to support Ollama - system_content = self.system_prompt if self.system_prompt else "" + system_content = ( + system_prompt if system_prompt is not None else self.system_prompt + ) + system_content = system_content if system_content else "" messages.append({"role": "system", "content": system_content}) if prompt: @@ -104,6 +112,23 @@ def _build_messages(self, prompt: str | list[str] | None = None) -> list[dict]: return messages + def parse_structured_output( + self, + response, + response_model: type[BaseModel], + ) -> BaseModel: + """Normalize structured LLM output into the requested pydantic model.""" + message = response.choices[0].message + parsed = getattr(message, "parsed", None) + + if isinstance(parsed, response_model): + return parsed + + if parsed is not None: + return response_model.model_validate(parsed) + + return response_model.model_validate_json(message.content) + @retry( wait=wait_exponential(multiplier=1, min=1, max=60), retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS), @@ -115,6 +140,7 @@ def generate( tool_schema: list[dict] | None = None, tool_choice: str = "auto", response_format: dict | object | None = None, + system_prompt: str | None = None, ) -> str: """ Generate a response from the LLM using litellm based on the prompt @@ -129,7 +155,7 @@ def generate( The response from the LLM """ - messages = self._build_messages(prompt) + messages = self._build_messages(prompt, system_prompt=system_prompt) completion_kwargs = { "model": self.llm_model, @@ -151,11 +177,12 @@ async def agenerate( tool_schema: list[dict] | None = None, tool_choice: str = "auto", response_format: dict | object | None = None, + system_prompt: str | None = None, ) -> str: """ Asynchronous version of generate() method for parallel LLM calls. """ - messages = self._build_messages(prompt) + messages = self._build_messages(prompt, system_prompt=system_prompt) async for attempt in AsyncRetrying( wait=wait_exponential(multiplier=1, min=1, max=60), retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS), diff --git a/mesa_llm/reasoning/decision.py b/mesa_llm/reasoning/decision.py new file mode 100644 index 00000000..00b7e991 --- /dev/null +++ b/mesa_llm/reasoning/decision.py @@ -0,0 +1,192 @@ +from typing import TYPE_CHECKING + +from pydantic import BaseModel, Field + +from mesa_llm.reasoning.reasoning import Observation, Plan, Reasoning + +if TYPE_CHECKING: + from mesa_llm.llm_agent import LLMAgent + + +class DecisionOption(BaseModel): + name: str + description: str + tradeoffs: list[str] + score: float = Field( + ge=0.0, + le=1.0, + description="Relative evaluation score for this option in the current context.", + ) + + +class DecisionOutput(BaseModel): + goal: str + constraints: list[str] + known_facts: list[str] + unknowns: list[str] + assumptions: list[str] + options: list[DecisionOption] + chosen_option: str + rationale: str + confidence: float = Field(ge=0.0, le=1.0) + risks: list[str] + next_action: str + + +class DecisionReasoning(Reasoning): + """ + Structured decision-making reasoning that returns a strict JSON object before + converting the selected next action into tool calls. + """ + + def __init__(self, agent: "LLMAgent"): + super().__init__(agent=agent) + + def get_decision_system_prompt(self) -> str: + return """ + You are an autonomous agent operating within a simulation environment. + + Your task is to analyze your current observation and memory to make a highly structured, optimal decision. + Do not produce free-form chain-of-thought prose. You must evaluate the situation and return a strict JSON object matching the required schema. + + Your response must include: + - goal: Your current primary objective within the simulation. + - constraints: Any rules, resource limits, or environmental boundaries restricting your actions. + - known_facts: Verified data strictly grounded in your current observation or historical memory. + - unknowns: Critical missing information required for perfect decision-making. + - assumptions: Logical inferences made to bridge the gap between known facts and unknowns. + - options: A list of distinct, executable choices currently available to you. Each must include a name, description, tradeoffs, and a relative evaluation score. + - chosen_option: The exact name of the best option selected from the list above. + - rationale: A concise, logical justification for why this option was chosen over the alternatives. + - confidence: A float between 0.0 and 1.0 representing your certainty in this decision. + - risks: Potential negative outcomes or failure states associated with the chosen option. + - next_action: A single, concrete, and strictly formatted executable command. + + Execution Requirements: + 1. Ground all known_facts entirely in the provided observation context. Do not hallucinate simulation state or capabilities. + 2. next_action must strictly match an available execution command. Do not invent tools. + 3. If information is heavily constrained or missing, explicitly reflect this by lowering the confidence score and detailing the danger in risks. + """ + + def get_decision_prompt(self, obs: Observation) -> list[str]: + prompt_list = [] + + get_prompt_ready = getattr(self.agent.memory, "get_prompt_ready", None) + if callable(get_prompt_ready): + prompt_list.append(get_prompt_ready()) + + get_communication_history = getattr( + self.agent.memory, "get_communication_history", None + ) + last_communication = ( + get_communication_history() if callable(get_communication_history) else "" + ) + + if last_communication: + prompt_list.append("last communication: \n" + str(last_communication)) + if obs: + prompt_list.append("current observation: \n" + str(obs)) + + return prompt_list + + def plan( + self, + prompt: str | None = None, + obs: Observation | None = None, + ttl: int = 1, + selected_tools: list[str] | None = None, + ) -> Plan: + """ + Plan the next action through a structured decision artifact. + """ + if obs is None: + obs = self.agent.generate_obs() + + prompt_list = self.get_decision_prompt(obs) + + if prompt is not None: + prompt_list.append(prompt) + elif self.agent.step_prompt is not None: + prompt_list.append(self.agent.step_prompt) + else: + raise ValueError("No prompt provided and agent.step_prompt is None.") + + selected_tools_schema = self.agent.tool_manager.get_all_tools_schema( + selected_tools + ) + + rsp = self.agent.llm.generate( + prompt=prompt_list, + tool_schema=selected_tools_schema, + tool_choice="none", + response_format=DecisionOutput, + system_prompt=self.get_decision_system_prompt(), + ) + + formatted_response = self.agent.llm.parse_structured_output( + rsp, DecisionOutput + ).model_dump() + self.agent.memory.add_to_memory(type="decision", content=formatted_response) + + if hasattr(self.agent, "_step_display_data"): + self.agent._step_display_data["plan_content"] = formatted_response[ + "rationale" + ] + + return self.execute_tool_call( + formatted_response["next_action"], + selected_tools=selected_tools, + ttl=ttl, + ) + + async def aplan( + self, + prompt: str | None = None, + obs: Observation | None = None, + ttl: int = 1, + selected_tools: list[str] | None = None, + ) -> Plan: + """ + Asynchronous version of plan() method for parallel planning. + """ + if obs is None: + obs = await self.agent.agenerate_obs() + + prompt_list = self.get_decision_prompt(obs) + + if prompt is not None: + prompt_list.append(prompt) + elif self.agent.step_prompt is not None: + prompt_list.append(self.agent.step_prompt) + else: + raise ValueError("No prompt provided and agent.step_prompt is None.") + + selected_tools_schema = self.agent.tool_manager.get_all_tools_schema( + selected_tools + ) + + rsp = await self.agent.llm.agenerate( + prompt=prompt_list, + tool_schema=selected_tools_schema, + tool_choice="none", + response_format=DecisionOutput, + system_prompt=self.get_decision_system_prompt(), + ) + + formatted_response = self.agent.llm.parse_structured_output( + rsp, DecisionOutput + ).model_dump() + await self.agent.memory.aadd_to_memory( + type="decision", content=formatted_response + ) + + if hasattr(self.agent, "_step_display_data"): + self.agent._step_display_data["plan_content"] = formatted_response[ + "rationale" + ] + + return await self.aexecute_tool_call( + formatted_response["next_action"], + selected_tools=selected_tools, + ttl=ttl, + ) diff --git a/tests/test_module_llm.py b/tests/test_module_llm.py index 8e3dd302..aaf9b6ba 100644 --- a/tests/test_module_llm.py +++ b/tests/test_module_llm.py @@ -1,7 +1,8 @@ import os -from unittest.mock import patch +from unittest.mock import Mock, patch import pytest +from pydantic import BaseModel from mesa_llm.module_llm import ModuleLLM @@ -82,6 +83,36 @@ def test_build_messages(self): messages = llm._build_messages(prompt=None) assert messages == [{"role": "system", "content": ""}] + messages = llm._build_messages( + "Hello, how are you?", system_prompt="Per-call prompt" + ) + assert messages == [ + {"role": "system", "content": "Per-call prompt"}, + {"role": "user", "content": "Hello, how are you?"}, + ] + + def test_parse_structured_output(self): + class DummyOutput(BaseModel): + answer: str + + llm = ModuleLLM(llm_model="openai/gpt-4o") + + response = Mock() + response.choices = [Mock()] + response.choices[0].message = Mock() + response.choices[0].message.parsed = DummyOutput(answer="parsed") + parsed = llm.parse_structured_output(response, DummyOutput) + assert parsed.answer == "parsed" + + response.choices[0].message.parsed = {"answer": "dict"} + parsed = llm.parse_structured_output(response, DummyOutput) + assert parsed.answer == "dict" + + response.choices[0].message.parsed = None + response.choices[0].message.content = '{"answer":"json"}' + parsed = llm.parse_structured_output(response, DummyOutput) + assert parsed.answer == "json" + def test_generate(self, monkeypatch, llm_response_factory): monkeypatch.setattr( "mesa_llm.module_llm.completion", lambda **kwargs: llm_response_factory() diff --git a/tests/test_reasoning/test_decision.py b/tests/test_reasoning/test_decision.py new file mode 100644 index 00000000..e48905f3 --- /dev/null +++ b/tests/test_reasoning/test_decision.py @@ -0,0 +1,418 @@ +import asyncio +import json +from unittest.mock import AsyncMock, Mock + +import pytest + +from mesa_llm.reasoning.decision import ( + DecisionOption, + DecisionOutput, + DecisionReasoning, +) +from mesa_llm.reasoning.reasoning import Observation, Plan + + +class TestDecisionModels: + def test_decision_option_creation(self): + option = DecisionOption( + name="move_to_market", + description="Move toward the market cell.", + tradeoffs=["Consumes one turn", "May expose the agent"], + score=0.82, + ) + + assert option.name == "move_to_market" + assert option.tradeoffs == ["Consumes one turn", "May expose the agent"] + assert option.score == 0.82 + + def test_decision_option_score_constraints(self): + with pytest.raises(ValueError): + DecisionOption( + name="invalid_option", + description="Invalid score", + tradeoffs=["Not allowed"], + score=1.5, + ) + + def test_decision_output_creation(self): + output = DecisionOutput( + goal="Reach a safer location", + constraints=["Can only move once"], + known_facts=["An exit is visible to the north"], + unknowns=["Whether another agent will block the path"], + assumptions=["The northern cell remains open this turn"], + options=[ + DecisionOption( + name="move_north", + description="Move one cell north.", + tradeoffs=["Fast", "Could enter conflict"], + score=0.9, + ) + ], + chosen_option="move_north", + rationale="It advances the goal with the best visible route.", + confidence=0.76, + risks=["The path could become blocked"], + next_action="move_north", + ) + + assert output.goal == "Reach a safer location" + assert output.chosen_option == "move_north" + assert output.confidence == 0.76 + assert output.next_action == "move_north" + + +class TestDecisionReasoning: + def test_decision_reasoning_initialization(self, mock_agent): + reasoning = DecisionReasoning(mock_agent) + + assert reasoning.agent == mock_agent + + def test_get_decision_system_prompt(self, mock_agent): + reasoning = DecisionReasoning(mock_agent) + + prompt = reasoning.get_decision_system_prompt() + + assert "strict JSON object" in prompt + assert "known_facts" in prompt + assert "next_action" in prompt + + def test_get_decision_prompt_with_observation(self, mock_agent): + mock_agent.memory = Mock() + mock_agent.memory.get_prompt_ready.return_value = "memory1\n\nmemory2" + mock_agent.memory.get_communication_history.return_value = "communication" + + reasoning = DecisionReasoning(mock_agent) + obs = Observation(step=1, self_state={}, local_state={}) + prompt_list = reasoning.get_decision_prompt(obs) + + assert len(prompt_list) >= 2 + assert "last communication" in prompt_list[-2] + assert "current observation" in prompt_list[-1] + + def test_plan_with_prompt(self, llm_response_factory, mock_agent): + mock_agent.memory = Mock() + mock_agent.memory.get_prompt_ready.return_value = "memory1" + mock_agent.memory.get_communication_history.return_value = "" + mock_agent.memory.add_to_memory = Mock() + mock_agent.llm = Mock() + parsed_output = DecisionOutput( + goal="Reach food", + constraints=["One movement per turn"], + known_facts=["Food is visible nearby"], + unknowns=["Whether the route stays open"], + assumptions=["The route remains open this step"], + options=[ + DecisionOption( + name="move_to_food", + description="Move toward visible food", + tradeoffs=["Fast", "May be contested"], + score=0.88, + ) + ], + chosen_option="move_to_food", + rationale="It best advances the immediate goal.", + confidence=0.78, + risks=["Another agent may reach it first"], + next_action="move_to_food", + ) + mock_agent.llm.parse_structured_output.return_value = parsed_output + mock_agent.tool_manager = Mock() + mock_agent.tool_manager.get_all_tools_schema.return_value = {} + mock_agent._step_display_data = {} + + mock_agent.llm.generate.return_value = llm_response_factory( + content=json.dumps( + { + "goal": "Reach food", + "constraints": ["One movement per turn"], + "known_facts": ["Food is visible nearby"], + "unknowns": ["Whether the route stays open"], + "assumptions": ["The route remains open this step"], + "options": [ + { + "name": "move_to_food", + "description": "Move toward visible food", + "tradeoffs": ["Fast", "May be contested"], + "score": 0.88, + } + ], + "chosen_option": "move_to_food", + "rationale": "It best advances the immediate goal.", + "confidence": 0.78, + "risks": ["Another agent may reach it first"], + "next_action": "move_to_food", + } + ) + ) + + mock_plan = Plan(step=1, llm_plan=Mock()) + reasoning = DecisionReasoning(mock_agent) + reasoning.execute_tool_call = Mock(return_value=mock_plan) + + obs = Observation(step=1, self_state={}, local_state={}) + result = reasoning.plan(obs=obs, prompt="Custom prompt") + + assert result == mock_plan + mock_agent.memory.add_to_memory.assert_called_once() + mock_agent.llm.generate.assert_called_once() + assert ( + mock_agent.llm.generate.call_args.kwargs["system_prompt"] + == reasoning.get_decision_system_prompt() + ) + mock_agent.llm.parse_structured_output.assert_called_once_with( + mock_agent.llm.generate.return_value, DecisionOutput + ) + reasoning.execute_tool_call.assert_called_once_with( + "move_to_food", + selected_tools=None, + ttl=1, + ) + assert mock_agent._step_display_data["plan_content"] == ( + "It best advances the immediate goal." + ) + + def test_plan_with_selected_tools(self, llm_response_factory, mock_agent): + mock_agent.step_prompt = "Default step prompt" + mock_agent.memory = Mock() + mock_agent.memory.get_prompt_ready.return_value = "memory1" + mock_agent.memory.get_communication_history.return_value = "" + mock_agent.memory.add_to_memory = Mock() + mock_agent.llm = Mock() + mock_agent.llm.parse_structured_output.return_value = DecisionOutput( + goal="Hold position", + constraints=["No safe exit visible"], + known_facts=["A threat is adjacent"], + unknowns=["Threat intent"], + assumptions=["Staying still is safer than moving blindly"], + options=[ + DecisionOption( + name="wait", + description="Hold current position", + tradeoffs=["No progress", "Reduces exposure"], + score=0.61, + ) + ], + chosen_option="wait", + rationale="It minimizes immediate danger.", + confidence=0.53, + risks=["Threat may approach anyway"], + next_action="wait", + ) + mock_agent.tool_manager = Mock() + mock_agent.tool_manager.get_all_tools_schema.return_value = {} + mock_agent._step_display_data = {} + + mock_agent.llm.generate.return_value = llm_response_factory( + content=json.dumps( + { + "goal": "Hold position", + "constraints": ["No safe exit visible"], + "known_facts": ["A threat is adjacent"], + "unknowns": ["Threat intent"], + "assumptions": ["Staying still is safer than moving blindly"], + "options": [ + { + "name": "wait", + "description": "Hold current position", + "tradeoffs": ["No progress", "Reduces exposure"], + "score": 0.61, + } + ], + "chosen_option": "wait", + "rationale": "It minimizes immediate danger.", + "confidence": 0.53, + "risks": ["Threat may approach anyway"], + "next_action": "wait", + } + ) + ) + + mock_plan = Plan(step=1, llm_plan=Mock()) + reasoning = DecisionReasoning(mock_agent) + reasoning.execute_tool_call = Mock(return_value=mock_plan) + + obs = Observation(step=1, self_state={}, local_state={}) + selected_tools = ["tool1", "tool2"] + result = reasoning.plan(obs=obs, ttl=3, selected_tools=selected_tools) + + assert result == mock_plan + mock_agent.tool_manager.get_all_tools_schema.assert_called_with(selected_tools) + reasoning.execute_tool_call.assert_called_once_with( + "wait", + selected_tools=selected_tools, + ttl=3, + ) + + def test_get_decision_prompt_without_optional_memory_methods(self, mock_agent): + mock_agent.memory = Mock(spec=[]) + + reasoning = DecisionReasoning(mock_agent) + obs = Observation(step=1, self_state={}, local_state={}) + + prompt_list = reasoning.get_decision_prompt(obs) + + assert prompt_list == ["current observation: \n" + str(obs)] + + def test_plan_uses_structured_response_normalizer( + self, llm_response_factory, mock_agent + ): + mock_agent.memory = Mock() + mock_agent.memory.get_prompt_ready.return_value = "memory1" + mock_agent.memory.get_communication_history.return_value = "" + mock_agent.memory.add_to_memory = Mock() + mock_agent.llm = Mock() + parsed_output = DecisionOutput( + goal="Reach food", + constraints=["One movement per turn"], + known_facts=["Food is visible nearby"], + unknowns=["Whether the route stays open"], + assumptions=["The route remains open this step"], + options=[ + DecisionOption( + name="move_to_food", + description="Move toward visible food", + tradeoffs=["Fast", "May be contested"], + score=0.88, + ) + ], + chosen_option="move_to_food", + rationale="It best advances the immediate goal.", + confidence=0.78, + risks=["Another agent may reach it first"], + next_action="move_to_food", + ) + mock_agent.llm.parse_structured_output.return_value = parsed_output + mock_agent.tool_manager = Mock() + mock_agent.tool_manager.get_all_tools_schema.return_value = {} + mock_agent._step_display_data = {} + mock_agent.llm.generate.return_value = llm_response_factory(content="ignored") + + mock_plan = Plan(step=1, llm_plan=Mock()) + reasoning = DecisionReasoning(mock_agent) + reasoning.execute_tool_call = Mock(return_value=mock_plan) + + obs = Observation(step=1, self_state={}, local_state={}) + result = reasoning.plan(obs=obs, prompt="Custom prompt") + + assert result == mock_plan + mock_agent.memory.add_to_memory.assert_called_once_with( + type="decision", content=parsed_output.model_dump() + ) + mock_agent.llm.parse_structured_output.assert_called_once_with( + mock_agent.llm.generate.return_value, DecisionOutput + ) + + def test_plan_no_prompt_error(self, mock_agent): + mock_agent.step_prompt = None + mock_agent.memory = Mock() + mock_agent.memory.get_prompt_ready.return_value = "memory1" + mock_agent.memory.get_communication_history.return_value = "" + + reasoning = DecisionReasoning(mock_agent) + obs = Observation(step=1, self_state={}, local_state={}) + + with pytest.raises( + ValueError, match=r"No prompt provided and agent.step_prompt is None" + ): + reasoning.plan(obs=obs) + + def test_aplan_async_version(self, llm_response_factory, mock_agent): + mock_agent.step_prompt = "Default step prompt" + mock_agent.memory = Mock() + mock_agent.memory.get_prompt_ready.return_value = "memory1" + mock_agent.memory.get_communication_history.return_value = "" + mock_agent.memory.aadd_to_memory = AsyncMock() + mock_agent.llm = Mock() + mock_agent.llm.parse_structured_output.return_value = DecisionOutput( + goal="Move closer to ally", + constraints=["One step per turn"], + known_facts=["An ally is east of the agent"], + unknowns=["Whether the east cell is contested"], + assumptions=["The ally remains in place this step"], + options=[ + DecisionOption( + name="move_east", + description="Move one cell east", + tradeoffs=["Improves coordination", "May increase exposure"], + score=0.74, + ) + ], + chosen_option="move_east", + rationale="It improves coordination with acceptable risk.", + confidence=0.69, + risks=["The east cell may be occupied"], + next_action="move_east", + ) + mock_agent.tool_manager = Mock() + mock_agent.tool_manager.get_all_tools_schema.return_value = {} + mock_agent._step_display_data = {} + + mock_agent.llm.agenerate = AsyncMock( + return_value=llm_response_factory( + content=json.dumps( + { + "goal": "Move closer to ally", + "constraints": ["One step per turn"], + "known_facts": ["An ally is east of the agent"], + "unknowns": ["Whether the east cell is contested"], + "assumptions": ["The ally remains in place this step"], + "options": [ + { + "name": "move_east", + "description": "Move one cell east", + "tradeoffs": [ + "Improves coordination", + "May increase exposure", + ], + "score": 0.74, + } + ], + "chosen_option": "move_east", + "rationale": "It improves coordination with acceptable risk.", + "confidence": 0.69, + "risks": ["The east cell may be occupied"], + "next_action": "move_east", + } + ) + ) + ) + + mock_plan = Plan(step=1, llm_plan=Mock()) + reasoning = DecisionReasoning(mock_agent) + reasoning.aexecute_tool_call = AsyncMock(return_value=mock_plan) + + obs = Observation(step=1, self_state={}, local_state={}) + result = asyncio.run(reasoning.aplan(obs=obs, ttl=4)) + + assert result == mock_plan + mock_agent.llm.agenerate.assert_called_once() + assert ( + mock_agent.llm.agenerate.call_args.kwargs["system_prompt"] + == reasoning.get_decision_system_prompt() + ) + mock_agent.llm.parse_structured_output.assert_called_once_with( + mock_agent.llm.agenerate.return_value, DecisionOutput + ) + reasoning.aexecute_tool_call.assert_awaited_once_with( + "move_east", + selected_tools=None, + ttl=4, + ) + assert mock_agent._step_display_data["plan_content"] == ( + "It improves coordination with acceptable risk." + ) + + def test_aplan_no_prompt_error(self, mock_agent): + mock_agent.step_prompt = None + mock_agent.memory = Mock() + mock_agent.memory.get_prompt_ready.return_value = "memory1" + mock_agent.memory.get_communication_history.return_value = "" + + reasoning = DecisionReasoning(mock_agent) + obs = Observation(step=1, self_state={}, local_state={}) + + with pytest.raises( + ValueError, match=r"No prompt provided and agent.step_prompt is None" + ): + asyncio.run(reasoning.aplan(obs=obs))