Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mesa_llm/recording/record_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def init_wrapper(self: "Model", *args, **kwargs): # type: ignore[override]
def _auto_save():
try:
# Avoid creating multiple identical files if already saved manually
if hasattr(self, "recorder") and self.recorder.events:
if hasattr(self, "recorder") and self.recorder.has_recorded_events:
self.save_recording()
except Exception: # pragma: no cover - defensive
logger.exception("SimulationRecorder auto-save failed")
Expand Down
152 changes: 129 additions & 23 deletions mesa_llm/recording/simulation_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import logging
import pickle
import uuid
from collections import defaultdict
from dataclasses import asdict, dataclass
from datetime import UTC, datetime
from pathlib import Path
Expand Down Expand Up @@ -61,6 +62,8 @@ def __init__(
output_dir: str = "recordings",
record_state_changes: bool = True,
auto_save_interval: int | None = None,
storage_mode: str = "memory",
max_events_in_memory: int | None = None,
):
"""
Initialize the simulation recorder.
Expand All @@ -70,34 +73,114 @@ def __init__(
- **output_dir** (*str*) - Directory for saving recordings (default: "recordings")
- **record_state_changes** (*bool*) - Whether to track agent state changes (default: True)
- **auto_save_interval** (*int | None*) - Automatic save frequency in events (default: None)
- **storage_mode** (*str*) - "memory" to keep all events in memory, or "jsonl" to stream events to disk while retaining only an optional in-memory window
- **max_events_in_memory** (*int | None*) - Maximum number of recent events to retain in memory when using streaming mode. If None, no in-memory event window is kept in streaming mode
"""

self.model = model
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
if storage_mode not in {"memory", "jsonl"}:
raise ValueError("storage_mode must be either 'memory' or 'jsonl'")
if max_events_in_memory is not None and max_events_in_memory < 0:
raise ValueError("max_events_in_memory must be >= 0")

# Recording configuration
self.record_state_changes = record_state_changes
self.auto_save_interval = auto_save_interval
self.storage_mode = storage_mode
self.max_events_in_memory = max_events_in_memory

# Internal state
self.events: list[SimulationEvent] = []
self.simulation_id = str(uuid.uuid4())[:8]
self.start_time = datetime.now(UTC)
self.total_events_recorded = 0

# Agent state tracking for change detection
self.previous_agent_states: dict[int, dict[str, Any]] = {}
self.agent_summaries: dict[int, dict[str, Any]] = defaultdict(
lambda: {
"total_events": 0,
"event_types": set(),
"active_steps": set(),
"first_event": None,
"last_event": None,
}
)
self.unique_agent_ids: set[int] = set()
self.recorded_event_types: set[str] = set()

# Auto-save counter
self.events_since_save = 0

# Optional streaming path for unbounded runs
self.events_stream_path: Path | None = None
if self.storage_mode == "jsonl":
self.events_stream_path = (
self.output_dir / f"simulation_{self.simulation_id}_events.jsonl"
)
self.events_stream_path.touch()

# Initialize simulation metadata
self.simulation_metadata = {
"simulation_id": self.simulation_id,
"start_time": self.start_time.isoformat(),
"model_class": self.model.__class__.__name__,
"storage_mode": self.storage_mode,
}

@property
def has_recorded_events(self) -> bool:
"""Whether the recorder has captured any events."""
return self.total_events_recorded > 0

def _serialize_event(self, event: SimulationEvent) -> dict[str, Any]:
"""Convert an event to a JSON-safe dictionary."""
serialized = asdict(event)
serialized["timestamp"] = event.timestamp.isoformat()
return serialized

def _deserialize_event(self, data: dict[str, Any]) -> SimulationEvent:
"""Convert serialized event data back into a SimulationEvent."""
return SimulationEvent(
event_id=data["event_id"],
timestamp=datetime.fromisoformat(data["timestamp"]),
step=data["step"],
agent_id=data["agent_id"],
event_type=data["event_type"],
content=data["content"],
metadata=data["metadata"],
)

def _iter_all_events(self):
"""Iterate over all recorded events, including streamed events."""
if self.storage_mode == "jsonl":
if self.events_stream_path is None:
return
with open(self.events_stream_path) as f:
for line in f:
if line.strip():
yield self._deserialize_event(json.loads(line))
return

yield from self.events

def _update_agent_summary(self, event: SimulationEvent):
"""Maintain agent-level summary data without depending on full event history."""
if event.agent_id is None:
return

self.unique_agent_ids.add(event.agent_id)
summary = self.agent_summaries[event.agent_id]
summary["total_events"] += 1
summary["event_types"].add(event.event_type)
summary["active_steps"].add(event.step)
timestamp = event.timestamp.isoformat()
if summary["first_event"] is None:
summary["first_event"] = timestamp
summary["last_event"] = timestamp

def record_event(
self,
event_type: str,
Expand Down Expand Up @@ -135,7 +218,7 @@ def record_event(
formatted_content = {"data": content}

# Create the event
event_id = f"{self.simulation_id}_{len(self.events):06d}"
event_id = f"{self.simulation_id}_{self.total_events_recorded:06d}"

event = SimulationEvent(
event_id=event_id,
Expand All @@ -147,15 +230,33 @@ def record_event(
metadata=metadata,
)

self.events.append(event)
self.total_events_recorded += 1
self.recorded_event_types.add(event.event_type)
self._update_agent_summary(event)

if self.storage_mode == "jsonl":
if self.events_stream_path is None:
raise RuntimeError("events_stream_path is not initialized")
with open(self.events_stream_path, "a") as f:
json.dump(self._serialize_event(event), f)
f.write("\n")
if self.max_events_in_memory:
self.events.append(event)
if len(self.events) > self.max_events_in_memory:
self.events.pop(0)
else:
self.events.append(event)

self.events_since_save += 1

# Auto-save if configured
if (
self.auto_save_interval
and self.events_since_save >= self.auto_save_interval
):
filename = f"autosave_{self.simulation_id}_{len(self.events)}.json"
filename = (
f"autosave_{self.simulation_id}_{self.total_events_recorded}.json"
)
self.save(filename)
self.events_since_save = 0

Expand All @@ -170,15 +271,19 @@ def record_model_event(self, event_type: str, content: dict[str, Any]):

def get_agent_events(self, agent_id: int) -> list[SimulationEvent]:
"""Get all events for a specific agent."""
return [event for event in self.events if event.agent_id == agent_id]
return [
event for event in self._iter_all_events() if event.agent_id == agent_id
]

def get_events_by_type(self, event_type: str) -> list[SimulationEvent]:
"""Get all events of a specific type."""
return [event for event in self.events if event.event_type == event_type]
return [
event for event in self._iter_all_events() if event.event_type == event_type
]

def get_events_by_step(self, step: int) -> list[SimulationEvent]:
"""Get all events from a specific simulation step."""
return [event for event in self.events if event.step == step]
return [event for event in self._iter_all_events() if event.step == step]

def export_agent_memory(self, agent_id: int) -> dict[str, Any]:
"""Export agent memory state for external analysis."""
Expand Down Expand Up @@ -221,7 +326,7 @@ def save(self, filename: str | None = None, format: str = "json"):
{
"end_time": datetime.now(UTC).isoformat(),
"total_steps": self.model.steps,
"total_events": len(self.events),
"total_events": self.total_events_recorded,
"total_agents": len(self.model.agents),
"duration_minutes": (
datetime.now(UTC) - self.start_time
Expand Down Expand Up @@ -254,25 +359,29 @@ def save(self, filename: str | None = None, format: str = "json"):
)
),
"final_step": self.model.steps,
"total_events": len(self.events),
"total_events": self.total_events_recorded,
},
)

events = list(self._iter_all_events())
self.simulation_metadata["total_events"] = len(events)

# Prepare export data
export_data = {
"metadata": self.simulation_metadata,
"events": [asdict(event) for event in self.events],
"events": [self._serialize_event(event) for event in events],
"agent_summaries": {
agent_id: self.export_agent_memory(agent_id)["summary"]
for agent_id in {
event.agent_id
for event in self.events
if event.agent_id is not None
str(agent_id): {
"total_events": summary["total_events"],
"event_types": sorted(summary["event_types"]),
"active_steps": sorted(summary["active_steps"]),
"first_event": summary["first_event"],
"last_event": summary["last_event"],
}
for agent_id, summary in self.agent_summaries.items()
},
}

# Save based on format
if format == "json":
with open(filepath, "w") as f:
json.dump(export_data, f, indent=2, default=str)
Expand All @@ -285,20 +394,17 @@ def save(self, filename: str | None = None, format: str = "json"):

def get_stats(self) -> dict[str, Any]:
"""Get recording statistics."""
agent_ids = {
event.agent_id for event in self.events if event.agent_id is not None
}

return {
"total_events": len(self.events),
"unique_agents": len(agent_ids),
"event_types": list({event.event_type for event in self.events}),
"total_events": self.total_events_recorded,
"unique_agents": len(self.unique_agent_ids),
"event_types": sorted(self.recorded_event_types),
"simulation_steps": self.model.steps,
"recording_duration_minutes": (
datetime.now(UTC) - self.start_time
).total_seconds()
/ 60,
"events_per_agent": {
agent_id: len(self.get_agent_events(agent_id)) for agent_id in agent_ids
agent_id: summary["total_events"]
for agent_id, summary in self.agent_summaries.items()
},
}
70 changes: 70 additions & 0 deletions tests/test_recording/test_simulation_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@ def test_initialization(self, mock_model, temp_dir):
assert recorder.simulation_metadata["model_class"] == "TestModel"
assert "start_time" in recorder.simulation_metadata

def test_initialization_jsonl_mode(self, mock_model, temp_dir):
"""Test recorder initialization in streaming mode."""
recorder = SimulationRecorder(
model=mock_model,
output_dir=str(temp_dir),
storage_mode="jsonl",
max_events_in_memory=2,
)

assert recorder.storage_mode == "jsonl"
assert recorder.max_events_in_memory == 2
assert recorder.events == []
assert recorder.events_stream_path == (
temp_dir / f"simulation_{recorder.simulation_id}_events.jsonl"
)
assert recorder.events_stream_path.exists()
assert recorder.simulation_metadata["storage_mode"] == "jsonl"

def test_record_event_basic(self, recorder):
"""Test recording a basic event."""
recorder.record_event(
Expand Down Expand Up @@ -278,6 +296,58 @@ def test_save_pickle_format(self, recorder, temp_dir):
assert "events" in data
assert "agent_summaries" in data

def test_streaming_mode_limits_in_memory_events(self, mock_model, temp_dir):
"""Test streaming mode keeps full history while capping in-memory events."""
recorder = SimulationRecorder(
model=mock_model,
output_dir=str(temp_dir),
storage_mode="jsonl",
max_events_in_memory=2,
)

mock_model.steps = 1
recorder.record_event("observation", {"data": "obs1"}, agent_id=123)
mock_model.steps = 2
recorder.record_event("action", {"data": "act1"}, agent_id=123)
mock_model.steps = 3
recorder.record_event("observation", {"data": "obs2"}, agent_id=456)

assert recorder.total_events_recorded == 3
assert len(recorder.events) == 2
assert [event.step for event in recorder.events] == [2, 3]

all_events = list(recorder._iter_all_events())
assert len(all_events) == 3
assert [event.step for event in all_events] == [1, 2, 3]
assert len(recorder.get_agent_events(123)) == 2
assert recorder.get_stats()["total_events"] == 3

def test_save_json_format_streaming_mode(self, mock_model, temp_dir):
"""Test saving JSON recording in streaming mode exports full history."""
recorder = SimulationRecorder(
model=mock_model,
output_dir=str(temp_dir),
storage_mode="jsonl",
max_events_in_memory=1,
)
mock_model.steps = 5
mock_model.agents = [Mock(), Mock()]
mock_model.max_steps = 10

recorder.record_event("test_event", {"data": "test1"}, agent_id=123)
recorder.record_event("test_event", {"data": "test2"}, agent_id=456)

filepath = recorder.save(filename="streamed_recording.json", format="json")

with open(filepath) as f:
data = json.load(f)

assert filepath == temp_dir / "streamed_recording.json"
assert data["metadata"]["storage_mode"] == "jsonl"
assert len(data["events"]) == 3
assert data["agent_summaries"]["123"]["total_events"] == 1
assert data["agent_summaries"]["456"]["total_events"] == 1

def test_save_auto_filename(self, recorder, temp_dir):
"""Test auto-generating filename when saving."""
# Add max_steps to mock model
Expand Down