Skip to content
162 changes: 128 additions & 34 deletions mesa_llm/memory/episodic_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,60 @@ class EventGrade(BaseModel):
grade: int


def normalize_dict_values(scores: dict, min_target: float, max_target: float) -> dict:
"""
Normalize dictionary values to a target range with min-max scaling.

This mirrors the min-max helper used in the Generative Agents reference
retrieval implementation:
https://github.com/joonspk-research/generative_agents/blob/main/reverie/backend_server/persona/cognitive_modules/retrieve.py
"""
if not scores:
return {}

vals = list(scores.values())
min_val = min(vals)
max_val = max(vals)

range_val = max_val - min_val

if range_val == 0:
midpoint = (max_target - min_target) / 2 + min_target
for key in scores:
scores[key] = midpoint
else:
for key, val in scores.items():
scores[key] = (val - min_val) * (
max_target - min_target
) / range_val + min_target

return scores


class EpisodicMemory(Memory):
"""
Stores memories based on event importance scoring. Each new memory entry is evaluated by a LLM
for its relevance and importance (1-5 scale) relative to the agent's current task and previous
experiences. Based on a Stanford/DeepMind paper:
[Generative Agents: Interactive Simulacra of Human Behavior](https://arxiv.org/pdf/2304.03442)
Event-level memory with LLM-based importance scoring and recency-aware retrieval.

Credit / references:
- Paper: Generative Agents: Interactive Simulacra of Human Behavior
https://arxiv.org/abs/2304.03442
- Reference retrieval code:
https://github.com/joonspk-research/generative_agents/blob/main/reverie/backend_server/persona/cognitive_modules/retrieve.py

This implementation is inspired by the paper's retrieval scoring design
(component-wise min-max normalization, then weighted combination). It is
not a strict copy of the original code: relevance scoring via embeddings is
not implemented yet, and recency is computed from step age.
"""

def __init__(
self,
agent: "LLMAgent",
llm_model: str | None = None,
display: bool = True,
max_capacity: int = 10,
considered_entries: int = 5,
max_capacity: int = 200,
considered_entries: int = 30,
recency_decay: float = 0.995,
):
"""
Initialize the EpisodicMemory
Expand All @@ -43,6 +82,7 @@ def __init__(
self.max_capacity = max_capacity
self.memory_entries = deque(maxlen=self.max_capacity)
self.considered_entries = considered_entries
self.recency_decay = recency_decay

self.system_prompt = """
You are an assistant that evaluates memory entries on a scale from 1 to 5, based on their importance to a specific problem or task. Your goal is to assign a score that reflects how much each entry contributes to understanding, solving, or advancing the task. Use the following grading scale:
Expand All @@ -60,6 +100,24 @@ def __init__(
Only assess based on the entry's content and its value to the task at hand. Ignore style, grammar, or tone.
"""

def _extract_importance(self, entry) -> int:
"""
Safely extracts importance score regardless of data structure.
Handles:
- Nested: {"msg": {"importance": 5}}
- Flat: {"importance": 5}
"""
if "importance" in entry.content:
val = entry.content["importance"]
return val if isinstance(val, (int, float)) else 1

for value in entry.content.values():
if isinstance(value, dict) and "importance" in value:
val = value["importance"]
return val if isinstance(val, (int, float)) else 1

return 1

def _build_grade_prompt(self, type: str, content: dict) -> str:
"""
This helper assembles a prompt that includes the event type, event content,
Expand Down Expand Up @@ -89,7 +147,7 @@ def grade_event_importance(self, type: str, content: dict) -> float:
prompt = self._build_grade_prompt(type, content)
self.llm.system_prompt = self.system_prompt

rsp = self.agent.llm.generate(
rsp = self.llm.generate(
prompt=prompt,
response_format=EventGrade,
)
Expand All @@ -104,7 +162,7 @@ async def agrade_event_importance(self, type: str, content: dict) -> float:
prompt = self._build_grade_prompt(type, content)
self.llm.system_prompt = self.system_prompt

rsp = await self.agent.llm.agenerate(
rsp = await self.llm.agenerate(
prompt=prompt,
response_format=EventGrade,
)
Expand All @@ -114,30 +172,68 @@ async def agrade_event_importance(self, type: str, content: dict) -> float:

def retrieve_top_k_entries(self, k: int) -> list[MemoryEntry]:
"""
Retrieve the top k entries based on the importance and recency
Retrieve the top-k entries using normalized importance and recency.

Notes:
- Inspired by Generative Agents retrieval scoring:
recency/importance/relevance are normalized separately and combined.
- This implementation currently combines importance + recency only.
Relevance (embedding cosine similarity with a focal query) is pending.
"""
top_list = sorted(
self.memory_entries,
key=lambda x: x.content["importance"] - (self.agent.model.steps - x.step),
reverse=True,
)
if not self.memory_entries:
return []

importance_dict = {}
recency_dict = {}

entries = list(self.memory_entries)
current_step = self.agent.model.steps

for i, entry in enumerate(entries):
importance_dict[i] = self._extract_importance(entry)

age = current_step - entry.step
recency_dict[i] = self.recency_decay**age

importance_scaled = normalize_dict_values(importance_dict, 0, 1)
recency_scaled = normalize_dict_values(recency_dict, 0, 1)

return top_list[:k]
final_scores = []
for i in range(len(entries)):
total_score = importance_scaled[i] + recency_scaled[i]
final_scores.append((total_score, entries[i]))

final_scores.sort(key=lambda x: x[0], reverse=True)
return [entry for _, entry in final_scores[:k]]

def _finalize_entry(self, type: str, graded_content: dict):
"""Create and persist a finalized episodic entry."""
new_entry = MemoryEntry(
agent=self.agent,
content={type: graded_content},
step=self.agent.model.steps,
)
self.memory_entries.append(new_entry)

def add_to_memory(self, type: str, content: dict):
"""
Add a new memory entry to the memory
grading logic + adding to memory function call
"""
content["importance"] = self.grade_event_importance(type, content)

super().add_to_memory(type, content)
graded_content = {
**content,
"importance": self.grade_event_importance(type, content),
}
self._finalize_entry(type, graded_content)

async def aadd_to_memory(self, type: str, content: dict):
"""
Async version of add_to_memory
Async version of add_to_memory + grading logic
"""
content["importance"] = await self.agrade_event_importance(type, content)
super().add_to_memory(type, content)
graded_content = {
**content,
"importance": await self.agrade_event_importance(type, content),
}
self._finalize_entry(type, graded_content)

def get_prompt_ready(self) -> str:
return f"Top {self.considered_entries} memory entries:\n\n" + "\n".join(
Expand All @@ -161,20 +257,18 @@ def get_communication_history(self) -> str:

async def aprocess_step(self, pre_step: bool = False):
"""
Asynchronous version of process_step
Asynchronous version of process_step.

EpisodicMemory persists entries at add-time and does not use two-phase
pre/post-step buffering.
"""
if pre_step:
await self.aadd_to_memory(type="observation", content=self.step_content)
self.step_content = {}
return
return

def process_step(self, pre_step: bool = False):
"""
Process the step of the agent :
- Add the new entry to the memory
- Display the new entry
Process step hook (no-op for episodic memory).

EpisodicMemory persists entries at add-time and does not use two-phase
pre/post-step buffering.
"""
if pre_step:
self.add_to_memory(type="observation", content=self.step_content)
self.step_content = {}
return
return
40 changes: 24 additions & 16 deletions tests/test_integration/test_memory_reasoning.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,14 @@ def test_plan_records_to_memory(self, monkeypatch):
plan = reasoning.plan(obs=obs)

assert isinstance(plan, Plan)
assert memory.step_content["Observation"]["content"] == str(obs)
assert memory.step_content["Plan"]["content"] == plan_content
assert memory.step_content["Plan-Execution"]["content"] == str(plan)
assert memory.step_content["Observation"]["importance"] == 3
assert memory.step_content["Plan"]["importance"] == 3
assert memory.step_content["Plan-Execution"]["importance"] == 3
entries = list(memory.memory_entries)
assert len(entries) == 3
assert entries[0].content["Observation"]["content"] == str(obs)
assert entries[1].content["Plan"]["content"] == plan_content
assert entries[2].content["Plan-Execution"]["content"] == str(plan)
assert entries[0].content["Observation"]["importance"] == 3
assert entries[1].content["Plan"]["importance"] == 3
assert entries[2].content["Plan-Execution"]["importance"] == 3
assert memory.grade_event_importance.call_count == 3

def test_async_plan_works(self, monkeypatch):
Expand All @@ -301,12 +303,14 @@ def test_async_plan_works(self, monkeypatch):
plan = asyncio.run(reasoning.aplan(obs=obs))

assert isinstance(plan, Plan)
assert memory.step_content["Observation"]["content"] == str(obs)
assert memory.step_content["Plan"]["content"] == plan_content
assert memory.step_content["Plan-Execution"]["content"] == str(plan)
assert memory.step_content["Observation"]["importance"] == 3
assert memory.step_content["Plan"]["importance"] == 3
assert memory.step_content["Plan-Execution"]["importance"] == 3
entries = list(memory.memory_entries)
assert len(entries) == 3
assert entries[0].content["Observation"]["content"] == str(obs)
assert entries[1].content["Plan"]["content"] == plan_content
assert entries[2].content["Plan-Execution"]["content"] == str(plan)
assert entries[0].content["Observation"]["importance"] == 3
assert entries[1].content["Plan"]["importance"] == 3
assert entries[2].content["Plan-Execution"]["importance"] == 3
assert memory.agrade_event_importance.await_count == 3


Expand Down Expand Up @@ -677,8 +681,10 @@ def test_plan_records_to_memory(self, monkeypatch):

plan = reasoning.plan()
assert isinstance(plan, Plan)
assert memory.step_content["plan"]["content"] == plan_content
assert memory.step_content["plan"]["importance"] == 3
entries = list(memory.memory_entries)
assert len(entries) == 1
assert entries[0].content["plan"]["content"] == plan_content
assert entries[0].content["plan"]["importance"] == 3
assert memory.grade_event_importance.call_count == 1
reasoning.execute_tool_call.assert_called_once_with(
plan_content, selected_tools=None, ttl=1
Expand All @@ -699,8 +705,10 @@ def test_async_plan_works(self, monkeypatch):

plan = asyncio.run(reasoning.aplan())
assert isinstance(plan, Plan)
assert memory.step_content["plan"]["content"] == plan_content
assert memory.step_content["plan"]["importance"] == 3
entries = list(memory.memory_entries)
assert len(entries) == 1
assert entries[0].content["plan"]["content"] == plan_content
assert entries[0].content["plan"]["importance"] == 3
assert memory.grade_event_importance.call_count == 1
reasoning.aexecute_tool_call.assert_awaited_once_with(
plan_content, selected_tools=None, ttl=1
Expand Down
Loading
Loading