From dec642504ba386f2f00dcf8bff7bd64796518851 Mon Sep 17 00:00:00 2001 From: Matteo Merola Date: Sun, 28 Jun 2026 00:51:55 +0200 Subject: [PATCH 01/13] feat(subagents): execution primitive + scope/recursion/budget guardrails MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the subagent core (issue #15): a scoped sub-loop the agent delegates to, reached by one primitive (AgentCore.run_subagent) from two paths — the spawn_subagent tool and scheduled 'subagent' jobs. - core/subagents.py: SubagentRun + in-memory SubagentRegistry (list/status/ cancel) and narrow_scope() for inherit-never-widen tool/skill/secret scope. - agent.py: spawn_subagent tool, run_subagent primitive, budgeted/depth-capped sub-loop (system semantics: no decomposition/memory/reflection/prompts), background runs that post results back to the originating chat. - config: SubagentsConfig (enabled, recursion_depth, max_steps, token_budget, max_concurrent). - job_store: 'subagent' type + persona column (additive migration). - scheduler: run_subagent_task handler + routing for scheduled subagent jobs. - permissions: spawn_subagent is an ASK write-action. - admin: spawn_subagent is persona-gateable. --- api/admin.py | 1 + core/agent.py | 338 +++++++++++++++++++++++++++++++++++++++++++- core/config.py | 17 +++ core/job_store.py | 41 ++++-- core/permissions.py | 4 + core/scheduler.py | 72 +++++++++- core/subagents.py | 162 +++++++++++++++++++++ 7 files changed, 621 insertions(+), 14 deletions(-) create mode 100644 core/subagents.py diff --git a/api/admin.py b/api/admin.py index 318be10..c777a2b 100644 --- a/api/admin.py +++ b/api/admin.py @@ -3337,6 +3337,7 @@ def _config_requires_restart(values: dict) -> bool: "web_search", "manage_jobs", "write_artifact", + "spawn_subagent", ] diff --git a/core/agent.py b/core/agent.py index 7b5c5c2..9a2d11a 100644 --- a/core/agent.py +++ b/core/agent.py @@ -32,6 +32,7 @@ from core.scheduler import AgentScheduler from core.secret_store import SecretStore from core.skills import SkillsEngine +from core.subagents import SubagentRegistry, SubagentRun, narrow_scope, short_summary from core.task_reflection import ReflectionStore from core.tools import tool_env from voice.pipeline import VoicePipeline @@ -270,6 +271,50 @@ def _shell_quote(s: str) -> str: "required": ["action"], }, }, + # Subagents (issue #15) — delegate a scoped subtask to a sub-loop. + { + "name": "spawn_subagent", + "description": ( + "Delegate a self-contained subtask to a subagent. The subagent runs " + "the full agent loop under a chosen persona, with a tool/skill/secret " + "scope that is never wider than yours, and returns a structured " + "result. It has NO memory of this conversation — put everything it " + "needs in 'task'.\n" + "Use background=true for long-running work: you get a run id " + "immediately and the result is posted to this chat when done (monitor " + "or cancel it with /jobs or the admin Jobs page). Use background=false " + "(default) to block and get the result back in this turn.\n" + "Subagents are depth-limited and budgeted, so prefer one focused " + "delegation over deep nesting." + ), + "input_schema": { + "type": "object", + "properties": { + "task": { + "type": "string", + "description": ( + "The complete instruction for the subagent. Be specific and " + "self-contained — it cannot see this conversation." + ), + }, + "persona": { + "type": "string", + "description": ( + "Persona name to run as (e.g. 'coding-helper'). Omit to " + "inherit your current identity and scope." + ), + }, + "background": { + "type": "boolean", + "description": ( + "Run asynchronously (default false). True returns a run id " + "now and posts the result back to this chat when done." + ), + }, + }, + "required": ["task"], + }, + }, # Secrets vault (issue #19) — discover + request secrets by NAME only. { "name": "list_secrets", @@ -406,7 +451,11 @@ def scoped_tools(persona: Persona | None) -> list[dict]: def apply_feature_gates( - tools: list[dict], *, secrets_available: bool, artifacts_enabled: bool + tools: list[dict], + *, + secrets_available: bool, + artifacts_enabled: bool, + subagents_enabled: bool = True, ) -> list[dict]: """Drop tools whose backing feature is unavailable/disabled, so the model is never offered a capability it can't use (defence in depth — the tool handlers @@ -416,6 +465,8 @@ def apply_feature_gates( out = [t for t in out if t["name"] not in ("list_secrets", "request_secret")] if not artifacts_enabled: out = [t for t in out if t["name"] != "write_artifact"] + if not subagents_enabled: + out = [t for t in out if t["name"] != "spawn_subagent"] return out @@ -462,6 +513,8 @@ def __init__(self, config: Config, secret_store: SecretStore | None = None): self.voice: VoicePipeline | None = None self.job_store = JobStore(db_path="data/jobs.db") self.scheduler = AgentScheduler(self, self.job_store) + # Live registry of subagent runs (issue #15) — list/status/cancel. + self.subagents = SubagentRegistry() config_db = "data/config.db" self.permissions = PermissionEngine(db_path=config_db) self.prompt_capture: deque[dict[str, str]] = deque(maxlen=20) @@ -545,6 +598,7 @@ async def process( scoped_tools(persona), secrets_available=self.secret_store is not None, artifacts_enabled=self.config.artifacts.enabled, + subagents_enabled=self.config.subagents.enabled, ) # Static system prompt. In session mode it is snapshotted once at the @@ -961,7 +1015,9 @@ async def _process_injection( ) # Agentic loop — keep going while the LLM wants to call tools - request_state = self._new_request_state(persona) + request_state = self._new_request_state( + persona, origin={"channel": channel, "user_id": user_id, "chat_id": chat_id} + ) tool_log: list[dict] = [] while response.tool_calls: await self._batch_approve_writes(response.tool_calls, channel, user_id, request_state) @@ -1064,7 +1120,9 @@ async def _process_session( # Agentic loop — keep going while the LLM wants to call tools new_messages: list[dict] = [] - request_state = self._new_request_state(persona) + request_state = self._new_request_state( + persona, origin={"channel": channel, "user_id": user_id, "chat_id": chat_id} + ) tool_log: list[dict] = [] while response.tool_calls: await self._batch_approve_writes(response.tool_calls, channel, user_id, request_state) @@ -1326,6 +1384,12 @@ async def _execute_tool( ] } + if name == "spawn_subagent": + result = await self._tool_spawn_subagent(params, channel, user_id, request_state) + if is_write_action and self._is_tool_success(result): + executed_writes.add(write_sig) + return result + if name == "request_secret": return await self._tool_request_secret(params, channel, user_id, request_state) @@ -1338,12 +1402,20 @@ async def _execute_tool( return {"error": f"Unknown tool: {name}"} @staticmethod - def _new_request_state(persona: Persona | None = None) -> dict: + def _new_request_state( + persona: Persona | None = None, + *, + depth: int = 0, + origin: dict | None = None, + run_id: str | None = None, + ) -> dict: """Fresh per-turn state tracking write actions and approval decisions. ``allowed_skills`` carries the active persona's skill allowlist so ``load_skill`` can refuse skills outside scope (defence in depth — the - index already hides them). + index already hides them). ``depth``/``origin``/``persona_obj`` carry the + context a ``spawn_subagent`` call needs to narrow scope, cap recursion, + and post a background result back to the originating chat (issue #15). """ return { "executed_writes": set(), @@ -1353,6 +1425,11 @@ def _new_request_state(persona: Persona | None = None) -> dict: # Secret scope for {{secret:}} ACL in run_command (issue #19). "persona_secrets": list(persona.secrets) if persona else [], "persona_name": persona.name if persona else "", + # Subagent plumbing (issue #15). + "persona_obj": persona, + "depth": depth, + "origin": origin or {}, + "run_id": run_id, } @staticmethod @@ -1718,6 +1795,257 @@ async def _tool_recall_memory(self, params: dict, request_state: dict | None = N log.info("Tool call: recall_memory — %r (%d hits)", query, len(memories)) return {"query": query, "count": len(memories), "memories": memories} + # -- Subagents (issue #15) ------------------------------------------------ + + async def _tool_spawn_subagent( + self, params: dict, channel: str, user_id: str, request_state: dict + ) -> dict: + """``spawn_subagent`` tool: delegate a scoped subtask to a sub-loop.""" + task = str(params.get("task", "")).strip() + if not task: + return {"error": "Missing 'task' for spawn_subagent."} + origin = request_state.get("origin") or {} + return await self.run_subagent( + task=task, + persona_name=str(params.get("persona", "")).strip(), + origin_channel=origin.get("channel", channel), + origin_user_id=str(origin.get("user_id", user_id)), + origin_chat_id=str(origin.get("chat_id", "")), + parent_state=request_state, + background=bool(params.get("background", False)), + ) + + async def run_subagent( + self, + *, + task: str, + persona_name: str = "", + origin_channel: str = "", + origin_user_id: str = "", + origin_chat_id: str = "", + parent_state: dict | None = None, + background: bool = False, + ) -> dict: + """Run a subagent — the one primitive behind both the tool and scheduled + ``subagent`` jobs. Scope is narrowed from the caller (inherit-never-widen); + recursion depth and per-run budgets are enforced. + """ + cfg = self.config.subagents + if not cfg.enabled: + return {"error": "Subagents are disabled."} + parent_state = parent_state or {} + parent_depth = int(parent_state.get("depth", 0) or 0) + if parent_depth >= cfg.recursion_depth: + return { + "error": ( + f"Max subagent recursion depth ({cfg.recursion_depth}) reached; " + "do this work directly instead of spawning another subagent." + ) + } + + # Resolve + narrow the persona. A name must exist; with no name the child + # inherits the caller's identity and scope. + if persona_name: + requested = await self._load_persona(persona_name) + if requested is None: + return {"error": f"Persona not found: {persona_name}"} + else: + requested = parent_state.get("persona_obj") + child_persona = self._narrow_persona(requested, parent_state) if requested else None + + run_id = f"sub_{uuid.uuid4().hex[:8]}" + child_state = self._new_request_state( + child_persona, + depth=parent_depth + 1, + origin={ + "channel": origin_channel, + "user_id": origin_user_id, + "chat_id": origin_chat_id, + }, + run_id=run_id, + ) + run = SubagentRun( + run_id=run_id, + persona=child_persona.name if child_persona else "", + task=task, + depth=parent_depth + 1, + background=background, + origin_channel=origin_channel, + origin_user_id=origin_user_id, + origin_chat_id=origin_chat_id, + parent_id=parent_state.get("run_id"), + ) + + if background: + if self.subagents.active_count() >= cfg.max_concurrent: + return { + "error": ( + f"Too many subagents running (max {cfg.max_concurrent}). " + "Wait for one to finish or cancel it via /jobs." + ) + } + self.subagents.register(run) + bg = asyncio.create_task( + self._run_subagent_background(run, child_persona, child_state), + name=f"subagent-{run_id}", + ) + self.subagents.attach_task(run_id, bg) + log.info( + "Spawned background subagent %s (persona=%s)", run_id, run.persona or "default" + ) + return { + "ok": True, + "run_id": run_id, + "background": True, + "status": "running", + "persona": run.persona, + "note": "Running in the background; the result will be posted here when done.", + } + + # Synchronous: run to completion and return the result to the caller. + self.subagents.register(run) + log.info("Running subagent %s (persona=%s)", run_id, run.persona or "default") + try: + text = await self._run_subagent_loop(task, child_persona, child_state, run) + except Exception as exc: + log.exception("Subagent %s failed", run_id) + self.subagents.finish(run_id, "error", error=str(exc)) + return {"error": f"Subagent failed: {exc}", "run_id": run_id} + self.subagents.finish(run_id, "done", result=text) + return { + "ok": True, + "run_id": run_id, + "persona": run.persona, + "summary": short_summary(text), + "result": text, + } + + def _narrow_persona(self, requested: Persona, parent_state: dict) -> Persona: + """Build a child persona whose scopes are a subset of the caller's.""" + parent: Persona | None = parent_state.get("persona_obj") + p_skills = parent.skills if parent else [] + p_tools = parent.tools if parent else [] + p_secrets = parent.secrets if parent else [] + return Persona( + name=requested.name, + agent_name=requested.agent_name, + role=requested.role, + emoji=requested.emoji, + voice=requested.voice, + personalia=requested.personalia, + character=requested.character, + skills=narrow_scope(p_skills, requested.skills), + tools=narrow_scope(p_tools, requested.tools), + secrets=narrow_scope(p_secrets, requested.secrets), + ) + + async def _run_subagent_loop( + self, task: str, child_persona: Persona | None, child_state: dict, run: SubagentRun + ) -> str: + """The subagent's agentic loop — system semantics, budgeted and depth-capped. + + Mirrors the main injection loop but runs from a clean slate (no history), + skips approval/decomposition/memory/reflection (channel='system'), and + stops at the configured step/token budget. + """ + cfg = self.config.subagents + tools = apply_feature_gates( + scoped_tools(child_persona), + secrets_available=self.secret_store is not None, + artifacts_enabled=self.config.artifacts.enabled, + subagents_enabled=cfg.enabled, + ) + # At the depth ceiling a subagent may not spawn further — don't even offer it. + if child_state["depth"] >= cfg.recursion_depth: + tools = [t for t in tools if t["name"] != "spawn_subagent"] + + system = await self._build_system_prompt(query=task, persona=child_persona) + preamble = self._turn_preamble(None) + messages: list[dict] = [await self._build_user_message(task, None, preamble)] + + response = await self.llm.generate( + model=self.config.agent.model, + max_tokens=4096, + system=system, + messages=messages, + tools=cast(Any, tools), + ) + steps = 0 + tokens = self._usage_total(response.usage) + while response.tool_calls and steps < cfg.max_steps and tokens < cfg.token_budget: + steps += 1 + run.progress = f"step {steps}: {', '.join(c.name for c in response.tool_calls)}"[:120] + tool_results = [] + for call in response.tool_calls: + result = await self._execute_tool( + call, "system", run.origin_user_id or "subagent", child_state + ) + tool_results.append( + { + "type": "tool_result", + "tool_use_id": call.id, + "content": json.dumps(result), + } + ) + messages.append(self.llm.assistant_message(response)) + messages.extend(self.llm.tool_result_messages(tool_results)) + response = await self.llm.generate( + model=self.config.agent.model, + max_tokens=4096, + system=system, + messages=messages, + tools=cast(Any, tools), + ) + tokens += self._usage_total(response.usage) + + text = response.text or "" + if response.tool_calls: + text = (text + "\n\n[subagent stopped: reached its step/token budget]").strip() + run.progress = "done" + return text + + async def _run_subagent_background( + self, run: SubagentRun, child_persona: Persona | None, child_state: dict + ) -> None: + """Run a subagent off-turn and post the result back to its origin chat.""" + try: + text = await self._run_subagent_loop(run.task, child_persona, child_state, run) + except asyncio.CancelledError: + self.subagents.finish(run.run_id, "cancelled") + await self._deliver_subagent_result(run, "Subagent was cancelled.") + raise + except Exception as exc: + log.exception("Background subagent %s failed", run.run_id) + self.subagents.finish(run.run_id, "error", error=str(exc)) + await self._deliver_subagent_result(run, f"Subagent failed: {exc}") + return + self.subagents.finish(run.run_id, "done", result=text) + await self._deliver_subagent_result(run, text) + + async def _deliver_subagent_result(self, run: SubagentRun, text: str) -> None: + """Post a background subagent's result to the chat that started it.""" + ch = self.channels.get(run.origin_channel) + if not ch or not run.origin_chat_id: + log.warning( + "Subagent %s result not delivered (channel=%r, chat=%r)", + run.run_id, + run.origin_channel, + run.origin_chat_id, + ) + return + label = run.persona or "default" + try: + await ch.send(run.origin_chat_id, f"🤖 Subagent ({label}) finished:\n\n{text}") + except Exception: + log.exception("Failed to deliver subagent %s result", run.run_id) + + @staticmethod + def _usage_total(usage: dict | None) -> int: + """Best-effort token count for budgeting (0 when the provider omits usage).""" + if not usage: + return 0 + return int(usage.get("input_tokens", 0) or 0) + int(usage.get("output_tokens", 0) or 0) + async def _tool_web_search(self, params: dict) -> dict: """Search the web via Tavily API.""" if not self.search_client: diff --git a/core/config.py b/core/config.py index 9082300..15b2ec6 100644 --- a/core/config.py +++ b/core/config.py @@ -145,6 +145,7 @@ class SchedulerJob(BaseModel): task: str channel: str = "telegram" type: str = "agent" + persona: str = "" # for type="subagent": the persona the run adopts class SchedulerConfig(BaseModel): @@ -302,6 +303,21 @@ class ArtifactsConfig(BaseModel): ttl_hours: int = 168 # 7 days; 0 = keep forever (no auto-cleanup) +class SubagentsConfig(BaseModel): + """Subagents — scoped sub-loops the agent can delegate to (see core/subagents.py). + + Defaults are deliberately conservative so spawning works out of the box + without runaway recursion or cost: a top-level spawn is depth 1, a subagent + spawning a subagent is depth 2, and so on up to ``recursion_depth``. + """ + + enabled: bool = True + recursion_depth: int = 3 # max nesting; spawns are refused beyond this + max_steps: int = 12 # max tool-call rounds per run (hard stop) + token_budget: int = 100_000 # approx token ceiling per run (best-effort) + max_concurrent: int = 3 # max background runs at once + + class Config(BaseModel): agent: AgentConfig = AgentConfig() channels: ChannelsConfig = ChannelsConfig() @@ -320,6 +336,7 @@ class Config(BaseModel): prompt: PromptConfig = PromptConfig() tools: ToolsConfig = ToolsConfig() artifacts: ArtifactsConfig = ArtifactsConfig() + subagents: SubagentsConfig = SubagentsConfig() def load_config(path: str | Path = "config.yml") -> Config: diff --git a/core/job_store.py b/core/job_store.py index cdacad3..1514372 100644 --- a/core/job_store.py +++ b/core/job_store.py @@ -30,13 +30,20 @@ status TEXT NOT NULL DEFAULT 'active', created_by TEXT NOT NULL DEFAULT 'admin', description TEXT NOT NULL DEFAULT '', + persona TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); """ -# Valid values -VALID_TYPES = ("agent", "agent_silent", "system", "memory_consolidation") +# Additive migrations for DBs created before a column existed. Each is a column +# name → ALTER statement; applied only when the column is missing. +_MIGRATIONS = { + "persona": "ALTER TABLE jobs ADD COLUMN persona TEXT NOT NULL DEFAULT ''", +} + +# Valid values. "subagent" runs the spawn_subagent primitive under ``persona``. +VALID_TYPES = ("agent", "agent_silent", "system", "memory_consolidation", "subagent") VALID_SCHEDULES = ("cron", "once") VALID_STATUSES = ("active", "paused", "done", "cancelled") @@ -59,6 +66,12 @@ async def _ensure_schema(self) -> None: Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) async with aiosqlite.connect(self.db_path) as db: await db.executescript(_SCHEMA) + cursor = await db.execute("PRAGMA table_info(jobs)") + cols = {row[1] for row in await cursor.fetchall()} + for col, stmt in _MIGRATIONS.items(): + if col not in cols: + await db.execute(stmt) + await db.commit() self._ready = True # -- Sync helpers (for use from non-async contexts like CLI) -- @@ -69,6 +82,11 @@ def _ensure_schema_sync(self) -> None: Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) with sqlite3.connect(self.db_path) as db: db.executescript(_SCHEMA) + cols = {row[1] for row in db.execute("PRAGMA table_info(jobs)").fetchall()} + for col, stmt in _MIGRATIONS.items(): + if col not in cols: + db.execute(stmt) + db.commit() self._ready = True def list_jobs_sync(self, status: str | None = None, include_done: bool = False) -> list[dict]: @@ -111,6 +129,7 @@ def upsert_job_sync( status: str = "active", created_by: str = "admin", description: str = "", + persona: str = "", ) -> dict: """Synchronous upsert for CLI/admin use.""" self._ensure_schema_sync() @@ -118,8 +137,8 @@ def upsert_job_sync( db.row_factory = sqlite3.Row db.execute( """INSERT INTO jobs (id, type, schedule, cron, run_at, task, channel, - status, created_by, description, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) + status, created_by, description, persona, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) ON CONFLICT(id) DO UPDATE SET type = excluded.type, schedule = excluded.schedule, @@ -129,6 +148,7 @@ def upsert_job_sync( channel = excluded.channel, status = excluded.status, description = excluded.description, + persona = excluded.persona, updated_at = datetime('now') """, ( @@ -142,6 +162,7 @@ def upsert_job_sync( status, created_by, description, + persona, ), ) db.commit() @@ -200,6 +221,7 @@ async def upsert_job( status: str = "active", created_by: str = "admin", description: str = "", + persona: str = "", ) -> dict: """Insert or update a job. Returns the job dict.""" await self._ensure_schema() @@ -207,8 +229,8 @@ async def upsert_job( db.row_factory = aiosqlite.Row await db.execute( """INSERT INTO jobs (id, type, schedule, cron, run_at, task, channel, - status, created_by, description, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) + status, created_by, description, persona, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) ON CONFLICT(id) DO UPDATE SET type = excluded.type, schedule = excluded.schedule, @@ -218,6 +240,7 @@ async def upsert_job( channel = excluded.channel, status = excluded.status, description = excluded.description, + persona = excluded.persona, updated_at = datetime('now') """, ( @@ -231,6 +254,7 @@ async def upsert_job( status, created_by, description, + persona, ), ) await db.commit() @@ -271,8 +295,8 @@ async def seed_from_config(self, jobs: list[dict]) -> int: continue await db.execute( """INSERT INTO jobs (id, type, schedule, cron, task, channel, - status, created_by, description) - VALUES (?, ?, 'cron', ?, ?, ?, 'active', 'config', '') + status, created_by, description, persona) + VALUES (?, ?, 'cron', ?, ?, ?, 'active', 'config', '', ?) """, ( job["id"], @@ -280,6 +304,7 @@ async def seed_from_config(self, jobs: list[dict]) -> int: job["cron"], job.get("task", ""), job.get("channel", "telegram"), + job.get("persona", ""), ), ) inserted += 1 diff --git a/core/permissions.py b/core/permissions.py index 00d3863..06a99b3 100644 --- a/core/permissions.py +++ b/core/permissions.py @@ -128,6 +128,9 @@ class PermissionLevel: "run_command:himalaya*move*": "ASK", "schedule_task": "ASK", "manage_jobs": "ASK", + # Delegating to a subagent is approved once per spawn; the subagent then runs + # autonomously within its narrowed scope (system semantics), like a job. + "spawn_subagent": "ASK", # Publishing inline content the agent authored is low-risk and reversible # (TTL cleanup) — no prompt, like web_search / load_skill. But copying an # on-disk file to a public URL can expose data the agent didn't author, so @@ -233,6 +236,7 @@ def is_write_action(self, tool_name: str, params: dict | None = None) -> bool: "create_calendar_event", "schedule_task", "manage_jobs", + "spawn_subagent", }: return True diff --git a/core/scheduler.py b/core/scheduler.py index b1fd0e2..776cf23 100644 --- a/core/scheduler.py +++ b/core/scheduler.py @@ -4,11 +4,12 @@ APScheduler runs in-memory only (no SQLAlchemy jobstore) and is re-synced from the JobStore whenever jobs change. -Three job types: +Job types: - "agent" / "agent_silent": natural-language task -> agent.process() -> send result to channel - "system": raw CLI command -> executor.run_command_trusted() - "memory_consolidation": review short-term memories, promote worthy ones to long-term, delete expired entries (uses a lightweight LLM call) + - "subagent": run agent.run_subagent() under a persona -> send result to channel (issue #15) """ from __future__ import annotations @@ -103,6 +104,46 @@ async def run_agent_task( log.info("One-shot job %r marked as done", job_id) +async def run_subagent_task( + persona: str, + task: str, + channel: str = "telegram", + job_id: str | None = None, +) -> None: + """Fire a scheduled subagent run and deliver its result to the channel.""" + agent = _get_agent_context() + if agent is None: + log.error("Scheduler subagent task dropped; agent not initialized") + return + + owner = _get_owner_chat_id(agent, channel) + log.info("Scheduler running subagent (persona=%s): %s", persona or "default", task[:100]) + try: + result = await agent.run_subagent( + task=task, + persona_name=persona or "", + origin_channel=channel, + origin_user_id=str(owner or "scheduler"), + origin_chat_id=str(owner or ""), + background=False, + ) + text = result.get("result") or result.get("error") or "" + ch = agent.channels.get(channel) + if ch and text and owner: + await ch.send(owner, text) + elif not ch: + log.warning("Scheduler: channel %r not registered, subagent result dropped", channel) + except Exception: + log.exception("Scheduler subagent task failed: %s", task[:100]) + + # Mark one-shot jobs as done + if job_id and agent.job_store: + job = await agent.job_store.get_job(job_id) + if job and job.get("schedule") == "once" and job.get("status") == "active": + await agent.job_store.update_status(job_id, "done") + log.info("One-shot job %r marked as done", job_id) + + async def run_system_command(command: str) -> None: """Execute a raw CLI command (e.g. memory cleanup).""" agent = _get_agent_context() @@ -212,6 +253,7 @@ def _register_job(self, job: dict) -> None: schedule = job.get("schedule", "cron") task = job.get("task", "") channel = job.get("channel", "telegram") + persona = job.get("persona", "") silent = job_type == "agent_silent" try: @@ -253,6 +295,20 @@ def _register_job(self, job: dict) -> None: run_date=run_at, replace_existing=True, ) + elif job_type == "subagent": + self.scheduler.add_job( + run_subagent_task, + "date", + id=job_id, + run_date=run_at, + kwargs={ + "persona": persona, + "task": task, + "channel": channel, + "job_id": job_id, + }, + replace_existing=True, + ) else: self.scheduler.add_job( run_agent_task, @@ -292,6 +348,20 @@ def _register_job(self, job: dict) -> None: replace_existing=True, **cron_kwargs, ) + elif job_type == "subagent": + self.scheduler.add_job( + run_subagent_task, + "cron", + id=job_id, + kwargs={ + "persona": persona, + "task": task, + "channel": channel, + "job_id": job_id, + }, + replace_existing=True, + **cron_kwargs, + ) else: self.scheduler.add_job( run_agent_task, diff --git a/core/subagents.py b/core/subagents.py new file mode 100644 index 0000000..00236a0 --- /dev/null +++ b/core/subagents.py @@ -0,0 +1,162 @@ +"""Subagents — scoped sub-loops the main agent can delegate to (issue #15). + +A *subagent* is one execution primitive (``AgentCore.run_subagent``) reached by +two trigger paths: on demand via the ``spawn_subagent`` tool, or on a schedule +via a ``subagent`` job. Either way it runs the existing agent loop with **system +semantics** (no goal decomposition / memory / reflection / approval prompts) and +a persona whose tool/skill/secret scope is a *subset* of the caller's — never +wider (``narrow_scope``). + +This module holds only the lifecycle bookkeeping: a small in-memory registry of +runs so list / status / cancel work from Telegram and the admin UI. Runs are +ephemeral (an ``asyncio`` task each) — a restart kills them, so an in-memory +registry is the right scope; nothing here needs to survive a reboot. +""" + +from __future__ import annotations + +import asyncio +import time +from collections import OrderedDict +from dataclasses import dataclass, field + +# Keep at most this many finished runs for after-the-fact inspection. +_MAX_FINISHED = 50 + + +def narrow_scope(parent: list[str] | None, child: list[str] | None) -> list[str]: + """Intersect a child scope with the parent's — inherit, never widen. + + The allowlist convention (see :class:`core.personae.Persona`) is ``[]``/``None`` + = *all*. So an empty parent means "no restriction → the child's own scope + applies", an empty child means "unspecified → inherit the parent's", and when + both list names the result is their intersection (the child can never gain a + name the parent lacked). + """ + p = parent or [] + c = child or [] + if not p: + return list(c) + if not c: + return list(p) + return [x for x in c if x in p] + + +@dataclass(slots=True) +class SubagentRun: + """One subagent execution and its live status.""" + + run_id: str + persona: str + task: str + depth: int = 1 + background: bool = False + status: str = "running" # running | done | cancelled | error + progress: str = "" + result: str = "" + error: str = "" + started_at: float = field(default_factory=time.time) + finished_at: float = 0.0 + origin_channel: str = "" + origin_user_id: str = "" + origin_chat_id: str = "" + parent_id: str | None = None + _task: asyncio.Task | None = field(default=None, repr=False, compare=False) + + @property + def elapsed(self) -> float: + return (self.finished_at or time.time()) - self.started_at + + @property + def elapsed_str(self) -> str: + secs = int(self.elapsed) + if secs < 60: + return f"{secs}s" + return f"{secs // 60}m {secs % 60}s" + + def to_dict(self) -> dict: + return { + "run_id": self.run_id, + "persona": self.persona, + "task": self.task, + "depth": self.depth, + "background": self.background, + "status": self.status, + "progress": self.progress, + "result": self.result, + "error": self.error, + "elapsed": round(self.elapsed, 1), + "elapsed_str": self.elapsed_str, + "parent_id": self.parent_id, + } + + +class SubagentRegistry: + """In-memory registry of subagent runs for list / status / cancel.""" + + def __init__(self) -> None: + self._runs: OrderedDict[str, SubagentRun] = OrderedDict() + + def register(self, run: SubagentRun) -> None: + self._runs[run.run_id] = run + self._trim() + + def attach_task(self, run_id: str, task: asyncio.Task) -> None: + run = self._runs.get(run_id) + if run: + run._task = task + + def get(self, run_id: str) -> SubagentRun | None: + return self._runs.get(run_id) + + def active_count(self) -> int: + return sum(1 for r in self._runs.values() if r.status == "running") + + def list_runs(self, active_only: bool = False) -> list[SubagentRun]: + runs = [r for r in self._runs.values() if not active_only or r.status == "running"] + return sorted(runs, key=lambda r: r.started_at, reverse=True) + + def finish(self, run_id: str, status: str, *, result: str = "", error: str = "") -> None: + run = self._runs.get(run_id) + if not run: + return + run.status = status + run.result = result + run.error = error + run.finished_at = time.time() + self._trim() + + def cancel(self, run_id: str) -> bool: + """Request cancellation of a running subagent. Returns False if it is not + running (unknown id or already finished).""" + run = self._runs.get(run_id) + if not run or run.status != "running": + return False + if run._task and not run._task.done(): + run._task.cancel() + # The task's CancelledError handler flips status to "cancelled"; set it + # eagerly too so a sync caller / the next poll sees it immediately. + run.status = "cancelled" + run.finished_at = time.time() + return True + + def _trim(self) -> None: + """Drop the oldest finished runs once we exceed the cap (running runs are + always kept).""" + finished = [rid for rid, r in self._runs.items() if r.status != "running"] + excess = len(finished) - _MAX_FINISHED + for rid in finished[:excess] if excess > 0 else []: + self._runs.pop(rid, None) + + +def short_summary(text: str, limit: int = 280) -> str: + """A one-glance summary of a subagent's result — first non-empty line, capped. + + ponytail: a truncation, not an LLM call. Add a summariser model only if the + plain preview proves too lossy in practice. + """ + for line in (text or "").splitlines(): + line = line.strip() + if line: + return line[:limit] + ("…" if len(line) > limit else "") + return (text or "").strip()[:limit] From 1c2776e776d185ccb4ea3f850cf2620888856df8 Mon Sep 17 00:00:00 2001 From: Matteo Merola Date: Sun, 28 Jun 2026 00:54:28 +0200 Subject: [PATCH 02/13] feat(subagents): Telegram /jobs + cancel, admin runs view, scheduled-job form - Telegram: /jobs lists active subagent runs with inline Cancel buttons; the command is registered ahead of the text handler so it isn't sent to the agent. - Admin: Jobs tab gains a responsive subagent-runs card grid (status, persona, elapsed, progress, cancel) polled every 3s; /partials/subagent-runs + /subagents/cancel routes. - Admin Jobs form: 'subagent' job type + persona field, validated and persisted. --- api/admin.py | 45 ++++++++++++++++++++++- api/templates/partials/jobs.html | 22 ++++++++++- api/templates/partials/subagent_runs.html | 35 ++++++++++++++++++ channels/telegram.py | 43 +++++++++++++++++++++- 4 files changed, 140 insertions(+), 5 deletions(-) create mode 100644 api/templates/partials/subagent_runs.html diff --git a/api/admin.py b/api/admin.py index c777a2b..2e06e99 100644 --- a/api/admin.py +++ b/api/admin.py @@ -1423,6 +1423,7 @@ def _get_jobs_list() -> list[dict]: "run_at": j.get("run_at", ""), "description": j.get("description", ""), "created_by": j.get("created_by", ""), + "persona": j.get("persona", ""), } ) return jobs @@ -1451,6 +1452,7 @@ async def upsert_job(request: Request) -> HTMLResponse: task = str(body.get("task", "")).strip() channel = str(body.get("channel", "telegram")).strip() description = str(body.get("description", "")).strip() + persona = str(body.get("persona", "")).strip() if not job_id: raise HTTPException(400, "Job ID is required") @@ -1462,10 +1464,10 @@ async def upsert_job(request: Request) -> HTMLResponse: else: if not run_at: raise HTTPException(400, "Run-at datetime is required for one-time jobs") - if job_type not in ("agent", "agent_silent", "system", "memory_consolidation"): + if job_type not in ("agent", "agent_silent", "system", "memory_consolidation", "subagent"): raise HTTPException(400, f"Invalid job type: {job_type}") if job_type != "memory_consolidation" and not task: - raise HTTPException(400, "Task is required for agent/system jobs") + raise HTTPException(400, "Task is required for agent/system/subagent jobs") if schedule == "cron": from core.scheduler import _parse_cron @@ -1494,6 +1496,7 @@ async def upsert_job(request: Request) -> HTMLResponse: status="active", created_by="admin", description=description, + persona=persona, ) # Sync with APScheduler if the agent is running @@ -1588,6 +1591,44 @@ async def run_job_now(request: Request) -> HTMLResponse: "for output" ) + # ── Subagent runs (issue #15) ────────────────────────────────────── + + def _subagent_runs() -> list: + """Live subagent runs from the running agent (newest first).""" + agent = agent_state.agent + if not agent: + return [] + return agent.subagents.list_runs() + + @app.get("/partials/subagent-runs", dependencies=[Depends(auth)]) + async def partial_subagent_runs() -> HTMLResponse: + """Subagent runs card grid — polled by the Jobs tab for live status.""" + return _render_partial( + "partials/subagent_runs.html", + runs=_subagent_runs(), + agent_running=agent_state.agent is not None, + ) + + @app.post("/subagents/cancel", dependencies=[Depends(auth)]) + async def cancel_subagent(request: Request) -> HTMLResponse: + """Cancel a running subagent. Returns the refreshed runs partial.""" + agent = agent_state.agent + if not agent: + raise HTTPException(503, "Agent not running") + content_type = request.headers.get("content-type", "") + if "application/x-www-form-urlencoded" in content_type: + body = await request.form() + else: + body = await request.json() + run_id = str(body.get("run_id", "")).strip() + if not run_id: + raise HTTPException(400, "Missing 'run_id' in request body") + agent.subagents.cancel(run_id) + log.info("Subagent %r cancelled via admin", run_id) + return _render_partial( + "partials/subagent_runs.html", runs=_subagent_runs(), agent_running=True + ) + # ── Config API ───────────────────────────────────────────────────── @app.get("/config", dependencies=[Depends(auth)]) diff --git a/api/templates/partials/jobs.html b/api/templates/partials/jobs.html index 8c01560..d515cbd 100644 --- a/api/templates/partials/jobs.html +++ b/api/templates/partials/jobs.html @@ -10,6 +10,15 @@ {% endif %} + {# Subagent runs (issue #15) — live status, polled every 3s #} +
+

Subagent runs

+
+

Loading…

+
+
+ {# Jobs table #}
@@ -119,9 +128,16 @@

agent — natural language task + +
+ + + Persona the subagent adopts (blank = default identity) +
+
+ +

+ Let the assistant delegate a scoped subtask to a sub-loop (the + spawn_subagent tool) under a chosen persona, on demand or + scheduled. A child's tools/skills/secrets are a subset of the caller's — + never wider. When disabled, the tool is hidden from the assistant (and from + the Personae tool scope). Assign it per persona in the + Personae editor; monitor and cancel runs in the + Jobs tab. +

+
+
+ + +

Max nesting; a top-level spawn is depth 1.

+
+
+ + +

Tool-call rounds per run — a hard stop.

+
+
+ + +

Approx token ceiling per run (best-effort).

+
+
+ + +

Background runs allowed at once.

+
+
+
+ +
+ + + {# GitHub CLI (gh) #}
diff --git a/docs/content/docs/subagents.mdx b/docs/content/docs/subagents.mdx index 7a6a529..e7096af 100644 --- a/docs/content/docs/subagents.mdx +++ b/docs/content/docs/subagents.mdx @@ -76,19 +76,22 @@ the subagent, so a blocked action stays blocked at any depth. ## Guardrails -Sensible defaults keep delegation safe and bounded — tune them under -`subagents` in `config.yml`: +Sensible defaults keep delegation safe and bounded. Edit them in the admin +**Tools** tab (the Subagents card — the master switch and every +limit, applied live without a restart), or under `subagents` in `config.yml`: | Setting | Default | What it does | |---------|---------|--------------| -| `enabled` | `true` | Master switch; off removes the `spawn_subagent` tool | +| `enabled` | `true` | Master switch; off hides the `spawn_subagent` tool everywhere | | `recursion_depth` | `3` | Max nesting (a top-level spawn is depth 1) | | `max_steps` | `12` | Max tool-call rounds per run — a hard stop | | `token_budget` | `100000` | Approx token ceiling per run (best-effort) | | `max_concurrent` | `3` | Max background runs at once | -A run that hits its step or token budget stops and returns what it has, with a -note that the budget was reached. +When disabled, the tool is hidden from the assistant and from the Personae tool +scope. Assign it per persona in the **Personae** editor. A run that hits its step +or token budget stops and returns what it has, with a note that the budget was +reached. ## Monitoring & cancelling diff --git a/tests/test_subagents.py b/tests/test_subagents.py index e588bc5..28164f3 100644 --- a/tests/test_subagents.py +++ b/tests/test_subagents.py @@ -259,6 +259,27 @@ def test_narrow_persona_intersects_scopes(agent) -> None: # --------------------------------------------------------------------------- +def test_disabled_drops_spawn_subagent_from_llm_tools() -> None: + from core.agent import TOOLS, apply_feature_gates + + def names(ts): + return {t["name"] for t in ts} + + base = dict(secrets_available=True, artifacts_enabled=True) + assert "spawn_subagent" in names(apply_feature_gates(TOOLS, **base, subagents_enabled=True)) + assert "spawn_subagent" not in names( + apply_feature_gates(TOOLS, **base, subagents_enabled=False) + ) + + +def test_disabled_hides_spawn_subagent_from_persona_scope() -> None: + from api.admin import GATEABLE_TOOLS, gateable_tools_for + + assert "spawn_subagent" in gateable_tools_for(True) + assert set(gateable_tools_for(True)) == set(GATEABLE_TOOLS) + assert "spawn_subagent" not in gateable_tools_for(True, subagents_enabled=False) + + def test_subagent_is_valid_job_type() -> None: assert "subagent" in VALID_TYPES From 51dc9f28fb76fd24e4751ad3ebde0940b85fb498 Mon Sep 17 00:00:00 2001 From: Matteo Merola Date: Sun, 28 Jun 2026 23:09:14 +0200 Subject: [PATCH 07/13] fix(subagents): show background run status to the spawning agent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A background subagent's result is delivered to the chat out-of-band (a direct ch.send), so it never entered the spawning agent's context — the agent couldn't tell a finished run from a pending one and would confidently claim runs were 'still running' (it even reached for manage_jobs, which only knows scheduled jobs, not the in-memory subagent registry). Fix: each turn, inject a block into the user-message preamble listing this chat's runs — running ones every turn, a finished one once with its result summary (SubagentRegistry.updates_for, gated to the chat's channel+chat_id). No new tool, no history-alternation hazard; the preamble is sent to the model but not persisted. The background-spawn return note now tells the agent results auto-post and it needn't relay them. Tests: updates_for chat-scoping + report-finish-once; _subagent_status_note. --- core/agent.py | 42 ++++++++++++++++++++++++++++++++++++++++- core/subagents.py | 21 +++++++++++++++++++++ tests/test_subagents.py | 38 +++++++++++++++++++++++++++++++++++++ 3 files changed, 100 insertions(+), 1 deletion(-) diff --git a/core/agent.py b/core/agent.py index efdcfab..e34d3e1 100644 --- a/core/agent.py +++ b/core/agent.py @@ -593,6 +593,13 @@ async def process( persona=persona, session_key=session_key, ) + # Append the status of still-running background subagents from this chat, + # so the agent always knows what is pending (their results are folded into + # the conversation history when they finish). (#15) + if channel != "system": + note = self._subagent_status_note(channel, chat_id) + if note: + preamble = f"{preamble}\n\n{note}" tools = apply_feature_gates( scoped_tools(persona), @@ -816,6 +823,35 @@ async def _skills_block_in_history(self, session_key: tuple[str, str, str], bloc return True return False + def _subagent_status_note(self, channel: str, chat_id: str) -> str: + """Status of background subagents spawned from this chat, for the preamble. + + Running runs appear every turn; a finished run appears once (the turn + after it completes) so the agent learns the outcome without re-announcing + it forever. Their results are also delivered to the chat directly. (#15) + """ + runs = self.subagents.updates_for(channel, chat_id) + if not runs: + return "" + lines = [] + for r in runs: + who = f"- [{r.run_id}] {r.persona or 'default'} — {r.status} ({r.elapsed_str})" + if r.status == "running": + lines.append(f"{who}; {r.progress}" if r.progress else who) + else: + lines.append(f"{who}: {short_summary(r.result or r.error or '', 200)}") + body = "\n".join(lines) + return ( + "\n" + "Background subagents you spawned from this chat. Their results are " + "posted to this chat directly as they finish, so you do not need to " + "relay them.\n" + f"{body}\n" + "Trust this status: never say a subagent is still running if it shows " + "done, cancelled, or error here.\n" + "" + ) + async def _session_system_prompt( self, channel: str, @@ -1904,7 +1940,11 @@ async def run_subagent( "background": True, "status": "running", "persona": run.persona, - "note": "Running in the background; the result will be posted here when done.", + "note": ( + "Running in the background; its result is posted to this chat " + "automatically when done — you don't relay it. Each later turn " + "shows this run's status until it finishes." + ), } # Synchronous: run to completion and return the result to the caller. diff --git a/core/subagents.py b/core/subagents.py index 1797607..e494f9d 100644 --- a/core/subagents.py +++ b/core/subagents.py @@ -60,6 +60,9 @@ class SubagentRun: origin_channel: str = "" origin_user_id: str = "" origin_chat_id: str = "" + # True once a *finished* run has been surfaced to the spawning agent's + # context, so a completion is reported to it exactly once (see updates_for). + notified: bool = False _task: asyncio.Task | None = field(default=None, repr=False, compare=False) @property @@ -99,6 +102,24 @@ def list_runs(self, active_only: bool = False) -> list[SubagentRun]: runs = [r for r in self._runs.values() if not active_only or r.status == "running"] return sorted(runs, key=lambda r: r.started_at, reverse=True) + def updates_for(self, channel: str, chat_id: str) -> list[SubagentRun]: + """Runs for one chat the spawning agent should be reminded of: every run + still in flight, plus any that finished since it was last reminded. + + A finished run is returned once and then marked ``notified`` so the agent + is told of its completion exactly once; running runs are returned every + turn so the agent never claims a still-pending run is done.""" + out = [] + for r in self._runs.values(): + if r.origin_channel != channel or r.origin_chat_id != chat_id: + continue + if r.status == "running": + out.append(r) + elif not r.notified: + r.notified = True + out.append(r) + return sorted(out, key=lambda r: r.started_at) + def finish(self, run_id: str, status: str, *, result: str = "", error: str = "") -> bool: """Move a *running* run to a terminal state. Returns False (a no-op) when the run is unknown or already finished, so a terminal state is sticky — diff --git a/tests/test_subagents.py b/tests/test_subagents.py index 28164f3..1dc695c 100644 --- a/tests/test_subagents.py +++ b/tests/test_subagents.py @@ -85,6 +85,23 @@ def test_short_summary_first_nonempty_line_capped() -> None: assert len(short_summary("x" * 400)) == 281 # 280 + ellipsis +def test_updates_for_filters_by_chat_and_reports_finish_once() -> None: + reg = SubagentRegistry() + here = SubagentRun(run_id="a", persona="", task="t", origin_channel="tg", origin_chat_id="1") + other = SubagentRun(run_id="b", persona="", task="t", origin_channel="tg", origin_chat_id="2") + reg.register(here) + reg.register(other) + + # Running runs appear every turn; the other chat's run is never included. + assert [r.run_id for r in reg.updates_for("tg", "1")] == ["a"] + assert [r.run_id for r in reg.updates_for("tg", "1")] == ["a"] + + # After it finishes it appears exactly once more, then never again. + reg.finish("a", "done", result="answer") + assert [r.run_id for r in reg.updates_for("tg", "1")] == ["a"] + assert reg.updates_for("tg", "1") == [] + + # --------------------------------------------------------------------------- # AgentCore.run_subagent — built with a scripted fake LLM (no network) # --------------------------------------------------------------------------- @@ -254,6 +271,27 @@ def test_narrow_persona_intersects_scopes(agent) -> None: assert child.secrets == [] # 'y' not in parent's ['x'] +def test_subagent_status_note_reports_this_chats_runs(agent) -> None: + agent.subagents.register( + SubagentRun( + run_id="r1", + persona="coding-helper", + task="t", + origin_channel="repl", + origin_chat_id="repl", + progress="step 2", + ) + ) + agent.subagents.finish("r1", "done", result="the iPhone 17e is CHF 599") + + note = agent._subagent_status_note("repl", "repl") + assert "r1" in note and "done" in note and "CHF 599" in note + # Scoped to the chat: a different chat sees nothing. + assert agent._subagent_status_note("repl", "other-chat") == "" + # Reported once: the next turn no longer repeats the finished run. + assert agent._subagent_status_note("repl", "repl") == "" + + # --------------------------------------------------------------------------- # Scheduled subagent jobs # --------------------------------------------------------------------------- From ba73aa192c7b1ce27602bd2382d481ce78fb6eea Mon Sep 17 00:00:00 2001 From: Matteo Merola Date: Sun, 28 Jun 2026 23:28:12 +0200 Subject: [PATCH 08/13] fix(subagents): fold background results into the conversation, not just out-of-band MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A background subagent's result was only ch.send to the chat — the spawning agent never saw it, so it couldn't reason about or recall it. Now the result is ALSO recorded into the originating chat's history as an assistant turn, so it becomes a first-class part of the conversation the agent reads on every later turn (and the agent's memory matches what the user saw). - history: append_to_last_turn / append_to_last_session_message merge the result into the trailing assistant turn, preserving strict user/assistant alternation for providers that require it (both injection and session modes); a fresh assistant turn is added only when the last turn isn't the assistant's. - _deliver_subagent_result now delivers AND records. - The turn preamble now lists only *running* runs (status awareness while pending); finished runs live in history instead of an ephemeral once-only note. Tests cover the merge (alternation kept), history persistence, and running-only preamble. 481 passing. --- core/agent.py | 67 ++++++++++++++++++++++----------- core/history.py | 66 ++++++++++++++++++++++++++++++++ core/subagents.py | 31 ++++++--------- docs/content/docs/subagents.mdx | 5 ++- tests/test_subagents.py | 52 +++++++++++++++++++------ 5 files changed, 169 insertions(+), 52 deletions(-) diff --git a/core/agent.py b/core/agent.py index e34d3e1..4068624 100644 --- a/core/agent.py +++ b/core/agent.py @@ -824,31 +824,26 @@ async def _skills_block_in_history(self, session_key: tuple[str, str, str], bloc return False def _subagent_status_note(self, channel: str, chat_id: str) -> str: - """Status of background subagents spawned from this chat, for the preamble. - - Running runs appear every turn; a finished run appears once (the turn - after it completes) so the agent learns the outcome without re-announcing - it forever. Their results are also delivered to the chat directly. (#15) + """List background subagents from this chat that are *still running*, for + the turn preamble — so the agent knows what is pending. A run's result is + folded into the chat history when it finishes, so completed runs are part + of the conversation itself and need no mention here. (#15) """ - runs = self.subagents.updates_for(channel, chat_id) + runs = self.subagents.running_for(channel, chat_id) if not runs: return "" lines = [] for r in runs: - who = f"- [{r.run_id}] {r.persona or 'default'} — {r.status} ({r.elapsed_str})" - if r.status == "running": - lines.append(f"{who}; {r.progress}" if r.progress else who) - else: - lines.append(f"{who}: {short_summary(r.result or r.error or '', 200)}") + who = f"- [{r.run_id}] {r.persona or 'default'} — running ({r.elapsed_str})" + lines.append(f"{who}; {r.progress}" if r.progress else who) body = "\n".join(lines) return ( "\n" - "Background subagents you spawned from this chat. Their results are " - "posted to this chat directly as they finish, so you do not need to " - "relay them.\n" + "Background subagents you spawned from this chat are still running. " + "Their results are posted to this chat (and added to this conversation) " + "automatically when each finishes, so you needn't relay them.\n" f"{body}\n" - "Trust this status: never say a subagent is still running if it shows " - "done, cancelled, or error here.\n" + "Don't claim one of these is finished until its result appears above.\n" "" ) @@ -2070,21 +2065,51 @@ async def _run_subagent_background( await self._deliver_subagent_result(run, text) async def _deliver_subagent_result(self, run: SubagentRun, text: str) -> None: - """Post a background subagent's result to the chat that started it.""" + """Post a background subagent's result to the chat that started it AND fold + it into that chat's history, so the spawning agent both shows it to the + user now and remembers it natively on later turns (#15).""" + label = run.persona or "default" + framed = f"🤖 Subagent ({label}) finished:\n\n{text}" ch = self.channels.get(run.origin_channel) - if not ch or not run.origin_chat_id: + if ch and run.origin_chat_id: + try: + await ch.send(run.origin_chat_id, framed) + except Exception: + log.exception("Failed to deliver subagent %s result", run.run_id) + else: log.warning( "Subagent %s result not delivered (channel=%r, chat=%r)", run.run_id, run.origin_channel, run.origin_chat_id, ) + await self._record_subagent_in_history(run, framed) + + async def _record_subagent_in_history(self, run: SubagentRun, framed: str) -> None: + """Record a background subagent's result as an assistant turn in the + originating chat — merged into the trailing assistant turn so replayed + history stays strictly alternating for providers that require it.""" + channel, user_id, chat_id = run.origin_channel, run.origin_user_id, run.origin_chat_id + # Only real user chats have a history; skip system/scheduler-origin runs. + if not chat_id or channel == "system": return - label = run.persona or "default" try: - await ch.send(run.origin_chat_id, f"🤖 Subagent ({label}) finished:\n\n{text}") + if self.history_mode == "session": + merged = await self.history.append_to_last_session_message( + channel, user_id, f"\n\n{framed}", chat_id + ) + if not merged: + await self.history.append_session_message( + channel, user_id, {"role": "assistant", "content": framed}, chat_id + ) + else: + merged = await self.history.append_to_last_turn( + channel, user_id, "assistant", f"\n\n{framed}", chat_id + ) + if not merged: + await self.history.add_turn(channel, user_id, "assistant", framed, chat_id) except Exception: - log.exception("Failed to deliver subagent %s result", run.run_id) + log.exception("Failed to record subagent %s result in history", run.run_id) @staticmethod def _usage_total(usage: dict | None) -> int: diff --git a/core/history.py b/core/history.py index 9648b07..cf8e95a 100644 --- a/core/history.py +++ b/core/history.py @@ -281,6 +281,72 @@ async def replace_session( ) await db.commit() + async def append_to_last_turn( + self, channel: str, user_id: str, role: str, suffix: str, chat_id: str = "" + ) -> bool: + """Append ``suffix`` to the most recent turn iff it has ``role`` and text + content. Returns False when there is no such turn (caller adds a fresh one). + + Used to fold an out-of-band assistant message (a background subagent's + result) into the trailing assistant turn so the replayed history keeps + strict user/assistant alternation. (injection mode) + """ + await self._ensure_schema() + async with aiosqlite.connect(self.db_path) as db: + db.row_factory = aiosqlite.Row + cursor = await db.execute( + "SELECT id, role, content FROM conversation_turns " + "WHERE channel = ? AND user_id = ? AND chat_id = ? ORDER BY id DESC LIMIT 1", + (channel, user_id, chat_id), + ) + row = await cursor.fetchone() + if not row or row["role"] != role: + return False + content = json.loads(row["content"]) + if not isinstance(content, str): + return False + await db.execute( + "UPDATE conversation_turns SET content = ? WHERE id = ?", + (json.dumps(content + suffix), row["id"]), + ) + await db.commit() + return True + + async def append_to_last_session_message( + self, channel: str, user_id: str, suffix: str, chat_id: str = "", role: str = "assistant" + ) -> bool: + """Session-mode counterpart of :meth:`append_to_last_turn`: fold ``suffix`` + into the last session message iff it has ``role``. Returns False otherwise.""" + await self._ensure_schema() + key = (channel, user_id, chat_id) + if key not in self._sessions: + await self.get_session(channel, user_id, chat_id) + session = self._sessions[key] + if not session or session[-1].get("role") != role: + return False + msg = session[-1] + content = msg.get("content") + if isinstance(content, str): + msg["content"] = content + suffix + elif isinstance(content, list): + content.append({"type": "text", "text": suffix}) + else: + return False + async with aiosqlite.connect(self.db_path) as db: + cursor = await db.execute( + "SELECT id FROM session_messages " + "WHERE channel = ? AND user_id = ? AND chat_id = ? ORDER BY id DESC LIMIT 1", + (channel, user_id, chat_id), + ) + row = await cursor.fetchone() + if row: + await db.execute( + "UPDATE session_messages SET message = ? WHERE id = ?", + (json.dumps(msg), row[0]), + ) + await db.commit() + return True + async def clear_session(self, channel: str, user_id: str, chat_id: str = "") -> None: """Clear just the sticky session for a (channel, user_id, chat_id) triple.""" await self._ensure_schema() diff --git a/core/subagents.py b/core/subagents.py index e494f9d..325dcb9 100644 --- a/core/subagents.py +++ b/core/subagents.py @@ -60,9 +60,6 @@ class SubagentRun: origin_channel: str = "" origin_user_id: str = "" origin_chat_id: str = "" - # True once a *finished* run has been surfaced to the spawning agent's - # context, so a completion is reported to it exactly once (see updates_for). - notified: bool = False _task: asyncio.Task | None = field(default=None, repr=False, compare=False) @property @@ -102,22 +99,18 @@ def list_runs(self, active_only: bool = False) -> list[SubagentRun]: runs = [r for r in self._runs.values() if not active_only or r.status == "running"] return sorted(runs, key=lambda r: r.started_at, reverse=True) - def updates_for(self, channel: str, chat_id: str) -> list[SubagentRun]: - """Runs for one chat the spawning agent should be reminded of: every run - still in flight, plus any that finished since it was last reminded. - - A finished run is returned once and then marked ``notified`` so the agent - is told of its completion exactly once; running runs are returned every - turn so the agent never claims a still-pending run is done.""" - out = [] - for r in self._runs.values(): - if r.origin_channel != channel or r.origin_chat_id != chat_id: - continue - if r.status == "running": - out.append(r) - elif not r.notified: - r.notified = True - out.append(r) + def running_for(self, channel: str, chat_id: str) -> list[SubagentRun]: + """Still-running background runs spawned from one chat, oldest first. + + Surfaced in the spawning agent's turn preamble so it always knows what is + pending. (Finished runs are folded into the chat history instead, so the + agent remembers their results natively rather than as ephemeral status.) + """ + out = [ + r + for r in self._runs.values() + if r.status == "running" and r.origin_channel == channel and r.origin_chat_id == chat_id + ] return sorted(out, key=lambda r: r.started_at) def finish(self, run_id: str, status: str, *, result: str = "", error: str = "") -> bool: diff --git a/docs/content/docs/subagents.mdx b/docs/content/docs/subagents.mdx index e7096af..446e7b6 100644 --- a/docs/content/docs/subagents.mdx +++ b/docs/content/docs/subagents.mdx @@ -30,7 +30,10 @@ The agent decides to delegate and calls the tool. Two modes: in the same turn. - **Background** (`background: true`): the tool returns a run id immediately and the subagent runs off-turn. When it finishes, the result is **posted back to - the chat it was started from**. + the chat it was started from** *and* folded into that conversation's history — + so the spawning agent both shows it to you now and remembers it on later turns. + While a background run is in flight, its status is shown to the agent each turn + so it never mistakes a pending run for a finished one. ```jsonc // spawn_subagent tool input diff --git a/tests/test_subagents.py b/tests/test_subagents.py index 1dc695c..5a48a12 100644 --- a/tests/test_subagents.py +++ b/tests/test_subagents.py @@ -85,7 +85,7 @@ def test_short_summary_first_nonempty_line_capped() -> None: assert len(short_summary("x" * 400)) == 281 # 280 + ellipsis -def test_updates_for_filters_by_chat_and_reports_finish_once() -> None: +def test_running_for_filters_by_chat_and_drops_finished() -> None: reg = SubagentRegistry() here = SubagentRun(run_id="a", persona="", task="t", origin_channel="tg", origin_chat_id="1") other = SubagentRun(run_id="b", persona="", task="t", origin_channel="tg", origin_chat_id="2") @@ -93,13 +93,12 @@ def test_updates_for_filters_by_chat_and_reports_finish_once() -> None: reg.register(other) # Running runs appear every turn; the other chat's run is never included. - assert [r.run_id for r in reg.updates_for("tg", "1")] == ["a"] - assert [r.run_id for r in reg.updates_for("tg", "1")] == ["a"] + assert [r.run_id for r in reg.running_for("tg", "1")] == ["a"] + assert [r.run_id for r in reg.running_for("tg", "1")] == ["a"] - # After it finishes it appears exactly once more, then never again. + # Once finished it drops out of the running list (its result goes to history). reg.finish("a", "done", result="answer") - assert [r.run_id for r in reg.updates_for("tg", "1")] == ["a"] - assert reg.updates_for("tg", "1") == [] + assert reg.running_for("tg", "1") == [] # --------------------------------------------------------------------------- @@ -204,9 +203,14 @@ async def test_run_subagent_background_reports_to_origin(agent) -> None: agent.channels["telegram"] = channel agent.llm = _ScriptedLLM([LLMResponse(text="bg result", tool_calls=[])]) + # The spawning turn already left a user+assistant pair in history. + await agent.history.add_turn("telegram", "u1", "user", "do it async", "555") + await agent.history.add_turn("telegram", "u1", "assistant", "started in the background", "555") + result = await agent.run_subagent( task="async work", origin_channel="telegram", + origin_user_id="u1", origin_chat_id="555", background=True, ) @@ -217,10 +221,16 @@ async def test_run_subagent_background_reports_to_origin(agent) -> None: await run._task # let the background loop finish assert run.status == "done" + # Delivered to the chat... channel.send.assert_awaited_once() sent_chat, sent_text = channel.send.await_args.args assert sent_chat == "555" assert "bg result" in sent_text + # ...and folded into the trailing assistant turn (history stays alternating). + turns = await agent.history.get_messages("telegram", "u1", "555") + assert [t["role"] for t in turns] == ["user", "assistant"] + assert "started in the background" in str(turns[-1]["content"]) + assert "bg result" in str(turns[-1]["content"]) @pytest.mark.asyncio @@ -271,7 +281,7 @@ def test_narrow_persona_intersects_scopes(agent) -> None: assert child.secrets == [] # 'y' not in parent's ['x'] -def test_subagent_status_note_reports_this_chats_runs(agent) -> None: +def test_subagent_status_note_lists_only_running_runs(agent) -> None: agent.subagents.register( SubagentRun( run_id="r1", @@ -282,16 +292,36 @@ def test_subagent_status_note_reports_this_chats_runs(agent) -> None: progress="step 2", ) ) - agent.subagents.finish("r1", "done", result="the iPhone 17e is CHF 599") - note = agent._subagent_status_note("repl", "repl") - assert "r1" in note and "done" in note and "CHF 599" in note + assert "r1" in note and "running" in note and "step 2" in note # Scoped to the chat: a different chat sees nothing. assert agent._subagent_status_note("repl", "other-chat") == "" - # Reported once: the next turn no longer repeats the finished run. + + # Once finished it leaves the preamble (its result is in history instead). + agent.subagents.finish("r1", "done", result="the iPhone 17e is CHF 599") assert agent._subagent_status_note("repl", "repl") == "" +@pytest.mark.asyncio +async def test_record_subagent_in_history_merges_into_assistant_turn(agent) -> None: + run = SubagentRun( + run_id="r9", + persona="coding-helper", + task="t", + origin_channel="repl", + origin_user_id="repl", + origin_chat_id="repl", + ) + # No prior assistant turn → a fresh assistant turn is added. + await agent.history.add_turn("repl", "repl", "user", "hello", "repl") + await agent.history.add_turn("repl", "repl", "assistant", "on it", "repl") + await agent._record_subagent_in_history(run, "🤖 done: CHF 599") + + turns = await agent.history.get_messages("repl", "repl", "repl") + assert [t["role"] for t in turns] == ["user", "assistant"] # merged, not appended + assert "CHF 599" in str(turns[-1]["content"]) + + # --------------------------------------------------------------------------- # Scheduled subagent jobs # --------------------------------------------------------------------------- From 9c26cbc7ec5ee69d0d25d761f47b14104c67ede6 Mon Sep 17 00:00:00 2001 From: Matteo Merola Date: Mon, 29 Jun 2026 00:04:21 +0200 Subject: [PATCH 09/13] feat(subagents): caller-sized runs + file handoff to the spawning agent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The spawning agent now sizes each subagent run to the job and gets its files back: - spawn_subagent gains max_steps, token_budget, and thinking_effort. max_steps/token_budget default to the configured value and are clamped to it as a ceiling (resolve_cap) — the agent may dial a run *down* but never past the guardrail; token_budget has a 1000 floor. thinking_effort (off|low|medium|high) maps to a thinking level, omit to inherit the caller's (normalize_effort); an effort-scoped _background_llm clone runs the loop. - Subagents share the agent's cwd/filesystem, so any file they write is already on disk where the parent can reach it. FILE_HANDOFF_INSTRUCTION makes the subagent report absolute paths in its result, which the existing result-folding carries into the parent's history. - Persona stays inherited by default (run as the caller itself); made explicit in the tool schema and docs. resolve_cap degrades non-numeric / infinite input to the ceiling rather than raising. Tests cover clamping, effort mapping/scoping, the file- handoff system suffix, and the caller-as-limiter path. --- core/agent.py | 98 ++++++++++++++++++++++++------ core/subagents.py | 53 ++++++++++++++++ docs/content/docs/subagents.mdx | 22 ++++++- tests/test_subagents.py | 103 +++++++++++++++++++++++++++++++- 4 files changed, 255 insertions(+), 21 deletions(-) diff --git a/core/agent.py b/core/agent.py index 4068624..13c394f 100644 --- a/core/agent.py +++ b/core/agent.py @@ -32,7 +32,15 @@ from core.scheduler import AgentScheduler from core.secret_store import SecretStore from core.skills import SkillsEngine -from core.subagents import SubagentRegistry, SubagentRun, narrow_scope, short_summary +from core.subagents import ( + FILE_HANDOFF_INSTRUCTION, + SubagentRegistry, + SubagentRun, + narrow_scope, + normalize_effort, + resolve_cap, + short_summary, +) from core.task_reflection import ReflectionStore from core.tools import tool_env from voice.pipeline import VoicePipeline @@ -276,16 +284,26 @@ def _shell_quote(s: str) -> str: "name": "spawn_subagent", "description": ( "Delegate a self-contained subtask to a subagent. The subagent runs " - "the full agent loop under a chosen persona, with a tool/skill/secret " - "scope that is never wider than yours, and returns a structured " - "result. It has NO memory of this conversation — put everything it " - "needs in 'task'.\n" + "the full agent loop under a persona, with a tool/skill/secret scope " + "that is never wider than yours, and returns a structured result. It " + "has NO memory of this conversation — put everything it needs in " + "'task'.\n" + "Persona: omit 'persona' to run the subagent as YOURSELF — same " + "identity, tools, and scope. That is the default; only name a persona " + "when the task clearly calls for a different specialist.\n" + "Sizing: by default the subagent runs at the configured ceilings. Size " + "it to the job with 'max_steps', 'token_budget', and 'thinking_effort' " + "— smaller for a quick lookup, larger / 'high' effort for hard " + "multi-step work. Requested values are capped at the configured maxima.\n" + "Files: you share a filesystem with the subagent, so it reports the " + "absolute paths of any files it creates in its result — you can then " + "read or send them.\n" "Use background=true for long-running work: you get a run id " "immediately and the result is posted to this chat when done (monitor " "or cancel it with /jobs or the admin Jobs page). Use background=false " "(default) to block and get the result back in this turn.\n" - "Subagents are depth-limited and budgeted, so prefer one focused " - "delegation over deep nesting." + "Subagents are depth-limited, so prefer one focused delegation over " + "deep nesting." ), "input_schema": { "type": "object", @@ -300,8 +318,33 @@ def _shell_quote(s: str) -> str: "persona": { "type": "string", "description": ( - "Persona name to run as (e.g. 'coding-helper'). Omit to " - "inherit your current identity and scope." + "Persona name to run as (e.g. 'coding-helper'). Omit to run " + "as yourself — same identity, tools, and scope. This is the " + "default; use it unless the task needs a different specialist." + ), + }, + "max_steps": { + "type": "integer", + "description": ( + "Tool-call rounds the subagent may run before a hard stop. " + "Omit for the configured default; capped at the maximum. " + "Lower it for quick tasks, raise it for thorough ones." + ), + }, + "token_budget": { + "type": "integer", + "description": ( + "Approximate token ceiling for the whole run (minimum 1000). " + "Omit for the configured default; capped at the maximum." + ), + }, + "thinking_effort": { + "type": "string", + "enum": ["off", "low", "medium", "high"], + "description": ( + "How hard the subagent reasons each step. Omit to inherit " + "your own level. Use 'high' for tricky reasoning, 'off'/'low' " + "for simple mechanical work." ), }, "background": { @@ -1850,6 +1893,9 @@ async def _tool_spawn_subagent( origin_chat_id=str(origin.get("chat_id", "")), parent_state=request_state, background=bool(params.get("background", False)), + max_steps=params.get("max_steps"), + token_budget=params.get("token_budget"), + thinking_effort=params.get("thinking_effort"), ) async def run_subagent( @@ -1862,10 +1908,17 @@ async def run_subagent( origin_chat_id: str = "", parent_state: dict | None = None, background: bool = False, + max_steps: object = None, + token_budget: object = None, + thinking_effort: str | None = None, ) -> dict: """Run a subagent — the one primitive behind both the tool and scheduled ``subagent`` jobs. Scope is narrowed from the caller (inherit-never-widen); recursion depth and per-run budgets are enforced. + + ``max_steps`` / ``token_budget`` / ``thinking_effort`` let the caller size + the run; each defaults to the configured value and is clamped to it as a + ceiling (``thinking_effort`` defaults to inheriting the caller's level). """ cfg = self.config.subagents if not cfg.enabled: @@ -1907,6 +1960,9 @@ async def run_subagent( task=task, depth=parent_depth + 1, background=background, + max_steps=resolve_cap(max_steps, cfg.max_steps), + token_budget=resolve_cap(token_budget, cfg.token_budget, floor=1000), + effort=normalize_effort(thinking_effort), origin_channel=origin_channel, origin_user_id=origin_user_id, origin_chat_id=origin_chat_id, @@ -1986,7 +2042,7 @@ async def _run_subagent_loop( Mirrors the main injection loop but runs from a clean slate (no history), skips approval/decomposition/memory/reflection (channel='system'), and - stops at the configured step/token budget. + stops at this run's step/token budget (sized by the spawning agent). """ cfg = self.config.subagents tools = apply_feature_gates( @@ -1999,11 +2055,19 @@ async def _run_subagent_loop( if child_state["depth"] >= cfg.recursion_depth: tools = [t for t in tools if t["name"] != "spawn_subagent"] - system = await self._build_system_prompt(query=task, persona=child_persona) - preamble = self._turn_preamble(None) + system = await self._build_system_prompt(persona=child_persona) + system = f"{system}\n\n{FILE_HANDOFF_INSTRUCTION}" + # Memory/reflections inject per-turn via the preamble (#41), scoped to the + # child persona (#42); query=task keeps the injection relevant. + preamble = await self._turn_preamble(None, query=task, scope=_persona_scope(child_persona)) messages: list[dict] = [await self._build_user_message(task, None, preamble)] - response = await self.llm.generate( + # effort None = inherit the main client's level; otherwise an effort-scoped + # clone (same provider/connection, overridden thinking level). + llm = self.llm + if run.effort is not None: + llm = self._background_llm(self.llm.provider, run.effort) + response = await llm.generate( model=self.config.agent.model, max_tokens=4096, system=system, @@ -2012,7 +2076,7 @@ async def _run_subagent_loop( ) steps = 0 tokens = self._usage_total(response.usage) - while response.tool_calls and steps < cfg.max_steps and tokens < cfg.token_budget: + while response.tool_calls and steps < run.max_steps and tokens < run.token_budget: steps += 1 run.progress = f"step {steps}: {', '.join(c.name for c in response.tool_calls)}"[:120] tool_results = [] @@ -2027,9 +2091,9 @@ async def _run_subagent_loop( "content": json.dumps(result), } ) - messages.append(self.llm.assistant_message(response)) - messages.extend(self.llm.tool_result_messages(tool_results)) - response = await self.llm.generate( + messages.append(llm.assistant_message(response)) + messages.extend(llm.tool_result_messages(tool_results)) + response = await llm.generate( model=self.config.agent.model, max_tokens=4096, system=system, diff --git a/core/subagents.py b/core/subagents.py index 325dcb9..1369d53 100644 --- a/core/subagents.py +++ b/core/subagents.py @@ -23,6 +23,53 @@ # Keep at most this many finished runs for after-the-fact inspection. _MAX_FINISHED = 50 +# Appended to a subagent's system prompt so files it writes reach the spawning +# agent. Subagents and the main agent share one process cwd / filesystem (see +# core/executor.py — subprocesses inherit the cwd), so a file a subagent creates +# is already on disk where the parent can read it; the only gap is the parent +# knowing the path. This closes that gap by making the subagent report paths in +# its result, which is folded into the parent's history. +FILE_HANDOFF_INSTRUCTION = ( + "You share a filesystem with the agent that spawned you: it can read any " + "file you leave behind, but only if it knows the path. So if you create or " + "modify any files, end your final reply with a line 'Files:' followed by " + "their absolute paths, one per line. If you made no files, omit it." +) + +_EFFORT_LEVELS = {"off": "", "low": "low", "medium": "medium", "high": "high"} + + +def normalize_effort(value: str | None) -> str | None: + """Map a tool-supplied thinking effort to an LLM thinking level. + + ``None``/empty → ``None`` = *inherit the caller's level* (the default, so a + subagent thinks as hard as its parent unless told otherwise). ``"off"`` → + ``""`` (reasoning off); ``low``/``medium``/``high`` pass through. Anything + unrecognised → ``None``, degrading to the safe inherit default rather than + erroring on a bad value. + """ + if not value: + return None + return _EFFORT_LEVELS.get(str(value).strip().lower()) + + +def resolve_cap(value: object, ceiling: int, floor: int = 1) -> int: + """Clamp a caller-requested run cap (steps / token budget) into bounds. + + ``None`` (the caller didn't choose) → the configured ``ceiling``, preserving + prior behaviour. A chosen value is honoured up to the ceiling: the config is + a safety guardrail the agent may dial *down* but never exceed. A non-numeric + value coerces to the ceiling rather than raising. + """ + if value is None: + return ceiling + try: + return max(floor, min(int(value), ceiling)) # type: ignore[arg-type] + except TypeError, ValueError, OverflowError: + # OverflowError: json.loads accepts the literal `Infinity`, and int(inf) + # overflows — degrade it to the ceiling like any other bad value. + return ceiling + def narrow_scope(parent: list[str] | None, child: list[str] | None) -> list[str]: """Intersect a child scope with the parent's — inherit, never widen. @@ -51,6 +98,12 @@ class SubagentRun: task: str depth: int = 1 background: bool = False + # Per-run sizing the spawning agent chose, resolved/clamped via resolve_cap + # before the run starts (so always concrete on a live run; the 0 defaults are + # only placeholders for direct construction). effort None = inherit caller level. + max_steps: int = 0 + token_budget: int = 0 + effort: str | None = None status: str = "running" # running | done | cancelled | error progress: str = "" result: str = "" diff --git a/docs/content/docs/subagents.mdx b/docs/content/docs/subagents.mdx index 446e7b6..b3b1230 100644 --- a/docs/content/docs/subagents.mdx +++ b/docs/content/docs/subagents.mdx @@ -39,7 +39,10 @@ The agent decides to delegate and calls the tool. Two modes: // spawn_subagent tool input { "task": "Research the top 3 Python HTTP clients and summarise trade-offs", - "persona": "coding-helper", // optional — omit to inherit your identity & scope + "persona": "coding-helper", // optional — omit to run as yourself (same identity & scope) + "max_steps": 6, // optional — caller-chosen, capped at the configured max + "token_budget": 40000, // optional — caller-chosen, min 1000, capped at the configured max + "thinking_effort": "high", // optional — off | low | medium | high; omit to inherit yours "background": false // true → run async, report back when done } ``` @@ -47,6 +50,19 @@ The agent decides to delegate and calls the tool. Two modes: The `task` is self-contained: a subagent has **no memory** of the conversation that spawned it, so everything it needs goes in `task`. +The agent **sizes each run to the job**: `max_steps`, `token_budget`, and +`thinking_effort` let it spend less on a quick lookup and more on hard, +multi-step work. Each is optional — omit `max_steps`/`token_budget` for the +configured ceiling, `thinking_effort` to inherit the caller's level — and the +numeric caps are clamped to the [Guardrails](#guardrails) maxima, which the +agent can dial *down* but never past. Omitting `persona` runs the subagent as +the caller itself, with the same identity and scope. + +**Files hand back automatically.** The subagent shares the agent's filesystem, +so any file it writes is already on disk where the caller can reach it — it just +reports the absolute paths in its result, which the spawning agent then reads, +sends, or attaches. + ## Scheduled Run a subagent on a cron or one-shot schedule by giving a job `type: "subagent"` @@ -87,8 +103,8 @@ limit, applied live without a restart), or under `subagents` in `config.yml`: |---------|---------|--------------| | `enabled` | `true` | Master switch; off hides the `spawn_subagent` tool everywhere | | `recursion_depth` | `3` | Max nesting (a top-level spawn is depth 1) | -| `max_steps` | `12` | Max tool-call rounds per run — a hard stop | -| `token_budget` | `100000` | Approx token ceiling per run (best-effort) | +| `max_steps` | `12` | Ceiling on tool-call rounds per run — a hard stop; the agent may request fewer per spawn, never more | +| `token_budget` | `100000` | Ceiling on tokens per run (best-effort); the agent may request fewer per spawn, never more | | `max_concurrent` | `3` | Max background runs at once | When disabled, the tool is hidden from the assistant and from the Personae tool diff --git a/tests/test_subagents.py b/tests/test_subagents.py index 5a48a12..399af38 100644 --- a/tests/test_subagents.py +++ b/tests/test_subagents.py @@ -12,7 +12,15 @@ from core.job_store import VALID_TYPES, JobStore from core.llm import LLMResponse, LLMToolCall from core.personae import Persona -from core.subagents import SubagentRegistry, SubagentRun, narrow_scope, short_summary +from core.subagents import ( + FILE_HANDOFF_INSTRUCTION, + SubagentRegistry, + SubagentRun, + narrow_scope, + normalize_effort, + resolve_cap, + short_summary, +) # --------------------------------------------------------------------------- # narrow_scope — inherit, never widen ([] / None == "all") @@ -405,3 +413,96 @@ async def test_run_subagent_task_delivers_to_owner() -> None: agent.run_subagent.assert_awaited_once() channel.send.assert_awaited_once_with(7, "scheduled out") + + +# --------------------------------------------------------------------------- +# Caller-sized runs: max_steps / token_budget / thinking_effort + file handoff +# --------------------------------------------------------------------------- + + +def test_resolve_cap_defaults_clamps_and_coerces() -> None: + assert resolve_cap(None, 12) == 12 # caller didn't choose → configured ceiling + assert resolve_cap(3, 12) == 3 # honoured below the ceiling + assert resolve_cap(999, 12) == 12 # config is a ceiling, never exceeded + assert resolve_cap(0, 12, floor=1) == 1 # floored + assert resolve_cap("nope", 12) == 12 # garbage degrades to the ceiling + assert resolve_cap("4", 12) == 4 # numeric string coerces + assert resolve_cap(float("inf"), 12) == 12 # int(inf) overflows → ceiling, no crash + + +def test_normalize_effort_maps_and_inherits() -> None: + assert normalize_effort(None) is None # inherit the caller's level + assert normalize_effort("") is None + assert normalize_effort("off") == "" # reasoning off + assert normalize_effort("HIGH") == "high" # case-insensitive + assert normalize_effort("medium") == "medium" + assert normalize_effort("bogus") is None # unknown → safe inherit default + + +class _RecordingLLM(_ScriptedLLM): + """A scripted LLM that also records the system prompt of each call.""" + + def __init__(self, responses: list[LLMResponse]) -> None: + super().__init__(responses) + self.systems: list[str] = [] + + async def generate(self, *, system: str = "", **_kw) -> LLMResponse: + self.systems.append(system) + return await super().generate() + + +@pytest.mark.asyncio +async def test_subagent_system_prompt_carries_file_handoff(agent) -> None: + rec = _RecordingLLM([LLMResponse(text="done", tool_calls=[])]) + agent.llm = rec + await agent.run_subagent(task="x") + assert any(FILE_HANDOFF_INSTRUCTION in s for s in rec.systems) + + +@pytest.mark.asyncio +async def test_run_subagent_caller_max_steps_is_the_limiter(agent) -> None: + agent.config.subagents.max_steps = 100 # high config ceiling … + call = LLMToolCall(id="1", name="web_search", arguments={"query": "q"}) + agent.llm = _ScriptedLLM([LLMResponse(text="", tool_calls=[call]) for _ in range(10)]) + result = await agent.run_subagent(task="loop", max_steps=1) # … caller wants just 1 + assert result["ok"] is True + assert "budget" in result["result"].lower() + assert agent.subagents.get(result["run_id"]).max_steps == 1 + + +@pytest.mark.asyncio +async def test_run_subagent_clamps_caps_to_config_ceiling(agent) -> None: + agent.config.subagents.max_steps = 5 + agent.config.subagents.token_budget = 9000 + agent.llm = _ScriptedLLM([LLMResponse(text="done", tool_calls=[])]) + result = await agent.run_subagent(task="x", max_steps=999, token_budget=10**9) + run = agent.subagents.get(result["run_id"]) + assert run.max_steps == 5 + assert run.token_budget == 9000 + + +@pytest.mark.asyncio +async def test_run_subagent_effort_inherits_by_default(agent, monkeypatch) -> None: + agent.llm = _ScriptedLLM([LLMResponse(text="ok", tool_calls=[])]) + monkeypatch.setattr( + agent, "_background_llm", lambda *a, **k: pytest.fail("should not clone when inheriting") + ) + result = await agent.run_subagent(task="x") # no thinking_effort + assert result["ok"] is True + assert agent.subagents.get(result["run_id"]).effort is None + + +@pytest.mark.asyncio +async def test_run_subagent_effort_uses_scoped_client(agent, monkeypatch) -> None: + agent.llm = _ScriptedLLM([LLMResponse(text="ok", tool_calls=[])]) + captured: dict = {} + + def spy(provider, level=""): + captured["level"] = level + return _ScriptedLLM([LLMResponse(text="ok", tool_calls=[])]) + + monkeypatch.setattr(agent, "_background_llm", spy) + result = await agent.run_subagent(task="x", thinking_effort="high") + assert result["ok"] is True + assert captured["level"] == "high" + assert agent.subagents.get(result["run_id"]).effort == "high" From c791fcdb3a0f9e11b3bcb178916bd6d08627eb39 Mon Sep 17 00:00:00 2001 From: Matteo Merola Date: Mon, 29 Jun 2026 00:17:41 +0200 Subject: [PATCH 10/13] feat(subagents): show the agent a persona roster so delegation is informed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Selection stays user-led — omitting `persona` runs the subagent as the caller itself (the chat's bound persona). For the specialist case the agent no longer guesses a name blindly: - _personae_roster_block injects a compact `name — role` roster of available personae into the main turn preamble, gated to when spawn_subagent is actually in scope (subagents enabled + the persona's tool allowlist permits it). Off for subagent sub-loops (offer_personae defaults False), so it never leaks where it can't be used. - The "Persona not found" error now lists the valid names, so a wrong guess self-corrects. Tests cover the name/role rendering (first role line only), the current- persona "(you)" tag, the disabled / out-of-scope gates, main-turn-only injection, and the name-listing error. --- core/agent.py | 57 +++++++++++++++++++++++++++++++- docs/content/docs/subagents.mdx | 9 +++++ tests/test_subagents.py | 58 +++++++++++++++++++++++++++++++++ 3 files changed, 123 insertions(+), 1 deletion(-) diff --git a/core/agent.py b/core/agent.py index 13c394f..d82bd8f 100644 --- a/core/agent.py +++ b/core/agent.py @@ -635,6 +635,7 @@ async def process( scope=_persona_scope(persona), persona=persona, session_key=session_key, + offer_personae=True, ) # Append the status of still-running background subagents from this chat, # so the agent always knows what is pending (their results are folded into @@ -760,6 +761,7 @@ async def _turn_preamble( scope: str = "", persona: Persona | None = None, session_key: tuple[str, str, str] | None = None, + offer_personae: bool = False, ) -> str: """Build the per-turn preamble prepended to the current user message. @@ -821,6 +823,14 @@ async def _turn_preamble( except Exception: log.exception("Failed to load memories for turn preamble") + # Roster of personae the agent can delegate to via spawn_subagent, so its + # choice is informed rather than guessed (#15). Only on the main turn — + # selection stays user-led (omit persona = run as yourself / the bound one). + if offer_personae: + roster = await self._personae_roster_block(persona) + if roster: + preamble += f"\n\n{roster}" + if self.config.task_reflection.enabled: try: reflections = await self.reflections.format_for_prompt() @@ -840,6 +850,41 @@ async def _turn_preamble( ) return preamble + async def _personae_roster_block(self, persona: Persona | None) -> str: + """Compact `name — role` roster of personae the agent can delegate to (#15). + + Makes specialist delegation an informed choice instead of a guess, while + leaving selection user-led: omitting ``persona`` runs the subagent as the + caller itself. Returns "" (nothing injected) when subagents are disabled, + the active persona can't spawn, or there is no one to delegate to. + """ + if not self.config.subagents.enabled: + return "" + if persona is not None and not persona.allows_tool("spawn_subagent"): + return "" + try: + personae = await self.personae.list_personae() + except Exception: + log.exception("Failed to list personae for the subagent roster") + return "" + current = persona.name if persona else "" + lines = [] + for p in personae: + role = p.role.strip().splitlines()[0].strip() if (p.role or "").strip() else "" + tag = " (you)" if p.name == current else "" + lines.append(f"- {p.name}{tag}" + (f" — {role}" if role else "")) + if not lines: + return "" + body = "\n".join(lines) + return ( + "\n" + "Personae you may run a subagent as via spawn_subagent's 'persona'. " + "Omit 'persona' to run as yourself (the default) — name one only when " + "the subtask clearly fits that specialist.\n" + f"{body}\n" + "" + ) + async def _skills_block_in_history(self, session_key: tuple[str, str, str], block: str) -> bool: """True if the exact ```` block is already present in the replayed session history — so the model still sees it and we needn't @@ -1938,7 +1983,17 @@ async def run_subagent( if persona_name: requested = await self._load_persona(persona_name) if requested is None: - return {"error": f"Persona not found: {persona_name}"} + try: + names = [p.name for p in await self.personae.list_personae()] + except Exception: + names = [] + hint = f" Available: {', '.join(names)}." if names else "" + return { + "error": ( + f"Persona not found: {persona_name}.{hint} " + "Omit 'persona' to run as yourself." + ) + } else: requested = parent_state.get("persona_obj") child_persona = self._narrow_persona(requested, parent_state) if requested else None diff --git a/docs/content/docs/subagents.mdx b/docs/content/docs/subagents.mdx index b3b1230..6cf2439 100644 --- a/docs/content/docs/subagents.mdx +++ b/docs/content/docs/subagents.mdx @@ -93,6 +93,15 @@ and a scoped caller delegating to an open persona keeps its own limits. [Permission rules](/docs/permissions) (`NEVER` in particular) still apply inside the subagent, so a blocked action stays blocked at any depth. +## Choosing a persona + +Selection stays **user-led**: omitting `persona` runs the subagent as the caller +itself (the chat's bound persona), so a subtask stays in the identity the user +already chose. To make the *specialist* case an informed choice rather than a +guess, each turn the agent sees a compact roster of available personae +(`name — role`) and only names one when the subtask clearly fits it. A wrong +name is rejected with the list of valid names, so the agent can self-correct. + ## Guardrails Sensible defaults keep delegation safe and bounded. Edit them in the admin diff --git a/tests/test_subagents.py b/tests/test_subagents.py index 399af38..07a6e47 100644 --- a/tests/test_subagents.py +++ b/tests/test_subagents.py @@ -506,3 +506,61 @@ def spy(provider, level=""): assert result["ok"] is True assert captured["level"] == "high" assert agent.subagents.get(result["run_id"]).effort == "high" + + +# --------------------------------------------------------------------------- +# Persona roster — let the agent pick a specialist, selection stays user-led +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_personae_roster_lists_name_and_role(agent, monkeypatch) -> None: + personae = [ + Persona(name="coding-helper", role="Writes and reviews code"), + Persona(name="writing-editor", role="Edits prose\nsecond line ignored"), + ] + monkeypatch.setattr(agent.personae, "list_personae", AsyncMock(return_value=personae)) + block = await agent._personae_roster_block(None) + assert "" in block + assert "- coding-helper — Writes and reviews code" in block + assert "- writing-editor — Edits prose" in block # only the first role line + assert "second line ignored" not in block + + +@pytest.mark.asyncio +async def test_personae_roster_marks_current_and_gates(agent, monkeypatch) -> None: + personae = [Persona(name="me", role="r1"), Persona(name="other", role="r2")] + monkeypatch.setattr(agent.personae, "list_personae", AsyncMock(return_value=personae)) + block = await agent._personae_roster_block(Persona(name="me", role="r1")) + assert "- me (you) — r1" in block + # a persona whose tool scope excludes spawn_subagent gets no roster + scoped = Persona(name="me", tools=["web_search"]) + assert await agent._personae_roster_block(scoped) == "" + # nor when subagents are disabled + agent.config.subagents.enabled = False + assert await agent._personae_roster_block(None) == "" + + +@pytest.mark.asyncio +async def test_personae_roster_only_offered_on_main_turn(agent, monkeypatch) -> None: + monkeypatch.setattr( + agent.personae, + "list_personae", + AsyncMock(return_value=[Persona(name="coding-helper", role="code")]), + ) + # subagent preamble (offer_personae defaults False) → no roster leaks in + assert "" not in await agent._turn_preamble(None, query="x") + # main turn opts in + assert "" in await agent._turn_preamble(None, query="x", offer_personae=True) + + +@pytest.mark.asyncio +async def test_run_subagent_unknown_persona_lists_available(agent, monkeypatch) -> None: + monkeypatch.setattr( + agent.personae, + "list_personae", + AsyncMock(return_value=[Persona(name="coding-helper"), Persona(name="analyst")]), + ) + result = await agent.run_subagent(task="x", persona_name="nope") + assert "not found" in result["error"].lower() + assert "coding-helper" in result["error"] and "analyst" in result["error"] From 0b7b01f695ccaf0b734e323e1104cba09140a420 Mon Sep 17 00:00:00 2001 From: Matteo Merola Date: Mon, 29 Jun 2026 08:53:04 +0200 Subject: [PATCH 11/13] feat(subagents): agent synthesises background results; never dump raw to the user MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Real-world feedback exposed three problems with background subagents: they picked a random persona, dumped long raw output straight to the user, and the agent never processed the results into a real answer. Fixes: - Synthesis flow: a background subagent now works for the AGENT, not the user. Its raw result is no longer sent to the chat. When the whole batch of a chat's background runs has finished, the agent runs ONE synthesis turn (process(decompose=False)) that ingests the findings and replies in its own voice — the user sees only that. A barrier collapses parallel spawns into a single reply. Replaces the old _deliver_subagent_result / direct-to-user path. - Lost-reply fix: cancellation is the one terminal path that never re-checked the barrier, so a done sibling that deferred to a still-running run got orphaned when the user cancelled that run. The cancel path now releases the deferred batch. Regression test added. - Persona default: spawn_subagent + the roster now state firmly that omitting 'persona' (run as yourself) is the default; a persona is named only on an explicit request. Stops the agent assigning an unrelated specialist. - Conciseness: subagents are told their result is read by the agent, not a human — return dense facts, no prose/tables. Adds SubagentRun.synthesized; process() gains decompose. Docs updated. --- core/agent.py | 176 ++++++++++++++++++++------------ core/subagents.py | 14 +++ docs/content/docs/subagents.mdx | 11 +- tests/test_subagents.py | 131 ++++++++++++++++-------- 4 files changed, 221 insertions(+), 111 deletions(-) diff --git a/core/agent.py b/core/agent.py index d82bd8f..79df2ba 100644 --- a/core/agent.py +++ b/core/agent.py @@ -34,6 +34,7 @@ from core.skills import SkillsEngine from core.subagents import ( FILE_HANDOFF_INSTRUCTION, + RESULT_FOR_AGENT_INSTRUCTION, SubagentRegistry, SubagentRun, narrow_scope, @@ -288,9 +289,11 @@ def _shell_quote(s: str) -> str: "that is never wider than yours, and returns a structured result. It " "has NO memory of this conversation — put everything it needs in " "'task'.\n" - "Persona: omit 'persona' to run the subagent as YOURSELF — same " - "identity, tools, and scope. That is the default; only name a persona " - "when the task clearly calls for a different specialist.\n" + "Persona: by DEFAULT omit 'persona' — the subagent runs as YOU (your " + "identity, tools, scope). This is almost always what you want. Set " + "'persona' ONLY when the user explicitly asked for a named specialist, " + "or the subtask plainly belongs to a different one. Never pick a " + "persona just because the roster lists some.\n" "Sizing: by default the subagent runs at the configured ceilings. Size " "it to the job with 'max_steps', 'token_budget', and 'thinking_effort' " "— smaller for a quick lookup, larger / 'high' effort for hard " @@ -318,9 +321,10 @@ def _shell_quote(s: str) -> str: "persona": { "type": "string", "description": ( - "Persona name to run as (e.g. 'coding-helper'). Omit to run " - "as yourself — same identity, tools, and scope. This is the " - "default; use it unless the task needs a different specialist." + "Persona name to run as. OMIT THIS by default — the subagent " + "then runs as you (same identity, tools, scope), which is " + "almost always correct. Only set it when the user explicitly " + "named a specialist or the subtask clearly belongs to one." ), }, "max_steps": { @@ -582,6 +586,7 @@ async def process( attachments: list[Attachment] | None = None, chat_id: str = "", persona_name: str | None = None, + decompose: bool = True, ) -> AgentResponse: """Process an incoming message through the LLM with tool-use loop. @@ -614,7 +619,7 @@ async def process( # The resulting plan is request-specific, so it is injected per turn # (in the user-message preamble), not baked into the static prompt. decomposed_goal: DecomposedGoal | None = None - if self.config.goal_decomposition.enabled and channel != "system": + if decompose and self.config.goal_decomposition.enabled and channel != "system": decomposed_goal = await self._maybe_decompose(message) # Resolve the active persona (its identity, skills + tool scope) — a @@ -878,9 +883,10 @@ async def _personae_roster_block(self, persona: Persona | None) -> str: body = "\n".join(lines) return ( "\n" - "Personae you may run a subagent as via spawn_subagent's 'persona'. " - "Omit 'persona' to run as yourself (the default) — name one only when " - "the subtask clearly fits that specialist.\n" + "These personae exist ONLY so you can honour an explicit request for a " + "specialist. By default, spawn_subagent with NO 'persona' so the " + "subagent runs as you — do not assign one of these unless the user " + "asked for it or the subtask plainly belongs to it.\n" f"{body}\n" "" ) @@ -913,9 +919,9 @@ async def _skills_block_in_history(self, session_key: tuple[str, str, str], bloc def _subagent_status_note(self, channel: str, chat_id: str) -> str: """List background subagents from this chat that are *still running*, for - the turn preamble — so the agent knows what is pending. A run's result is - folded into the chat history when it finishes, so completed runs are part - of the conversation itself and need no mention here. (#15) + the turn preamble — so the agent knows what is pending. When the whole + batch finishes you'll be prompted to answer the user with their results, + so finished runs need no mention here. (#15) """ runs = self.subagents.running_for(channel, chat_id) if not runs: @@ -927,11 +933,11 @@ def _subagent_status_note(self, channel: str, chat_id: str) -> str: body = "\n".join(lines) return ( "\n" - "Background subagents you spawned from this chat are still running. " - "Their results are posted to this chat (and added to this conversation) " - "automatically when each finishes, so you needn't relay them.\n" + "Background helpers you spawned from this chat are still running. When " + "they finish you'll be prompted to fold their results into a reply, so " + "don't pre-empt or invent their results now, and don't claim one is " + "finished while it shows here.\n" f"{body}\n" - "Don't claim one of these is finished until its result appears above.\n" "" ) @@ -2111,7 +2117,7 @@ async def _run_subagent_loop( tools = [t for t in tools if t["name"] != "spawn_subagent"] system = await self._build_system_prompt(persona=child_persona) - system = f"{system}\n\n{FILE_HANDOFF_INSTRUCTION}" + system = f"{system}\n\n{RESULT_FOR_AGENT_INSTRUCTION}\n\n{FILE_HANDOFF_INSTRUCTION}" # Memory/reflections inject per-turn via the preamble (#41), scoped to the # child persona (#42); query=task keeps the injection relevant. preamble = await self._turn_preamble(None, query=task, scope=_persona_scope(child_persona)) @@ -2166,69 +2172,107 @@ async def _run_subagent_loop( async def _run_subagent_background( self, run: SubagentRun, child_persona: Persona | None, child_state: dict ) -> None: - """Run a subagent off-turn and post the result back to its origin chat.""" + """Run a subagent off-turn. When the chat's whole batch of background runs + has finished, the spawning agent ingests their results and writes one reply + — the user never sees raw subagent output; it works for the agent (#15).""" try: text = await self._run_subagent_loop(run.task, child_persona, child_state, run) except asyncio.CancelledError: + # User-initiated stop (registry.cancel already flipped the status, so + # finish() is a no-op here). Mark it synthesised so a sibling's batch + # doesn't report on a run the user deliberately cancelled. self.subagents.finish(run.run_id, "cancelled") - await self._deliver_subagent_result(run, "Subagent was cancelled.") + run.synthesized = True + # This may have been the last running sibling: a done/error run that + # deferred earlier would otherwise be orphaned (its reply lost), since + # cancellation is the one terminal path that never re-checks the + # barrier. Release it before unwinding. (Safe to await here: the + # cancellation was already delivered once and won't re-fire.) + await self._maybe_synthesize_subagent_batch(run) raise except Exception as exc: log.exception("Background subagent %s failed", run.run_id) - self.subagents.finish(run.run_id, "error", error=str(exc)) - await self._deliver_subagent_result(run, f"Subagent failed: {exc}") + if self.subagents.finish(run.run_id, "error", error=str(exc)): + await self._maybe_synthesize_subagent_batch(run) return - # Only deliver the result if this completion is what finalised the run — - # a run cancelled in the final moment must not also post its result. + # finish() returns False if a late cancellation already finalised the run, + # in which case this completion must not also trigger a reply. if self.subagents.finish(run.run_id, "done", result=text): - await self._deliver_subagent_result(run, text) - - async def _deliver_subagent_result(self, run: SubagentRun, text: str) -> None: - """Post a background subagent's result to the chat that started it AND fold - it into that chat's history, so the spawning agent both shows it to the - user now and remembers it natively on later turns (#15).""" - label = run.persona or "default" - framed = f"🤖 Subagent ({label}) finished:\n\n{text}" - ch = self.channels.get(run.origin_channel) - if ch and run.origin_chat_id: - try: - await ch.send(run.origin_chat_id, framed) - except Exception: - log.exception("Failed to deliver subagent %s result", run.run_id) - else: - log.warning( - "Subagent %s result not delivered (channel=%r, chat=%r)", - run.run_id, - run.origin_channel, - run.origin_chat_id, - ) - await self._record_subagent_in_history(run, framed) + await self._maybe_synthesize_subagent_batch(run) - async def _record_subagent_in_history(self, run: SubagentRun, framed: str) -> None: - """Record a background subagent's result as an assistant turn in the - originating chat — merged into the trailing assistant turn so replayed - history stays strictly alternating for providers that require it.""" + async def _maybe_synthesize_subagent_batch(self, run: SubagentRun) -> None: + """Once every background run for this chat is done, run ONE agent turn that + ingests the batch and answers the user. The barrier collapses a fan-out of + parallel spawns into a single synthesised reply (#15).""" channel, user_id, chat_id = run.origin_channel, run.origin_user_id, run.origin_chat_id - # Only real user chats have a history; skip system/scheduler-origin runs. if not chat_id or channel == "system": + return # scheduler / system-origin runs have no user chat to answer + # Barrier — race-free because there is no await between this check and + # marking the batch below: while another background run for the chat is + # still running, defer; that last finisher does the synthesis. (Sync runs + # are ignored: they return inline and never reach this path.) + runs = self.subagents.list_runs() + if any( + r.background + and r.status == "running" + and r.origin_channel == channel + and r.origin_chat_id == chat_id + for r in runs + ): + return + batch = [ + r + for r in runs + if r.background + and not r.synthesized + and r.origin_channel == channel + and r.origin_chat_id == chat_id + and r.status in ("done", "error") + ] + if not batch: return + for r in batch: + r.synthesized = True + await self._synthesize_and_reply(channel, user_id, chat_id, batch) + + async def _synthesize_and_reply( + self, channel: str, user_id: str, chat_id: str, batch: list[SubagentRun] + ) -> None: + """Run a synthesis turn over a finished batch and deliver the agent's reply. + + The findings go in as an INTERNAL trigger message (not from the user); the + agent's reply — in its own persona/voice — is what the user actually sees. + """ + findings = [] + for r in batch: + outcome = r.result if r.status == "done" else f"[failed: {r.error or 'unknown error'}]" + who = r.persona or "you" + findings.append(f"• helper for «{r.task[:100]}» (persona: {who}):\n{outcome}") + instruction = ( + "[INTERNAL — not from the user] The background helper(s) you spawned " + "earlier have finished. Their raw findings are below; they worked for " + "you, not the user, so do NOT paste them verbatim or mention " + "'subagents'. Using these findings together with the user's original " + "request in this conversation, write ONE concise, natural reply to the " + "user now. Do not spawn more subagents.\n\n" + "\n\n".join(findings) + ) try: - if self.history_mode == "session": - merged = await self.history.append_to_last_session_message( - channel, user_id, f"\n\n{framed}", chat_id - ) - if not merged: - await self.history.append_session_message( - channel, user_id, {"role": "assistant", "content": framed}, chat_id - ) - else: - merged = await self.history.append_to_last_turn( - channel, user_id, "assistant", f"\n\n{framed}", chat_id - ) - if not merged: - await self.history.add_turn(channel, user_id, "assistant", framed, chat_id) + response = await self.process( + message=instruction, + channel=channel, + user_id=user_id, + chat_id=chat_id, + decompose=False, + ) except Exception: - log.exception("Failed to record subagent %s result in history", run.run_id) + log.exception("Subagent synthesis turn failed (chat=%s)", chat_id) + return + ch = self.channels.get(channel) + if ch and chat_id and response.text: + try: + await ch.send(chat_id, response.text) + except Exception: + log.exception("Failed to deliver synthesised subagent reply (chat=%s)", chat_id) @staticmethod def _usage_total(usage: dict | None) -> int: diff --git a/core/subagents.py b/core/subagents.py index 1369d53..cbb25a1 100644 --- a/core/subagents.py +++ b/core/subagents.py @@ -36,6 +36,17 @@ "their absolute paths, one per line. If you made no files, omit it." ) +# Appended to a subagent's system prompt. A subagent reports to the agent that +# spawned it, never to a human — the agent synthesises the user-facing answer +# from this result, so prose, greetings, and big formatted tables here are just +# noise the agent has to wade through. Keep the result a dense fact dump. +RESULT_FOR_AGENT_INSTRUCTION = ( + "Your final reply is read by the agent that spawned you, NOT by a human. " + "It synthesises the user's answer from it. So return only the findings — " + "dense and factual, no greeting, no preamble, no offers to help further, no " + "elaborate tables. Just the facts the agent asked for, as briefly as possible." +) + _EFFORT_LEVELS = {"off": "", "low": "low", "medium": "medium", "high": "high"} @@ -108,6 +119,9 @@ class SubagentRun: progress: str = "" result: str = "" error: str = "" + # True once a background run's result has been folded into a synthesis turn, + # so it is not picked up again by a later batch (#15). + synthesized: bool = False started_at: float = field(default_factory=time.time) finished_at: float = 0.0 origin_channel: str = "" diff --git a/docs/content/docs/subagents.mdx b/docs/content/docs/subagents.mdx index 6cf2439..fb09955 100644 --- a/docs/content/docs/subagents.mdx +++ b/docs/content/docs/subagents.mdx @@ -29,11 +29,12 @@ The agent decides to delegate and calls the tool. Two modes: and `{ summary, result }` comes back as the tool result — the agent continues in the same turn. - **Background** (`background: true`): the tool returns a run id immediately and - the subagent runs off-turn. When it finishes, the result is **posted back to - the chat it was started from** *and* folded into that conversation's history — - so the spawning agent both shows it to you now and remembers it on later turns. - While a background run is in flight, its status is shown to the agent each turn - so it never mistakes a pending run for a finished one. + the subagent runs off-turn. A subagent works **for the agent, not the user** — + so its raw output is never shown to you. When the whole batch of background + runs for the chat has finished, the agent runs one **synthesis turn**: it + ingests the findings and writes a single, concise reply in its own voice, and + that is what you see. While a run is in flight, its status is shown to the + agent each turn so it never mistakes a pending run for a finished one. ```jsonc // spawn_subagent tool input diff --git a/tests/test_subagents.py b/tests/test_subagents.py index 07a6e47..077e89c 100644 --- a/tests/test_subagents.py +++ b/tests/test_subagents.py @@ -206,39 +206,110 @@ async def test_run_subagent_token_budget_stops_loop(agent) -> None: @pytest.mark.asyncio -async def test_run_subagent_background_reports_to_origin(agent) -> None: +async def test_background_subagent_synthesises_instead_of_dumping(agent, monkeypatch) -> None: + from core.agent import AgentResponse + channel = AsyncMock() agent.channels["telegram"] = channel - agent.llm = _ScriptedLLM([LLMResponse(text="bg result", tool_calls=[])]) + agent.llm = _ScriptedLLM([LLMResponse(text="raw findings: CHF 599", tool_calls=[])]) + + # Capture the synthesis turn instead of running the whole agent loop. + captured: dict = {} + + async def fake_process(message, channel, user_id, chat_id, decompose=True, **kw): + captured.update(message=message, decompose=decompose, chat_id=chat_id) + return AgentResponse(text="The cheapest is CHF 599.") - # The spawning turn already left a user+assistant pair in history. - await agent.history.add_turn("telegram", "u1", "user", "do it async", "555") - await agent.history.add_turn("telegram", "u1", "assistant", "started in the background", "555") + monkeypatch.setattr(agent, "process", fake_process) result = await agent.run_subagent( - task="async work", + task="price check", origin_channel="telegram", origin_user_id="u1", origin_chat_id="555", background=True, ) assert result["background"] is True - assert result["status"] == "running" - run = agent.subagents.get(result["run_id"]) - await run._task # let the background loop finish + await run._task # let the background loop + synthesis finish assert run.status == "done" - # Delivered to the chat... - channel.send.assert_awaited_once() - sent_chat, sent_text = channel.send.await_args.args - assert sent_chat == "555" - assert "bg result" in sent_text - # ...and folded into the trailing assistant turn (history stays alternating). - turns = await agent.history.get_messages("telegram", "u1", "555") - assert [t["role"] for t in turns] == ["user", "assistant"] - assert "started in the background" in str(turns[-1]["content"]) - assert "bg result" in str(turns[-1]["content"]) + assert run.synthesized is True + # The user sees the SYNTHESISED reply, never the raw subagent output. + channel.send.assert_awaited_once_with("555", "The cheapest is CHF 599.") + # The synthesis turn received the raw findings as a non-decomposed internal trigger. + assert "raw findings: CHF 599" in captured["message"] + assert captured["decompose"] is False + + +@pytest.mark.asyncio +async def test_background_batch_synthesises_once_when_all_done(agent, monkeypatch) -> None: + calls: list[list[str]] = [] + + async def fake_synth(channel, user_id, chat_id, batch): + calls.append([r.run_id for r in batch]) + + monkeypatch.setattr(agent, "_synthesize_and_reply", fake_synth) + + common = dict(background=True, origin_channel="telegram", origin_chat_id="c") + r1 = SubagentRun(run_id="s1", persona="", task="a", origin_user_id="u", **common) + r2 = SubagentRun(run_id="s2", persona="", task="b", origin_user_id="u", **common) + agent.subagents.register(r1) + agent.subagents.register(r2) + + # First finishes → the other is still running → barrier holds, no synthesis. + agent.subagents.finish("s1", "done", result="x") + await agent._maybe_synthesize_subagent_batch(r1) + assert calls == [] + + # Last finishes → barrier releases → ONE synthesis over the whole batch. + agent.subagents.finish("s2", "done", result="y") + await agent._maybe_synthesize_subagent_batch(r2) + assert len(calls) == 1 + assert sorted(calls[0]) == ["s1", "s2"] + assert r1.synthesized and r2.synthesized + + +@pytest.mark.asyncio +async def test_cancelling_a_sibling_releases_a_deferred_reply(agent, monkeypatch) -> None: + """Regression: a done run that deferred to a still-running sibling must not be + orphaned when the user cancels that sibling (the lost-reply blocker).""" + import asyncio + + calls: list[list[str]] = [] + + async def fake_synth(channel, user_id, chat_id, batch): + calls.append(sorted(r.run_id for r in batch)) + + monkeypatch.setattr(agent, "_synthesize_and_reply", fake_synth) + + gate = asyncio.Event() + + async def fake_loop(task, persona, state, run): + if task == "B-task": + await gate.wait() # block so this run is "still running" when A finishes + return "B done" + return "A done" + + monkeypatch.setattr(agent, "_run_subagent_loop", fake_loop) + + origin = dict(origin_channel="telegram", origin_user_id="u", origin_chat_id="c") + b_res = await agent.run_subagent(task="B-task", background=True, **origin) + a_res = await agent.run_subagent(task="A-task", background=True, **origin) + + # A completes but B is still running → A defers (not synthesised, not lost). + await agent.subagents.get(a_res["run_id"])._task + assert calls == [] + assert agent.subagents.get(a_res["run_id"]).synthesized is False + + # User cancels B → its cancel path must release A's deferred reply. + agent.subagents.cancel(b_res["run_id"]) + with pytest.raises(asyncio.CancelledError): + await agent.subagents.get(b_res["run_id"])._task + + # A's reply was synthesised (not lost), and only A — B was cancelled. + assert calls == [[a_res["run_id"]]] + assert agent.subagents.get(a_res["run_id"]).synthesized is True @pytest.mark.asyncio @@ -305,31 +376,11 @@ def test_subagent_status_note_lists_only_running_runs(agent) -> None: # Scoped to the chat: a different chat sees nothing. assert agent._subagent_status_note("repl", "other-chat") == "" - # Once finished it leaves the preamble (its result is in history instead). + # Once finished it leaves the preamble (the agent synthesises a reply instead). agent.subagents.finish("r1", "done", result="the iPhone 17e is CHF 599") assert agent._subagent_status_note("repl", "repl") == "" -@pytest.mark.asyncio -async def test_record_subagent_in_history_merges_into_assistant_turn(agent) -> None: - run = SubagentRun( - run_id="r9", - persona="coding-helper", - task="t", - origin_channel="repl", - origin_user_id="repl", - origin_chat_id="repl", - ) - # No prior assistant turn → a fresh assistant turn is added. - await agent.history.add_turn("repl", "repl", "user", "hello", "repl") - await agent.history.add_turn("repl", "repl", "assistant", "on it", "repl") - await agent._record_subagent_in_history(run, "🤖 done: CHF 599") - - turns = await agent.history.get_messages("repl", "repl", "repl") - assert [t["role"] for t in turns] == ["user", "assistant"] # merged, not appended - assert "CHF 599" in str(turns[-1]["content"]) - - # --------------------------------------------------------------------------- # Scheduled subagent jobs # --------------------------------------------------------------------------- From 8c11bc68593788a6b2df51a140c46619f6a9f0f4 Mon Sep 17 00:00:00 2001 From: Matteo Merola Date: Mon, 29 Jun 2026 08:56:01 +0200 Subject: [PATCH 12/13] fix(subagents): a synthesis turn cannot fan out more background work MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The synthesis turn runs the full agent loop, which still offered spawn_subagent — so a misbehaving model could chain new background spawns during synthesis, each triggering another synthesis turn (unbounded, behind only a prompt instruction). process() gains allow_subagents (default True); the synthesis turn passes False, which structurally drops spawn_subagent from its tools and omits the persona roster. Test asserts the tool is withheld. --- core/agent.py | 8 +++++--- tests/test_subagents.py | 25 +++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/core/agent.py b/core/agent.py index 79df2ba..6ca5842 100644 --- a/core/agent.py +++ b/core/agent.py @@ -587,6 +587,7 @@ async def process( chat_id: str = "", persona_name: str | None = None, decompose: bool = True, + allow_subagents: bool = True, ) -> AgentResponse: """Process an incoming message through the LLM with tool-use loop. @@ -640,7 +641,7 @@ async def process( scope=_persona_scope(persona), persona=persona, session_key=session_key, - offer_personae=True, + offer_personae=allow_subagents, ) # Append the status of still-running background subagents from this chat, # so the agent always knows what is pending (their results are folded into @@ -654,7 +655,7 @@ async def process( scoped_tools(persona), secrets_available=self.secret_store is not None, artifacts_enabled=self.config.artifacts.enabled, - subagents_enabled=self.config.subagents.enabled, + subagents_enabled=self.config.subagents.enabled and allow_subagents, ) # Static system prompt. In session mode it is snapshotted once at the @@ -2254,7 +2255,7 @@ async def _synthesize_and_reply( "you, not the user, so do NOT paste them verbatim or mention " "'subagents'. Using these findings together with the user's original " "request in this conversation, write ONE concise, natural reply to the " - "user now. Do not spawn more subagents.\n\n" + "\n\n".join(findings) + "user now.\n\n" + "\n\n".join(findings) ) try: response = await self.process( @@ -2263,6 +2264,7 @@ async def _synthesize_and_reply( user_id=user_id, chat_id=chat_id, decompose=False, + allow_subagents=False, # a synthesis turn must not fan out more work ) except Exception: log.exception("Subagent synthesis turn failed (chat=%s)", chat_id) diff --git a/tests/test_subagents.py b/tests/test_subagents.py index 077e89c..2724a0b 100644 --- a/tests/test_subagents.py +++ b/tests/test_subagents.py @@ -312,6 +312,31 @@ async def fake_loop(task, persona, state, run): assert agent.subagents.get(a_res["run_id"]).synthesized is True +@pytest.mark.asyncio +async def test_allow_subagents_false_withholds_spawn_tool(agent) -> None: + """A synthesis turn (allow_subagents=False) must not be offered spawn_subagent, + so it can't fan out more background work behind a soft prompt guard.""" + captured: list[set[str]] = [] + + class RecLLM(_ScriptedLLM): + async def generate(self, *, tools=(), **_kw) -> LLMResponse: + captured.append({t["name"] for t in tools}) + return await super().generate() + + # Default (allowed) → the tool is on the table. + agent.llm = RecLLM([LLMResponse(text="ok", tool_calls=[])]) + await agent.process(message="hi", channel="telegram", user_id="u", chat_id="c1") + assert "spawn_subagent" in captured[0] + + # Synthesis turn (disallowed) → it is withheld. + captured.clear() + agent.llm = RecLLM([LLMResponse(text="ok", tool_calls=[])]) + await agent.process( + message="hi", channel="telegram", user_id="u", chat_id="c2", allow_subagents=False + ) + assert "spawn_subagent" not in captured[0] + + @pytest.mark.asyncio async def test_run_subagent_background_respects_concurrency(agent) -> None: agent.config.subagents.max_concurrent = 1 From c01de8732811c25be9c0247c8cd48d19fb720d66 Mon Sep 17 00:00:00 2001 From: Matteo Merola Date: Mon, 29 Jun 2026 09:47:09 +0200 Subject: [PATCH 13/13] feat(subagents): summary inference for finished background batches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the per-result raw delivery + ad-hoc synthesis turn with a dedicated summary inference, and stop polluting the user chat and the agent context with raw subagent output. - New subagent_summary inference (enabled/provider/model/thinking_level, mirroring memory/compaction/reflection). When a chat's background batch finishes, it distils the results into a one-line chat *notification* and a concise *digest* kept in the agent's context for follow-ups. Disabled or on failure → crude first-line truncation (short_summary). Default model is the fast/cheap deepseek-v4-flash. - Drops the full process()-based synthesis turn (and the now-dead decompose / allow_subagents params); the notification is what the user sees, the raw result stays only in the ephemeral run registry. - Admin: Result-summary controls in the Tools-tab Subagents card. - Docs/README/config example updated. Also switch the text background inferences (memory extraction/consolidation, goal decomposition, task reflection, compaction) from claude-haiku-4-5 to deepseek-v4-flash by default — better and cheaper — including the admin-UI fallbacks and llm.html placeholders. Vision stays on a multimodal model. --- README.md | 2 +- api/admin.py | 31 +++++--- api/templates/partials/llm.html | 20 ++--- api/templates/partials/tools.html | 41 +++++++++- config.yml.example | 13 +++- core/agent.py | 124 ++++++++++++++++++------------ core/config.py | 36 ++++++--- core/subagents.py | 71 ++++++++++++++++- docs/content/docs/subagents.mdx | 30 ++++++-- tests/test_subagents.py | 106 +++++++++++++------------ 10 files changed, 334 insertions(+), 140 deletions(-) diff --git a/README.md b/README.md index 2041327..e446190 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ A self-hosted personal AI agent that runs in a single Docker container. MPA acts - **Personae** — Swappable agent identities (own character, skill/tool scope, voice). Bind one per chat — and, on Telegram, per forum topic — so several run concurrently, each with its own isolated context. Give a persona its own bot token and it becomes a separate Telegram contact (bot-per-persona) - **Memory** — Two-tier system: permanent long-term facts and expiring short-term context, both extracted automatically from conversations - **Scheduled tasks** — Cron-based jobs for morning briefings, email checks, contact sync, and custom tasks -- **Subagents** — Delegate a scoped subtask to a sub-loop under a chosen persona, on demand or scheduled. Runs sync (result returned in-turn) or in the background (result posted back to chat); scope is a subset of the caller's (inherit-never-widen) with recursion-depth and token/step budgets. Monitor and cancel from Telegram (`/jobs`) or the admin UI +- **Subagents** — Delegate a scoped subtask to a sub-loop under a chosen persona, on demand or scheduled. Runs sync (result returned in-turn) or in the background; a finished background batch is distilled by a summary inference into a one-line chat notification + a concise context digest (raw output never reaches the user or the agent's context). The agent sizes each run (steps / token budget / thinking effort) and defaults the persona to its own; scope is a subset of the caller's (inherit-never-widen). Monitor and cancel from Telegram (`/jobs`) or the admin UI - **Voice** — Speech-to-text (faster-whisper) and text-to-speech (edge-tts) - **Web search** — Tavily integration for real-time information - **Browser automation** — Optional headless browser (Playwright) to read JS-heavy pages and act on sites, with persistent logged-in profiles and per-domain approval (off by default) diff --git a/api/admin.py b/api/admin.py index 600e864..244e02b 100644 --- a/api/admin.py +++ b/api/admin.py @@ -1036,13 +1036,13 @@ async def partial_llm() -> HTMLResponse: deepseek_vaulted = _is_vault_ref(deepseek_api_key) model = await config_store.get("agent.model") or "claude-4-6-sonnet" thinking_level = await config_store.get("agent.thinking_level") or "" - extraction_provider = await config_store.get("memory.extraction_provider") or "anthropic" - extraction_model = await config_store.get("memory.extraction_model") or "claude-haiku-4-5" + extraction_provider = await config_store.get("memory.extraction_provider") or "deepseek" + extraction_model = await config_store.get("memory.extraction_model") or "deepseek-v4-flash" consolidation_provider = ( - await config_store.get("memory.consolidation_provider") or "anthropic" + await config_store.get("memory.consolidation_provider") or "deepseek" ) consolidation_model = ( - await config_store.get("memory.consolidation_model") or "claude-haiku-4-5" + await config_store.get("memory.consolidation_model") or "deepseek-v4-flash" ) extraction_thinking_level = await config_store.get("memory.extraction_thinking_level") or "" consolidation_thinking_level = ( @@ -1050,16 +1050,16 @@ async def partial_llm() -> HTMLResponse: ) gd_enabled = await config_store.get("goal_decomposition.enabled") gd_enabled = gd_enabled if gd_enabled is not None else "true" - gd_provider = await config_store.get("goal_decomposition.provider") or "anthropic" - gd_model = await config_store.get("goal_decomposition.model") or "claude-haiku-4-5" + gd_provider = await config_store.get("goal_decomposition.provider") or "deepseek" + gd_model = await config_store.get("goal_decomposition.model") or "deepseek-v4-flash" tr_enabled = await config_store.get("task_reflection.enabled") tr_enabled = tr_enabled if tr_enabled is not None else "true" - tr_provider = await config_store.get("task_reflection.provider") or "anthropic" - tr_model = await config_store.get("task_reflection.model") or "claude-haiku-4-5" + tr_provider = await config_store.get("task_reflection.provider") or "deepseek" + tr_model = await config_store.get("task_reflection.model") or "deepseek-v4-flash" gd_thinking_level = await config_store.get("goal_decomposition.thinking_level") or "" tr_thinking_level = await config_store.get("task_reflection.thinking_level") or "" - compaction_provider = await config_store.get("compaction.provider") or "anthropic" - compaction_model = await config_store.get("compaction.model") or "claude-haiku-4-5" + compaction_provider = await config_store.get("compaction.provider") or "deepseek" + compaction_model = await config_store.get("compaction.model") or "deepseek-v4-flash" compaction_thinking_level = await config_store.get("compaction.thinking_level") or "" vision_enabled = await config_store.get("vision.enabled") vision_enabled = vision_enabled if vision_enabled is not None else "false" @@ -1164,6 +1164,13 @@ async def partial_tools() -> HTMLResponse: sub_steps = await config_store.get("subagents.max_steps") or "12" sub_tokens = await config_store.get("subagents.token_budget") or "100000" sub_concurrent = await config_store.get("subagents.max_concurrent") or "3" + # Result-summary inference (notification + context digest) for finished + # background batches — fast/cheap model by default. + ss_enabled = await config_store.get("subagent_summary.enabled") + ss_enabled = ss_enabled if ss_enabled is not None else "true" + ss_provider = await config_store.get("subagent_summary.provider") or "deepseek" + ss_model = await config_store.get("subagent_summary.model") or "deepseek-v4-flash" + ss_thinking = await config_store.get("subagent_summary.thinking_level") or "" return _render_partial( "partials/tools.html", @@ -1185,6 +1192,10 @@ async def partial_tools() -> HTMLResponse: subagents_max_steps=sub_steps, subagents_token_budget=sub_tokens, subagents_max_concurrent=sub_concurrent, + summary_enabled=ss_enabled, + summary_provider=ss_provider, + summary_model=ss_model, + summary_thinking_level=ss_thinking, ) @app.post("/tools/gh/test", dependencies=[Depends(auth)]) diff --git a/api/templates/partials/llm.html b/api/templates/partials/llm.html index ad521d0..f64e61b 100644 --- a/api/templates/partials/llm.html +++ b/api/templates/partials/llm.html @@ -38,23 +38,23 @@ {{ grok_base_url|default('', true)|tojson|forceescape }}, {{ deepseek_api_key|default('', true)|tojson|forceescape }}, {{ deepseek_base_url|default('', true)|tojson|forceescape }}, - {{ extraction_provider|default('anthropic', true)|tojson|forceescape }}, - {{ extraction_model|default('claude-haiku-4-5', true)|tojson|forceescape }}, - {{ consolidation_provider|default('anthropic', true)|tojson|forceescape }}, - {{ consolidation_model|default('claude-haiku-4-5', true)|tojson|forceescape }}, + {{ extraction_provider|default('deepseek', true)|tojson|forceescape }}, + {{ extraction_model|default('deepseek-v4-flash', true)|tojson|forceescape }}, + {{ consolidation_provider|default('deepseek', true)|tojson|forceescape }}, + {{ consolidation_model|default('deepseek-v4-flash', true)|tojson|forceescape }}, {{ gd_enabled|default('true', true)|tojson|forceescape }}, - {{ gd_provider|default('anthropic', true)|tojson|forceescape }}, - {{ gd_model|default('claude-haiku-4-5', true)|tojson|forceescape }}, + {{ gd_provider|default('deepseek', true)|tojson|forceescape }}, + {{ gd_model|default('deepseek-v4-flash', true)|tojson|forceescape }}, {{ tr_enabled|default('true', true)|tojson|forceescape }}, - {{ tr_provider|default('anthropic', true)|tojson|forceescape }}, - {{ tr_model|default('claude-haiku-4-5', true)|tojson|forceescape }}, + {{ tr_provider|default('deepseek', true)|tojson|forceescape }}, + {{ tr_model|default('deepseek-v4-flash', true)|tojson|forceescape }}, {{ prompt_tool_usage_override|default('', true)|tojson|forceescape }}, {{ prompt_history_override|default('', true)|tojson|forceescape }}, {{ default_tool_usage|default('', true)|tojson|forceescape }}, {{ default_history_handling|default('', true)|tojson|forceescape }}, {{ prompt_capture_enabled|default(false, true)|tojson|forceescape }}, - {{ compaction_provider|default('anthropic', true)|tojson|forceescape }}, - {{ compaction_model|default('claude-haiku-4-5', true)|tojson|forceescape }}, + {{ compaction_provider|default('deepseek', true)|tojson|forceescape }}, + {{ compaction_model|default('deepseek-v4-flash', true)|tojson|forceescape }}, {{ thinking_level|default('', true)|tojson|forceescape }}, {{ extraction_thinking_level|default('', true)|tojson|forceescape }}, {{ consolidation_thinking_level|default('', true)|tojson|forceescape }}, diff --git a/api/templates/partials/tools.html b/api/templates/partials/tools.html index 171e1dc..885f953 100644 --- a/api/templates/partials/tools.html +++ b/api/templates/partials/tools.html @@ -83,6 +83,10 @@

Web artifacts

steps: {{ subagents_max_steps|default('12', true)|tojson|forceescape }}, tokens: {{ subagents_token_budget|default('100000', true)|tojson|forceescape }}, concurrent: {{ subagents_max_concurrent|default('3', true)|tojson|forceescape }}, + summaryEnabled: {{ summary_enabled|default('true', true)|tojson|forceescape }}, + summaryProvider: {{ summary_provider|default('deepseek', true)|tojson|forceescape }}, + summaryModel: {{ summary_model|default('deepseek-v4-flash', true)|tojson|forceescape }}, + summaryThinking: {{ summary_thinking_level|default('', true)|tojson|forceescape }}, result: '', resultOk: false, save() { const vals = { @@ -90,7 +94,11 @@

Web artifacts

'subagents.recursion_depth': String(parseInt(this.recursion) || 1), 'subagents.max_steps': String(parseInt(this.steps) || 1), 'subagents.token_budget': String(parseInt(this.tokens) || 1), - 'subagents.max_concurrent': String(parseInt(this.concurrent) || 1) + 'subagents.max_concurrent': String(parseInt(this.concurrent) || 1), + 'subagent_summary.enabled': String(this.summaryEnabled === true || this.summaryEnabled === 'true'), + 'subagent_summary.provider': String(this.summaryProvider || 'deepseek').trim(), + 'subagent_summary.model': String(this.summaryModel || 'deepseek-v4-flash').trim(), + 'subagent_summary.thinking_level': String(this.summaryThinking || '').trim() }; fetch('/config', { method: 'PATCH', @@ -143,6 +151,37 @@

Subagents

Background runs allowed at once.

+
+
+

Result summary

+
+ + +
+
+

+ When a batch of background runs finishes, a small inference distils it into a + one-line chat notification plus a concise context digest (instead of dumping + raw output). Off falls back to a crude first-line truncation. +

+
+
+ + +
+
+ + +
+
+ + +

Blank = off. low / medium / high for reasoning models.

+
+
+
diff --git a/config.yml.example b/config.yml.example index 331f322..434a429 100644 --- a/config.yml.example +++ b/config.yml.example @@ -95,8 +95,8 @@ history: memory: db_path: "data/memory.db" long_term_limit: 50 - extraction_model: "claude-haiku-4-5" # cheap model for post-turn memory extraction - consolidation_model: "claude-haiku-4-5" # model for scheduled consolidation + hygiene + extraction_model: "deepseek-v4-flash" # fast + cheap model for post-turn memory extraction + consolidation_model: "deepseek-v4-flash" # model for scheduled consolidation + hygiene # Tier 2 — semantic similarity + relevance-ranked injection. # Default: a local on-device model (fastembed, private, no API key, free). @@ -141,3 +141,12 @@ subagents: max_steps: 12 # max tool-call rounds per run (hard stop) token_budget: 100000 # approx token ceiling per run (best-effort) max_concurrent: 3 # max background runs at once + +# When a background subagent batch finishes, distil it into a one-line chat +# notification + a concise context digest (instead of dumping the raw output). +# Off → a crude first-line truncation with no LLM call. +subagent_summary: + enabled: true + provider: "deepseek" + model: "deepseek-v4-flash" # fast + cheap is ideal for this distillation + thinking_level: "" # "" (off) | "low" | "medium" | "high" diff --git a/core/agent.py b/core/agent.py index 6ca5842..5fb1582 100644 --- a/core/agent.py +++ b/core/agent.py @@ -37,10 +37,12 @@ RESULT_FOR_AGENT_INSTRUCTION, SubagentRegistry, SubagentRun, + fallback_summary, narrow_scope, normalize_effort, resolve_cap, short_summary, + summarize_batch, ) from core.task_reflection import ReflectionStore from core.tools import tool_env @@ -586,8 +588,6 @@ async def process( attachments: list[Attachment] | None = None, chat_id: str = "", persona_name: str | None = None, - decompose: bool = True, - allow_subagents: bool = True, ) -> AgentResponse: """Process an incoming message through the LLM with tool-use loop. @@ -620,7 +620,7 @@ async def process( # The resulting plan is request-specific, so it is injected per turn # (in the user-message preamble), not baked into the static prompt. decomposed_goal: DecomposedGoal | None = None - if decompose and self.config.goal_decomposition.enabled and channel != "system": + if self.config.goal_decomposition.enabled and channel != "system": decomposed_goal = await self._maybe_decompose(message) # Resolve the active persona (its identity, skills + tool scope) — a @@ -641,7 +641,7 @@ async def process( scope=_persona_scope(persona), persona=persona, session_key=session_key, - offer_personae=allow_subagents, + offer_personae=True, ) # Append the status of still-running background subagents from this chat, # so the agent always knows what is pending (their results are folded into @@ -655,7 +655,7 @@ async def process( scoped_tools(persona), secrets_available=self.secret_store is not None, artifacts_enabled=self.config.artifacts.enabled, - subagents_enabled=self.config.subagents.enabled and allow_subagents, + subagents_enabled=self.config.subagents.enabled, ) # Static system prompt. In session mode it is snapshotted once at the @@ -2189,29 +2189,29 @@ async def _run_subagent_background( # cancellation is the one terminal path that never re-checks the # barrier. Release it before unwinding. (Safe to await here: the # cancellation was already delivered once and won't re-fire.) - await self._maybe_synthesize_subagent_batch(run) + await self._maybe_deliver_subagent_batch(run) raise except Exception as exc: log.exception("Background subagent %s failed", run.run_id) if self.subagents.finish(run.run_id, "error", error=str(exc)): - await self._maybe_synthesize_subagent_batch(run) + await self._maybe_deliver_subagent_batch(run) return # finish() returns False if a late cancellation already finalised the run, # in which case this completion must not also trigger a reply. if self.subagents.finish(run.run_id, "done", result=text): - await self._maybe_synthesize_subagent_batch(run) + await self._maybe_deliver_subagent_batch(run) - async def _maybe_synthesize_subagent_batch(self, run: SubagentRun) -> None: - """Once every background run for this chat is done, run ONE agent turn that - ingests the batch and answers the user. The barrier collapses a fan-out of - parallel spawns into a single synthesised reply (#15).""" + async def _maybe_deliver_subagent_batch(self, run: SubagentRun) -> None: + """Once every background run for this chat is done, distil the batch into a + chat notification + a context digest and deliver them. The barrier collapses + a fan-out of parallel spawns into a single delivery (#15).""" channel, user_id, chat_id = run.origin_channel, run.origin_user_id, run.origin_chat_id if not chat_id or channel == "system": return # scheduler / system-origin runs have no user chat to answer # Barrier — race-free because there is no await between this check and # marking the batch below: while another background run for the chat is - # still running, defer; that last finisher does the synthesis. (Sync runs - # are ignored: they return inline and never reach this path.) + # still running, defer; the last finisher delivers. (Sync runs are ignored: + # they return inline and never reach this path.) runs = self.subagents.list_runs() if any( r.background @@ -2234,47 +2234,75 @@ async def _maybe_synthesize_subagent_batch(self, run: SubagentRun) -> None: return for r in batch: r.synthesized = True - await self._synthesize_and_reply(channel, user_id, chat_id, batch) + await self._summarize_and_deliver(channel, user_id, chat_id, batch) - async def _synthesize_and_reply( + async def _summarize_and_deliver( self, channel: str, user_id: str, chat_id: str, batch: list[SubagentRun] ) -> None: - """Run a synthesis turn over a finished batch and deliver the agent's reply. - - The findings go in as an INTERNAL trigger message (not from the user); the - agent's reply — in its own persona/voice — is what the user actually sees. + """Distil a finished batch into a one-line chat notification + a concise + context digest, then deliver: notification → the user, digest → the agent's + context. The raw subagent output reaches neither the user nor the context. """ - findings = [] - for r in batch: - outcome = r.result if r.status == "done" else f"[failed: {r.error or 'unknown error'}]" - who = r.persona or "you" - findings.append(f"• helper for «{r.task[:100]}» (persona: {who}):\n{outcome}") - instruction = ( - "[INTERNAL — not from the user] The background helper(s) you spawned " - "earlier have finished. Their raw findings are below; they worked for " - "you, not the user, so do NOT paste them verbatim or mention " - "'subagents'. Using these findings together with the user's original " - "request in this conversation, write ONE concise, natural reply to the " - "user now.\n\n" + "\n\n".join(findings) - ) - try: - response = await self.process( - message=instruction, - channel=channel, - user_id=user_id, - chat_id=chat_id, - decompose=False, - allow_subagents=False, # a synthesis turn must not fan out more work - ) - except Exception: - log.exception("Subagent synthesis turn failed (chat=%s)", chat_id) - return + notification, digest = await self._summarize_subagent_batch(batch) + # The user only ever saw the notification; the agent's context keeps the + # concise digest (so it can answer follow-ups) — never the raw output. + framed = notification + if digest and digest.strip() and digest.strip() != notification.strip(): + framed = f"{notification}\n\n\n{digest}\n" + await self._record_subagent_context(channel, user_id, chat_id, framed) ch = self.channels.get(channel) - if ch and chat_id and response.text: + if ch and chat_id and notification: try: - await ch.send(chat_id, response.text) + await ch.send(chat_id, notification) except Exception: - log.exception("Failed to deliver synthesised subagent reply (chat=%s)", chat_id) + log.exception("Failed to deliver subagent notification (chat=%s)", chat_id) + + async def _summarize_subagent_batch(self, batch: list[SubagentRun]) -> tuple[str, str]: + """(notification, digest) for a finished batch via the summary inference, + falling back to truncation when it is disabled or the inference fails.""" + items = [ + ( + r.task, + r.result if r.status == "done" else f"[failed: {r.error or 'unknown error'}]", + r.persona or "", + r.status, + ) + for r in batch + ] + cfg = self.config.subagent_summary + if cfg.enabled: + try: + llm = self._background_llm(cfg.provider, cfg.thinking_level) + return await summarize_batch(llm, cfg.model, items) + except Exception: + log.exception("Subagent summary inference failed; using truncation fallback") + return fallback_summary(items) + + async def _record_subagent_context( + self, channel: str, user_id: str, chat_id: str, framed: str + ) -> None: + """Record a background batch's notification + digest as an assistant turn — + merged into the trailing assistant turn so replayed history stays strictly + alternating for providers that require it (#15).""" + if not chat_id or channel == "system": + return + try: + if self.history_mode == "session": + merged = await self.history.append_to_last_session_message( + channel, user_id, f"\n\n{framed}", chat_id + ) + if not merged: + await self.history.append_session_message( + channel, user_id, {"role": "assistant", "content": framed}, chat_id + ) + else: + merged = await self.history.append_to_last_turn( + channel, user_id, "assistant", f"\n\n{framed}", chat_id + ) + if not merged: + await self.history.add_turn(channel, user_id, "assistant", framed, chat_id) + except Exception: + log.exception("Failed to record subagent context (chat=%s)", chat_id) @staticmethod def _usage_total(usage: dict | None) -> int: diff --git a/core/config.py b/core/config.py index 15b2ec6..34d724e 100644 --- a/core/config.py +++ b/core/config.py @@ -188,10 +188,10 @@ class EmbeddingConfig(BaseModel): class MemoryConfig(BaseModel): db_path: str = "data/memory.db" long_term_limit: int = 50 - extraction_provider: str = "anthropic" - extraction_model: str = "claude-haiku-4-5" - consolidation_provider: str = "anthropic" - consolidation_model: str = "claude-haiku-4-5" + extraction_provider: str = "deepseek" + extraction_model: str = "deepseek-v4-flash" + consolidation_provider: str = "deepseek" + consolidation_model: str = "deepseek-v4-flash" extraction_thinking_level: str = "" # "" (off) | "low" | "medium" | "high" consolidation_thinking_level: str = "" # "" (off) | "low" | "medium" | "high" extraction_cooldown_seconds: int = 120 # minimum seconds between extractions @@ -211,15 +211,15 @@ class MemoryConfig(BaseModel): class GoalDecompositionConfig(BaseModel): enabled: bool = True - provider: str = "anthropic" - model: str = "claude-haiku-4-5" + provider: str = "deepseek" + model: str = "deepseek-v4-flash" thinking_level: str = "" # "" (off) | "low" | "medium" | "high" class TaskReflectionConfig(BaseModel): enabled: bool = True - provider: str = "anthropic" - model: str = "claude-haiku-4-5" + provider: str = "deepseek" + model: str = "deepseek-v4-flash" thinking_level: str = "" # "" (off) | "low" | "medium" | "high" db_path: str = "data/reflections.db" max_reflections: int = 50 # max reflections to keep for prompt injection @@ -233,8 +233,8 @@ class CompactionConfig(BaseModel): """ enabled: bool = True - provider: str = "anthropic" - model: str = "claude-haiku-4-5" + provider: str = "deepseek" + model: str = "deepseek-v4-flash" thinking_level: str = "" # "" (off) | "low" | "medium" | "high" threshold_type: str = "percent" # "percent" (of context window) or "tokens" (absolute) threshold_percent: int = 80 # trigger at this % of the model's context window @@ -318,6 +318,21 @@ class SubagentsConfig(BaseModel): max_concurrent: int = 3 # max background runs at once +class SubagentSummaryConfig(BaseModel): + """Summarise a finished background subagent batch (issue #15). + + Instead of dumping a subagent's raw output to the chat and the agent's + context, a small inference distils each batch into a one-sentence chat + *notification* and a concise *digest* for the agent's context. Mirrors the + other background inferences (memory / compaction / reflection). + """ + + enabled: bool = True + provider: str = "deepseek" # fast + cheap is ideal for this distillation + model: str = "deepseek-v4-flash" + thinking_level: str = "" # "" (off) | "low" | "medium" | "high" + + class Config(BaseModel): agent: AgentConfig = AgentConfig() channels: ChannelsConfig = ChannelsConfig() @@ -337,6 +352,7 @@ class Config(BaseModel): tools: ToolsConfig = ToolsConfig() artifacts: ArtifactsConfig = ArtifactsConfig() subagents: SubagentsConfig = SubagentsConfig() + subagent_summary: SubagentSummaryConfig = SubagentSummaryConfig() def load_config(path: str | Path = "config.yml") -> Config: diff --git a/core/subagents.py b/core/subagents.py index cbb25a1..8c48cc2 100644 --- a/core/subagents.py +++ b/core/subagents.py @@ -220,11 +220,78 @@ def _trim(self) -> None: def short_summary(text: str, limit: int = 280) -> str: """A one-glance summary of a subagent's result — first non-empty line, capped. - ponytail: a truncation, not an LLM call. Add a summariser model only if the - plain preview proves too lossy in practice. + The crude fallback for :func:`summarize_batch` when the summary inference is + disabled or its output can't be parsed; also the sync spawn's preview field. """ for line in (text or "").splitlines(): line = line.strip() if line: return line[:limit] + ("…" if len(line) > limit else "") return (text or "").strip()[:limit] + + +# (task, outcome, persona, status) for one finished background run. +SummaryItem = tuple[str, str, str, str] + +_SUMMARY_PROMPT = ( + "Background helper task(s) ran for a user and finished. Summarise their " + "results into TWO things, and nothing else:\n" + "1. NOTIFICATION — ONE short sentence (max ~140 chars) giving the single most " + "important result/answer, phrased for the user. No greeting, no 'the helper " + "found', no markdown.\n" + "2. DIGEST — a few concise sentences with the key facts the assistant may need " + "to answer follow-ups. Real content only; do not restate the task or pad.\n\n" + "Respond in EXACTLY this format (NOTIFICATION on one line):\n" + "NOTIFICATION: \n" + "DIGEST: " +) + + +def _format_items(items: list[SummaryItem]) -> str: + blocks = [] + for task, outcome, _persona, status in items: + blocks.append(f"--- task: {task}\nstatus: {status}\nresult:\n{outcome}") + return "\n\n".join(blocks) + + +def fallback_summary(items: list[SummaryItem]) -> tuple[str, str]: + """Crude (no-LLM) fallback for :func:`summarize_batch`: first-line previews + for both the notification and the digest, used when the summary inference is + disabled or its output is unusable.""" + notif = "; ".join(s for s in (short_summary(o, 120) for _, o, _, _ in items) if s)[:200] + digest = "\n".join(f"- {t[:80]}: {short_summary(o, 280)}" for t, o, _, _ in items) + return notif, digest + + +def _parse_summary(raw: str) -> tuple[str, str]: + """Pull (notification, digest) out of the model's reply; ('', '') if unusable.""" + if not raw or not raw.strip(): + return "", "" + low = raw.lower() + n_idx, d_idx = low.find("notification:"), low.find("digest:") + if n_idx == -1 and d_idx == -1: + # No markers — take the first non-empty line as the notification. + lines = [ln.strip() for ln in raw.strip().splitlines() if ln.strip()] + return (lines[0], "\n".join(lines[1:]) or lines[0]) if lines else ("", "") + notif = "" + if n_idx != -1: + end = d_idx if (d_idx > n_idx) else len(raw) + body = raw[n_idx + len("notification:") : end].strip() + notif = body.splitlines()[0].strip() if body else "" + digest = raw[d_idx + len("digest:") :].strip() if d_idx != -1 else "" + return notif, digest + + +async def summarize_batch(llm, model: str, items: list[SummaryItem]) -> tuple[str, str]: + """LLM-distil a finished background batch into (notification, digest). + + ``notification`` is one sentence for the chat; ``digest`` is concise context + for the spawning agent. Falls back to truncation if the model's output can't + be parsed (the caller falls back too if the inference itself raises). + """ + prompt = f"{_SUMMARY_PROMPT}\n\n{_format_items(items)}" + raw = await llm.generate_text(model=model, prompt=prompt, max_tokens=600) + notif, digest = _parse_summary(raw) + if not notif: + return fallback_summary(items) + return notif, digest or notif diff --git a/docs/content/docs/subagents.mdx b/docs/content/docs/subagents.mdx index fb09955..f295fe1 100644 --- a/docs/content/docs/subagents.mdx +++ b/docs/content/docs/subagents.mdx @@ -30,11 +30,13 @@ The agent decides to delegate and calls the tool. Two modes: in the same turn. - **Background** (`background: true`): the tool returns a run id immediately and the subagent runs off-turn. A subagent works **for the agent, not the user** — - so its raw output is never shown to you. When the whole batch of background - runs for the chat has finished, the agent runs one **synthesis turn**: it - ingests the findings and writes a single, concise reply in its own voice, and - that is what you see. While a run is in flight, its status is shown to the - agent each turn so it never mistakes a pending run for a finished one. + its raw output is never shown to you. When the whole batch of background runs + for the chat has finished, a small [summary inference](#result-summary) + distils it into **two** things: a one-line *notification* sent to the chat, and + a concise *digest* kept in the agent's context so it can answer follow-ups + without the raw output polluting the conversation. While a run is in flight, + its status is shown to the agent each turn so it never mistakes a pending run + for a finished one. ```jsonc // spawn_subagent tool input @@ -122,6 +124,24 @@ scope. Assign it per persona in the **Personae** editor. A run that hits its ste or token budget stops and returns what it has, with a note that the budget was reached. +## Result summary + +A finished **background** batch isn't dumped on you raw. A small, fast inference +(its own provider/model, like memory or compaction) distils the batch into a +one-line chat **notification** and a concise **digest** that stays in the agent's +context for follow-ups. Configure it in the **Tools** tab (the *Result summary* +section of the Subagents card) or under `subagent_summary` in `config.yml`: + +| Setting | Default | What it does | +|---------|---------|--------------| +| `enabled` | `true` | Off falls back to a crude first-line truncation (no LLM call) | +| `provider` | `deepseek` | Inference provider for the summary | +| `model` | `deepseek-v4-flash` | Fast + cheap is ideal for this distillation | +| `thinking_level` | `""` | `""` (off) / `low` / `medium` / `high` for reasoning models | + +Synchronous spawns are unaffected — the agent gets the full result inline as the +tool result and weaves it into its own reply. + ## Monitoring & cancelling Background subagents are first-class runs you can watch and stop: diff --git a/tests/test_subagents.py b/tests/test_subagents.py index 2724a0b..554abc0 100644 --- a/tests/test_subagents.py +++ b/tests/test_subagents.py @@ -206,21 +206,20 @@ async def test_run_subagent_token_budget_stops_loop(agent) -> None: @pytest.mark.asyncio -async def test_background_subagent_synthesises_instead_of_dumping(agent, monkeypatch) -> None: - from core.agent import AgentResponse - +async def test_background_subagent_notifies_user_digests_context(agent, monkeypatch) -> None: channel = AsyncMock() agent.channels["telegram"] = channel - agent.llm = _ScriptedLLM([LLMResponse(text="raw findings: CHF 599", tool_calls=[])]) + agent.llm = _ScriptedLLM([LLMResponse(text="raw verbose findings: CHF 599 ...", tool_calls=[])]) - # Capture the synthesis turn instead of running the whole agent loop. - captured: dict = {} + # Stub the summary inference: (chat notification, context digest). + async def fake_summary(batch): + return "Cheapest is CHF 599.", "iPhone 17e 256GB at CHF 599; entry model." - async def fake_process(message, channel, user_id, chat_id, decompose=True, **kw): - captured.update(message=message, decompose=decompose, chat_id=chat_id) - return AgentResponse(text="The cheapest is CHF 599.") + monkeypatch.setattr(agent, "_summarize_subagent_batch", fake_summary) - monkeypatch.setattr(agent, "process", fake_process) + # A trailing assistant turn for the digest to merge into (keeps alternation). + await agent.history.add_turn("telegram", "u1", "user", "price?", "555") + await agent.history.add_turn("telegram", "u1", "assistant", "On it.", "555") result = await agent.run_subagent( task="price check", @@ -229,27 +228,28 @@ async def fake_process(message, channel, user_id, chat_id, decompose=True, **kw) origin_chat_id="555", background=True, ) - assert result["background"] is True run = agent.subagents.get(result["run_id"]) - await run._task # let the background loop + synthesis finish + await run._task assert run.status == "done" - assert run.synthesized is True - # The user sees the SYNTHESISED reply, never the raw subagent output. - channel.send.assert_awaited_once_with("555", "The cheapest is CHF 599.") - # The synthesis turn received the raw findings as a non-decomposed internal trigger. - assert "raw findings: CHF 599" in captured["message"] - assert captured["decompose"] is False + # Chat: the one-line NOTIFICATION only — never the raw output. + channel.send.assert_awaited_once_with("555", "Cheapest is CHF 599.") + # Context: the concise digest is kept (merged), the raw output never is. + turns = await agent.history.get_messages("telegram", "u1", "555") + blob = str(turns[-1]["content"]) + assert [t["role"] for t in turns] == ["user", "assistant"] # still alternating + assert "iPhone 17e 256GB at CHF 599" in blob + assert "raw verbose findings" not in blob @pytest.mark.asyncio -async def test_background_batch_synthesises_once_when_all_done(agent, monkeypatch) -> None: +async def test_background_batch_delivers_once_when_all_done(agent, monkeypatch) -> None: calls: list[list[str]] = [] - async def fake_synth(channel, user_id, chat_id, batch): + async def fake_deliver(channel, user_id, chat_id, batch): calls.append([r.run_id for r in batch]) - monkeypatch.setattr(agent, "_synthesize_and_reply", fake_synth) + monkeypatch.setattr(agent, "_summarize_and_deliver", fake_deliver) common = dict(background=True, origin_channel="telegram", origin_chat_id="c") r1 = SubagentRun(run_id="s1", persona="", task="a", origin_user_id="u", **common) @@ -257,14 +257,14 @@ async def fake_synth(channel, user_id, chat_id, batch): agent.subagents.register(r1) agent.subagents.register(r2) - # First finishes → the other is still running → barrier holds, no synthesis. + # First finishes → the other is still running → barrier holds, no delivery. agent.subagents.finish("s1", "done", result="x") - await agent._maybe_synthesize_subagent_batch(r1) + await agent._maybe_deliver_subagent_batch(r1) assert calls == [] - # Last finishes → barrier releases → ONE synthesis over the whole batch. + # Last finishes → barrier releases → ONE delivery over the whole batch. agent.subagents.finish("s2", "done", result="y") - await agent._maybe_synthesize_subagent_batch(r2) + await agent._maybe_deliver_subagent_batch(r2) assert len(calls) == 1 assert sorted(calls[0]) == ["s1", "s2"] assert r1.synthesized and r2.synthesized @@ -278,10 +278,10 @@ async def test_cancelling_a_sibling_releases_a_deferred_reply(agent, monkeypatch calls: list[list[str]] = [] - async def fake_synth(channel, user_id, chat_id, batch): + async def fake_deliver(channel, user_id, chat_id, batch): calls.append(sorted(r.run_id for r in batch)) - monkeypatch.setattr(agent, "_synthesize_and_reply", fake_synth) + monkeypatch.setattr(agent, "_summarize_and_deliver", fake_deliver) gate = asyncio.Event() @@ -297,7 +297,7 @@ async def fake_loop(task, persona, state, run): b_res = await agent.run_subagent(task="B-task", background=True, **origin) a_res = await agent.run_subagent(task="A-task", background=True, **origin) - # A completes but B is still running → A defers (not synthesised, not lost). + # A completes but B is still running → A defers (not delivered, not lost). await agent.subagents.get(a_res["run_id"])._task assert calls == [] assert agent.subagents.get(a_res["run_id"]).synthesized is False @@ -307,34 +307,38 @@ async def fake_loop(task, persona, state, run): with pytest.raises(asyncio.CancelledError): await agent.subagents.get(b_res["run_id"])._task - # A's reply was synthesised (not lost), and only A — B was cancelled. + # A's reply was delivered (not lost), and only A — B was cancelled. assert calls == [[a_res["run_id"]]] assert agent.subagents.get(a_res["run_id"]).synthesized is True +def test_summary_parsing_and_fallback() -> None: + from core.subagents import _parse_summary, fallback_summary + + n, d = _parse_summary("NOTIFICATION: Cheapest is CHF 599.\nDIGEST: iPhone 17e 256GB CHF 599.") + assert n == "Cheapest is CHF 599." + assert "iPhone 17e" in d + # No markers → first non-empty line becomes the notification. + assert _parse_summary("Just one line")[0] == "Just one line" + assert _parse_summary("") == ("", "") + # Truncation fallback from raw items (no LLM). + items = [("task a", "line1\nline2", "", "done"), ("task b", "r b", "", "done")] + notif, digest = fallback_summary(items) + assert "line1" in notif and "r b" in notif + assert "- task a:" in digest + + @pytest.mark.asyncio -async def test_allow_subagents_false_withholds_spawn_tool(agent) -> None: - """A synthesis turn (allow_subagents=False) must not be offered spawn_subagent, - so it can't fan out more background work behind a soft prompt guard.""" - captured: list[set[str]] = [] - - class RecLLM(_ScriptedLLM): - async def generate(self, *, tools=(), **_kw) -> LLMResponse: - captured.append({t["name"] for t in tools}) - return await super().generate() - - # Default (allowed) → the tool is on the table. - agent.llm = RecLLM([LLMResponse(text="ok", tool_calls=[])]) - await agent.process(message="hi", channel="telegram", user_id="u", chat_id="c1") - assert "spawn_subagent" in captured[0] - - # Synthesis turn (disallowed) → it is withheld. - captured.clear() - agent.llm = RecLLM([LLMResponse(text="ok", tool_calls=[])]) - await agent.process( - message="hi", channel="telegram", user_id="u", chat_id="c2", allow_subagents=False - ) - assert "spawn_subagent" not in captured[0] +async def test_summarize_batch_calls_llm_and_parses() -> None: + from core.subagents import summarize_batch + + class FakeLLM: + async def generate_text(self, *, model, prompt, max_tokens=600): + return "NOTIFICATION: Done — 3 results.\nDIGEST: A, B and C found." + + notif, digest = await summarize_batch(FakeLLM(), "m", [("t", "r", "", "done")]) + assert notif == "Done — 3 results." + assert digest == "A, B and C found." @pytest.mark.asyncio