From 39cfada67e467f6341e21a79ceea88a99c4afd52 Mon Sep 17 00:00:00 2001 From: Matteo Merola Date: Sun, 28 Jun 2026 00:46:55 +0200 Subject: [PATCH 1/8] fix(agent): inject memory + reflections per-turn, not in frozen snapshot (#41) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Session mode snapshots the static system prompt once per session so the cacheable prefix stays stable. But memories and task reflections were baked into that snapshot, so anything extracted mid-session stayed invisible to the model until /new — hurting most after compaction, across chats, and for reflections meant for the next similar task in the same session. Move both into the per-turn preamble (already an uncached seam carrying the live date/time + execution plan). The static snapshot now holds only truly static content (persona/skills/instructions); memory is fetched fresh each turn via format_for_prompt(query=message), so it is both current and relevance-ranked per turn. Cost is only the block's own (bounded, top-k) tokens on the new turn — far cheaper than rebuilding the whole prefix. --- core/agent.py | 66 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 42 insertions(+), 24 deletions(-) diff --git a/core/agent.py b/core/agent.py index dc1ab37..c9fc234 100644 --- a/core/agent.py +++ b/core/agent.py @@ -484,8 +484,8 @@ async def process( if self.config.goal_decomposition.enabled and channel != "system": decomposed_goal = await self._maybe_decompose(message) - # Per-turn preamble: live date/time + (optional) execution plan. - preamble = self._turn_preamble(decomposed_goal) + # Per-turn preamble: live date/time + fresh memory/reflections + plan. + preamble = await self._turn_preamble(decomposed_goal, query=message) # Resolve the active persona (its identity, skills + tool scope) — a # per-chat binding wins over the globally selected persona (#14). An @@ -505,11 +505,9 @@ async def process( # is only built once, not rebuilt and re-sent each turn). In injection # mode the prompt is windowed/stateless, so it is rebuilt per call. if self.history_mode == "session": - system = await self._session_system_prompt( - channel, user_id, chat_id, query=message, persona=persona - ) + system = await self._session_system_prompt(channel, user_id, chat_id, persona=persona) else: - system = await self._build_system_prompt(query=message, persona=persona) + system = await self._build_system_prompt(persona=persona) if self.config.admin.capture_prompts: self._record_system_prompt( @@ -604,15 +602,40 @@ async def bind_chat_persona_by_label( return p.name return None - def _turn_preamble(self, decomposed_goal: DecomposedGoal | None) -> str: + async def _turn_preamble( + self, decomposed_goal: DecomposedGoal | None, query: str | None = None + ) -> str: """Build the per-turn preamble prepended to the current user message. Always carries the live date/time (so the agent knows 'now' every turn); - also carries the execution plan when the request was decomposed. + also carries fresh, query-relevant memory + reflections and the + execution plan when the request was decomposed. + + Memory/reflections live here, not in the static system prompt: in + session mode that prompt is snapshotted once and would freeze any + mid-session extraction out of view until ``/new`` (#41). The preamble is + rebuilt every turn and rides on the new (uncached) user message, so it + costs only the block's own tokens and is also relevance-ranked per turn. """ now = datetime.now(ZoneInfo(self.config.agent.timezone)) stamp = now.strftime("%A, %B %d, %Y %H:%M %Z") preamble = f"[Current date & time: {stamp}]" + + try: + memories = await self.memory.format_for_prompt(query=query) + if memories: + preamble += f"\n\n\n{memories}\n" + except Exception: + log.exception("Failed to load memories for turn preamble") + + if self.config.task_reflection.enabled: + try: + reflections = await self.reflections.format_for_prompt() + if reflections: + preamble += f"\n\n\n{reflections}\n" + except Exception: + log.exception("Failed to load task reflections for turn preamble") + if decomposed_goal: preamble += ( "\n\n\n" @@ -629,20 +652,19 @@ async def _session_system_prompt( channel: str, user_id: str, chat_id: str, - query: str | None = None, persona: Persona | None = None, ) -> str: """Return the session's static system prompt, building it once if needed. Built fresh after a ``/new`` (when no snapshot exists), then reused for the lifetime of the session so the static content is sent only once. - Relevance-ranked memory injection therefore uses the first message of - the session as its query. + The prompt is purely static now — memory/reflections are injected per + turn in the preamble (#41), so the snapshot never goes stale. """ cached = await self.history.get_session_system(channel, user_id, chat_id) if cached is not None: return cached - system = await self._build_system_prompt(query=query, persona=persona) + system = await self._build_system_prompt(persona=persona) await self.history.set_session_system(channel, user_id, system, chat_id) return system @@ -1861,29 +1883,25 @@ async def _reflect_on_task(self, user_msg: str, agent_msg: str, tool_log: list[d async def _build_system_prompt( self, decomposed_goal: DecomposedGoal | None = None, - query: str | None = None, persona: Persona | None = None, ) -> str: skills_index = await self.skills.get_index_block(allow=persona.skills if persona else None) - memories = await self.memory.format_for_prompt(query=query) - - # Task reflections — lessons learned from past tasks - reflections = "" - if self.config.task_reflection.enabled: - try: - reflections = await self.reflections.format_for_prompt() - except Exception: - log.exception("Failed to load task reflections for prompt") + # Memory + reflections are NOT baked into the static prompt: in session + # mode it is snapshotted once and would freeze stale (#41). They are + # injected fresh per turn in the preamble instead (see _turn_preamble), + # which also makes them query-relevant on every turn. sections = build_prompt_sections( config=self.config, history_mode=self.history_mode, skills_index=skills_index, - memories=memories, - reflections=reflections, + memories="", + reflections="", decomposed_goal=decomposed_goal, persona=persona, secrets_available=self.secret_store is not None, + include_memories=False, + include_reflections=False, ) return sections.full_prompt From 6dd2442f73321e697cb49f08704e7cc3ab6f8de9 Mon Sep 17 00:00:00 2001 From: Matteo Merola Date: Sun, 28 Jun 2026 00:47:08 +0200 Subject: [PATCH 2/8] test(agent): async preamble + mid-session memory visibility self-check (#41) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Asserts a long-term memory written mid-session appears in the next turn's preamble — without /new and without rebuilding the frozen session snapshot. --- tests/test_tools.py | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/tests/test_tools.py b/tests/test_tools.py index 41a0405..1d4bcd1 100644 --- a/tests/test_tools.py +++ b/tests/test_tools.py @@ -126,8 +126,9 @@ def agent(tmp_path, monkeypatch): return AgentCore(Config()) -def test_turn_preamble_carries_datetime(agent) -> None: - preamble = agent._turn_preamble(None) +@pytest.mark.asyncio +async def test_turn_preamble_carries_datetime(agent) -> None: + preamble = await agent._turn_preamble(None) assert "Current date & time" in preamble # No execution plan when the goal was not decomposed. assert "execution_plan" not in preamble @@ -135,7 +136,7 @@ def test_turn_preamble_carries_datetime(agent) -> None: @pytest.mark.asyncio async def test_build_user_message_prepends_preamble(agent) -> None: - preamble = agent._turn_preamble(None) + preamble = await agent._turn_preamble(None) msg = await agent._build_user_message("hello", None, preamble) assert msg["role"] == "user" assert msg["content"].startswith(preamble) @@ -170,6 +171,30 @@ async def fake_build(*args, **kwargs) -> str: assert calls["n"] == 2 +@pytest.mark.asyncio +async def test_mid_session_memory_visible_next_turn_without_new(agent) -> None: + """A memory written mid-session must reach the model on the next turn (#41). + + It rides the per-turn preamble, so it appears even though the static session + system prompt is snapshotted once and never rebuilt mid-session. + """ + # Snapshot the static prompt as the session start would, then verify it does + # NOT carry the memory (the whole point: the snapshot stays static). + snapshot = await agent._session_system_prompt("telegram", "u1", "") + assert "Capital of France is Paris" not in snapshot + + # Mid-session extraction stores a new long-term fact. + await agent.memory._insert_long_term("fact", "France", "Capital of France is Paris") + + # Next turn's preamble surfaces it — no /new, no snapshot rebuild. + preamble = await agent._turn_preamble(None, query="What's the capital of France?") + assert "Capital of France is Paris" in preamble + assert "" in preamble + + # Snapshot is still the frozen one (cache intact, not rebuilt). + assert await agent._session_system_prompt("telegram", "u1", "") == snapshot + + # --------------------------------------------------------------------------- # Per-action write state — one write's outcome must not block a different one # --------------------------------------------------------------------------- From c90636277fae94ae8df5c794fa31919c71e4789e Mon Sep 17 00:00:00 2001 From: Matteo Merola Date: Sun, 28 Jun 2026 00:47:09 +0200 Subject: [PATCH 3/8] docs: memory/reflections inject per-turn, not in the snapshot (#41) --- docs/content/docs/architecture.mdx | 4 ++-- docs/content/docs/memory.mdx | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/content/docs/architecture.mdx b/docs/content/docs/architecture.mdx index 93eb837..c62f11b 100644 --- a/docs/content/docs/architecture.mdx +++ b/docs/content/docs/architecture.mdx @@ -70,8 +70,8 @@ The agent becomes a **thin orchestrator**: it reads skill files, passes them to The brain of MPA. Implements the LLM tool-use loop: 1. Load conversation history -2. Build the static system prompt (skills, character, personalia, memories, active tools). In session mode this is snapshotted once per conversation (rebuilt after `/new`) and reused every turn, so the cacheable prefix stays stable; on Anthropic it is sent with a `cache_control` breakpoint so the tools + system prefix is not reprocessed each turn -3. Inject the live date/time (and any per-request execution plan) at the start of the current user message — so the agent always knows "now" without mutating the cached prefix +2. Build the static system prompt (skills, character, personalia, active tools). In session mode this is snapshotted once per conversation (rebuilt after `/new`) and reused every turn, so the cacheable prefix stays stable; on Anthropic it is sent with a `cache_control` breakpoint so the tools + system prefix is not reprocessed each turn +3. Inject the live date/time, the fresh relevance-ranked memories + task reflections, and any per-request execution plan at the start of the current user message — so the agent always knows "now" and sees memories written mid-session, without mutating the cached prefix. (Memories live here, not in the snapshot, so a fact extracted mid-session reaches the model on the very next turn instead of waiting for `/new`.) 4. Call the LLM 5. Handle tool calls with permission checks 6. Save conversation turn and extract memories diff --git a/docs/content/docs/memory.mdx b/docs/content/docs/memory.mdx index bb11f18..c266c8d 100644 --- a/docs/content/docs/memory.mdx +++ b/docs/content/docs/memory.mdx @@ -117,7 +117,7 @@ Set `memory.embedding.enabled: false` to fall back to **Tier-1 lexical** (word-o ### What embeddings power -- **Relevance-ranked injection.** Instead of dumping the most recent `long_term_limit` rows into the prompt, only the `injection_top_k` (default 12) memories most relevant to the *current message* are injected. They're scored Generative-Agents style: `relevance + 0.5·importance + 0.3·recency`. (In session mode, where the static prompt is snapshotted once, the session's first message is the query.) +- **Relevance-ranked injection.** Instead of dumping the most recent `long_term_limit` rows into the prompt, only the `injection_top_k` (default 12) memories most relevant to the *current message* are injected. They're scored Generative-Agents style: `relevance + 0.5·importance + 0.3·recency`. Injection happens in the **per-turn preamble** (prepended to the current user message), not the static system prompt — so even in session mode (where the static prompt is snapshotted once) the current message is the query every turn, and a fact written mid-session is visible on the next turn without `/new`. - **Dedup.** `update_memory` retrieves ADD/UPDATE/DELETE/NOOP candidates by cosine similarity (with a per-row lexical fallback for any memory that has no vector yet). - **Hygiene clustering** (Tier 4, below). From efd61da0cc0e7b9e0eb0fa2c29cad24d9275e9dc Mon Sep 17 00:00:00 2001 From: Matteo Merola Date: Sun, 28 Jun 2026 00:51:59 +0200 Subject: [PATCH 4/8] test(agent): self-check also drives the reflection path; note per-turn cost (#41) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adversarial review: extend the mid-session self-check to insert a task reflection too and assert appears in the next preamble — covering the third staleness case the issue names. Add a comment flagging that session mode now retrieves per turn (intended; phase-2 recall tool if the store grows huge). --- core/agent.py | 5 +++++ tests/test_tools.py | 10 ++++++++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/core/agent.py b/core/agent.py index c9fc234..b09601d 100644 --- a/core/agent.py +++ b/core/agent.py @@ -621,6 +621,11 @@ async def _turn_preamble( stamp = now.strftime("%A, %B %d, %Y %H:%M %Z") preamble = f"[Current date & time: {stamp}]" + # ponytail: in session mode this now runs a query embed + cosine scan + + # reinforce-write every turn (was once per session). Intended — that is + # what makes injection fresh and per-turn relevant — and cheap for a + # personal store. If the store grows huge, gate behind the recall_memory + # tool (issue #41 phase 2) instead of always-injecting top-k. try: memories = await self.memory.format_for_prompt(query=query) if memories: diff --git a/tests/test_tools.py b/tests/test_tools.py index 1d4bcd1..7faee9e 100644 --- a/tests/test_tools.py +++ b/tests/test_tools.py @@ -183,13 +183,19 @@ async def test_mid_session_memory_visible_next_turn_without_new(agent) -> None: snapshot = await agent._session_system_prompt("telegram", "u1", "") assert "Capital of France is Paris" not in snapshot - # Mid-session extraction stores a new long-term fact. + # Mid-session extraction stores a new long-term fact + a task reflection + # (the issue names all three of compaction/cross-chat/reflection staleness). await agent.memory._insert_long_term("fact", "France", "Capital of France is Paris") + await agent.reflections._store_reflection( + {"lesson": "Prefer himalaya -o json over scraping text", "category": "tool"} + ) - # Next turn's preamble surfaces it — no /new, no snapshot rebuild. + # Next turn's preamble surfaces both — no /new, no snapshot rebuild. preamble = await agent._turn_preamble(None, query="What's the capital of France?") assert "Capital of France is Paris" in preamble assert "" in preamble + assert "Prefer himalaya -o json over scraping text" in preamble + assert "" in preamble # Snapshot is still the frozen one (cache intact, not rebuilt). assert await agent._session_system_prompt("telegram", "u1", "") == snapshot From cc07c3b49ece9f27b716cf3c23836e7c49451dc9 Mon Sep 17 00:00:00 2001 From: Matteo Merola Date: Sun, 28 Jun 2026 01:03:05 +0200 Subject: [PATCH 5/8] =?UTF-8?q?feat(memory):=20two-tier=20scoped=20memory?= =?UTF-8?q?=20=E2=80=94=20shared=20pool=20+=20per-persona=20private=20(#42?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Memory was global to the owner: every persona read and wrote one pool, so a fitness coach could surface facts only ever told to the finance assistant, and group multi-agent (#30) would silently share private memory. Add a `scope` column to long_term + short_term ('' = shared owner-level, '' = private to that persona), mirroring the secrets vault's two-tier shared/scoped model (#19). Additive ALTER TABLE migration — existing rows become shared, the correct default. - Retrieval (format_for_prompt / get_relevant_long_term / get_short_term) filters scope IN ('', ); the default identity sees shared only. - Extraction is tagged: the active persona is plumbed through; the extractor may mark a fact private to that persona, defaulting to shared when unsure. - Dedup/UPDATE/DELETE candidates are bounded to shared + own scope, so a private fact can never merge into or delete another persona's memory. - Consolidation promotes per scope; hygiene clusters within a scope only — no cross-persona merge. Composes with the per-turn memory seam from the preceding change (#41): scope is resolved from the active persona and passed into the same preamble injection. --- core/agent.py | 41 +++++++-- core/memory.py | 226 ++++++++++++++++++++++++++++++++++++---------- schema/memory.sql | 14 ++- 3 files changed, 220 insertions(+), 61 deletions(-) diff --git a/core/agent.py b/core/agent.py index b09601d..63075a7 100644 --- a/core/agent.py +++ b/core/agent.py @@ -355,6 +355,15 @@ def _shell_quote(s: str) -> str: ] +def _persona_scope(persona: Persona | None) -> str: + """The memory scope key for an active persona (#42). + + A persona's own name is its private scope; no persona (default identity) = + ``""`` = shared only. + """ + return persona.name if persona else "" + + def scoped_tools(persona: Persona | None) -> list[dict]: """Filter the function-tool schemas by the active persona's tool scope. @@ -484,9 +493,6 @@ async def process( if self.config.goal_decomposition.enabled and channel != "system": decomposed_goal = await self._maybe_decompose(message) - # Per-turn preamble: live date/time + fresh memory/reflections + plan. - preamble = await self._turn_preamble(decomposed_goal, query=message) - # Resolve the active persona (its identity, skills + tool scope) — a # per-chat binding wins over the globally selected persona (#14). An # explicit override (scheduler) skips the ladder (#29). @@ -494,6 +500,13 @@ async def process( persona = await self._load_persona(persona_name) else: persona = await self._resolve_persona(channel, user_id, chat_id) + + # Per-turn preamble: live date/time + fresh memory/reflections + plan. + # Memory is scoped to the active persona (#42): shared + its private. + preamble = await self._turn_preamble( + decomposed_goal, query=message, scope=_persona_scope(persona) + ) + tools = apply_feature_gates( scoped_tools(persona), secrets_available=self.secret_store is not None, @@ -603,7 +616,10 @@ async def bind_chat_persona_by_label( return None async def _turn_preamble( - self, decomposed_goal: DecomposedGoal | None, query: str | None = None + self, + decomposed_goal: DecomposedGoal | None, + query: str | None = None, + scope: str = "", ) -> str: """Build the per-turn preamble prepended to the current user message. @@ -616,6 +632,9 @@ async def _turn_preamble( mid-session extraction out of view until ``/new`` (#41). The preamble is rebuilt every turn and rides on the new (uncached) user message, so it costs only the block's own tokens and is also relevance-ranked per turn. + + ``scope`` is the active persona's memory scope (#42): ``""`` = shared + only, ``""`` = shared + that persona's private memory. """ now = datetime.now(ZoneInfo(self.config.agent.timezone)) stamp = now.strftime("%A, %B %d, %Y %H:%M %Z") @@ -627,7 +646,7 @@ async def _turn_preamble( # personal store. If the store grows huge, gate behind the recall_memory # tool (issue #41 phase 2) instead of always-injecting top-k. try: - memories = await self.memory.format_for_prompt(query=query) + memories = await self.memory.format_for_prompt(query=query, scope=scope) if memories: preamble += f"\n\n\n{memories}\n" except Exception: @@ -895,7 +914,7 @@ async def _process_injection( # Automatic memory extraction if channel != "system": asyncio.create_task( - self._extract_memories(message, final_text), + self._extract_memories(message, final_text, persona), name=f"memory-extract-{user_id}", ) @@ -1013,7 +1032,7 @@ async def _process_session( # Automatic memory extraction if channel != "system": asyncio.create_task( - self._extract_memories(message, final_text), + self._extract_memories(message, final_text, persona), name=f"memory-extract-{user_id}", ) @@ -1746,13 +1765,18 @@ async def _await_approval( self.permissions._pending.pop(request_id, None) return "skipped" - async def _extract_memories(self, user_msg: str, agent_msg: str) -> None: + async def _extract_memories( + self, user_msg: str, agent_msg: str, persona: Persona | None = None + ) -> None: """Run automatic memory extraction in the background. Uses a cheap/fast model to identify facts worth remembering from the conversation turn, then stores them in the memory DB. Exceptions are logged and swallowed — this must never crash the main agent loop. + + ``persona`` scopes what is written (#42): facts the extractor marks + private land in that persona's scope, everything else stays shared. """ try: llm = self._memory_llm( @@ -1765,6 +1789,7 @@ async def _extract_memories(self, user_msg: str, agent_msg: str) -> None: user_msg=user_msg, agent_msg=agent_msg, cooldown_seconds=self.config.memory.extraction_cooldown_seconds, + persona_scope=_persona_scope(persona), ) if stored: log.info("Background memory extraction stored %d memories", stored) diff --git a/core/memory.py b/core/memory.py index d6d4b52..5d1eec3 100644 --- a/core/memory.py +++ b/core/memory.py @@ -150,7 +150,7 @@ When in doubt between LONG_TERM and SHORT_TERM, choose SHORT_TERM. Only use LONG_TERM for facts you are highly confident will still be true months from now. - +{scope_block} Return a JSON array (max 3 items). Each element must be one of: {{"tier": "LONG_TERM", "category": "", \ "subject": "", "content": ""}} @@ -359,6 +359,50 @@ def _normalize_subject(subject: str) -> str: return (subject or "").strip().lower() +def _extraction_scope_block(persona_scope: str) -> str: + """Scope instruction injected into the extraction prompt (#42). + + Empty when no persona is active (everything is shared). When a persona is + active, lets the model mark domain-specific facts private to it; the default + stays shared so owner-level facts reach every persona. + """ + if not persona_scope: + return "" + return ( + f'\nYou are extracting for the "{persona_scope}" persona. Add ' + f'`"scope": "private"` to a fact ONLY if it is specific to this persona\'s ' + f"domain and should NOT be shared with the owner's other assistants. " + f"General facts about the owner (preferences, relationships, biography) are " + f'shared — omit scope or use `"scope": "shared"`. When unsure, leave it shared.\n' + ) + + +def _resolve_extracted_scope(mem: dict, persona_scope: str) -> str: + """Map an extracted item's scope hint to a stored scope key (#42). + + Private only when a persona is active AND the model tagged it private; + everything else is shared (``''``). + """ + if persona_scope and str(mem.get("scope", "")).strip().lower() == "private": + return persona_scope + return "" + + +def _scope_filter(scope: str | None) -> tuple[str, tuple]: + """Build a SQL ``AND`` fragment + params restricting rows by scope (#42). + + - ``None`` → no filter (every scope; for admin listing / owner-level jobs). + - ``""`` → shared only (``scope = ''``); the default identity's view. + - ``"x"`` → shared + persona ``x`` (``scope IN ('', 'x')``); never another + persona's private memory. + """ + if scope is None: + return "", () + if scope == "": + return " AND scope = ''", () + return " AND scope IN ('', ?)", (scope,) + + def _tokens(text: str) -> set[str]: """Lowercase content words, dropping stopwords and single characters.""" return {t for t in _TOKEN_RE.findall(text.lower()) if len(t) > 1 and t not in _STOPWORDS} @@ -479,6 +523,7 @@ async def _ensure_schema(self) -> None: async with aiosqlite.connect(self.db_path) as db: await db.executescript(schema) await self._migrate_long_term(db) + await self._migrate_short_term(db) self._ready = True # Columns added after the original two-tier schema shipped. Each is applied @@ -491,8 +536,12 @@ async def _ensure_schema(self) -> None: ("last_accessed", "last_accessed DATETIME"), ("access_count", "access_count INTEGER NOT NULL DEFAULT 0"), ("archived", "archived INTEGER NOT NULL DEFAULT 0"), + # #42: scope column — existing rows become '' (shared), the right default. + ("scope", "scope TEXT NOT NULL DEFAULT ''"), ) + _SHORT_TERM_MIGRATIONS = (("scope", "scope TEXT NOT NULL DEFAULT ''"),) + async def _migrate_long_term(self, db: aiosqlite.Connection) -> None: cursor = await db.execute("PRAGMA table_info(long_term)") existing = {row[1] for row in await cursor.fetchall()} @@ -502,45 +551,63 @@ async def _migrate_long_term(self, db: aiosqlite.Connection) -> None: # Safe to create now: the archived column is guaranteed to exist (fresh # DBs declare it; legacy DBs just had it added above). await db.execute("CREATE INDEX IF NOT EXISTS idx_lt_archived ON long_term(archived)") + await db.execute("CREATE INDEX IF NOT EXISTS idx_lt_scope ON long_term(scope)") await db.commit() - async def get_long_term(self) -> list[dict]: - """Retrieve recent (non-archived) long-term memories for injection.""" + async def _migrate_short_term(self, db: aiosqlite.Connection) -> None: + cursor = await db.execute("PRAGMA table_info(short_term)") + existing = {row[1] for row in await cursor.fetchall()} + for name, ddl in self._SHORT_TERM_MIGRATIONS: + if name not in existing: + await db.execute(f"ALTER TABLE short_term ADD COLUMN {ddl}") # noqa: S608 + await db.execute("CREATE INDEX IF NOT EXISTS idx_st_scope ON short_term(scope)") + await db.commit() + + async def get_long_term(self, scope: str | None = None) -> list[dict]: + """Retrieve recent (non-archived) long-term memories for injection. + + ``scope`` filters per #42 (see :func:`_scope_filter`); ``None`` = all. + """ await self._ensure_schema() + clause, params = _scope_filter(scope) async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT category, subject, content FROM long_term " - "WHERE archived = 0 ORDER BY updated_at DESC LIMIT ?", - (self.long_term_limit,), + f"WHERE archived = 0{clause} ORDER BY updated_at DESC LIMIT ?", # noqa: S608 + (*params, self.long_term_limit), ) return [dict(row) for row in await cursor.fetchall()] - async def get_relevant_long_term(self, query: str) -> list[dict]: + async def get_relevant_long_term(self, query: str, scope: str | None = None) -> list[dict]: """Return long-term memories most relevant to *query*, relevance-ranked. Uses a Generative-Agents-style score (recency + importance + relevance) over embedding cosine similarity, and reinforces the chosen memories (bumps ``access_count`` / ``last_accessed``). Falls back to recency order when embeddings are unavailable or the query can't be embedded. + + ``scope`` filters per #42 (see :func:`_scope_filter`); ``None`` = all. """ if not self.embedder or not query.strip(): - return await self.get_long_term() + return await self.get_long_term(scope) try: query_vec = await self.embedder.embed_one(query) except Exception: log.exception("Query embedding failed; falling back to recency order") - return await self.get_long_term() + return await self.get_long_term(scope) if not query_vec: - return await self.get_long_term() + return await self.get_long_term(scope) await self._ensure_schema() + clause, params = _scope_filter(scope) async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT id, category, subject, content, importance, embedding, " - "updated_at, last_accessed FROM long_term WHERE archived = 0" + f"updated_at, last_accessed FROM long_term WHERE archived = 0{clause}", # noqa: S608 + params, ) rows = [dict(r) for r in await cursor.fetchall()] @@ -574,36 +641,45 @@ async def _reinforce(self, ids: list[int]) -> None: ) await db.commit() - async def get_short_term(self) -> list[dict]: - """Retrieve active (non-expired) short-term memories.""" + async def get_short_term(self, scope: str | None = None) -> list[dict]: + """Retrieve active (non-expired) short-term memories. + + ``scope`` filters per #42 (see :func:`_scope_filter`); ``None`` = all. + """ await self._ensure_schema() + clause, params = _scope_filter(scope) async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT content, context FROM short_term " - "WHERE expires_at > datetime('now') " + f"WHERE expires_at > datetime('now'){clause} " # noqa: S608 "ORDER BY created_at DESC", + params, ) return [dict(row) for row in await cursor.fetchall()] - async def format_for_prompt(self, query: str | None = None) -> str: + async def format_for_prompt(self, query: str | None = None, scope: str | None = None) -> str: """Format both tiers into a block for the system prompt. When *query* is given and embeddings are enabled, only the long-term memories most relevant to the query are injected (relevance-ranked), instead of dumping the most recent ``long_term_limit`` rows (issue #5). + + ``scope`` restricts to the active persona's view per #42: ``""`` = + shared only (default identity), ``""`` = shared + that + persona's private memory, ``None`` = every scope. """ sections: list[str] = [] if query: - long_term = await self.get_relevant_long_term(query) + long_term = await self.get_relevant_long_term(query, scope) else: - long_term = await self.get_long_term() + long_term = await self.get_long_term(scope) if long_term: lines = [f"- [{m['category']}] {m['subject']}: {m['content']}" for m in long_term] sections.append("## Long-term memories\n" + "\n".join(lines)) - short_term = await self.get_short_term() + short_term = await self.get_short_term(scope) if short_term: lines = [] for m in short_term: @@ -643,6 +719,7 @@ async def extract_memories( user_msg: str, agent_msg: str, cooldown_seconds: int = 120, + persona_scope: str = "", ) -> int: """Extract facts from a conversation turn and store them. @@ -652,6 +729,11 @@ async def extract_memories( If fewer than *cooldown_seconds* have elapsed since the last extraction call, the call is skipped entirely (returns 0). + ``persona_scope`` is the active persona's key (#42). When set, the + extraction LLM may tag a fact ``"scope": "private"`` to keep it inside + that persona; everything else is stored shared (``''``). With no active + persona it is ``""`` and every fact is shared. + Returns the number of memories stored. """ now = time.monotonic() @@ -676,14 +758,16 @@ async def extract_memories( recent_turns_block = self._format_pending_turns() self._pending_turns = [] - # Build existing-memories block so the LLM can avoid duplicates. - existing_block = await self._existing_memories_block() + # Build existing-memories block so the LLM can avoid duplicates — scoped + # to what this persona may see (shared + its own private). + existing_block = await self._existing_memories_block(persona_scope) prompt = _EXTRACTION_PROMPT.format( user_msg=user_msg, agent_msg=agent_msg, recent_turns_block=recent_turns_block, existing_memories_block=existing_block, + scope_block=_extraction_scope_block(persona_scope), ) try: @@ -700,13 +784,14 @@ async def extract_memories( stored = 0 for mem in memories[: self._MAX_PER_TURN]: try: + scope = _resolve_extracted_scope(mem, persona_scope) tier = mem.get("tier", "").upper() if tier == "LONG_TERM": - op = await self.update_memory(llm, model, mem) + op = await self.update_memory(llm, model, mem, scope=scope) if op in ("ADD", "UPDATE"): stored += 1 elif tier == "SHORT_TERM": - stored += await self._store_short_term(mem) + stored += await self._store_short_term(mem, scope=scope) else: log.warning("Unknown memory tier: %s", tier) except Exception: @@ -719,10 +804,10 @@ async def extract_memories( log.info("Extracted and stored %d memories", stored) return stored - async def _existing_memories_block(self) -> str: + async def _existing_memories_block(self, scope: str | None = None) -> str: """Build a summary of existing memories for the extraction prompt.""" - long_term = await self.get_long_term() - short_term = await self.get_short_term() + long_term = await self.get_long_term(scope) + short_term = await self.get_short_term(scope) if not long_term and not short_term: return "" @@ -737,7 +822,9 @@ async def _existing_memories_block(self) -> str: parts.append("") # trailing newline return "\n".join(parts) + "\n" - async def update_memory(self, llm: LLMClient, model: str, candidate: dict) -> str: + async def update_memory( + self, llm: LLMClient, model: str, candidate: dict, scope: str = "" + ) -> str: """Apply a candidate fact to long-term memory via a unified pipeline. Retrieves the most lexically similar existing long-term memories, then @@ -746,6 +833,10 @@ async def update_memory(self, llm: LLMClient, model: str, candidate: dict) -> st When nothing similar exists the candidate is added directly without an LLM call. Malformed model output is a safe no-op. + ``scope`` (#42) tags an ADD and bounds the dedup/UPDATE/DELETE candidate + set to shared + that scope, so a private fact never merges into or + deletes another persona's private memory. + Returns the operation applied: ``"ADD"``, ``"UPDATE"``, ``"DELETE"``, or ``"NOOP"``. """ @@ -755,9 +846,11 @@ async def update_memory(self, llm: LLMClient, model: str, candidate: dict) -> st if not content: return "NOOP" - similar = await self._retrieve_similar_long_term(subject, content) + # Candidates restricted to shared + own scope: a private fact must not + # see — and therefore cannot UPDATE/DELETE — another persona's memory. + similar = await self._retrieve_similar_long_term(subject, content, scope or "") if not similar: - await self._insert_long_term(category, subject, content) + await self._insert_long_term(category, subject, content, scope=scope) log.debug("ADD long-term (no similar): [%s] %s: %s", category, subject, content[:80]) return "ADD" @@ -790,7 +883,7 @@ async def update_memory(self, llm: LLMClient, model: str, candidate: dict) -> st valid_ids = {row["id"] for row in similar} if operation == "ADD": - await self._insert_long_term(category, subject, content) + await self._insert_long_term(category, subject, content, scope=scope) log.debug("ADD long-term: [%s] %s: %s", category, subject, content[:80]) return "ADD" @@ -838,20 +931,27 @@ async def update_memory(self, llm: LLMClient, model: str, candidate: dict) -> st return "NOOP" - async def _retrieve_similar_long_term(self, subject: str, content: str) -> list[dict]: + async def _retrieve_similar_long_term( + self, subject: str, content: str, scope: str | None = None + ) -> list[dict]: """Return the top-k existing (non-archived) long-term memories similar to a candidate (subject + content). Uses embedding cosine similarity when an embedder is configured (with a per-row lexical fallback for memories that have no stored vector yet), otherwise pure token overlap. A matching subject adds a fixed boost. - Cheap and dependency-free at <1k rows.""" + Cheap and dependency-free at <1k rows. + + ``scope`` (#42) bounds the candidate set: ``""`` = shared only, a persona + key = shared + that persona, ``None`` = every scope.""" await self._ensure_schema() + clause, params = _scope_filter(scope) async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT id, category, subject, content, created_at, updated_at, embedding " - "FROM long_term WHERE archived = 0" + f"FROM long_term WHERE archived = 0{clause}", # noqa: S608 + params, ) rows = [dict(r) for r in await cursor.fetchall()] @@ -894,7 +994,12 @@ async def _embed_blob(self, text: str) -> bytes | None: return pack_vector(vec) if vec else None async def _insert_long_term( - self, category: str, subject: str, content: str, importance: float | None = None + self, + category: str, + subject: str, + content: str, + importance: float | None = None, + scope: str = "", ) -> None: """Insert a new long-term memory row (with embedding + importance).""" await self._ensure_schema() @@ -903,17 +1008,18 @@ async def _insert_long_term( async with aiosqlite.connect(self.db_path) as db: await db.execute( "INSERT INTO long_term " - "(category, subject, content, source, confidence, embedding, importance) " - "VALUES (?, ?, ?, 'conversation', 'stated', ?, ?)", - (category, subject, content, blob, imp), + "(category, subject, content, source, confidence, embedding, importance, scope) " + "VALUES (?, ?, ?, 'conversation', 'stated', ?, ?, ?)", + (category, subject, content, blob, imp, scope), ) await db.commit() - async def _store_short_term(self, mem: dict) -> int: + async def _store_short_term(self, mem: dict, scope: str = "") -> int: """Store a short-term memory with a LLM-determined TTL. Skips insertion if an active (non-expired) short-term memory - already exists with overlapping content. + already exists with overlapping content. ``scope`` (#42) tags the row + and bounds the duplicate check to shared + that scope. """ content = mem.get("content", "") context = mem.get("context", "") @@ -929,10 +1035,12 @@ async def _store_short_term(self, mem: dict) -> int: expires_str = expires_at.strftime("%Y-%m-%d %H:%M:%S") await self._ensure_schema() + clause, params = _scope_filter(scope or "") async with aiosqlite.connect(self.db_path) as db: - # Check for duplicate active short-term memories + # Check for duplicate active short-term memories within this scope. cursor = await db.execute( - "SELECT id, content FROM short_term WHERE expires_at > datetime('now')", + f"SELECT id, content FROM short_term WHERE expires_at > datetime('now'){clause}", # noqa: S608 + params, ) existing = await cursor.fetchall() content_lower = content.lower() @@ -942,8 +1050,8 @@ async def _store_short_term(self, mem: dict) -> int: return 0 await db.execute( - "INSERT INTO short_term (content, context, expires_at) VALUES (?, ?, ?)", - (content, context, expires_str), + "INSERT INTO short_term (content, context, expires_at, scope) VALUES (?, ?, ?, ?)", + (content, context, expires_str, scope), ) await db.commit() log.debug("Stored short-term memory (TTL %dh): %s", ttl_hours, content[:80]) @@ -967,15 +1075,22 @@ async def consolidate_and_cleanup(self, llm: LLMClient, model: str) -> dict: async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( - "SELECT id, content, context, created_at, expires_at FROM short_term " + "SELECT id, content, context, created_at, expires_at, scope FROM short_term " "WHERE expires_at > datetime('now') " "ORDER BY created_at ASC", ) active_short_term = [dict(row) for row in await cursor.fetchall()] + # Promote per scope (#42): each scope's short-term is reviewed and + # promoted into long-term of the same scope, never mixing two personas' + # private memory in one consolidation call. promoted = 0 if active_short_term: - promoted = await self._run_consolidation_llm(llm, model, active_short_term) + by_scope: dict[str, list[dict]] = {} + for row in active_short_term: + by_scope.setdefault(row.get("scope") or "", []).append(row) + for scope, rows in by_scope.items(): + promoted += await self._run_consolidation_llm(llm, model, rows, scope=scope) # Delete all expired short-term memories expired_count = await self._delete_expired_short_term() @@ -1042,7 +1157,7 @@ async def _hygiene_pass(self, llm: LLMClient, model: str) -> int: async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( - "SELECT id, category, subject, content, created_at, updated_at, embedding " + "SELECT id, category, subject, content, created_at, updated_at, embedding, scope " "FROM long_term WHERE archived = 0" ) rows = [dict(r) for r in await cursor.fetchall()] @@ -1050,7 +1165,15 @@ async def _hygiene_pass(self, llm: LLMClient, model: str) -> int: if len(rows) < 2: return 0 - clusters = self._cluster_long_term(rows)[: self._HYGIENE_MAX_CLUSTERS] + # Cluster within a scope only (#42): merging must never collapse one + # persona's private memory into another's (or into shared). + by_scope: dict[str, list[dict]] = {} + for row in rows: + by_scope.setdefault(row.get("scope") or "", []).append(row) + clusters: list[list[dict]] = [] + for scope_rows in by_scope.values(): + clusters.extend(self._cluster_long_term(scope_rows)) + clusters = clusters[: self._HYGIENE_MAX_CLUSTERS] removed = 0 for cluster in clusters: try: @@ -1145,9 +1268,13 @@ async def _resolve_cluster(self, llm: LLMClient, model: str, cluster: list[dict] return removed async def _run_consolidation_llm( - self, llm: LLMClient, model: str, short_term_rows: list[dict] + self, llm: LLMClient, model: str, short_term_rows: list[dict], scope: str = "" ) -> int: - """Ask the LLM which short-term memories to promote to long-term.""" + """Ask the LLM which short-term memories to promote to long-term. + + All rows are assumed to share ``scope`` (#42); promotions are stored in + that scope and deduplicated only against shared + that scope. + """ # Build the short-term entries block st_lines = [] for row in short_term_rows: @@ -1157,8 +1284,8 @@ async def _run_consolidation_llm( st_lines.append(entry) st_block = "\n".join(st_lines) - # Build existing long-term summary for deduplication - long_term = await self.get_long_term() + # Build existing long-term summary for deduplication (same scope view). + long_term = await self.get_long_term(scope or "") if long_term: lt_lines = [f"- [{m['category']}] {m['subject']}: {m['content']}" for m in long_term] lt_block = "\n".join(lt_lines) @@ -1192,6 +1319,7 @@ async def _run_consolidation_llm( "subject": mem.get("subject", ""), "content": mem.get("content", ""), }, + scope=scope, ) if op in ("ADD", "UPDATE"): stored += 1 diff --git a/schema/memory.sql b/schema/memory.sql index f364c8b..87e9a56 100644 --- a/schema/memory.sql +++ b/schema/memory.sql @@ -16,7 +16,10 @@ CREATE TABLE IF NOT EXISTS long_term ( importance REAL NOT NULL DEFAULT 5.0, last_accessed DATETIME, access_count INTEGER NOT NULL DEFAULT 0, - archived INTEGER NOT NULL DEFAULT 0 + archived INTEGER NOT NULL DEFAULT 0, + -- Two-tier scoped memory (#42): '' = shared (owner-level, visible to every + -- persona + the default identity), '' = private to that persona. + scope TEXT NOT NULL DEFAULT '' ); CREATE TABLE IF NOT EXISTS short_term ( @@ -24,11 +27,14 @@ CREATE TABLE IF NOT EXISTS short_term ( content TEXT NOT NULL, context TEXT, expires_at DATETIME NOT NULL, - created_at DATETIME DEFAULT (datetime('now')) + created_at DATETIME DEFAULT (datetime('now')), + -- See long_term.scope (#42). + scope TEXT NOT NULL DEFAULT '' ); CREATE INDEX IF NOT EXISTS idx_lt_category ON long_term(category); CREATE INDEX IF NOT EXISTS idx_lt_subject ON long_term(subject); CREATE INDEX IF NOT EXISTS idx_st_expires ON short_term(expires_at); --- idx_lt_archived is created in MemoryStore._migrate_long_term, after the --- archived column is guaranteed to exist (so legacy DBs migrate cleanly). +-- idx_lt_archived / idx_lt_scope / idx_st_scope are created in the +-- MemoryStore._migrate_* methods, after their columns are guaranteed to exist +-- (so legacy DBs that predate those columns migrate cleanly). From fee6c72327d32f0478ff5334126bfcf50265a4fe Mon Sep 17 00:00:00 2001 From: Matteo Merola Date: Sun, 28 Jun 2026 01:06:53 +0200 Subject: [PATCH 6/8] feat(admin): surface memory scope in the Memory tab + list endpoints (#42) Show each memory's scope (shared vs the owning persona) in the admin Memory tables and the /memory/long-term + /memory/short-term JSON. Migrate-on-read so a legacy DB gains the column even when no agent is running. --- api/admin.py | 15 +++++++++++---- api/templates/partials/memory.html | 8 ++++++-- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/api/admin.py b/api/admin.py index b4f6301..1ee2386 100644 --- a/api/admin.py +++ b/api/admin.py @@ -1313,13 +1313,20 @@ async def _bool(key: str, default: str) -> str: long_term: list[dict] = [] short_term: list[dict] = [] if Path(memory_db).exists(): - cols = "id, category, subject, content, source, confidence, created_at, updated_at" + # Idempotent migrate-on-read so a legacy DB has the scope column (#42) + # even when no agent is running to have migrated it on startup. + from core.memory import MemoryStore + + await MemoryStore(db_path=memory_db)._ensure_schema() + cols = ( + "id, category, subject, content, source, confidence, created_at, updated_at, scope" + ) async with aiosqlite.connect(memory_db) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(f"SELECT {cols} FROM long_term ORDER BY updated_at DESC") long_term = [dict(row) for row in await cursor.fetchall()] cursor = await db.execute( - "SELECT id, content, context, expires_at, created_at " + "SELECT id, content, context, expires_at, created_at, scope " "FROM short_term WHERE expires_at > datetime('now') " "ORDER BY created_at DESC" ) @@ -2464,7 +2471,7 @@ async def list_long_term( import aiosqlite await agent.memory._ensure_schema() - cols = "id, category, subject, content, source, confidence, created_at, updated_at" + cols = "id, category, subject, content, source, confidence, created_at, updated_at, scope" query = f"SELECT {cols} FROM long_term" conditions = [] params: list[str] = [] @@ -2495,7 +2502,7 @@ async def list_short_term() -> dict: async with aiosqlite.connect(agent.memory.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( - "SELECT id, content, context, expires_at, created_at " + "SELECT id, content, context, expires_at, created_at, scope " "FROM short_term WHERE expires_at > datetime('now') " "ORDER BY created_at DESC" ) diff --git a/api/templates/partials/memory.html b/api/templates/partials/memory.html index ca23f24..b46150a 100644 --- a/api/templates/partials/memory.html +++ b/api/templates/partials/memory.html @@ -271,6 +271,7 @@

Long-term

Category Subject Content + Scope @@ -281,6 +282,7 @@

Long-term

{{ m.category }} {{ m.subject }} {{ m.content }} + {% if m.scope %}{{ m.scope }}{% else %}shared{% endif %}