Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mesa_llm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
)
from .reasoning.reasoning import Observation, Plan
from .recording.record_model import record_model
from .tools.tool_decorator import requires
from .tools.tool_manager import ToolManager

# Enable automatic parallel stepping when mesa_llm is imported
Expand All @@ -20,6 +21,7 @@
"ToolManager",
"enable_automatic_parallel_stepping",
"record_model",
"requires",
"step_agents_parallel",
"step_agents_parallel_sync",
]
Expand Down
75 changes: 57 additions & 18 deletions mesa_llm/llm_agent.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any

from mesa.agent import Agent
from mesa.discrete_space import (
OrthogonalMooreGrid,
Expand Down Expand Up @@ -76,6 +78,48 @@ def __init__(

self.internal_state = internal_state

@property
def pos(self) -> Any:
"""Standardized access to the agent's position across all Mesa space types."""
# Check for discrete_space Cell
cell = getattr(self, "cell", None)
if cell is not None and hasattr(cell, "coordinate"):
return cell.coordinate
# Fall back to legacy Agent.pos (stored in __dict__ by our setter)
return self.__dict__.get("pos")

@pos.setter
def pos(self, value: Any):
"""Standardized setter for agent position."""
# For discrete_space, updating agent.cell is the primary way.
# If we are setting pos directly, we try to maintain legacy compatibility.
curr_cell = getattr(self, "cell", None)
if curr_cell is not None and value is None:
# Setting pos to None (e.g. during removal) should also clear cell
self.cell = None

# Always update the underlying attribute for legacy compatibility.
# We use __dict__ because Agent.pos is a simple attribute, not a property,
# so super().pos.fset won't work, and self.pos = value would recurse.
self.__dict__["pos"] = value

def move_to(self, target_coordinates: Any):
"""Standardized movement method for all Mesa space types."""
grid = getattr(self.model, "grid", None)
space = getattr(self.model, "space", None)

if isinstance(grid, SingleGrid | MultiGrid):
grid.move_agent(self, target_coordinates)
elif isinstance(grid, OrthogonalMooreGrid | OrthogonalVonNeumannGrid):
cell = grid._cells[tuple(target_coordinates)]
self.cell = cell
elif isinstance(space, ContinuousSpace):
space.move_agent(self, target_coordinates)
else:
raise ValueError(
f"Unsupported environment for move_to. Model has grid={type(grid)}, space={type(space)}"
)

def __str__(self):
return f"LLMAgent {self.unique_id}"

Expand Down Expand Up @@ -150,15 +194,7 @@ def _build_observation(self):
self_state = {
"agent_unique_id": self.unique_id,
"system_prompt": self.system_prompt,
"location": (
self.pos
if self.pos is not None
else (
getattr(self, "cell", None).coordinate
if getattr(self, "cell", None) is not None
else None
)
),
"location": self.pos,
"internal_state": self.internal_state,
}
if self.vision is not None and self.vision > 0:
Expand Down Expand Up @@ -205,16 +241,19 @@ def _build_observation(self):

local_state = {}
for i in neighbors:
# Use i.pos which redirects correctly if the neighbor is also an LLMAgent
# or use the same logic if it's a standard Agent
neighbor_pos = (
i.pos
if i.pos is not None
else (
getattr(i, "cell", None).coordinate
if getattr(i, "cell", None) is not None
else None
)
)
local_state[i.__class__.__name__ + " " + str(i.unique_id)] = {
"position": (
i.pos
if i.pos is not None
else (
getattr(i, "cell", None).coordinate
if getattr(i, "cell", None) is not None
else None
)
),
"position": neighbor_pos,
"internal_state": [
s for s in i.internal_state if not s.startswith("_")
],
Expand Down
64 changes: 23 additions & 41 deletions mesa_llm/tools/inbuilt_tools.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING

from mesa.discrete_space import (
OrthogonalMooreGrid,
Expand All @@ -10,7 +10,7 @@
SingleGrid,
)

from mesa_llm.tools.tool_decorator import tool
from mesa_llm.tools.tool_decorator import requires, tool

if TYPE_CHECKING:
from mesa_llm.llm_agent import LLMAgent
Expand Down Expand Up @@ -41,26 +41,18 @@
}


def _get_agent_position(agent: "LLMAgent") -> Any:
"""Return the agent position across Mesa space APIs."""
cell = getattr(agent, "cell", None)
if cell is not None and getattr(cell, "coordinate", None) is not None:
return cell.coordinate

pos = getattr(agent, "pos", None)
if pos is not None:
return pos

position = getattr(agent, "position", None)
if position is not None:
return position

raise ValueError(
"Could not infer agent position from `cell`, `pos`, or `position`."
)


@tool
@requires(
lambda agent: agent.pos is not None,
reason="Agent has no position",
)
@requires(
lambda agent: (
getattr(agent.model, "grid", None) is not None
or getattr(agent.model, "space", None) is not None
),
reason="Model has no grid or space",
)
def move_one_step(agent: "LLMAgent", direction: str) -> str:
"""
Moves agents one step in specified cardinal/diagonal directions (North, South, East, West, NorthEast, NorthWest, SouthEast, SouthWest). Automatically handles different Mesa grid types including SingleGrid, MultiGrid, OrthogonalGrids, and ContinuousSpace.
Expand All @@ -82,7 +74,7 @@ def move_one_step(agent: "LLMAgent", direction: str) -> str:

grid = getattr(agent.model, "grid", None)
if isinstance(grid, OrthogonalMooreGrid | OrthogonalVonNeumannGrid):
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move_one_step now unconditionally unpacks agent.pos (e.g., row, col = agent.pos). If agent.pos is None or not a 2-tuple, this will raise a TypeError instead of returning a clear error as before. Since @requires(...) is not enforced anywhere, this needs an explicit runtime check (raise ValueError with a helpful message) or the requirements mechanism needs to be wired into tool execution.

Suggested change
if isinstance(grid, OrthogonalMooreGrid | OrthogonalVonNeumannGrid):
if isinstance(grid, OrthogonalMooreGrid | OrthogonalVonNeumannGrid):
# Validate agent.pos before unpacking to avoid obscure TypeError.
if not isinstance(agent.pos, (tuple, list)) or len(agent.pos) != 2:
raise ValueError(
"Agent position must be a 2-tuple (row, col) for orthogonal grids; "
f"got: {agent.pos!r}"
)

Copilot uses AI. Check for mistakes.
row, col = _get_agent_position(agent)
row, col = agent.pos
drow, dcol = direction_map_row_col[direction]
new_pos = (row + drow, col + dcol)

Expand Down Expand Up @@ -121,7 +113,7 @@ def move_one_step(agent: "LLMAgent", direction: str) -> str:

if grid_or_space is not None:
dx, dy = direction_map_xy[direction]
x, y = _get_agent_position(agent)
x, y = agent.pos
new_pos = (x + dx, y + dy)

if grid_or_space.torus:
Expand Down Expand Up @@ -151,6 +143,13 @@ def move_one_step(agent: "LLMAgent", direction: str) -> str:


@tool
@requires(
lambda agent: (
getattr(agent.model, "grid", None) is not None
or getattr(agent.model, "space", None) is not None
),
reason="Model has no grid or space",
)
def teleport_to_location(
agent: "LLMAgent",
target_coordinates: list[int | float],
Expand All @@ -167,24 +166,7 @@ def teleport_to_location(

"""
target_coordinates = tuple(target_coordinates)

if isinstance(agent.model.grid, SingleGrid | MultiGrid):
agent.model.grid.move_agent(agent, target_coordinates)

elif isinstance(agent.model.grid, OrthogonalMooreGrid | OrthogonalVonNeumannGrid):
cell = agent.model.grid._cells[target_coordinates]
agent.cell = cell

elif isinstance(agent.model.space, ContinuousSpace):
agent.model.space.move_agent(agent, target_coordinates)

else:
raise ValueError(
"Unsupported environment for teleport_to_location. Expected "
"SingleGrid, MultiGrid, OrthogonalMooreGrid, "
"OrthogonalVonNeumannGrid, or ContinuousSpace."
)

agent.move_to(target_coordinates)
return f"agent {agent.unique_id} moved to {target_coordinates}."


Expand Down
47 changes: 47 additions & 0 deletions mesa_llm/tools/tool_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,28 @@
_TOOL_CALLBACKS: list[Callable[[Callable], None]] = []


def requires(precondition: Callable[[Any], bool], reason: str | None = None):
"""
Decorator to specify a precondition for a tool.

Args:
precondition: A function that takes an agent and returns True if the tool
is currently feasible for that agent, False otherwise.
reason: Optional human-readable explanation of why the tool is currently
unavailable. Used for logging and differentiated planning.
"""

def decorator(func: Callable):
if not hasattr(func, "__tool_requirements__"):
func.__tool_requirements__ = []
func.__tool_requirements__.append(
{"precondition": precondition, "reason": reason}
)
return func
Comment on lines +34 to +40
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requires stores preconditions on __tool_requirements__, but nothing in the codebase reads/enforces this attribute (e.g., ToolManager executes tools unconditionally). As a result, the new @requires usage in tools is currently a no-op and won't prevent invalid tool execution. Either integrate requirement checking into ToolManager (before calling the tool) and surface the optional reason, or remove requires to avoid a misleading public API.

Copilot uses AI. Check for mistakes.

return decorator


def add_tool_callback(callback: Callable[[Callable], None]):
"""Add a callback to be called when a new tool is registered"""
_TOOL_CALLBACKS.append(callback)
Expand Down Expand Up @@ -91,6 +113,27 @@ def _python_to_json_type(py_type: Any) -> dict[str, Any]:
"type": "array",
"items": _python_to_json_type(item_type),
}
elif base is dict:
# Handle dict[str, int]
if "," in inner_content:
parts = inner_content.split(",")
if len(parts) >= 2:
value_type_str = parts[1].strip()
# We assume key is string for LLM tools
type_mapping = {
"int": int,
"str": str,
"float": float,
"bool": bool,
}
value_type = type_mapping.get(value_type_str, str)
return {
"type": "object",
"additionalProperties": _python_to_json_type(
value_type
),
}
return {"type": "object"}

# Try to get the base type for simple cases
base_type = py_type.split("[")[0].strip()
Expand All @@ -105,6 +148,10 @@ def _python_to_json_type(py_type: Any) -> dict[str, Any]:
}
if base_type in type_mapping:
py_type = type_mapping[base_type]
else:
# If it's a string that doesn't match any known basic type,
# we default to string as per standard LLM tool practices.
return {"type": "string"}

except Exception:
# If parsing fails, default to string
Expand Down
Loading
Loading