Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 275 additions & 0 deletions hell
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
diff --cc tests/test_tools/test_inbuilt_tools.py
index fce291c,7a9c367..0000000
--- a/tests/test_tools/test_inbuilt_tools.py
+++ b/tests/test_tools/test_inbuilt_tools.py
@@@ -3,9 -3,9 +3,10 @@@ from __future__ import annotation
from types import SimpleNamespace

import pytest
 -from mesa.discrete_space import OrthogonalMooreGrid, OrthogonalVonNeumannGrid
 -from mesa.space import ContinuousSpace, MultiGrid, SingleGrid
 +from mesa.discrete_space import OrthogonalMooreGrid
 +from mesa.experimental.continuous_space import ContinuousSpace
 +
+ 
from mesa_llm.tools.inbuilt_tools import (
move_one_step,
speak_to,
@@@ -155,3 -329,256 +334,257 @@@ def test_move_one_step_on_continuousspa

assert agent.pos == (2.0, 3.0)
assert result == "agent 6 moved to (2.0, 3.0)."
+ 
+ 
+ def test_move_one_step_boundary_on_continuousspace():
+ model = DummyModel()
+ model.grid = None
+ model.space = ContinuousSpace(x_max=10.0, y_max=10.0, torus=False)
+ 
+ agent = DummyAgent(unique_id=30, model=model)
+ model.agents.append(agent)
+ model.space.place_agent(agent, (2.0, 9.0))
+ 
+ result = move_one_step(agent, "North")
+ 
+ assert agent.pos == (2.0, 9.0)
+ assert "boundary" in result.lower()
+ assert "North" in result
+ 
+ 
+ def test_move_one_step_torus_wrap_on_continuousspace():
+ model = DummyModel()
+ model.grid = None
+ model.space = ContinuousSpace(x_max=10.0, y_max=10.0, torus=True)
+ 
+ agent = DummyAgent(unique_id=31, model=model)
+ model.agents.append(agent)
+ model.space.place_agent(agent, (2.0, 9.0))
+ 
+ result = move_one_step(agent, "North")
+ 
+ assert agent.pos == (2.0, 0.0)
+ assert result == "agent 31 moved to (2.0, 0.0)."
+ 
+ 
+ def test_move_one_step_boundary_singlegrid_north():
+ """Agent at top edge of SingleGrid trying to go North gets a clear message."""
+ model = DummyModel()
+ model.grid = SingleGrid(width=5, height=5, torus=False)
+ 
+ agent = DummyAgent(unique_id=20, model=model)
+ model.agents.append(agent)
+ model.grid.place_agent(agent, (2, 4)) # y=4 is the top edge
+ 
+ result = move_one_step(agent, "North")
+ 
+ # agent should not have moved
+ assert agent.pos == (2, 4)
+ assert "boundary" in result.lower()
+ assert "North" in result
+ 
+ 
+ def test_move_one_step_torus_wrap_singlegrid_north():
+ model = DummyModel()
+ model.grid = SingleGrid(width=5, height=5, torus=True)
+ 
+ agent = DummyAgent(unique_id=23, model=model)
+ model.agents.append(agent)
+ model.grid.place_agent(agent, (2, 4))
+ 
+ result = move_one_step(agent, "North")
+ 
+ assert agent.pos == (2, 0)
+ assert result == "agent 23 moved to (2, 0)."
+ 
+ 
+ def test_move_one_step_boundary_multigrid_west():
+ """Agent at left edge of MultiGrid trying to go West gets a clear message."""
+ model = DummyModel()
+ model.grid = MultiGrid(width=5, height=5, torus=False)
+ 
+ agent = DummyAgent(unique_id=21, model=model)
+ model.agents.append(agent)
+ model.grid.place_agent(agent, (0, 2)) # x=0 is the left edge
+ 
+ result = move_one_step(agent, "West")
+ 
+ assert agent.pos == (0, 2)
+ assert "boundary" in result.lower()
+ assert "West" in result
+ 
+ 
+ def test_move_one_step_torus_wrap_multigrid_west():
+ model = DummyModel()
+ model.grid = MultiGrid(width=5, height=5, torus=True)
+ 
+ agent = DummyAgent(unique_id=24, model=model)
+ model.agents.append(agent)
+ model.grid.place_agent(agent, (0, 2))
+ 
+ result = move_one_step(agent, "West")
+ 
+ assert agent.pos == (4, 2)
+ assert result == "agent 24 moved to (4, 2)."
+ 
+ 
+ def test_move_one_step_singlegrid_occupied_target():
+ model = DummyModel()
+ model.grid = SingleGrid(width=5, height=5, torus=False)
+ 
+ moving_agent = DummyAgent(unique_id=25, model=model)
+ blocking_agent = DummyAgent(unique_id=26, model=model)
+ model.agents.extend([moving_agent, blocking_agent])
+ model.grid.place_agent(moving_agent, (2, 2))
+ model.grid.place_agent(blocking_agent, (2, 3))
+ 
+ result = move_one_step(moving_agent, "North")
+ 
+ assert moving_agent.pos == (2, 2)
+ assert blocking_agent.pos == (2, 3)
+ assert "occupied" in result.lower()
+ assert "North" in result
+ 
+ 
+ def test_move_one_step_boundary_orthogonal_grid():
+ """Agent at edge of OrthogonalMooreGrid with no cell in that direction gets a clear message."""
+ 
+ class _DummyOrthogonalGrid(OrthogonalMooreGrid):
+ pass
+ 
+ orth_grid = object.__new__(_DummyOrthogonalGrid)
+ orth_grid.torus = False
+ orth_grid.dimensions = (5, 5)
+ start = (0, 1)
+ start_cell = SimpleNamespace(
+ coordinate=start, agents=[], connections={}, is_full=False
+ )
+ orth_grid._cells = {start: start_cell}
+ 
+ model = DummyModel()
+ model.grid = orth_grid
+ 
+ agent = DummyAgent(unique_id=22, model=model)
+ agent.cell = start_cell
+ model.agents.append(agent)
+ 
+ result = move_one_step(agent, "North")
+ 
+ # cell should be unchanged
+ assert agent.cell is start_cell
+ assert "boundary" in result.lower()
+ assert "North" in result
+ 
+ 
+ def test_move_one_step_boundary_orthogonal_torus_missing_wrapped_cell():
+ class _DummyOrthogonalGrid(OrthogonalMooreGrid):
+ pass
+ 
+ orth_grid = object.__new__(_DummyOrthogonalGrid)
+ orth_grid.torus = True
+ orth_grid.dimensions = (3, 3)
+ start = (0, 0)
+ start_cell = SimpleNamespace(coordinate=start, agents=[], is_full=False)
+ # Wrapped target for North would be (2, 0), but it is intentionally absent.
+ orth_grid._cells = {start: start_cell}
+ 
+ model = DummyModel()
+ model.grid = orth_grid
+ 
+ agent = DummyAgent(unique_id=38, model=model)
+ agent.cell = start_cell
+ model.agents.append(agent)
+ 
+ result = move_one_step(agent, "North")
+ 
+ assert agent.cell is start_cell
+ assert "boundary" in result.lower()
+ assert "North" in result
+ 
+ 
+ def test_move_one_step_full_target_orthogonal_grid():
+ class _DummyOrthogonalGrid(OrthogonalMooreGrid):
+ pass
+ 
+ orth_grid = object.__new__(_DummyOrthogonalGrid)
+ orth_grid.torus = False
+ orth_grid.dimensions = (5, 5)
+ start = (1, 1)
+ end = (0, 1)
+ start_cell = SimpleNamespace(
+ coordinate=start, agents=[], connections={}, is_full=False
+ )
+ full_target_cell = SimpleNamespace(
+ coordinate=end,
+ agents=[SimpleNamespace(unique_id=99)],
+ connections={},
+ is_full=True,
+ )
+ start_cell.connections[(-1, 0)] = full_target_cell
+ orth_grid._cells = {start: start_cell, end: full_target_cell}
+ 
+ model = DummyModel()
+ model.grid = orth_grid
+ 
+ agent = DummyAgent(unique_id=27, model=model)
+ agent.cell = start_cell
+ model.agents.append(agent)
+ 
+ result = move_one_step(agent, "North")
+ 
+ assert agent.cell is start_cell
+ assert "full" in result.lower()
+ assert "North" in result
+ 
+ 
+ def test_move_one_step_diagonal_on_orthogonal_vonneumann_grid():
+ class _DummyOrthogonalVonNeumannGrid(OrthogonalVonNeumannGrid):
+ pass
+ 
+ orth_grid = object.__new__(_DummyOrthogonalVonNeumannGrid)
+ orth_grid.torus = False
+ orth_grid.dimensions = (5, 5)
+ start = (2, 2)
+ end = (1, 3) # NorthEast
+ start_cell = SimpleNamespace(coordinate=start, agents=[], is_full=False)
+ end_cell = SimpleNamespace(coordinate=end, agents=[], is_full=False)
+ orth_grid._cells = {start: start_cell, end: end_cell}
+ 
+ model = DummyModel()
+ model.grid = orth_grid
+ 
+ agent = DummyAgent(unique_id=28, model=model)
+ agent.cell = start_cell
+ model.agents.append(agent)
+ 
+ result = move_one_step(agent, "NorthEast")
+ 
+ assert agent.cell is end_cell
+ assert result == "agent 28 moved to (1, 3)."
+ 
+ 
+ def test_move_one_step_torus_wrap_orthogonal_grid():
+ class _DummyOrthogonalGrid(OrthogonalMooreGrid):
+ pass
+ 
+ orth_grid = object.__new__(_DummyOrthogonalGrid)
+ orth_grid.torus = True
+ orth_grid.dimensions = (3, 3)
+ start = (0, 0)
+ end = (2, 2) # NorthWest wraps on torus
+ start_cell = SimpleNamespace(coordinate=start, agents=[], is_full=False)
+ wrapped_cell = SimpleNamespace(coordinate=end, agents=[], is_full=False)
+ orth_grid._cells = {start: start_cell, end: wrapped_cell}
+ 
+ model = DummyModel()
+ model.grid = orth_grid
+ 
+ agent = DummyAgent(unique_id=29, model=model)
+ agent.cell = start_cell
+ model.agents.append(agent)
+ 
+ result = move_one_step(agent, "NorthWest")
+ 
+ assert agent.cell is wrapped_cell
+ assert result == "agent 29 moved to (2, 2)."
++
49 changes: 23 additions & 26 deletions mesa_llm/llm_agent.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import math

from mesa.agent import Agent
from mesa.discrete_space import (
OrthogonalMooreGrid,
OrthogonalVonNeumannGrid,
)
from mesa.experimental.continuous_space import ContinuousSpace
from mesa.model import Model
from mesa.space import (
ContinuousSpace,
MultiGrid,
SingleGrid,
)

from mesa_llm import Plan
from mesa_llm.memory.st_lt_memory import STLTMemory
Expand Down Expand Up @@ -151,8 +149,8 @@ def _build_observation(self):
"agent_unique_id": self.unique_id,
"system_prompt": self.system_prompt,
"location": (
self.pos
if self.pos is not None
getattr(self, "pos", None)
if getattr(self, "pos", None) is not None
else (
getattr(self, "cell", None).coordinate
if getattr(self, "cell", None) is not None
Expand All @@ -166,31 +164,30 @@ def _build_observation(self):
grid = getattr(self.model, "grid", None)
space = getattr(self.model, "space", None)

if grid and isinstance(grid, SingleGrid | MultiGrid):
neighbors = grid.get_neighbors(
tuple(self.pos),
moore=True,
include_center=False,
radius=self.vision,
)
elif grid and isinstance(
if grid and isinstance(
grid, OrthogonalMooreGrid | OrthogonalVonNeumannGrid
):
agent_cell = next(
(cell for cell in grid.all_cells if self in cell.agents),
None,
)
agent_cell = getattr(self, "cell", None)
if agent_cell:
neighborhood = agent_cell.get_neighborhood(radius=self.vision)
neighbors = [a for cell in neighborhood for a in cell.agents]
neighbors = [
a
for cell in neighborhood
for a in list(cell.agents)
if a is not self
]
else:
neighbors = []

elif space and isinstance(space, ContinuousSpace):
all_nearby = space.get_neighbors(
self.pos, radius=self.vision, include_center=True
)
neighbors = [a for a in all_nearby if a is not self]
neighbors = [
a
for a in self.model.agents
if a is not self
and getattr(a, "pos", None) is not None
and getattr(self, "pos", None) is not None
and math.dist(self.pos, a.pos) <= self.vision
]

else:
# No recognized grid/space type
Expand Down Expand Up @@ -227,7 +224,7 @@ async def agenerate_obs(self) -> Observation:
construction logic, stores it in the agent's memory module using
async memory operations, and returns it as an Observation instance.
"""
step = self.model.steps
step = self.model.step
self_state, local_state = self._build_observation()
await self.memory.aadd_to_memory(
type="observation",
Expand All @@ -245,7 +242,7 @@ def generate_obs(self) -> Observation:
builder, stores the resulting observation in the agent's memory module,
and returns it as an Observation instance.
"""
step = self.model.steps
step = self.model.step
self_state, local_state = self._build_observation()
# Add to memory (memory handles its own display separately)
self.memory.add_to_memory(
Expand Down
4 changes: 2 additions & 2 deletions mesa_llm/recording/record_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def _auto_save():
def step_wrapper(self: "Model", *args, **kwargs): # type: ignore[override]
# Record beginning of step
if hasattr(self, "recorder"):
self.recorder.record_model_event("step_start", {"step": self.steps}) # type: ignore[attr-defined]
self.recorder.record_model_event("step_start", {"step": self.step}) # type: ignore[attr-defined]

# Execute the original step logic
result = original_step(self, *args, **kwargs) # type: ignore[misc]
Expand All @@ -110,7 +110,7 @@ def step_wrapper(self: "Model", *args, **kwargs): # type: ignore[override]
if hasattr(self, "recorder"):
_attach_recorder_to_agents(self, self.recorder) # type: ignore[attr-defined]
# Record end of step after agents have acted
self.recorder.record_model_event("step_end", {"step": self.steps}) # type: ignore[attr-defined]
self.recorder.record_model_event("step_end", {"step": self.step}) # type: ignore[attr-defined]

return result

Expand Down
Loading