Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion mesa_llm/parallel_stepping.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ async def step_agents_parallel(agents: list[Agent | LLMAgent]) -> None:
"""Step all agents in parallel using async/await."""
tasks = []
for agent in agents:
# Skip agents that are components of a Meta Agent
if getattr(agent, "is_component", False):
continue

if hasattr(agent, "astep"):
tasks.append(agent.astep())
elif hasattr(agent, "step"):
Expand All @@ -41,6 +45,10 @@ def step_agents_multithreaded(agents: list[Agent | LLMAgent]) -> None:
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for agent in agents:
# Skip agents that are components of a Meta Agent
if getattr(agent, "is_component", False):
continue

if hasattr(agent, "astep"):
# run async steps in the event loop in a thread
futures.append(
Expand Down Expand Up @@ -75,6 +83,13 @@ def step_agents_parallel_sync(agents: list[Agent | LLMAgent]) -> None:

# Patch Mesa's shuffle_do for automatic parallel detection
_original_shuffle_do = AgentSet.shuffle_do
try:
from mesa.agentset import _HardKeyAgentSet

_original_hardkey_shuffle_do = _HardKeyAgentSet.shuffle_do
except ImportError:
_HardKeyAgentSet = None
_original_hardkey_shuffle_do = None


def _enhanced_shuffle_do(self, method: str, *args, **kwargs):
Expand All @@ -84,7 +99,11 @@ def _enhanced_shuffle_do(self, method: str, *args, **kwargs):
if hasattr(agent, "model") and getattr(agent.model, "parallel_stepping", False):
step_agents_parallel_sync(list(self))
return
_original_shuffle_do(self, method, *args, **kwargs)

if _HardKeyAgentSet and isinstance(self, _HardKeyAgentSet):
_original_hardkey_shuffle_do(self, method, *args, **kwargs)
else:
_original_shuffle_do(self, method, *args, **kwargs)


def enable_automatic_parallel_stepping(mode: str = "asyncio"):
Expand All @@ -94,11 +113,15 @@ def enable_automatic_parallel_stepping(mode: str = "asyncio"):
raise ValueError("mode must be either 'asyncio' or 'threading'")
_PARALLEL_STEPPING_MODE = mode
AgentSet.shuffle_do = _enhanced_shuffle_do
if _HardKeyAgentSet:
_HardKeyAgentSet.shuffle_do = _enhanced_shuffle_do


def disable_automatic_parallel_stepping():
"""Restore original shuffle_do behavior."""
AgentSet.shuffle_do = _original_shuffle_do
if _HardKeyAgentSet:
_HardKeyAgentSet.shuffle_do = _original_hardkey_shuffle_do


# --- Monkey-patch AgentSet with do_async for async parallel method calls ---
Expand All @@ -114,6 +137,10 @@ def _agentset_do_async(self, method: str, *args, **kwargs):
async def _run():
tasks = []
for agent in self:
# Skip agents that are components of a Meta Agent
if getattr(agent, "is_component", False):
continue

fn = getattr(agent, method, None)
if fn is not None and asyncio.iscoroutinefunction(fn):
tasks.append(fn(*args, **kwargs))
Expand Down
84 changes: 83 additions & 1 deletion tests/test_parallel_stepping.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest
from mesa.agent import Agent, AgentSet
from mesa.experimental.meta_agents.meta_agent import MetaAgent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if MetaAgent is documented or if there's a more stable import path
rg -n "MetaAgent" --type py -g '!tests/*' | head -20

Repository: mesa/mesa-llm

Length of output: 39


🏁 Script executed:

# Find where MetaAgent is defined
rg -n "class MetaAgent" --type py

# Check directory structure of meta_agents module
fd -t d "meta_agents" 

# Look for __init__ files that might expose MetaAgent
fd -t f "__init__.py" | xargs rg -l "MetaAgent"

Repository: mesa/mesa-llm

Length of output: 41


🏁 Script executed:

# Simple search for MetaAgent class definition
rg "class MetaAgent" --type py

# Search in the entire codebase for the file
fd "meta_agent.py"

# List the experimental directory if it exists
fd -t d "experimental"

Repository: mesa/mesa-llm

Length of output: 39


🏁 Script executed:

# List main directories
git ls-files | head -50

# Check if experimental directory exists at all
git ls-files | grep -i experimental

# Read the test file to understand its context
head -30 tests/test_parallel_stepping.py

Repository: mesa/mesa-llm

Length of output: 2033


🏁 Script executed:

# Read the full test file to see how MetaAgent is used
cat -n tests/test_parallel_stepping.py | head -100

# Check for mesa version requirements or documentation
fd -t f "requirements.txt|setup.py|pyproject.toml|setup.cfg" -x cat

# Look for any documentation about MetaAgent or experimental APIs
fd -t f "*.md" -x grep -l "MetaAgent\|experimental" 2>/dev/null

Repository: mesa/mesa-llm

Length of output: 6625


🏁 Script executed:

# Read the rest of the test file
tail -n +100 tests/test_parallel_stepping.py | head -100

# Search for any Mesa-related documentation about experimental APIs
find . -name "*.md" -type f -exec grep -l "experimental\|MetaAgent" {} \;

# Check if there are any comments in the codebase about API stability
rg "experimental\|deprecated\|unstable" --type py

Repository: mesa/mesa-llm

Length of output: 2288


Using experimental API in tests—consider version pinning if MetaAgent becomes required.

The import path mesa.experimental.meta_agents.meta_agent.MetaAgent indicates this uses Mesa's experimental namespace. Tests depend on specific MetaAgent features (is_component attribute, remove_constituting_agents() method). With the flexible version constraint mesa[rec]>=3.0, future Mesa releases could relocate or modify this API, breaking these tests. If MetaAgent becomes essential, consider pinning a minimum Mesa version once its API stabilizes, or switch to a stable import path if one becomes available.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_parallel_stepping.py` at line 5, Tests currently import the
experimental MetaAgent API (MetaAgent, and rely on its is_component attribute
and remove_constituting_agents method) which may move or change in future Mesa
releases; update the test to either (a) add a guarded import with a clear
fallback/skip (try importing MetaAgent from
mesa.experimental.meta_agents.meta_agent and skip the tests with a helpful
message if ImportError/AttributeError occurs), or (b) pin a minimum Mesa version
in the test requirements/CI config once the API is required/stable so the import
path remains valid and features (is_component, remove_constituting_agents)
exist.

from mesa.model import Model

from mesa_llm.parallel_stepping import (
Expand All @@ -15,7 +16,7 @@

class DummyModel(Model):
def __init__(self):
super().__init__(seed=42)
super().__init__(rng=42)
self.parallel_stepping = False


Expand Down Expand Up @@ -99,3 +100,84 @@ async def wrapper():
asyncio.run(wrapper())
assert a1.counter == 1
assert a2.counter == 1


@pytest.fixture(autouse=True)
def manage_parallel_stepping_patch():
# Helper to clean up parallel stepping state if tests fail
yield
disable_automatic_parallel_stepping()


# --- Meta Agent Parallel Conflict Tests ---


class ConflictWorker(Agent):
def __init__(self, model):
super().__init__(model)
self.step_count = 0

def step(self):
self.step_count += 1


class ConflictManager(MetaAgent):
def step(self):
for agent in self.agents:
agent.step()


class ConflictBusinessModel(Model):
def __init__(self):
super().__init__(rng=42)
self.parallel_stepping = True
self.worker = ConflictWorker(self)
self.manager = ConflictManager(self, agents={self.worker})

def step(self):
self.agents.shuffle_do("step")


def test_meta_agent_parallel_conflict_fix():
"""
Test that a constituent agent is only stepped ONCE when
parallel stepping is enabled, because it is skipped by
the scheduler and only stepped by its MetaAgent.
"""
enable_automatic_parallel_stepping("asyncio")
model = ConflictBusinessModel()

# Run one step
model.step()

# Assert worker was only stepped once
# If the fix failed, this would be 2
assert model.worker.step_count == 1


def test_meta_agent_multithreaded_conflict_fix():
"""Test same logic but with multithreaded mode."""
disable_automatic_parallel_stepping()
enable_automatic_parallel_stepping("threading")

model = ConflictBusinessModel()
model.step()

assert model.worker.step_count == 1


def test_agent_becomes_independent_again():
"""Test that an agent removed from MetaAgent is stepped by scheduler again."""
enable_automatic_parallel_stepping("asyncio")
model = ConflictBusinessModel()

# 1. Initially it's a component
assert model.worker.is_component is True

# 2. Remove it from manager
model.manager.remove_constituting_agents({model.worker})
assert model.worker.is_component is False

# 3. Model step should now step the worker directly
model.step()
assert model.worker.step_count == 1
Loading