diff --git a/README.md b/README.md index c43cb04..e446190 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ A self-hosted personal AI agent that runs in a single Docker container. MPA acts - **Personae** — Swappable agent identities (own character, skill/tool scope, voice). Bind one per chat — and, on Telegram, per forum topic — so several run concurrently, each with its own isolated context. Give a persona its own bot token and it becomes a separate Telegram contact (bot-per-persona) - **Memory** — Two-tier system: permanent long-term facts and expiring short-term context, both extracted automatically from conversations - **Scheduled tasks** — Cron-based jobs for morning briefings, email checks, contact sync, and custom tasks +- **Subagents** — Delegate a scoped subtask to a sub-loop under a chosen persona, on demand or scheduled. Runs sync (result returned in-turn) or in the background; a finished background batch is distilled by a summary inference into a one-line chat notification + a concise context digest (raw output never reaches the user or the agent's context). The agent sizes each run (steps / token budget / thinking effort) and defaults the persona to its own; scope is a subset of the caller's (inherit-never-widen). Monitor and cancel from Telegram (`/jobs`) or the admin UI - **Voice** — Speech-to-text (faster-whisper) and text-to-speech (edge-tts) - **Web search** — Tavily integration for real-time information - **Browser automation** — Optional headless browser (Playwright) to read JS-heavy pages and act on sites, with persistent logged-in profiles and per-domain approval (off by default) diff --git a/api/admin.py b/api/admin.py index 318be10..244e02b 100644 --- a/api/admin.py +++ b/api/admin.py @@ -835,7 +835,12 @@ async def _persona_editor_ctx() -> dict: store = await _skills_store_from_config(config_store) all_skills = [s["name"] for s in await store.list_skills()] artifacts = await store_from_config(config_store) - return {"all_skills": all_skills, "all_tools": gateable_tools_for(artifacts.enabled)} + sub_enabled = await config_store.get("subagents.enabled") + sub_on = sub_enabled is None or sub_enabled == "true" + return { + "all_skills": all_skills, + "all_tools": gateable_tools_for(artifacts.enabled, subagents_enabled=sub_on), + } @app.get("/admin/personae/new", response_model=None) async def admin_persona_new() -> Response: @@ -1031,13 +1036,13 @@ async def partial_llm() -> HTMLResponse: deepseek_vaulted = _is_vault_ref(deepseek_api_key) model = await config_store.get("agent.model") or "claude-4-6-sonnet" thinking_level = await config_store.get("agent.thinking_level") or "" - extraction_provider = await config_store.get("memory.extraction_provider") or "anthropic" - extraction_model = await config_store.get("memory.extraction_model") or "claude-haiku-4-5" + extraction_provider = await config_store.get("memory.extraction_provider") or "deepseek" + extraction_model = await config_store.get("memory.extraction_model") or "deepseek-v4-flash" consolidation_provider = ( - await config_store.get("memory.consolidation_provider") or "anthropic" + await config_store.get("memory.consolidation_provider") or "deepseek" ) consolidation_model = ( - await config_store.get("memory.consolidation_model") or "claude-haiku-4-5" + await config_store.get("memory.consolidation_model") or "deepseek-v4-flash" ) extraction_thinking_level = await config_store.get("memory.extraction_thinking_level") or "" consolidation_thinking_level = ( @@ -1045,16 +1050,16 @@ async def partial_llm() -> HTMLResponse: ) gd_enabled = await config_store.get("goal_decomposition.enabled") gd_enabled = gd_enabled if gd_enabled is not None else "true" - gd_provider = await config_store.get("goal_decomposition.provider") or "anthropic" - gd_model = await config_store.get("goal_decomposition.model") or "claude-haiku-4-5" + gd_provider = await config_store.get("goal_decomposition.provider") or "deepseek" + gd_model = await config_store.get("goal_decomposition.model") or "deepseek-v4-flash" tr_enabled = await config_store.get("task_reflection.enabled") tr_enabled = tr_enabled if tr_enabled is not None else "true" - tr_provider = await config_store.get("task_reflection.provider") or "anthropic" - tr_model = await config_store.get("task_reflection.model") or "claude-haiku-4-5" + tr_provider = await config_store.get("task_reflection.provider") or "deepseek" + tr_model = await config_store.get("task_reflection.model") or "deepseek-v4-flash" gd_thinking_level = await config_store.get("goal_decomposition.thinking_level") or "" tr_thinking_level = await config_store.get("task_reflection.thinking_level") or "" - compaction_provider = await config_store.get("compaction.provider") or "anthropic" - compaction_model = await config_store.get("compaction.model") or "claude-haiku-4-5" + compaction_provider = await config_store.get("compaction.provider") or "deepseek" + compaction_model = await config_store.get("compaction.model") or "deepseek-v4-flash" compaction_thinking_level = await config_store.get("compaction.thinking_level") or "" vision_enabled = await config_store.get("vision.enabled") vision_enabled = vision_enabled if vision_enabled is not None else "false" @@ -1151,6 +1156,22 @@ async def partial_tools() -> HTMLResponse: artifacts = await store_from_config(config_store) + # Subagents (issue #15) — keys may be absent on a store seeded before the + # feature existed, so fall back to the SubagentsConfig defaults. + sub_enabled = await config_store.get("subagents.enabled") + sub_enabled = sub_enabled if sub_enabled is not None else "true" + sub_recursion = await config_store.get("subagents.recursion_depth") or "3" + sub_steps = await config_store.get("subagents.max_steps") or "12" + sub_tokens = await config_store.get("subagents.token_budget") or "100000" + sub_concurrent = await config_store.get("subagents.max_concurrent") or "3" + # Result-summary inference (notification + context digest) for finished + # background batches — fast/cheap model by default. + ss_enabled = await config_store.get("subagent_summary.enabled") + ss_enabled = ss_enabled if ss_enabled is not None else "true" + ss_provider = await config_store.get("subagent_summary.provider") or "deepseek" + ss_model = await config_store.get("subagent_summary.model") or "deepseek-v4-flash" + ss_thinking = await config_store.get("subagent_summary.thinking_level") or "" + return _render_partial( "partials/tools.html", tools=tool_registry(), @@ -1166,6 +1187,15 @@ async def partial_tools() -> HTMLResponse: artifacts_enabled="true" if artifacts.enabled else "false", artifacts_directory=str(artifacts.dir), artifacts_ttl_hours=str(artifacts.ttl_hours), + subagents_enabled=sub_enabled, + subagents_recursion_depth=sub_recursion, + subagents_max_steps=sub_steps, + subagents_token_budget=sub_tokens, + subagents_max_concurrent=sub_concurrent, + summary_enabled=ss_enabled, + summary_provider=ss_provider, + summary_model=ss_model, + summary_thinking_level=ss_thinking, ) @app.post("/tools/gh/test", dependencies=[Depends(auth)]) @@ -1423,6 +1453,7 @@ def _get_jobs_list() -> list[dict]: "run_at": j.get("run_at", ""), "description": j.get("description", ""), "created_by": j.get("created_by", ""), + "persona": j.get("persona", ""), } ) return jobs @@ -1451,6 +1482,7 @@ async def upsert_job(request: Request) -> HTMLResponse: task = str(body.get("task", "")).strip() channel = str(body.get("channel", "telegram")).strip() description = str(body.get("description", "")).strip() + persona = str(body.get("persona", "")).strip() if not job_id: raise HTTPException(400, "Job ID is required") @@ -1462,10 +1494,13 @@ async def upsert_job(request: Request) -> HTMLResponse: else: if not run_at: raise HTTPException(400, "Run-at datetime is required for one-time jobs") - if job_type not in ("agent", "agent_silent", "system", "memory_consolidation"): + if job_type not in ("agent", "agent_silent", "system", "memory_consolidation", "subagent"): raise HTTPException(400, f"Invalid job type: {job_type}") if job_type != "memory_consolidation" and not task: - raise HTTPException(400, "Task is required for agent/system jobs") + raise HTTPException(400, "Task is required for agent/system/subagent jobs") + # Persona only applies to subagent jobs — don't persist a stray value on others. + if job_type != "subagent": + persona = "" if schedule == "cron": from core.scheduler import _parse_cron @@ -1494,6 +1529,7 @@ async def upsert_job(request: Request) -> HTMLResponse: status="active", created_by="admin", description=description, + persona=persona, ) # Sync with APScheduler if the agent is running @@ -1564,7 +1600,12 @@ async def run_job_now(request: Request) -> HTMLResponse: import asyncio - from core.scheduler import run_agent_task, run_memory_consolidation, run_system_command + from core.scheduler import ( + run_agent_task, + run_memory_consolidation, + run_subagent_task, + run_system_command, + ) job_type = job.get("type", "agent") task = job.get("task", "") @@ -1579,6 +1620,15 @@ async def run_job_now(request: Request) -> HTMLResponse: asyncio.create_task(run_system_command(command=task)) elif job_type == "memory_consolidation": asyncio.create_task(run_memory_consolidation()) + elif job_type == "subagent": + asyncio.create_task( + run_subagent_task( + persona=job.get("persona", ""), + task=task, + channel=channel_name, + job_id=job_id, + ) + ) else: return HTMLResponse('Unknown job type') @@ -1588,6 +1638,44 @@ async def run_job_now(request: Request) -> HTMLResponse: "for output" ) + # ── Subagent runs (issue #15) ────────────────────────────────────── + + def _subagent_runs() -> list: + """Live subagent runs from the running agent (newest first).""" + agent = agent_state.agent + if not agent: + return [] + return agent.subagents.list_runs() + + @app.get("/partials/subagent-runs", dependencies=[Depends(auth)]) + async def partial_subagent_runs() -> HTMLResponse: + """Subagent runs card grid — polled by the Jobs tab for live status.""" + return _render_partial( + "partials/subagent_runs.html", + runs=_subagent_runs(), + agent_running=agent_state.agent is not None, + ) + + @app.post("/subagents/cancel", dependencies=[Depends(auth)]) + async def cancel_subagent(request: Request) -> HTMLResponse: + """Cancel a running subagent. Returns the refreshed runs partial.""" + agent = agent_state.agent + if not agent: + raise HTTPException(503, "Agent not running") + content_type = request.headers.get("content-type", "") + if "application/x-www-form-urlencoded" in content_type: + body = await request.form() + else: + body = await request.json() + run_id = str(body.get("run_id", "")).strip() + if not run_id: + raise HTTPException(400, "Missing 'run_id' in request body") + agent.subagents.cancel(run_id) + log.info("Subagent %r cancelled via admin", run_id) + return _render_partial( + "partials/subagent_runs.html", runs=_subagent_runs(), agent_running=True + ) + # ── Config API ───────────────────────────────────────────────────── @app.get("/config", dependencies=[Depends(auth)]) @@ -3337,14 +3425,18 @@ def _config_requires_restart(values: dict) -> bool: "web_search", "manage_jobs", "write_artifact", + "spawn_subagent", ] -def gateable_tools_for(artifacts_enabled: bool) -> list[str]: +def gateable_tools_for(artifacts_enabled: bool, subagents_enabled: bool = True) -> list[str]: """GATEABLE_TOOLS minus tools whose feature is globally disabled.""" - if artifacts_enabled: - return list(GATEABLE_TOOLS) - return [t for t in GATEABLE_TOOLS if t != "write_artifact"] + out = list(GATEABLE_TOOLS) + if not artifacts_enabled: + out = [t for t in out if t != "write_artifact"] + if not subagents_enabled: + out = [t for t in out if t != "spawn_subagent"] + return out # --------------------------------------------------------------------------- diff --git a/api/templates/partials/jobs.html b/api/templates/partials/jobs.html index 8c01560..e630dab 100644 --- a/api/templates/partials/jobs.html +++ b/api/templates/partials/jobs.html @@ -10,6 +10,15 @@ {% endif %} + {# Subagent runs (issue #15) — live status, polled every 3s #} +
+

Subagent runs

+
+

Loading…

+
+
+ {# Jobs table #}
@@ -119,9 +128,16 @@

agent — natural language task + +
+ + + Persona the subagent adopts (blank = default identity) +
+
+ +

+ Let the assistant delegate a scoped subtask to a sub-loop (the + spawn_subagent tool) under a chosen persona, on demand or + scheduled. A child's tools/skills/secrets are a subset of the caller's — + never wider. When disabled, the tool is hidden from the assistant (and from + the Personae tool scope). Assign it per persona in the + Personae editor; monitor and cancel runs in the + Jobs tab. +

+
+
+ + +

Max nesting; a top-level spawn is depth 1.

+
+
+ + +

Tool-call rounds per run — a hard stop.

+
+
+ + +

Approx token ceiling per run (best-effort).

+
+
+ + +

Background runs allowed at once.

+
+
+
+
+

Result summary

+
+ + +
+
+

+ When a batch of background runs finishes, a small inference distils it into a + one-line chat notification plus a concise context digest (instead of dumping + raw output). Off falls back to a crude first-line truncation. +

+
+
+ + +
+
+ + +
+
+ + +

Blank = off. low / medium / high for reasoning models.

+
+
+
+
+ +
+ + + {# GitHub CLI (gh) #}
diff --git a/channels/telegram.py b/channels/telegram.py index 73e713f..5705fe0 100644 --- a/channels/telegram.py +++ b/channels/telegram.py @@ -13,7 +13,13 @@ from telegram import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Update from telegram.constants import ChatAction from telegram.error import BadRequest -from telegram.ext import Application, CallbackQueryHandler, MessageHandler, filters +from telegram.ext import ( + Application, + CallbackQueryHandler, + CommandHandler, + MessageHandler, + filters, +) from channels.markdown_tg import to_telegram_html @@ -29,6 +35,8 @@ _APPROVE_PREFIX = "perm_approve:" _DENY_PREFIX = "perm_deny:" _ALWAYS_PREFIX = "perm_always:" +# Callback data prefix for subagent-run cancel buttons (issue #15) +_SUB_CANCEL_PREFIX = "sub_cancel:" _HTML_TAG_RE = re.compile( r"]*)?>", re.IGNORECASE, @@ -54,6 +62,10 @@ def __init__( # folded ":" string when the message came from a topic. self._last_chat_for_user: dict[int, int | str] = {} self.app = Application.builder().token(config.bot_token).concurrent_updates(8).build() + # Commands are checked before the text handler so "/jobs" doesn't reach the + # agent as an ordinary message. (Plain text — incl. /new, /clear — still + # falls through to _on_text, which handles those.) + self.app.add_handler(CommandHandler("jobs", self._on_jobs_command)) self.app.add_handler(MessageHandler(filters.TEXT, self._on_text)) self.app.add_handler(MessageHandler(filters.VOICE | filters.AUDIO, self._on_voice)) self.app.add_handler(MessageHandler(filters.PHOTO | filters.Document.IMAGE, self._on_photo)) @@ -228,6 +240,29 @@ async def _on_photo(self, update: Update, context) -> None: name=f"tg-photo-{user_id}", ) + async def _on_jobs_command(self, update: Update, context) -> None: + """/jobs — list active subagent runs with inline cancel buttons (issue #15).""" + user = update.effective_user + message = update.message + if not user or not message: + return + if not self._is_allowed(user.id): + return + runs = self.agent.subagents.list_runs(active_only=True) + if not runs: + await message.reply_text("No active subagent runs.") + return + for r in runs: + text = ( + f"🤖 {r.persona or 'default'} · {r.status} · {r.elapsed_str}\n" + f"{(r.progress or '—')}\n" + f"{r.task[:160]}" + ) + keyboard = InlineKeyboardMarkup( + [[InlineKeyboardButton("Cancel", callback_data=f"{_SUB_CANCEL_PREFIX}{r.run_id}")]] + ) + await message.reply_text(text, parse_mode="HTML", reply_markup=keyboard) + async def _on_approval_callback(self, update: Update, context) -> None: """Handle inline keyboard button presses for permission approvals.""" query = update.callback_query @@ -261,6 +296,12 @@ async def _on_approval_callback(self, update: Update, context) -> None: resolved = self.agent.permissions.resolve_approval(request_id, True, always_allow=True) await self._finalize_approval_response(query, resolved, "Always allowed") + elif data.startswith(_SUB_CANCEL_PREFIX): + run_id = data[len(_SUB_CANCEL_PREFIX) :] + ok = self.agent.subagents.cancel(run_id) + label = "Cancelled" if ok else "Already finished / not found" + await self._finalize_approval_response(query, True, label) + async def _on_forum_topic(self, update: Update, context) -> None: """Auto-bind a freshly created/renamed forum topic to a matching persona. diff --git a/config.yml.example b/config.yml.example index ec7da39..434a429 100644 --- a/config.yml.example +++ b/config.yml.example @@ -59,6 +59,14 @@ scheduler: cron: "0 */8 * * *" task: "sqlite3 /app/data/memory.db \"DELETE FROM short_term WHERE expires_at < datetime('now');\"" type: "system" + # A scheduled subagent: runs `task` under a chosen persona (#15). Its scope + # is that persona's tools/skills/secrets; the result is posted to `channel`. + # - id: "weekly_review" + # cron: "0 18 * * 5" + # type: "subagent" + # persona: "writing-editor" + # task: "Draft a short weekly review from this week's notes and email it to me" + # channel: "telegram" admin: enabled: true @@ -87,8 +95,8 @@ history: memory: db_path: "data/memory.db" long_term_limit: 50 - extraction_model: "claude-haiku-4-5" # cheap model for post-turn memory extraction - consolidation_model: "claude-haiku-4-5" # model for scheduled consolidation + hygiene + extraction_model: "deepseek-v4-flash" # fast + cheap model for post-turn memory extraction + consolidation_model: "deepseek-v4-flash" # model for scheduled consolidation + hygiene # Tier 2 — semantic similarity + relevance-ranked injection. # Default: a local on-device model (fastembed, private, no API key, free). @@ -122,3 +130,23 @@ artifacts: enabled: true directory: "data/artifacts" # where artifacts live ttl_hours: 168 # default lifetime; agent may pick its own (0 = forever) + +# Subagents — the agent can delegate a scoped subtask to a sub-loop running +# under a chosen persona, on demand (spawn_subagent tool) or on a schedule +# (a "subagent" job above). A child's tools/skills/secrets are a subset of the +# caller's — never wider. Defaults work out of the box; tune the guardrails: +subagents: + enabled: true + recursion_depth: 3 # max nesting (a top-level spawn is depth 1) + max_steps: 12 # max tool-call rounds per run (hard stop) + token_budget: 100000 # approx token ceiling per run (best-effort) + max_concurrent: 3 # max background runs at once + +# When a background subagent batch finishes, distil it into a one-line chat +# notification + a concise context digest (instead of dumping the raw output). +# Off → a crude first-line truncation with no LLM call. +subagent_summary: + enabled: true + provider: "deepseek" + model: "deepseek-v4-flash" # fast + cheap is ideal for this distillation + thinking_level: "" # "" (off) | "low" | "medium" | "high" diff --git a/core/agent.py b/core/agent.py index 7b5c5c2..5fb1582 100644 --- a/core/agent.py +++ b/core/agent.py @@ -32,6 +32,18 @@ from core.scheduler import AgentScheduler from core.secret_store import SecretStore from core.skills import SkillsEngine +from core.subagents import ( + FILE_HANDOFF_INSTRUCTION, + RESULT_FOR_AGENT_INSTRUCTION, + SubagentRegistry, + SubagentRun, + fallback_summary, + narrow_scope, + normalize_effort, + resolve_cap, + short_summary, + summarize_batch, +) from core.task_reflection import ReflectionStore from core.tools import tool_env from voice.pipeline import VoicePipeline @@ -270,6 +282,88 @@ def _shell_quote(s: str) -> str: "required": ["action"], }, }, + # Subagents (issue #15) — delegate a scoped subtask to a sub-loop. + { + "name": "spawn_subagent", + "description": ( + "Delegate a self-contained subtask to a subagent. The subagent runs " + "the full agent loop under a persona, with a tool/skill/secret scope " + "that is never wider than yours, and returns a structured result. It " + "has NO memory of this conversation — put everything it needs in " + "'task'.\n" + "Persona: by DEFAULT omit 'persona' — the subagent runs as YOU (your " + "identity, tools, scope). This is almost always what you want. Set " + "'persona' ONLY when the user explicitly asked for a named specialist, " + "or the subtask plainly belongs to a different one. Never pick a " + "persona just because the roster lists some.\n" + "Sizing: by default the subagent runs at the configured ceilings. Size " + "it to the job with 'max_steps', 'token_budget', and 'thinking_effort' " + "— smaller for a quick lookup, larger / 'high' effort for hard " + "multi-step work. Requested values are capped at the configured maxima.\n" + "Files: you share a filesystem with the subagent, so it reports the " + "absolute paths of any files it creates in its result — you can then " + "read or send them.\n" + "Use background=true for long-running work: you get a run id " + "immediately and the result is posted to this chat when done (monitor " + "or cancel it with /jobs or the admin Jobs page). Use background=false " + "(default) to block and get the result back in this turn.\n" + "Subagents are depth-limited, so prefer one focused delegation over " + "deep nesting." + ), + "input_schema": { + "type": "object", + "properties": { + "task": { + "type": "string", + "description": ( + "The complete instruction for the subagent. Be specific and " + "self-contained — it cannot see this conversation." + ), + }, + "persona": { + "type": "string", + "description": ( + "Persona name to run as. OMIT THIS by default — the subagent " + "then runs as you (same identity, tools, scope), which is " + "almost always correct. Only set it when the user explicitly " + "named a specialist or the subtask clearly belongs to one." + ), + }, + "max_steps": { + "type": "integer", + "description": ( + "Tool-call rounds the subagent may run before a hard stop. " + "Omit for the configured default; capped at the maximum. " + "Lower it for quick tasks, raise it for thorough ones." + ), + }, + "token_budget": { + "type": "integer", + "description": ( + "Approximate token ceiling for the whole run (minimum 1000). " + "Omit for the configured default; capped at the maximum." + ), + }, + "thinking_effort": { + "type": "string", + "enum": ["off", "low", "medium", "high"], + "description": ( + "How hard the subagent reasons each step. Omit to inherit " + "your own level. Use 'high' for tricky reasoning, 'off'/'low' " + "for simple mechanical work." + ), + }, + "background": { + "type": "boolean", + "description": ( + "Run asynchronously (default false). True returns a run id " + "now and posts the result back to this chat when done." + ), + }, + }, + "required": ["task"], + }, + }, # Secrets vault (issue #19) — discover + request secrets by NAME only. { "name": "list_secrets", @@ -406,7 +500,11 @@ def scoped_tools(persona: Persona | None) -> list[dict]: def apply_feature_gates( - tools: list[dict], *, secrets_available: bool, artifacts_enabled: bool + tools: list[dict], + *, + secrets_available: bool, + artifacts_enabled: bool, + subagents_enabled: bool = True, ) -> list[dict]: """Drop tools whose backing feature is unavailable/disabled, so the model is never offered a capability it can't use (defence in depth — the tool handlers @@ -416,6 +514,8 @@ def apply_feature_gates( out = [t for t in out if t["name"] not in ("list_secrets", "request_secret")] if not artifacts_enabled: out = [t for t in out if t["name"] != "write_artifact"] + if not subagents_enabled: + out = [t for t in out if t["name"] != "spawn_subagent"] return out @@ -462,6 +562,8 @@ def __init__(self, config: Config, secret_store: SecretStore | None = None): self.voice: VoicePipeline | None = None self.job_store = JobStore(db_path="data/jobs.db") self.scheduler = AgentScheduler(self, self.job_store) + # Live registry of subagent runs (issue #15) — list/status/cancel. + self.subagents = SubagentRegistry() config_db = "data/config.db" self.permissions = PermissionEngine(db_path=config_db) self.prompt_capture: deque[dict[str, str]] = deque(maxlen=20) @@ -539,12 +641,21 @@ async def process( scope=_persona_scope(persona), persona=persona, session_key=session_key, + offer_personae=True, ) + # Append the status of still-running background subagents from this chat, + # so the agent always knows what is pending (their results are folded into + # the conversation history when they finish). (#15) + if channel != "system": + note = self._subagent_status_note(channel, chat_id) + if note: + preamble = f"{preamble}\n\n{note}" tools = apply_feature_gates( scoped_tools(persona), secrets_available=self.secret_store is not None, artifacts_enabled=self.config.artifacts.enabled, + subagents_enabled=self.config.subagents.enabled, ) # Static system prompt. In session mode it is snapshotted once at the @@ -656,6 +767,7 @@ async def _turn_preamble( scope: str = "", persona: Persona | None = None, session_key: tuple[str, str, str] | None = None, + offer_personae: bool = False, ) -> str: """Build the per-turn preamble prepended to the current user message. @@ -717,6 +829,14 @@ async def _turn_preamble( except Exception: log.exception("Failed to load memories for turn preamble") + # Roster of personae the agent can delegate to via spawn_subagent, so its + # choice is informed rather than guessed (#15). Only on the main turn — + # selection stays user-led (omit persona = run as yourself / the bound one). + if offer_personae: + roster = await self._personae_roster_block(persona) + if roster: + preamble += f"\n\n{roster}" + if self.config.task_reflection.enabled: try: reflections = await self.reflections.format_for_prompt() @@ -736,6 +856,42 @@ async def _turn_preamble( ) return preamble + async def _personae_roster_block(self, persona: Persona | None) -> str: + """Compact `name — role` roster of personae the agent can delegate to (#15). + + Makes specialist delegation an informed choice instead of a guess, while + leaving selection user-led: omitting ``persona`` runs the subagent as the + caller itself. Returns "" (nothing injected) when subagents are disabled, + the active persona can't spawn, or there is no one to delegate to. + """ + if not self.config.subagents.enabled: + return "" + if persona is not None and not persona.allows_tool("spawn_subagent"): + return "" + try: + personae = await self.personae.list_personae() + except Exception: + log.exception("Failed to list personae for the subagent roster") + return "" + current = persona.name if persona else "" + lines = [] + for p in personae: + role = p.role.strip().splitlines()[0].strip() if (p.role or "").strip() else "" + tag = " (you)" if p.name == current else "" + lines.append(f"- {p.name}{tag}" + (f" — {role}" if role else "")) + if not lines: + return "" + body = "\n".join(lines) + return ( + "\n" + "These personae exist ONLY so you can honour an explicit request for a " + "specialist. By default, spawn_subagent with NO 'persona' so the " + "subagent runs as you — do not assign one of these unless the user " + "asked for it or the subtask plainly belongs to it.\n" + f"{body}\n" + "" + ) + async def _skills_block_in_history(self, session_key: tuple[str, str, str], block: str) -> bool: """True if the exact ```` block is already present in the replayed session history — so the model still sees it and we needn't @@ -762,6 +918,30 @@ async def _skills_block_in_history(self, session_key: tuple[str, str, str], bloc return True return False + def _subagent_status_note(self, channel: str, chat_id: str) -> str: + """List background subagents from this chat that are *still running*, for + the turn preamble — so the agent knows what is pending. When the whole + batch finishes you'll be prompted to answer the user with their results, + so finished runs need no mention here. (#15) + """ + runs = self.subagents.running_for(channel, chat_id) + if not runs: + return "" + lines = [] + for r in runs: + who = f"- [{r.run_id}] {r.persona or 'default'} — running ({r.elapsed_str})" + lines.append(f"{who}; {r.progress}" if r.progress else who) + body = "\n".join(lines) + return ( + "\n" + "Background helpers you spawned from this chat are still running. When " + "they finish you'll be prompted to fold their results into a reply, so " + "don't pre-empt or invent their results now, and don't claim one is " + "finished while it shows here.\n" + f"{body}\n" + "" + ) + async def _session_system_prompt( self, channel: str, @@ -961,7 +1141,9 @@ async def _process_injection( ) # Agentic loop — keep going while the LLM wants to call tools - request_state = self._new_request_state(persona) + request_state = self._new_request_state( + persona, origin={"channel": channel, "user_id": user_id, "chat_id": chat_id} + ) tool_log: list[dict] = [] while response.tool_calls: await self._batch_approve_writes(response.tool_calls, channel, user_id, request_state) @@ -1064,7 +1246,9 @@ async def _process_session( # Agentic loop — keep going while the LLM wants to call tools new_messages: list[dict] = [] - request_state = self._new_request_state(persona) + request_state = self._new_request_state( + persona, origin={"channel": channel, "user_id": user_id, "chat_id": chat_id} + ) tool_log: list[dict] = [] while response.tool_calls: await self._batch_approve_writes(response.tool_calls, channel, user_id, request_state) @@ -1181,8 +1365,14 @@ async def _execute_tool( # ``manage_jobs`` is exempt: job creation is idempotent and guarded on # job id + status inside the tool, so an earlier write in the same turn # must never block a (re)create — that was the "already fulfilled" bug - # against brand-new job ids (issue #11). - if is_write_action and name != "manage_jobs" and write_sig in executed_writes: + # against brand-new job ids (issue #11). ``spawn_subagent`` is likewise + # exempt: each spawn is a distinct run (its own run id), so the agent may + # legitimately fan out the same task more than once in a turn (#15). + if ( + is_write_action + and name not in ("manage_jobs", "spawn_subagent") + and write_sig in executed_writes + ): return { "error": ( "This exact action was already completed in this request; not repeating it." @@ -1326,6 +1516,12 @@ async def _execute_tool( ] } + if name == "spawn_subagent": + result = await self._tool_spawn_subagent(params, channel, user_id, request_state) + if is_write_action and self._is_tool_success(result): + executed_writes.add(write_sig) + return result + if name == "request_secret": return await self._tool_request_secret(params, channel, user_id, request_state) @@ -1338,12 +1534,20 @@ async def _execute_tool( return {"error": f"Unknown tool: {name}"} @staticmethod - def _new_request_state(persona: Persona | None = None) -> dict: + def _new_request_state( + persona: Persona | None = None, + *, + depth: int = 0, + origin: dict | None = None, + run_id: str | None = None, + ) -> dict: """Fresh per-turn state tracking write actions and approval decisions. ``allowed_skills`` carries the active persona's skill allowlist so ``load_skill`` can refuse skills outside scope (defence in depth — the - index already hides them). + index already hides them). ``depth``/``origin``/``persona_obj`` carry the + context a ``spawn_subagent`` call needs to narrow scope, cap recursion, + and post a background result back to the originating chat (issue #15). """ return { "executed_writes": set(), @@ -1353,6 +1557,11 @@ def _new_request_state(persona: Persona | None = None) -> dict: # Secret scope for {{secret:}} ACL in run_command (issue #19). "persona_secrets": list(persona.secrets) if persona else [], "persona_name": persona.name if persona else "", + # Subagent plumbing (issue #15). + "persona_obj": persona, + "depth": depth, + "origin": origin or {}, + "run_id": run_id, } @staticmethod @@ -1718,6 +1927,390 @@ async def _tool_recall_memory(self, params: dict, request_state: dict | None = N log.info("Tool call: recall_memory — %r (%d hits)", query, len(memories)) return {"query": query, "count": len(memories), "memories": memories} + # -- Subagents (issue #15) ------------------------------------------------ + + async def _tool_spawn_subagent( + self, params: dict, channel: str, user_id: str, request_state: dict + ) -> dict: + """``spawn_subagent`` tool: delegate a scoped subtask to a sub-loop.""" + task = str(params.get("task", "")).strip() + if not task: + return {"error": "Missing 'task' for spawn_subagent."} + origin = request_state.get("origin") or {} + return await self.run_subagent( + task=task, + persona_name=str(params.get("persona", "")).strip(), + origin_channel=origin.get("channel", channel), + origin_user_id=str(origin.get("user_id", user_id)), + origin_chat_id=str(origin.get("chat_id", "")), + parent_state=request_state, + background=bool(params.get("background", False)), + max_steps=params.get("max_steps"), + token_budget=params.get("token_budget"), + thinking_effort=params.get("thinking_effort"), + ) + + async def run_subagent( + self, + *, + task: str, + persona_name: str = "", + origin_channel: str = "", + origin_user_id: str = "", + origin_chat_id: str = "", + parent_state: dict | None = None, + background: bool = False, + max_steps: object = None, + token_budget: object = None, + thinking_effort: str | None = None, + ) -> dict: + """Run a subagent — the one primitive behind both the tool and scheduled + ``subagent`` jobs. Scope is narrowed from the caller (inherit-never-widen); + recursion depth and per-run budgets are enforced. + + ``max_steps`` / ``token_budget`` / ``thinking_effort`` let the caller size + the run; each defaults to the configured value and is clamped to it as a + ceiling (``thinking_effort`` defaults to inheriting the caller's level). + """ + cfg = self.config.subagents + if not cfg.enabled: + return {"error": "Subagents are disabled."} + parent_state = parent_state or {} + parent_depth = int(parent_state.get("depth", 0) or 0) + if parent_depth >= cfg.recursion_depth: + return { + "error": ( + f"Max subagent recursion depth ({cfg.recursion_depth}) reached; " + "do this work directly instead of spawning another subagent." + ) + } + + # Resolve + narrow the persona. A name must exist; with no name the child + # inherits the caller's identity and scope. + if persona_name: + requested = await self._load_persona(persona_name) + if requested is None: + try: + names = [p.name for p in await self.personae.list_personae()] + except Exception: + names = [] + hint = f" Available: {', '.join(names)}." if names else "" + return { + "error": ( + f"Persona not found: {persona_name}.{hint} " + "Omit 'persona' to run as yourself." + ) + } + else: + requested = parent_state.get("persona_obj") + child_persona = self._narrow_persona(requested, parent_state) if requested else None + + run_id = f"sub_{uuid.uuid4().hex[:8]}" + child_state = self._new_request_state( + child_persona, + depth=parent_depth + 1, + origin={ + "channel": origin_channel, + "user_id": origin_user_id, + "chat_id": origin_chat_id, + }, + run_id=run_id, + ) + run = SubagentRun( + run_id=run_id, + persona=child_persona.name if child_persona else "", + task=task, + depth=parent_depth + 1, + background=background, + max_steps=resolve_cap(max_steps, cfg.max_steps), + token_budget=resolve_cap(token_budget, cfg.token_budget, floor=1000), + effort=normalize_effort(thinking_effort), + origin_channel=origin_channel, + origin_user_id=origin_user_id, + origin_chat_id=origin_chat_id, + ) + + if background: + if self.subagents.active_count() >= cfg.max_concurrent: + return { + "error": ( + f"Too many subagents running (max {cfg.max_concurrent}). " + "Wait for one to finish or cancel it via /jobs." + ) + } + self.subagents.register(run) + bg = asyncio.create_task( + self._run_subagent_background(run, child_persona, child_state), + name=f"subagent-{run_id}", + ) + self.subagents.attach_task(run_id, bg) + log.info( + "Spawned background subagent %s (persona=%s)", run_id, run.persona or "default" + ) + return { + "ok": True, + "run_id": run_id, + "background": True, + "status": "running", + "persona": run.persona, + "note": ( + "Running in the background; its result is posted to this chat " + "automatically when done — you don't relay it. Each later turn " + "shows this run's status until it finishes." + ), + } + + # Synchronous: run to completion and return the result to the caller. + self.subagents.register(run) + log.info("Running subagent %s (persona=%s)", run_id, run.persona or "default") + try: + text = await self._run_subagent_loop(task, child_persona, child_state, run) + except Exception as exc: + log.exception("Subagent %s failed", run_id) + self.subagents.finish(run_id, "error", error=str(exc)) + return {"error": f"Subagent failed: {exc}", "run_id": run_id} + self.subagents.finish(run_id, "done", result=text) + return { + "ok": True, + "run_id": run_id, + "persona": run.persona, + "summary": short_summary(text), + "result": text, + } + + def _narrow_persona(self, requested: Persona, parent_state: dict) -> Persona: + """Build a child persona whose scopes are a subset of the caller's.""" + parent: Persona | None = parent_state.get("persona_obj") + p_skills = parent.skills if parent else [] + p_tools = parent.tools if parent else [] + p_secrets = parent.secrets if parent else [] + return Persona( + name=requested.name, + agent_name=requested.agent_name, + role=requested.role, + emoji=requested.emoji, + voice=requested.voice, + personalia=requested.personalia, + character=requested.character, + skills=narrow_scope(p_skills, requested.skills), + tools=narrow_scope(p_tools, requested.tools), + secrets=narrow_scope(p_secrets, requested.secrets), + ) + + async def _run_subagent_loop( + self, task: str, child_persona: Persona | None, child_state: dict, run: SubagentRun + ) -> str: + """The subagent's agentic loop — system semantics, budgeted and depth-capped. + + Mirrors the main injection loop but runs from a clean slate (no history), + skips approval/decomposition/memory/reflection (channel='system'), and + stops at this run's step/token budget (sized by the spawning agent). + """ + cfg = self.config.subagents + tools = apply_feature_gates( + scoped_tools(child_persona), + secrets_available=self.secret_store is not None, + artifacts_enabled=self.config.artifacts.enabled, + subagents_enabled=cfg.enabled, + ) + # At the depth ceiling a subagent may not spawn further — don't even offer it. + if child_state["depth"] >= cfg.recursion_depth: + tools = [t for t in tools if t["name"] != "spawn_subagent"] + + system = await self._build_system_prompt(persona=child_persona) + system = f"{system}\n\n{RESULT_FOR_AGENT_INSTRUCTION}\n\n{FILE_HANDOFF_INSTRUCTION}" + # Memory/reflections inject per-turn via the preamble (#41), scoped to the + # child persona (#42); query=task keeps the injection relevant. + preamble = await self._turn_preamble(None, query=task, scope=_persona_scope(child_persona)) + messages: list[dict] = [await self._build_user_message(task, None, preamble)] + + # effort None = inherit the main client's level; otherwise an effort-scoped + # clone (same provider/connection, overridden thinking level). + llm = self.llm + if run.effort is not None: + llm = self._background_llm(self.llm.provider, run.effort) + response = await llm.generate( + model=self.config.agent.model, + max_tokens=4096, + system=system, + messages=messages, + tools=cast(Any, tools), + ) + steps = 0 + tokens = self._usage_total(response.usage) + while response.tool_calls and steps < run.max_steps and tokens < run.token_budget: + steps += 1 + run.progress = f"step {steps}: {', '.join(c.name for c in response.tool_calls)}"[:120] + tool_results = [] + for call in response.tool_calls: + result = await self._execute_tool( + call, "system", run.origin_user_id or "subagent", child_state + ) + tool_results.append( + { + "type": "tool_result", + "tool_use_id": call.id, + "content": json.dumps(result), + } + ) + messages.append(llm.assistant_message(response)) + messages.extend(llm.tool_result_messages(tool_results)) + response = await llm.generate( + model=self.config.agent.model, + max_tokens=4096, + system=system, + messages=messages, + tools=cast(Any, tools), + ) + tokens += self._usage_total(response.usage) + + text = response.text or "" + if response.tool_calls: + text = (text + "\n\n[subagent stopped: reached its step/token budget]").strip() + run.progress = "done" + return text + + async def _run_subagent_background( + self, run: SubagentRun, child_persona: Persona | None, child_state: dict + ) -> None: + """Run a subagent off-turn. When the chat's whole batch of background runs + has finished, the spawning agent ingests their results and writes one reply + — the user never sees raw subagent output; it works for the agent (#15).""" + try: + text = await self._run_subagent_loop(run.task, child_persona, child_state, run) + except asyncio.CancelledError: + # User-initiated stop (registry.cancel already flipped the status, so + # finish() is a no-op here). Mark it synthesised so a sibling's batch + # doesn't report on a run the user deliberately cancelled. + self.subagents.finish(run.run_id, "cancelled") + run.synthesized = True + # This may have been the last running sibling: a done/error run that + # deferred earlier would otherwise be orphaned (its reply lost), since + # cancellation is the one terminal path that never re-checks the + # barrier. Release it before unwinding. (Safe to await here: the + # cancellation was already delivered once and won't re-fire.) + await self._maybe_deliver_subagent_batch(run) + raise + except Exception as exc: + log.exception("Background subagent %s failed", run.run_id) + if self.subagents.finish(run.run_id, "error", error=str(exc)): + await self._maybe_deliver_subagent_batch(run) + return + # finish() returns False if a late cancellation already finalised the run, + # in which case this completion must not also trigger a reply. + if self.subagents.finish(run.run_id, "done", result=text): + await self._maybe_deliver_subagent_batch(run) + + async def _maybe_deliver_subagent_batch(self, run: SubagentRun) -> None: + """Once every background run for this chat is done, distil the batch into a + chat notification + a context digest and deliver them. The barrier collapses + a fan-out of parallel spawns into a single delivery (#15).""" + channel, user_id, chat_id = run.origin_channel, run.origin_user_id, run.origin_chat_id + if not chat_id or channel == "system": + return # scheduler / system-origin runs have no user chat to answer + # Barrier — race-free because there is no await between this check and + # marking the batch below: while another background run for the chat is + # still running, defer; the last finisher delivers. (Sync runs are ignored: + # they return inline and never reach this path.) + runs = self.subagents.list_runs() + if any( + r.background + and r.status == "running" + and r.origin_channel == channel + and r.origin_chat_id == chat_id + for r in runs + ): + return + batch = [ + r + for r in runs + if r.background + and not r.synthesized + and r.origin_channel == channel + and r.origin_chat_id == chat_id + and r.status in ("done", "error") + ] + if not batch: + return + for r in batch: + r.synthesized = True + await self._summarize_and_deliver(channel, user_id, chat_id, batch) + + async def _summarize_and_deliver( + self, channel: str, user_id: str, chat_id: str, batch: list[SubagentRun] + ) -> None: + """Distil a finished batch into a one-line chat notification + a concise + context digest, then deliver: notification → the user, digest → the agent's + context. The raw subagent output reaches neither the user nor the context. + """ + notification, digest = await self._summarize_subagent_batch(batch) + # The user only ever saw the notification; the agent's context keeps the + # concise digest (so it can answer follow-ups) — never the raw output. + framed = notification + if digest and digest.strip() and digest.strip() != notification.strip(): + framed = f"{notification}\n\n\n{digest}\n" + await self._record_subagent_context(channel, user_id, chat_id, framed) + ch = self.channels.get(channel) + if ch and chat_id and notification: + try: + await ch.send(chat_id, notification) + except Exception: + log.exception("Failed to deliver subagent notification (chat=%s)", chat_id) + + async def _summarize_subagent_batch(self, batch: list[SubagentRun]) -> tuple[str, str]: + """(notification, digest) for a finished batch via the summary inference, + falling back to truncation when it is disabled or the inference fails.""" + items = [ + ( + r.task, + r.result if r.status == "done" else f"[failed: {r.error or 'unknown error'}]", + r.persona or "", + r.status, + ) + for r in batch + ] + cfg = self.config.subagent_summary + if cfg.enabled: + try: + llm = self._background_llm(cfg.provider, cfg.thinking_level) + return await summarize_batch(llm, cfg.model, items) + except Exception: + log.exception("Subagent summary inference failed; using truncation fallback") + return fallback_summary(items) + + async def _record_subagent_context( + self, channel: str, user_id: str, chat_id: str, framed: str + ) -> None: + """Record a background batch's notification + digest as an assistant turn — + merged into the trailing assistant turn so replayed history stays strictly + alternating for providers that require it (#15).""" + if not chat_id or channel == "system": + return + try: + if self.history_mode == "session": + merged = await self.history.append_to_last_session_message( + channel, user_id, f"\n\n{framed}", chat_id + ) + if not merged: + await self.history.append_session_message( + channel, user_id, {"role": "assistant", "content": framed}, chat_id + ) + else: + merged = await self.history.append_to_last_turn( + channel, user_id, "assistant", f"\n\n{framed}", chat_id + ) + if not merged: + await self.history.add_turn(channel, user_id, "assistant", framed, chat_id) + except Exception: + log.exception("Failed to record subagent context (chat=%s)", chat_id) + + @staticmethod + def _usage_total(usage: dict | None) -> int: + """Best-effort token count for budgeting (0 when the provider omits usage).""" + if not usage: + return 0 + return int(usage.get("input_tokens", 0) or 0) + int(usage.get("output_tokens", 0) or 0) + async def _tool_web_search(self, params: dict) -> dict: """Search the web via Tavily API.""" if not self.search_client: diff --git a/core/config.py b/core/config.py index 9082300..34d724e 100644 --- a/core/config.py +++ b/core/config.py @@ -145,6 +145,7 @@ class SchedulerJob(BaseModel): task: str channel: str = "telegram" type: str = "agent" + persona: str = "" # for type="subagent": the persona the run adopts class SchedulerConfig(BaseModel): @@ -187,10 +188,10 @@ class EmbeddingConfig(BaseModel): class MemoryConfig(BaseModel): db_path: str = "data/memory.db" long_term_limit: int = 50 - extraction_provider: str = "anthropic" - extraction_model: str = "claude-haiku-4-5" - consolidation_provider: str = "anthropic" - consolidation_model: str = "claude-haiku-4-5" + extraction_provider: str = "deepseek" + extraction_model: str = "deepseek-v4-flash" + consolidation_provider: str = "deepseek" + consolidation_model: str = "deepseek-v4-flash" extraction_thinking_level: str = "" # "" (off) | "low" | "medium" | "high" consolidation_thinking_level: str = "" # "" (off) | "low" | "medium" | "high" extraction_cooldown_seconds: int = 120 # minimum seconds between extractions @@ -210,15 +211,15 @@ class MemoryConfig(BaseModel): class GoalDecompositionConfig(BaseModel): enabled: bool = True - provider: str = "anthropic" - model: str = "claude-haiku-4-5" + provider: str = "deepseek" + model: str = "deepseek-v4-flash" thinking_level: str = "" # "" (off) | "low" | "medium" | "high" class TaskReflectionConfig(BaseModel): enabled: bool = True - provider: str = "anthropic" - model: str = "claude-haiku-4-5" + provider: str = "deepseek" + model: str = "deepseek-v4-flash" thinking_level: str = "" # "" (off) | "low" | "medium" | "high" db_path: str = "data/reflections.db" max_reflections: int = 50 # max reflections to keep for prompt injection @@ -232,8 +233,8 @@ class CompactionConfig(BaseModel): """ enabled: bool = True - provider: str = "anthropic" - model: str = "claude-haiku-4-5" + provider: str = "deepseek" + model: str = "deepseek-v4-flash" thinking_level: str = "" # "" (off) | "low" | "medium" | "high" threshold_type: str = "percent" # "percent" (of context window) or "tokens" (absolute) threshold_percent: int = 80 # trigger at this % of the model's context window @@ -302,6 +303,36 @@ class ArtifactsConfig(BaseModel): ttl_hours: int = 168 # 7 days; 0 = keep forever (no auto-cleanup) +class SubagentsConfig(BaseModel): + """Subagents — scoped sub-loops the agent can delegate to (see core/subagents.py). + + Defaults are deliberately conservative so spawning works out of the box + without runaway recursion or cost: a top-level spawn is depth 1, a subagent + spawning a subagent is depth 2, and so on up to ``recursion_depth``. + """ + + enabled: bool = True + recursion_depth: int = 3 # max nesting; spawns are refused beyond this + max_steps: int = 12 # max tool-call rounds per run (hard stop) + token_budget: int = 100_000 # approx token ceiling per run (best-effort) + max_concurrent: int = 3 # max background runs at once + + +class SubagentSummaryConfig(BaseModel): + """Summarise a finished background subagent batch (issue #15). + + Instead of dumping a subagent's raw output to the chat and the agent's + context, a small inference distils each batch into a one-sentence chat + *notification* and a concise *digest* for the agent's context. Mirrors the + other background inferences (memory / compaction / reflection). + """ + + enabled: bool = True + provider: str = "deepseek" # fast + cheap is ideal for this distillation + model: str = "deepseek-v4-flash" + thinking_level: str = "" # "" (off) | "low" | "medium" | "high" + + class Config(BaseModel): agent: AgentConfig = AgentConfig() channels: ChannelsConfig = ChannelsConfig() @@ -320,6 +351,8 @@ class Config(BaseModel): prompt: PromptConfig = PromptConfig() tools: ToolsConfig = ToolsConfig() artifacts: ArtifactsConfig = ArtifactsConfig() + subagents: SubagentsConfig = SubagentsConfig() + subagent_summary: SubagentSummaryConfig = SubagentSummaryConfig() def load_config(path: str | Path = "config.yml") -> Config: diff --git a/core/history.py b/core/history.py index 9648b07..cf8e95a 100644 --- a/core/history.py +++ b/core/history.py @@ -281,6 +281,72 @@ async def replace_session( ) await db.commit() + async def append_to_last_turn( + self, channel: str, user_id: str, role: str, suffix: str, chat_id: str = "" + ) -> bool: + """Append ``suffix`` to the most recent turn iff it has ``role`` and text + content. Returns False when there is no such turn (caller adds a fresh one). + + Used to fold an out-of-band assistant message (a background subagent's + result) into the trailing assistant turn so the replayed history keeps + strict user/assistant alternation. (injection mode) + """ + await self._ensure_schema() + async with aiosqlite.connect(self.db_path) as db: + db.row_factory = aiosqlite.Row + cursor = await db.execute( + "SELECT id, role, content FROM conversation_turns " + "WHERE channel = ? AND user_id = ? AND chat_id = ? ORDER BY id DESC LIMIT 1", + (channel, user_id, chat_id), + ) + row = await cursor.fetchone() + if not row or row["role"] != role: + return False + content = json.loads(row["content"]) + if not isinstance(content, str): + return False + await db.execute( + "UPDATE conversation_turns SET content = ? WHERE id = ?", + (json.dumps(content + suffix), row["id"]), + ) + await db.commit() + return True + + async def append_to_last_session_message( + self, channel: str, user_id: str, suffix: str, chat_id: str = "", role: str = "assistant" + ) -> bool: + """Session-mode counterpart of :meth:`append_to_last_turn`: fold ``suffix`` + into the last session message iff it has ``role``. Returns False otherwise.""" + await self._ensure_schema() + key = (channel, user_id, chat_id) + if key not in self._sessions: + await self.get_session(channel, user_id, chat_id) + session = self._sessions[key] + if not session or session[-1].get("role") != role: + return False + msg = session[-1] + content = msg.get("content") + if isinstance(content, str): + msg["content"] = content + suffix + elif isinstance(content, list): + content.append({"type": "text", "text": suffix}) + else: + return False + async with aiosqlite.connect(self.db_path) as db: + cursor = await db.execute( + "SELECT id FROM session_messages " + "WHERE channel = ? AND user_id = ? AND chat_id = ? ORDER BY id DESC LIMIT 1", + (channel, user_id, chat_id), + ) + row = await cursor.fetchone() + if row: + await db.execute( + "UPDATE session_messages SET message = ? WHERE id = ?", + (json.dumps(msg), row[0]), + ) + await db.commit() + return True + async def clear_session(self, channel: str, user_id: str, chat_id: str = "") -> None: """Clear just the sticky session for a (channel, user_id, chat_id) triple.""" await self._ensure_schema() diff --git a/core/job_store.py b/core/job_store.py index cdacad3..1514372 100644 --- a/core/job_store.py +++ b/core/job_store.py @@ -30,13 +30,20 @@ status TEXT NOT NULL DEFAULT 'active', created_by TEXT NOT NULL DEFAULT 'admin', description TEXT NOT NULL DEFAULT '', + persona TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); """ -# Valid values -VALID_TYPES = ("agent", "agent_silent", "system", "memory_consolidation") +# Additive migrations for DBs created before a column existed. Each is a column +# name → ALTER statement; applied only when the column is missing. +_MIGRATIONS = { + "persona": "ALTER TABLE jobs ADD COLUMN persona TEXT NOT NULL DEFAULT ''", +} + +# Valid values. "subagent" runs the spawn_subagent primitive under ``persona``. +VALID_TYPES = ("agent", "agent_silent", "system", "memory_consolidation", "subagent") VALID_SCHEDULES = ("cron", "once") VALID_STATUSES = ("active", "paused", "done", "cancelled") @@ -59,6 +66,12 @@ async def _ensure_schema(self) -> None: Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) async with aiosqlite.connect(self.db_path) as db: await db.executescript(_SCHEMA) + cursor = await db.execute("PRAGMA table_info(jobs)") + cols = {row[1] for row in await cursor.fetchall()} + for col, stmt in _MIGRATIONS.items(): + if col not in cols: + await db.execute(stmt) + await db.commit() self._ready = True # -- Sync helpers (for use from non-async contexts like CLI) -- @@ -69,6 +82,11 @@ def _ensure_schema_sync(self) -> None: Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) with sqlite3.connect(self.db_path) as db: db.executescript(_SCHEMA) + cols = {row[1] for row in db.execute("PRAGMA table_info(jobs)").fetchall()} + for col, stmt in _MIGRATIONS.items(): + if col not in cols: + db.execute(stmt) + db.commit() self._ready = True def list_jobs_sync(self, status: str | None = None, include_done: bool = False) -> list[dict]: @@ -111,6 +129,7 @@ def upsert_job_sync( status: str = "active", created_by: str = "admin", description: str = "", + persona: str = "", ) -> dict: """Synchronous upsert for CLI/admin use.""" self._ensure_schema_sync() @@ -118,8 +137,8 @@ def upsert_job_sync( db.row_factory = sqlite3.Row db.execute( """INSERT INTO jobs (id, type, schedule, cron, run_at, task, channel, - status, created_by, description, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) + status, created_by, description, persona, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) ON CONFLICT(id) DO UPDATE SET type = excluded.type, schedule = excluded.schedule, @@ -129,6 +148,7 @@ def upsert_job_sync( channel = excluded.channel, status = excluded.status, description = excluded.description, + persona = excluded.persona, updated_at = datetime('now') """, ( @@ -142,6 +162,7 @@ def upsert_job_sync( status, created_by, description, + persona, ), ) db.commit() @@ -200,6 +221,7 @@ async def upsert_job( status: str = "active", created_by: str = "admin", description: str = "", + persona: str = "", ) -> dict: """Insert or update a job. Returns the job dict.""" await self._ensure_schema() @@ -207,8 +229,8 @@ async def upsert_job( db.row_factory = aiosqlite.Row await db.execute( """INSERT INTO jobs (id, type, schedule, cron, run_at, task, channel, - status, created_by, description, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) + status, created_by, description, persona, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) ON CONFLICT(id) DO UPDATE SET type = excluded.type, schedule = excluded.schedule, @@ -218,6 +240,7 @@ async def upsert_job( channel = excluded.channel, status = excluded.status, description = excluded.description, + persona = excluded.persona, updated_at = datetime('now') """, ( @@ -231,6 +254,7 @@ async def upsert_job( status, created_by, description, + persona, ), ) await db.commit() @@ -271,8 +295,8 @@ async def seed_from_config(self, jobs: list[dict]) -> int: continue await db.execute( """INSERT INTO jobs (id, type, schedule, cron, task, channel, - status, created_by, description) - VALUES (?, ?, 'cron', ?, ?, ?, 'active', 'config', '') + status, created_by, description, persona) + VALUES (?, ?, 'cron', ?, ?, ?, 'active', 'config', '', ?) """, ( job["id"], @@ -280,6 +304,7 @@ async def seed_from_config(self, jobs: list[dict]) -> int: job["cron"], job.get("task", ""), job.get("channel", "telegram"), + job.get("persona", ""), ), ) inserted += 1 diff --git a/core/permissions.py b/core/permissions.py index 00d3863..06a99b3 100644 --- a/core/permissions.py +++ b/core/permissions.py @@ -128,6 +128,9 @@ class PermissionLevel: "run_command:himalaya*move*": "ASK", "schedule_task": "ASK", "manage_jobs": "ASK", + # Delegating to a subagent is approved once per spawn; the subagent then runs + # autonomously within its narrowed scope (system semantics), like a job. + "spawn_subagent": "ASK", # Publishing inline content the agent authored is low-risk and reversible # (TTL cleanup) — no prompt, like web_search / load_skill. But copying an # on-disk file to a public URL can expose data the agent didn't author, so @@ -233,6 +236,7 @@ def is_write_action(self, tool_name: str, params: dict | None = None) -> bool: "create_calendar_event", "schedule_task", "manage_jobs", + "spawn_subagent", }: return True diff --git a/core/scheduler.py b/core/scheduler.py index b1fd0e2..67e753f 100644 --- a/core/scheduler.py +++ b/core/scheduler.py @@ -4,11 +4,12 @@ APScheduler runs in-memory only (no SQLAlchemy jobstore) and is re-synced from the JobStore whenever jobs change. -Three job types: +Job types: - "agent" / "agent_silent": natural-language task -> agent.process() -> send result to channel - "system": raw CLI command -> executor.run_command_trusted() - "memory_consolidation": review short-term memories, promote worthy ones to long-term, delete expired entries (uses a lightweight LLM call) + - "subagent": run agent.run_subagent() under a persona -> send result to channel (issue #15) """ from __future__ import annotations @@ -103,6 +104,48 @@ async def run_agent_task( log.info("One-shot job %r marked as done", job_id) +async def run_subagent_task( + persona: str, + task: str, + channel: str = "telegram", + job_id: str | None = None, +) -> None: + """Fire a scheduled subagent run and deliver its result to the channel.""" + agent = _get_agent_context() + if agent is None: + log.error("Scheduler subagent task dropped; agent not initialized") + return + + owner = _get_owner_chat_id(agent, channel) + log.info("Scheduler running subagent (persona=%s): %s", persona or "default", task[:100]) + try: + result = await agent.run_subagent( + task=task, + persona_name=persona or "", + origin_channel=channel, + origin_user_id=str(owner or "scheduler"), + origin_chat_id=str(owner or ""), + background=False, + ) + text = result.get("result") or result.get("error") or "" + ch = agent.channels.get(channel) + if ch and text and owner: + await ch.send(owner, text) + elif not ch: + log.warning("Scheduler: channel %r not registered, subagent result dropped", channel) + elif not owner: + log.warning("Scheduler: no owner chat for channel %r, subagent result dropped", channel) + except Exception: + log.exception("Scheduler subagent task failed: %s", task[:100]) + + # Mark one-shot jobs as done + if job_id and agent.job_store: + job = await agent.job_store.get_job(job_id) + if job and job.get("schedule") == "once" and job.get("status") == "active": + await agent.job_store.update_status(job_id, "done") + log.info("One-shot job %r marked as done", job_id) + + async def run_system_command(command: str) -> None: """Execute a raw CLI command (e.g. memory cleanup).""" agent = _get_agent_context() @@ -212,6 +255,7 @@ def _register_job(self, job: dict) -> None: schedule = job.get("schedule", "cron") task = job.get("task", "") channel = job.get("channel", "telegram") + persona = job.get("persona", "") silent = job_type == "agent_silent" try: @@ -253,6 +297,20 @@ def _register_job(self, job: dict) -> None: run_date=run_at, replace_existing=True, ) + elif job_type == "subagent": + self.scheduler.add_job( + run_subagent_task, + "date", + id=job_id, + run_date=run_at, + kwargs={ + "persona": persona, + "task": task, + "channel": channel, + "job_id": job_id, + }, + replace_existing=True, + ) else: self.scheduler.add_job( run_agent_task, @@ -292,6 +350,20 @@ def _register_job(self, job: dict) -> None: replace_existing=True, **cron_kwargs, ) + elif job_type == "subagent": + self.scheduler.add_job( + run_subagent_task, + "cron", + id=job_id, + kwargs={ + "persona": persona, + "task": task, + "channel": channel, + "job_id": job_id, + }, + replace_existing=True, + **cron_kwargs, + ) else: self.scheduler.add_job( run_agent_task, diff --git a/core/subagents.py b/core/subagents.py new file mode 100644 index 0000000..8c48cc2 --- /dev/null +++ b/core/subagents.py @@ -0,0 +1,297 @@ +"""Subagents — scoped sub-loops the main agent can delegate to (issue #15). + +A *subagent* is one execution primitive (``AgentCore.run_subagent``) reached by +two trigger paths: on demand via the ``spawn_subagent`` tool, or on a schedule +via a ``subagent`` job. Either way it runs the existing agent loop with **system +semantics** (no goal decomposition / memory / reflection / approval prompts) and +a persona whose tool/skill/secret scope is a *subset* of the caller's — never +wider (``narrow_scope``). + +This module holds only the lifecycle bookkeeping: a small in-memory registry of +runs so list / status / cancel work from Telegram and the admin UI. Runs are +ephemeral (an ``asyncio`` task each) — a restart kills them, so an in-memory +registry is the right scope; nothing here needs to survive a reboot. +""" + +from __future__ import annotations + +import asyncio +import time +from collections import OrderedDict +from dataclasses import dataclass, field + +# Keep at most this many finished runs for after-the-fact inspection. +_MAX_FINISHED = 50 + +# Appended to a subagent's system prompt so files it writes reach the spawning +# agent. Subagents and the main agent share one process cwd / filesystem (see +# core/executor.py — subprocesses inherit the cwd), so a file a subagent creates +# is already on disk where the parent can read it; the only gap is the parent +# knowing the path. This closes that gap by making the subagent report paths in +# its result, which is folded into the parent's history. +FILE_HANDOFF_INSTRUCTION = ( + "You share a filesystem with the agent that spawned you: it can read any " + "file you leave behind, but only if it knows the path. So if you create or " + "modify any files, end your final reply with a line 'Files:' followed by " + "their absolute paths, one per line. If you made no files, omit it." +) + +# Appended to a subagent's system prompt. A subagent reports to the agent that +# spawned it, never to a human — the agent synthesises the user-facing answer +# from this result, so prose, greetings, and big formatted tables here are just +# noise the agent has to wade through. Keep the result a dense fact dump. +RESULT_FOR_AGENT_INSTRUCTION = ( + "Your final reply is read by the agent that spawned you, NOT by a human. " + "It synthesises the user's answer from it. So return only the findings — " + "dense and factual, no greeting, no preamble, no offers to help further, no " + "elaborate tables. Just the facts the agent asked for, as briefly as possible." +) + +_EFFORT_LEVELS = {"off": "", "low": "low", "medium": "medium", "high": "high"} + + +def normalize_effort(value: str | None) -> str | None: + """Map a tool-supplied thinking effort to an LLM thinking level. + + ``None``/empty → ``None`` = *inherit the caller's level* (the default, so a + subagent thinks as hard as its parent unless told otherwise). ``"off"`` → + ``""`` (reasoning off); ``low``/``medium``/``high`` pass through. Anything + unrecognised → ``None``, degrading to the safe inherit default rather than + erroring on a bad value. + """ + if not value: + return None + return _EFFORT_LEVELS.get(str(value).strip().lower()) + + +def resolve_cap(value: object, ceiling: int, floor: int = 1) -> int: + """Clamp a caller-requested run cap (steps / token budget) into bounds. + + ``None`` (the caller didn't choose) → the configured ``ceiling``, preserving + prior behaviour. A chosen value is honoured up to the ceiling: the config is + a safety guardrail the agent may dial *down* but never exceed. A non-numeric + value coerces to the ceiling rather than raising. + """ + if value is None: + return ceiling + try: + return max(floor, min(int(value), ceiling)) # type: ignore[arg-type] + except TypeError, ValueError, OverflowError: + # OverflowError: json.loads accepts the literal `Infinity`, and int(inf) + # overflows — degrade it to the ceiling like any other bad value. + return ceiling + + +def narrow_scope(parent: list[str] | None, child: list[str] | None) -> list[str]: + """Intersect a child scope with the parent's — inherit, never widen. + + The allowlist convention (see :class:`core.personae.Persona`) is ``[]``/``None`` + = *all*. So an empty parent means "no restriction → the child's own scope + applies", an empty child means "unspecified → inherit the parent's", and when + both list names the result is their intersection (the child can never gain a + name the parent lacked). + """ + p = parent or [] + c = child or [] + if not p: + return list(c) + if not c: + return list(p) + return [x for x in c if x in p] + + +@dataclass(slots=True) +class SubagentRun: + """One subagent execution and its live status.""" + + run_id: str + persona: str + task: str + depth: int = 1 + background: bool = False + # Per-run sizing the spawning agent chose, resolved/clamped via resolve_cap + # before the run starts (so always concrete on a live run; the 0 defaults are + # only placeholders for direct construction). effort None = inherit caller level. + max_steps: int = 0 + token_budget: int = 0 + effort: str | None = None + status: str = "running" # running | done | cancelled | error + progress: str = "" + result: str = "" + error: str = "" + # True once a background run's result has been folded into a synthesis turn, + # so it is not picked up again by a later batch (#15). + synthesized: bool = False + started_at: float = field(default_factory=time.time) + finished_at: float = 0.0 + origin_channel: str = "" + origin_user_id: str = "" + origin_chat_id: str = "" + _task: asyncio.Task | None = field(default=None, repr=False, compare=False) + + @property + def elapsed(self) -> float: + return (self.finished_at or time.time()) - self.started_at + + @property + def elapsed_str(self) -> str: + secs = int(self.elapsed) + if secs < 60: + return f"{secs}s" + return f"{secs // 60}m {secs % 60}s" + + +class SubagentRegistry: + """In-memory registry of subagent runs for list / status / cancel.""" + + def __init__(self) -> None: + self._runs: OrderedDict[str, SubagentRun] = OrderedDict() + + def register(self, run: SubagentRun) -> None: + self._runs[run.run_id] = run + self._trim() + + def attach_task(self, run_id: str, task: asyncio.Task) -> None: + run = self._runs.get(run_id) + if run: + run._task = task + + def get(self, run_id: str) -> SubagentRun | None: + return self._runs.get(run_id) + + def active_count(self) -> int: + return sum(1 for r in self._runs.values() if r.status == "running") + + def list_runs(self, active_only: bool = False) -> list[SubagentRun]: + runs = [r for r in self._runs.values() if not active_only or r.status == "running"] + return sorted(runs, key=lambda r: r.started_at, reverse=True) + + def running_for(self, channel: str, chat_id: str) -> list[SubagentRun]: + """Still-running background runs spawned from one chat, oldest first. + + Surfaced in the spawning agent's turn preamble so it always knows what is + pending. (Finished runs are folded into the chat history instead, so the + agent remembers their results natively rather than as ephemeral status.) + """ + out = [ + r + for r in self._runs.values() + if r.status == "running" and r.origin_channel == channel and r.origin_chat_id == chat_id + ] + return sorted(out, key=lambda r: r.started_at) + + def finish(self, run_id: str, status: str, *, result: str = "", error: str = "") -> bool: + """Move a *running* run to a terminal state. Returns False (a no-op) when + the run is unknown or already finished, so a terminal state is sticky — + e.g. a late normal completion cannot overwrite a cancellation.""" + run = self._runs.get(run_id) + if not run or run.status != "running": + return False + run.status = status + run.result = result + run.error = error + run.finished_at = time.time() + self._trim() + return True + + def cancel(self, run_id: str) -> bool: + """Request cancellation of a running subagent. Returns False if it is not + running (unknown id or already finished).""" + run = self._runs.get(run_id) + if not run or run.status != "running": + return False + if run._task and not run._task.done(): + run._task.cancel() + # The task's CancelledError handler flips status to "cancelled"; set it + # eagerly too so a sync caller / the next poll sees it immediately. + run.status = "cancelled" + run.finished_at = time.time() + return True + + def _trim(self) -> None: + """Drop the oldest finished runs once we exceed the cap (running runs are + always kept).""" + finished = [rid for rid, r in self._runs.items() if r.status != "running"] + excess = len(finished) - _MAX_FINISHED + for rid in finished[:excess] if excess > 0 else []: + self._runs.pop(rid, None) + + +def short_summary(text: str, limit: int = 280) -> str: + """A one-glance summary of a subagent's result — first non-empty line, capped. + + The crude fallback for :func:`summarize_batch` when the summary inference is + disabled or its output can't be parsed; also the sync spawn's preview field. + """ + for line in (text or "").splitlines(): + line = line.strip() + if line: + return line[:limit] + ("…" if len(line) > limit else "") + return (text or "").strip()[:limit] + + +# (task, outcome, persona, status) for one finished background run. +SummaryItem = tuple[str, str, str, str] + +_SUMMARY_PROMPT = ( + "Background helper task(s) ran for a user and finished. Summarise their " + "results into TWO things, and nothing else:\n" + "1. NOTIFICATION — ONE short sentence (max ~140 chars) giving the single most " + "important result/answer, phrased for the user. No greeting, no 'the helper " + "found', no markdown.\n" + "2. DIGEST — a few concise sentences with the key facts the assistant may need " + "to answer follow-ups. Real content only; do not restate the task or pad.\n\n" + "Respond in EXACTLY this format (NOTIFICATION on one line):\n" + "NOTIFICATION: \n" + "DIGEST: " +) + + +def _format_items(items: list[SummaryItem]) -> str: + blocks = [] + for task, outcome, _persona, status in items: + blocks.append(f"--- task: {task}\nstatus: {status}\nresult:\n{outcome}") + return "\n\n".join(blocks) + + +def fallback_summary(items: list[SummaryItem]) -> tuple[str, str]: + """Crude (no-LLM) fallback for :func:`summarize_batch`: first-line previews + for both the notification and the digest, used when the summary inference is + disabled or its output is unusable.""" + notif = "; ".join(s for s in (short_summary(o, 120) for _, o, _, _ in items) if s)[:200] + digest = "\n".join(f"- {t[:80]}: {short_summary(o, 280)}" for t, o, _, _ in items) + return notif, digest + + +def _parse_summary(raw: str) -> tuple[str, str]: + """Pull (notification, digest) out of the model's reply; ('', '') if unusable.""" + if not raw or not raw.strip(): + return "", "" + low = raw.lower() + n_idx, d_idx = low.find("notification:"), low.find("digest:") + if n_idx == -1 and d_idx == -1: + # No markers — take the first non-empty line as the notification. + lines = [ln.strip() for ln in raw.strip().splitlines() if ln.strip()] + return (lines[0], "\n".join(lines[1:]) or lines[0]) if lines else ("", "") + notif = "" + if n_idx != -1: + end = d_idx if (d_idx > n_idx) else len(raw) + body = raw[n_idx + len("notification:") : end].strip() + notif = body.splitlines()[0].strip() if body else "" + digest = raw[d_idx + len("digest:") :].strip() if d_idx != -1 else "" + return notif, digest + + +async def summarize_batch(llm, model: str, items: list[SummaryItem]) -> tuple[str, str]: + """LLM-distil a finished background batch into (notification, digest). + + ``notification`` is one sentence for the chat; ``digest`` is concise context + for the spawning agent. Falls back to truncation if the model's output can't + be parsed (the caller falls back too if the inference itself raises). + """ + prompt = f"{_SUMMARY_PROMPT}\n\n{_format_items(items)}" + raw = await llm.generate_text(model=model, prompt=prompt, max_tokens=600) + notif, digest = _parse_summary(raw) + if not notif: + return fallback_summary(items) + return notif, digest or notif diff --git a/docs/content/docs/meta.json b/docs/content/docs/meta.json index bda4d45..fc3b2f6 100644 --- a/docs/content/docs/meta.json +++ b/docs/content/docs/meta.json @@ -14,6 +14,7 @@ "permissions", "secrets", "scheduler", + "subagents", "voice", "artifacts", "admin-ui", diff --git a/docs/content/docs/scheduler.mdx b/docs/content/docs/scheduler.mdx index eac4b41..05bd664 100644 --- a/docs/content/docs/scheduler.mdx +++ b/docs/content/docs/scheduler.mdx @@ -14,6 +14,7 @@ MPA includes a built-in scheduler powered by [APScheduler](https://apscheduler.r | **agent** | Natural language task processed by the agent | "Give me a morning briefing" | | **system** | Raw CLI command executed directly | `sqlite3 ... DELETE FROM short_term ...` | | **memory_consolidation** | LLM-driven memory review and promotion | Review short-term memories | +| **subagent** | Task run by a [subagent](/docs/subagents) under a `persona` | "Draft my weekly review" | ### Agent jobs diff --git a/docs/content/docs/subagents.mdx b/docs/content/docs/subagents.mdx new file mode 100644 index 0000000..f295fe1 --- /dev/null +++ b/docs/content/docs/subagents.mdx @@ -0,0 +1,159 @@ +--- +title: Subagents +description: Delegate scoped subtasks to sub-loops — on demand or scheduled — with lifecycle control. +--- + +# Subagents + +A **subagent** is a scoped run of the agent loop that the main agent can delegate +work to. It runs under a chosen [persona](/docs/personae), returns a structured +result, and — like a [scheduled job](/docs/scheduler) — appears in a registry you +can list, monitor, and cancel. + +There is **one execution primitive** reached by two trigger paths: + +- **On demand** — the agent calls the `spawn_subagent` tool mid-conversation. +- **Scheduled** — a `subagent` job fired by the [scheduler](/docs/scheduler). + +A subagent runs with **system semantics**: no goal decomposition, no memory +extraction, no reflection, and no per-action approval prompts — the same path +scheduled jobs already use. You approve the delegation once (the `spawn_subagent` +call is an [ASK action](/docs/permissions)); the subagent then runs autonomously +within its scope. + +## On demand + +The agent decides to delegate and calls the tool. Two modes: + +- **Synchronous** (default): the tool blocks, the subagent runs to completion, + and `{ summary, result }` comes back as the tool result — the agent continues + in the same turn. +- **Background** (`background: true`): the tool returns a run id immediately and + the subagent runs off-turn. A subagent works **for the agent, not the user** — + its raw output is never shown to you. When the whole batch of background runs + for the chat has finished, a small [summary inference](#result-summary) + distils it into **two** things: a one-line *notification* sent to the chat, and + a concise *digest* kept in the agent's context so it can answer follow-ups + without the raw output polluting the conversation. While a run is in flight, + its status is shown to the agent each turn so it never mistakes a pending run + for a finished one. + +```jsonc +// spawn_subagent tool input +{ + "task": "Research the top 3 Python HTTP clients and summarise trade-offs", + "persona": "coding-helper", // optional — omit to run as yourself (same identity & scope) + "max_steps": 6, // optional — caller-chosen, capped at the configured max + "token_budget": 40000, // optional — caller-chosen, min 1000, capped at the configured max + "thinking_effort": "high", // optional — off | low | medium | high; omit to inherit yours + "background": false // true → run async, report back when done +} +``` + +The `task` is self-contained: a subagent has **no memory** of the conversation +that spawned it, so everything it needs goes in `task`. + +The agent **sizes each run to the job**: `max_steps`, `token_budget`, and +`thinking_effort` let it spend less on a quick lookup and more on hard, +multi-step work. Each is optional — omit `max_steps`/`token_budget` for the +configured ceiling, `thinking_effort` to inherit the caller's level — and the +numeric caps are clamped to the [Guardrails](#guardrails) maxima, which the +agent can dial *down* but never past. Omitting `persona` runs the subagent as +the caller itself, with the same identity and scope. + +**Files hand back automatically.** The subagent shares the agent's filesystem, +so any file it writes is already on disk where the caller can reach it — it just +reports the absolute paths in its result, which the spawning agent then reads, +sends, or attaches. + +## Scheduled + +Run a subagent on a cron or one-shot schedule by giving a job `type: "subagent"` +and a `persona`. The result is delivered to the job's `channel`. + +```yaml +scheduler: + jobs: + - id: "weekly_review" + cron: "0 18 * * 5" # Fridays at 18:00 + type: "subagent" + persona: "writing-editor" + task: "Draft a short weekly review from this week's notes and email it to me" + channel: "telegram" +``` + +You can also create subagent jobs from the **Jobs** tab of the [admin UI](/docs/admin-ui). + +## Scope: inherit, never widen + +A subagent's [persona](/docs/personae) scope — the tools it may call, skills it +may load, and [secrets](/docs/secrets) it may use — is the **intersection** of +the caller's scope and the requested persona's. A child can never gain a +capability the caller lacked. The allowlist convention applies: an empty list +means "all", so an unrestricted caller delegating to a scoped persona *narrows*, +and a scoped caller delegating to an open persona keeps its own limits. + +[Permission rules](/docs/permissions) (`NEVER` in particular) still apply inside +the subagent, so a blocked action stays blocked at any depth. + +## Choosing a persona + +Selection stays **user-led**: omitting `persona` runs the subagent as the caller +itself (the chat's bound persona), so a subtask stays in the identity the user +already chose. To make the *specialist* case an informed choice rather than a +guess, each turn the agent sees a compact roster of available personae +(`name — role`) and only names one when the subtask clearly fits it. A wrong +name is rejected with the list of valid names, so the agent can self-correct. + +## Guardrails + +Sensible defaults keep delegation safe and bounded. Edit them in the admin +**Tools** tab (the Subagents card — the master switch and every +limit, applied live without a restart), or under `subagents` in `config.yml`: + +| Setting | Default | What it does | +|---------|---------|--------------| +| `enabled` | `true` | Master switch; off hides the `spawn_subagent` tool everywhere | +| `recursion_depth` | `3` | Max nesting (a top-level spawn is depth 1) | +| `max_steps` | `12` | Ceiling on tool-call rounds per run — a hard stop; the agent may request fewer per spawn, never more | +| `token_budget` | `100000` | Ceiling on tokens per run (best-effort); the agent may request fewer per spawn, never more | +| `max_concurrent` | `3` | Max background runs at once | + +When disabled, the tool is hidden from the assistant and from the Personae tool +scope. Assign it per persona in the **Personae** editor. A run that hits its step +or token budget stops and returns what it has, with a note that the budget was +reached. + +## Result summary + +A finished **background** batch isn't dumped on you raw. A small, fast inference +(its own provider/model, like memory or compaction) distils the batch into a +one-line chat **notification** and a concise **digest** that stays in the agent's +context for follow-ups. Configure it in the **Tools** tab (the *Result summary* +section of the Subagents card) or under `subagent_summary` in `config.yml`: + +| Setting | Default | What it does | +|---------|---------|--------------| +| `enabled` | `true` | Off falls back to a crude first-line truncation (no LLM call) | +| `provider` | `deepseek` | Inference provider for the summary | +| `model` | `deepseek-v4-flash` | Fast + cheap is ideal for this distillation | +| `thinking_level` | `""` | `""` (off) / `low` / `medium` / `high` for reasoning models | + +Synchronous spawns are unaffected — the agent gets the full result inline as the +tool result and weaves it into its own reply. + +## Monitoring & cancelling + +Background subagents are first-class runs you can watch and stop: + +- **Telegram** — send `/jobs` to list active runs, each with an inline **Cancel** + button. Monitoring and cancelling a long task from your phone needs no web UI. +- **Admin UI** — the **Jobs** tab shows a live card per run (persona, status, + elapsed time, current step) with a cancel button; the layout collapses to a + single column at phone width. + +Cancelling stops the run; if it was a background run, a cancellation notice is +posted back to the originating chat. + +> Runs live in memory, so a restart clears the registry (and stops any in-flight +> background runs) — there is nothing to resume after a reboot. diff --git a/tests/test_subagents.py b/tests/test_subagents.py new file mode 100644 index 0000000..554abc0 --- /dev/null +++ b/tests/test_subagents.py @@ -0,0 +1,646 @@ +"""Tests for subagents — scope narrowing, registry, the run primitive, and +scheduled-job wiring (issue #15).""" + +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import AsyncMock + +import pytest + +from core.config import Config +from core.job_store import VALID_TYPES, JobStore +from core.llm import LLMResponse, LLMToolCall +from core.personae import Persona +from core.subagents import ( + FILE_HANDOFF_INSTRUCTION, + SubagentRegistry, + SubagentRun, + narrow_scope, + normalize_effort, + resolve_cap, + short_summary, +) + +# --------------------------------------------------------------------------- +# narrow_scope — inherit, never widen ([] / None == "all") +# --------------------------------------------------------------------------- + + +def test_narrow_scope_parent_unrestricted_takes_child() -> None: + assert narrow_scope([], ["a"]) == ["a"] + assert narrow_scope(None, ["a", "b"]) == ["a", "b"] + + +def test_narrow_scope_child_unspecified_inherits_parent() -> None: + assert narrow_scope(["a", "b"], []) == ["a", "b"] + assert narrow_scope(["a"], None) == ["a"] + + +def test_narrow_scope_both_restricted_is_intersection() -> None: + # The child can never gain a name the parent lacks. + assert narrow_scope(["a", "b"], ["b", "c"]) == ["b"] + assert narrow_scope(["a"], ["b"]) == [] + + +def test_narrow_scope_both_empty_stays_all() -> None: + assert narrow_scope([], []) == [] + + +# --------------------------------------------------------------------------- +# SubagentRegistry +# --------------------------------------------------------------------------- + + +def _run(run_id: str, status: str = "running") -> SubagentRun: + return SubagentRun(run_id=run_id, persona="", task="t", status=status) + + +def test_registry_register_list_and_active_count() -> None: + reg = SubagentRegistry() + reg.register(_run("a")) + reg.register(_run("b")) + assert reg.active_count() == 2 + assert {r.run_id for r in reg.list_runs()} == {"a", "b"} + reg.finish("a", "done", result="ok") + assert reg.active_count() == 1 + assert {r.run_id for r in reg.list_runs(active_only=True)} == {"b"} + assert reg.get("a").result == "ok" + + +def test_registry_cancel_only_running() -> None: + reg = SubagentRegistry() + reg.register(_run("a")) + assert reg.cancel("a") is True + assert reg.get("a").status == "cancelled" + # already finished / unknown → False + assert reg.cancel("a") is False + assert reg.cancel("missing") is False + + +def test_registry_trims_finished_runs() -> None: + reg = SubagentRegistry() + for i in range(60): + reg.register(_run(f"r{i}")) + reg.finish(f"r{i}", "done") + # Only the most recent finished runs are kept (cap 50). + assert len(reg.list_runs()) == 50 + + +def test_short_summary_first_nonempty_line_capped() -> None: + assert short_summary("\n\nhello world\nmore") == "hello world" + assert short_summary("x" * 400).endswith("…") + assert len(short_summary("x" * 400)) == 281 # 280 + ellipsis + + +def test_running_for_filters_by_chat_and_drops_finished() -> None: + reg = SubagentRegistry() + here = SubagentRun(run_id="a", persona="", task="t", origin_channel="tg", origin_chat_id="1") + other = SubagentRun(run_id="b", persona="", task="t", origin_channel="tg", origin_chat_id="2") + reg.register(here) + reg.register(other) + + # Running runs appear every turn; the other chat's run is never included. + assert [r.run_id for r in reg.running_for("tg", "1")] == ["a"] + assert [r.run_id for r in reg.running_for("tg", "1")] == ["a"] + + # Once finished it drops out of the running list (its result goes to history). + reg.finish("a", "done", result="answer") + assert reg.running_for("tg", "1") == [] + + +# --------------------------------------------------------------------------- +# AgentCore.run_subagent — built with a scripted fake LLM (no network) +# --------------------------------------------------------------------------- + + +class _ScriptedLLM: + """Returns a fixed sequence of LLMResponses; trivial message builders.""" + + def __init__(self, responses: list[LLMResponse]) -> None: + self._responses = list(responses) + self.provider = "deepseek" + + async def generate(self, **_kw) -> LLMResponse: + if self._responses: + return self._responses.pop(0) + return LLMResponse(text="(done)", tool_calls=[]) + + def assistant_message(self, response: LLMResponse) -> dict: + return {"role": "assistant", "content": response.text} + + def tool_result_messages(self, results: list[dict]) -> list[dict]: + return [{"role": "user", "content": results}] + + +@pytest.fixture +def agent(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + from core.agent import AgentCore + + cfg = Config() + cfg.agent.llm_provider = "deepseek" + cfg.agent.model = "deepseek-v4-flash" + cfg.memory.embedding.enabled = False # keep retrieval lexical (no model load) + return AgentCore(cfg) + + +@pytest.mark.asyncio +async def test_run_subagent_sync_returns_result(agent) -> None: + agent.llm = _ScriptedLLM([LLMResponse(text="the answer", tool_calls=[])]) + result = await agent.run_subagent(task="do a thing") + assert result["ok"] is True + assert result["result"] == "the answer" + assert result["summary"] == "the answer" + run = agent.subagents.get(result["run_id"]) + assert run.status == "done" + + +@pytest.mark.asyncio +async def test_run_subagent_disabled(agent) -> None: + agent.config.subagents.enabled = False + result = await agent.run_subagent(task="x") + assert "disabled" in result["error"].lower() + + +@pytest.mark.asyncio +async def test_run_subagent_depth_cap(agent) -> None: + agent.config.subagents.recursion_depth = 2 + # A caller already at the ceiling cannot spawn. + result = await agent.run_subagent(task="x", parent_state={"depth": 2}) + assert "recursion depth" in result["error"].lower() + + +@pytest.mark.asyncio +async def test_run_subagent_unknown_persona(agent) -> None: + result = await agent.run_subagent(task="x", persona_name="does-not-exist") + assert "not found" in result["error"].lower() + + +@pytest.mark.asyncio +async def test_run_subagent_step_budget_stops_loop(agent) -> None: + agent.config.subagents.max_steps = 1 + # Always asks for a tool → would loop forever without the cap. + call = LLMToolCall(id="1", name="web_search", arguments={"query": "q"}) + agent.llm = _ScriptedLLM([LLMResponse(text="", tool_calls=[call]) for _ in range(5)]) + result = await agent.run_subagent(task="loop") + assert result["ok"] is True + assert "budget" in result["result"].lower() + + +@pytest.mark.asyncio +async def test_run_subagent_token_budget_stops_loop(agent) -> None: + agent.config.subagents.token_budget = 100 + agent.config.subagents.max_steps = 100 # ensure the token budget is the limiter + call = LLMToolCall(id="1", name="web_search", arguments={"query": "q"}) + # Each round reports 80 tokens; cumulative exceeds 100 after the second call. + agent.llm = _ScriptedLLM( + [ + LLMResponse(text="", tool_calls=[call], usage={"input_tokens": 80, "output_tokens": 0}) + for _ in range(10) + ] + ) + result = await agent.run_subagent(task="loop") + assert result["ok"] is True + assert "budget" in result["result"].lower() + + +@pytest.mark.asyncio +async def test_background_subagent_notifies_user_digests_context(agent, monkeypatch) -> None: + channel = AsyncMock() + agent.channels["telegram"] = channel + agent.llm = _ScriptedLLM([LLMResponse(text="raw verbose findings: CHF 599 ...", tool_calls=[])]) + + # Stub the summary inference: (chat notification, context digest). + async def fake_summary(batch): + return "Cheapest is CHF 599.", "iPhone 17e 256GB at CHF 599; entry model." + + monkeypatch.setattr(agent, "_summarize_subagent_batch", fake_summary) + + # A trailing assistant turn for the digest to merge into (keeps alternation). + await agent.history.add_turn("telegram", "u1", "user", "price?", "555") + await agent.history.add_turn("telegram", "u1", "assistant", "On it.", "555") + + result = await agent.run_subagent( + task="price check", + origin_channel="telegram", + origin_user_id="u1", + origin_chat_id="555", + background=True, + ) + run = agent.subagents.get(result["run_id"]) + await run._task + + assert run.status == "done" + # Chat: the one-line NOTIFICATION only — never the raw output. + channel.send.assert_awaited_once_with("555", "Cheapest is CHF 599.") + # Context: the concise digest is kept (merged), the raw output never is. + turns = await agent.history.get_messages("telegram", "u1", "555") + blob = str(turns[-1]["content"]) + assert [t["role"] for t in turns] == ["user", "assistant"] # still alternating + assert "iPhone 17e 256GB at CHF 599" in blob + assert "raw verbose findings" not in blob + + +@pytest.mark.asyncio +async def test_background_batch_delivers_once_when_all_done(agent, monkeypatch) -> None: + calls: list[list[str]] = [] + + async def fake_deliver(channel, user_id, chat_id, batch): + calls.append([r.run_id for r in batch]) + + monkeypatch.setattr(agent, "_summarize_and_deliver", fake_deliver) + + common = dict(background=True, origin_channel="telegram", origin_chat_id="c") + r1 = SubagentRun(run_id="s1", persona="", task="a", origin_user_id="u", **common) + r2 = SubagentRun(run_id="s2", persona="", task="b", origin_user_id="u", **common) + agent.subagents.register(r1) + agent.subagents.register(r2) + + # First finishes → the other is still running → barrier holds, no delivery. + agent.subagents.finish("s1", "done", result="x") + await agent._maybe_deliver_subagent_batch(r1) + assert calls == [] + + # Last finishes → barrier releases → ONE delivery over the whole batch. + agent.subagents.finish("s2", "done", result="y") + await agent._maybe_deliver_subagent_batch(r2) + assert len(calls) == 1 + assert sorted(calls[0]) == ["s1", "s2"] + assert r1.synthesized and r2.synthesized + + +@pytest.mark.asyncio +async def test_cancelling_a_sibling_releases_a_deferred_reply(agent, monkeypatch) -> None: + """Regression: a done run that deferred to a still-running sibling must not be + orphaned when the user cancels that sibling (the lost-reply blocker).""" + import asyncio + + calls: list[list[str]] = [] + + async def fake_deliver(channel, user_id, chat_id, batch): + calls.append(sorted(r.run_id for r in batch)) + + monkeypatch.setattr(agent, "_summarize_and_deliver", fake_deliver) + + gate = asyncio.Event() + + async def fake_loop(task, persona, state, run): + if task == "B-task": + await gate.wait() # block so this run is "still running" when A finishes + return "B done" + return "A done" + + monkeypatch.setattr(agent, "_run_subagent_loop", fake_loop) + + origin = dict(origin_channel="telegram", origin_user_id="u", origin_chat_id="c") + b_res = await agent.run_subagent(task="B-task", background=True, **origin) + a_res = await agent.run_subagent(task="A-task", background=True, **origin) + + # A completes but B is still running → A defers (not delivered, not lost). + await agent.subagents.get(a_res["run_id"])._task + assert calls == [] + assert agent.subagents.get(a_res["run_id"]).synthesized is False + + # User cancels B → its cancel path must release A's deferred reply. + agent.subagents.cancel(b_res["run_id"]) + with pytest.raises(asyncio.CancelledError): + await agent.subagents.get(b_res["run_id"])._task + + # A's reply was delivered (not lost), and only A — B was cancelled. + assert calls == [[a_res["run_id"]]] + assert agent.subagents.get(a_res["run_id"]).synthesized is True + + +def test_summary_parsing_and_fallback() -> None: + from core.subagents import _parse_summary, fallback_summary + + n, d = _parse_summary("NOTIFICATION: Cheapest is CHF 599.\nDIGEST: iPhone 17e 256GB CHF 599.") + assert n == "Cheapest is CHF 599." + assert "iPhone 17e" in d + # No markers → first non-empty line becomes the notification. + assert _parse_summary("Just one line")[0] == "Just one line" + assert _parse_summary("") == ("", "") + # Truncation fallback from raw items (no LLM). + items = [("task a", "line1\nline2", "", "done"), ("task b", "r b", "", "done")] + notif, digest = fallback_summary(items) + assert "line1" in notif and "r b" in notif + assert "- task a:" in digest + + +@pytest.mark.asyncio +async def test_summarize_batch_calls_llm_and_parses() -> None: + from core.subagents import summarize_batch + + class FakeLLM: + async def generate_text(self, *, model, prompt, max_tokens=600): + return "NOTIFICATION: Done — 3 results.\nDIGEST: A, B and C found." + + notif, digest = await summarize_batch(FakeLLM(), "m", [("t", "r", "", "done")]) + assert notif == "Done — 3 results." + assert digest == "A, B and C found." + + +@pytest.mark.asyncio +async def test_run_subagent_background_respects_concurrency(agent) -> None: + agent.config.subagents.max_concurrent = 1 + # Pre-fill one running slot. + agent.subagents.register(SubagentRun(run_id="busy", persona="", task="t")) + result = await agent.run_subagent(task="x", background=True) + assert "concurrent" in result["error"].lower() or "max" in result["error"].lower() + + +@pytest.mark.asyncio +async def test_spawn_subagent_not_deduplicated_in_turn(agent) -> None: + """Two identical spawns in one turn must both run (each is a distinct run).""" + from core.llm import LLMToolCall + + agent.llm = _ScriptedLLM( + [LLMResponse(text="a", tool_calls=[]), LLMResponse(text="b", tool_calls=[])] + ) + state = agent._new_request_state( + None, origin={"channel": "system", "user_id": "u", "chat_id": ""} + ) + call = LLMToolCall(id="x", name="spawn_subagent", arguments={"task": "same task"}) + r1 = await agent._execute_tool(call, "system", "u", state) + r2 = await agent._execute_tool(call, "system", "u", state) + assert r1.get("ok") is True + assert r2.get("ok") is True + assert r1["run_id"] != r2["run_id"] + + +def test_finish_does_not_overwrite_terminal_state() -> None: + """A late normal completion cannot un-cancel a run.""" + reg = SubagentRegistry() + reg.register(_run("a")) + assert reg.cancel("a") is True + assert reg.finish("a", "done", result="late") is False # no-op + assert reg.get("a").status == "cancelled" + assert reg.get("a").result == "" + + +def test_narrow_persona_intersects_scopes(agent) -> None: + parent = Persona(name="p", skills=["s1", "s2"], tools=["a", "b"], secrets=["x"]) + requested = Persona(name="child", skills=[], tools=["b", "c"], secrets=["y"]) + child = agent._narrow_persona(requested, {"persona_obj": parent}) + assert child.name == "child" + assert child.skills == ["s1", "s2"] # child unspecified → inherits parent + assert child.tools == ["b"] # intersection, never 'c' + assert child.secrets == [] # 'y' not in parent's ['x'] + + +def test_subagent_status_note_lists_only_running_runs(agent) -> None: + agent.subagents.register( + SubagentRun( + run_id="r1", + persona="coding-helper", + task="t", + origin_channel="repl", + origin_chat_id="repl", + progress="step 2", + ) + ) + note = agent._subagent_status_note("repl", "repl") + assert "r1" in note and "running" in note and "step 2" in note + # Scoped to the chat: a different chat sees nothing. + assert agent._subagent_status_note("repl", "other-chat") == "" + + # Once finished it leaves the preamble (the agent synthesises a reply instead). + agent.subagents.finish("r1", "done", result="the iPhone 17e is CHF 599") + assert agent._subagent_status_note("repl", "repl") == "" + + +# --------------------------------------------------------------------------- +# Scheduled subagent jobs +# --------------------------------------------------------------------------- + + +def test_disabled_drops_spawn_subagent_from_llm_tools() -> None: + from core.agent import TOOLS, apply_feature_gates + + def names(ts): + return {t["name"] for t in ts} + + base = dict(secrets_available=True, artifacts_enabled=True) + assert "spawn_subagent" in names(apply_feature_gates(TOOLS, **base, subagents_enabled=True)) + assert "spawn_subagent" not in names( + apply_feature_gates(TOOLS, **base, subagents_enabled=False) + ) + + +def test_disabled_hides_spawn_subagent_from_persona_scope() -> None: + from api.admin import GATEABLE_TOOLS, gateable_tools_for + + assert "spawn_subagent" in gateable_tools_for(True) + assert set(gateable_tools_for(True)) == set(GATEABLE_TOOLS) + assert "spawn_subagent" not in gateable_tools_for(True, subagents_enabled=False) + + +def test_subagent_is_valid_job_type() -> None: + assert "subagent" in VALID_TYPES + + +@pytest.mark.asyncio +async def test_job_store_persists_persona(tmp_path) -> None: + store = JobStore(db_path=str(tmp_path / "jobs.db")) + job = await store.upsert_job( + "j1", type="subagent", schedule="cron", cron="0 9 * * *", task="brief", persona="analyst" + ) + assert job["type"] == "subagent" + assert job["persona"] == "analyst" + fetched = await store.get_job("j1") + assert fetched["persona"] == "analyst" + + +@pytest.mark.asyncio +async def test_job_store_migrates_persona_column(tmp_path) -> None: + """A DB created before the persona column gains it on next open.""" + import sqlite3 + + db_path = str(tmp_path / "old.db") + with sqlite3.connect(db_path) as db: + db.execute( + "CREATE TABLE jobs (id TEXT PRIMARY KEY, type TEXT, schedule TEXT, cron TEXT, " + "run_at TEXT, task TEXT, channel TEXT, status TEXT, created_by TEXT, " + "description TEXT, created_at TEXT, updated_at TEXT)" + ) + db.execute("INSERT INTO jobs (id, type) VALUES ('legacy', 'agent')") + db.commit() + + store = JobStore(db_path=db_path) + job = await store.upsert_job("new", type="subagent", persona="coach") + assert job["persona"] == "coach" + legacy = await store.get_job("legacy") + assert legacy["persona"] == "" # backfilled default + + +@pytest.mark.asyncio +async def test_run_subagent_task_delivers_to_owner() -> None: + from core.scheduler import run_subagent_task, set_agent_context + + channel = AsyncMock() + agent = SimpleNamespace( + channels={"telegram": channel}, + run_subagent=AsyncMock(return_value={"ok": True, "result": "scheduled out"}), + config=SimpleNamespace( + channels=SimpleNamespace(telegram=SimpleNamespace(allowed_user_ids=[7])) + ), + job_store=None, + ) + set_agent_context(agent) + + await run_subagent_task(persona="analyst", task="weekly review", channel="telegram") + + agent.run_subagent.assert_awaited_once() + channel.send.assert_awaited_once_with(7, "scheduled out") + + +# --------------------------------------------------------------------------- +# Caller-sized runs: max_steps / token_budget / thinking_effort + file handoff +# --------------------------------------------------------------------------- + + +def test_resolve_cap_defaults_clamps_and_coerces() -> None: + assert resolve_cap(None, 12) == 12 # caller didn't choose → configured ceiling + assert resolve_cap(3, 12) == 3 # honoured below the ceiling + assert resolve_cap(999, 12) == 12 # config is a ceiling, never exceeded + assert resolve_cap(0, 12, floor=1) == 1 # floored + assert resolve_cap("nope", 12) == 12 # garbage degrades to the ceiling + assert resolve_cap("4", 12) == 4 # numeric string coerces + assert resolve_cap(float("inf"), 12) == 12 # int(inf) overflows → ceiling, no crash + + +def test_normalize_effort_maps_and_inherits() -> None: + assert normalize_effort(None) is None # inherit the caller's level + assert normalize_effort("") is None + assert normalize_effort("off") == "" # reasoning off + assert normalize_effort("HIGH") == "high" # case-insensitive + assert normalize_effort("medium") == "medium" + assert normalize_effort("bogus") is None # unknown → safe inherit default + + +class _RecordingLLM(_ScriptedLLM): + """A scripted LLM that also records the system prompt of each call.""" + + def __init__(self, responses: list[LLMResponse]) -> None: + super().__init__(responses) + self.systems: list[str] = [] + + async def generate(self, *, system: str = "", **_kw) -> LLMResponse: + self.systems.append(system) + return await super().generate() + + +@pytest.mark.asyncio +async def test_subagent_system_prompt_carries_file_handoff(agent) -> None: + rec = _RecordingLLM([LLMResponse(text="done", tool_calls=[])]) + agent.llm = rec + await agent.run_subagent(task="x") + assert any(FILE_HANDOFF_INSTRUCTION in s for s in rec.systems) + + +@pytest.mark.asyncio +async def test_run_subagent_caller_max_steps_is_the_limiter(agent) -> None: + agent.config.subagents.max_steps = 100 # high config ceiling … + call = LLMToolCall(id="1", name="web_search", arguments={"query": "q"}) + agent.llm = _ScriptedLLM([LLMResponse(text="", tool_calls=[call]) for _ in range(10)]) + result = await agent.run_subagent(task="loop", max_steps=1) # … caller wants just 1 + assert result["ok"] is True + assert "budget" in result["result"].lower() + assert agent.subagents.get(result["run_id"]).max_steps == 1 + + +@pytest.mark.asyncio +async def test_run_subagent_clamps_caps_to_config_ceiling(agent) -> None: + agent.config.subagents.max_steps = 5 + agent.config.subagents.token_budget = 9000 + agent.llm = _ScriptedLLM([LLMResponse(text="done", tool_calls=[])]) + result = await agent.run_subagent(task="x", max_steps=999, token_budget=10**9) + run = agent.subagents.get(result["run_id"]) + assert run.max_steps == 5 + assert run.token_budget == 9000 + + +@pytest.mark.asyncio +async def test_run_subagent_effort_inherits_by_default(agent, monkeypatch) -> None: + agent.llm = _ScriptedLLM([LLMResponse(text="ok", tool_calls=[])]) + monkeypatch.setattr( + agent, "_background_llm", lambda *a, **k: pytest.fail("should not clone when inheriting") + ) + result = await agent.run_subagent(task="x") # no thinking_effort + assert result["ok"] is True + assert agent.subagents.get(result["run_id"]).effort is None + + +@pytest.mark.asyncio +async def test_run_subagent_effort_uses_scoped_client(agent, monkeypatch) -> None: + agent.llm = _ScriptedLLM([LLMResponse(text="ok", tool_calls=[])]) + captured: dict = {} + + def spy(provider, level=""): + captured["level"] = level + return _ScriptedLLM([LLMResponse(text="ok", tool_calls=[])]) + + monkeypatch.setattr(agent, "_background_llm", spy) + result = await agent.run_subagent(task="x", thinking_effort="high") + assert result["ok"] is True + assert captured["level"] == "high" + assert agent.subagents.get(result["run_id"]).effort == "high" + + +# --------------------------------------------------------------------------- +# Persona roster — let the agent pick a specialist, selection stays user-led +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_personae_roster_lists_name_and_role(agent, monkeypatch) -> None: + personae = [ + Persona(name="coding-helper", role="Writes and reviews code"), + Persona(name="writing-editor", role="Edits prose\nsecond line ignored"), + ] + monkeypatch.setattr(agent.personae, "list_personae", AsyncMock(return_value=personae)) + block = await agent._personae_roster_block(None) + assert "" in block + assert "- coding-helper — Writes and reviews code" in block + assert "- writing-editor — Edits prose" in block # only the first role line + assert "second line ignored" not in block + + +@pytest.mark.asyncio +async def test_personae_roster_marks_current_and_gates(agent, monkeypatch) -> None: + personae = [Persona(name="me", role="r1"), Persona(name="other", role="r2")] + monkeypatch.setattr(agent.personae, "list_personae", AsyncMock(return_value=personae)) + block = await agent._personae_roster_block(Persona(name="me", role="r1")) + assert "- me (you) — r1" in block + # a persona whose tool scope excludes spawn_subagent gets no roster + scoped = Persona(name="me", tools=["web_search"]) + assert await agent._personae_roster_block(scoped) == "" + # nor when subagents are disabled + agent.config.subagents.enabled = False + assert await agent._personae_roster_block(None) == "" + + +@pytest.mark.asyncio +async def test_personae_roster_only_offered_on_main_turn(agent, monkeypatch) -> None: + monkeypatch.setattr( + agent.personae, + "list_personae", + AsyncMock(return_value=[Persona(name="coding-helper", role="code")]), + ) + # subagent preamble (offer_personae defaults False) → no roster leaks in + assert "" not in await agent._turn_preamble(None, query="x") + # main turn opts in + assert "" in await agent._turn_preamble(None, query="x", offer_personae=True) + + +@pytest.mark.asyncio +async def test_run_subagent_unknown_persona_lists_available(agent, monkeypatch) -> None: + monkeypatch.setattr( + agent.personae, + "list_personae", + AsyncMock(return_value=[Persona(name="coding-helper"), Persona(name="analyst")]), + ) + result = await agent.run_subagent(task="x", persona_name="nope") + assert "not found" in result["error"].lower() + assert "coding-helper" in result["error"] and "analyst" in result["error"]