diff --git a/codec_agents.py b/codec_agents.py index e0f0234..2a673cb 100644 --- a/codec_agents.py +++ b/codec_agents.py @@ -28,6 +28,13 @@ _VALID_TOOL_NAME_RE = re.compile(r'^[A-Za-z0-9_.\-]+$') _MAX_TOOL_NAME_LEN = 100 _MAX_TOOL_INPUT_LEN = 50000 +# L1 / SR-61: per-tool wall-clock budget. A skill tool with no internal timeout +# (input(), a no-timeout network call, a deadlock) would otherwise hang the +# agent — and its default-thread-pool worker — forever. wait_for returns control +# to the agent as a recoverable error string. (Residual: the abandoned worker +# thread can't be killed and stays parked until the blocking call returns — a +# generous budget keeps that rare; tools doing real work finish well inside it.) +_TOOL_CALL_TIMEOUT_SECONDS = 120 # ── CONFIG ── CONFIG_PATH = os.path.expanduser("~/.codec/config.json") @@ -62,7 +69,10 @@ def _serper_api_key() -> str: except Exception: return os.environ.get("SERPER_API_KEY", "") -SERPER_API_KEY = _serper_api_key() +# L1 / SR-61: dropped the eager module global `SERPER_API_KEY = _serper_api_key()` +# — it was never read (web_search routes through codec_search.search, which +# fetches the key itself) yet did a Keychain shellout at every import. The +# getter above stays for callers that want a live read. # ── HTTP connection pools (reuse TCP connections across calls) ── _HTTP_HEADERS = { @@ -470,7 +480,19 @@ def _inner(t, _c): ) ctx = contextvars.copy_context() - result = await loop.run_in_executor(None, ctx.run, _run_tool_with_hooks) + try: + result = await asyncio.wait_for( + loop.run_in_executor(None, ctx.run, _run_tool_with_hooks), + timeout=_TOOL_CALL_TIMEOUT_SECONDS, + ) + except asyncio.TimeoutError: + # L1 / SR-61: surface as a tool-error string so the agent's ReAct + # loop can recover (try a different tool / FINAL) instead of hanging. + _audit("tool_result", tool=tool_name, agent=self.name, outcome="timeout", + error=f"tool exceeded {_TOOL_CALL_TIMEOUT_SECONDS}s budget") + return (f"Tool '{tool_name}' timed out after " + f"{_TOOL_CALL_TIMEOUT_SECONDS}s. Try a different approach or " + f"give your FINAL answer.") if isinstance(result, HookVeto): result = (f"Tool '{tool_name}' was vetoed by plugin " f"'{result.plugin_name}': {result.reason}") @@ -816,10 +838,12 @@ def __post_init__(self): for agent in self.agents: before = len(agent.tools) agent.tools = [t for t in agent.tools if t.name in allowed] - if agent.tools != agent.tools or before != len(agent.tools): - stripped = before - len(agent.tools) - if stripped: - print(f"[Crew] Scoped {agent.name}: removed {stripped} tool(s) outside allowlist") + # L1 / SR-61: was `if agent.tools != agent.tools or ...` — the + # first operand is always False (list compared to itself); the + # condition reduced to the `if stripped:` below. Simplified. + stripped = before - len(agent.tools) + if stripped: + print(f"[Crew] Scoped {agent.name}: removed {stripped} tool(s) outside allowlist") async def run(self, callback: Optional[Callable] = None) -> str: # One correlation_id per crew run. All nested agent_start / agent_finish / @@ -869,7 +893,20 @@ async def run(self, callback: Optional[Callable] = None) -> str: # spawned N concurrent agent.run coroutines unbounded. pairs = list(zip(self.agents, self.tasks))[:self.max_steps] coros = [a.run(t, callback=callback) for a, t in pairs] - results = await asyncio.gather(*coros) + # L1 / SR-61: return_exceptions=True so ONE agent's failure + # (e.g. a stuck-detection abort raising) doesn't discard every + # other agent's result. Sequential mode isolates per-agent + # (try/raise above); parallel now degrades gracefully too — a + # failed agent contributes an error marker, the rest still return. + raw = await asyncio.gather(*coros, return_exceptions=True) + results = [] + for (agent, _task), r in zip(pairs, raw): + if isinstance(r, Exception): + _audit("agent_finish", agent=agent.name, outcome="error", + error_type=type(r).__name__, error=str(r)[:500]) + results.append(f"[{agent.name} failed: {type(r).__name__}: {r}]") + else: + results.append(r) final = "\n\n---\n\n".join(results) else: final = f"Unknown crew mode: {self.mode}" diff --git a/tests/test_agents_reliability_l1.py b/tests/test_agents_reliability_l1.py new file mode 100644 index 0000000..ecefe9f --- /dev/null +++ b/tests/test_agents_reliability_l1.py @@ -0,0 +1,124 @@ +"""L1 — regression tests for codec_agents.py reliability fixes (review sweep). + + - parallel crews degrade gracefully (one agent's exception no longer sinks all) + - a hanging tool is bounded by a per-tool wall-clock budget (no infinite hang) + - the Crew allowlist still strips out-of-allowlist tools (tautology simplification) + - the dead eager SERPER_API_KEY module global is gone +""" +from __future__ import annotations + +import asyncio +import sys +import time +from pathlib import Path + +_REPO = Path(__file__).resolve().parents[1] +if str(_REPO) not in sys.path: + sys.path.insert(0, str(_REPO)) + +import codec_agents # noqa: E402 +from codec_agents import Agent, Crew, Tool # noqa: E402 + + +# ── parallel crew graceful degradation ───────────────────────────────────── +def test_parallel_crew_survives_one_agent_exception(): + good = Agent(name="Good", role="r", tools=[]) + bad = Agent(name="Bad", role="r", tools=[]) + + async def _good_run(task, context="", callback=None): + return "good-result" + + async def _bad_run(task, context="", callback=None): + raise RuntimeError("boom") + + good.run = _good_run + bad.run = _bad_run + + crew = Crew(agents=[good, bad], tasks=["t1", "t2"], mode="parallel") + final = asyncio.run(crew.run()) + + # the good agent's result survives; the bad one becomes an error marker + assert "good-result" in final + assert "Bad failed" in final + assert "RuntimeError" in final + + +def test_parallel_crew_all_succeed_still_joins(): + a = Agent(name="A", role="r", tools=[]) + b = Agent(name="B", role="r", tools=[]) + + async def _ra(task, context="", callback=None): + return "ra" + + async def _rb(task, context="", callback=None): + return "rb" + + a.run, b.run = _ra, _rb + crew = Crew(agents=[a, b], tasks=["t", "t"], mode="parallel") + final = asyncio.run(crew.run()) + assert "ra" in final and "rb" in final + assert "failed" not in final + + +# ── per-tool timeout ─────────────────────────────────────────────────────── +def test_hanging_tool_is_bounded(monkeypatch): + # shrink the budget so the test is fast; the tool blocks longer than that + monkeypatch.setattr(codec_agents, "_TOOL_CALL_TIMEOUT_SECONDS", 0.3) + + def _slow(_s): + time.sleep(2.0) # exceeds the 0.3s budget + return "never" + + tool = Tool(name="slow", description="blocks", fn=_slow) + agent = Agent(name="T", role="r", tools=[tool]) + + # Measure the COROUTINE's own return time, not asyncio.run() — the latter + # also waits for the abandoned (non-killable) executor thread to finish its + # 2s sleep at loop teardown. The fix's contract is that the *agent* regains + # control at the budget, which is what run_until_complete returning proves. + loop = asyncio.new_event_loop() + try: + t0 = time.time() + out = loop.run_until_complete(agent._execute_tool_with_hooks(tool, "slow", "x")) + elapsed = time.time() - t0 # captured BEFORE loop.close() joins the thread + finally: + loop.close() + + assert "timed out" in out.lower(), f"expected timeout message, got: {out!r}" + assert elapsed < 1.5, f"wait_for did not bound the hang (took {elapsed:.1f}s)" + + +def test_fast_tool_returns_normally(monkeypatch): + monkeypatch.setattr(codec_agents, "_TOOL_CALL_TIMEOUT_SECONDS", 5) + tool = Tool(name="echo", description="echo", fn=lambda s: f"got:{s}") + agent = Agent(name="T", role="r", tools=[tool]) + out = asyncio.run(agent._execute_tool_with_hooks(tool, "echo", "hi")) + assert out == "got:hi" + + +# ── Crew allowlist still strips (tautology simplification) ────────────────── +def test_crew_allowlist_strips_out_of_scope_tools(): + keep = Tool(name="keep", description="", fn=lambda s: s) + drop = Tool(name="drop", description="", fn=lambda s: s) + agent = Agent(name="A", role="r", tools=[keep, drop]) + Crew(agents=[agent], tasks=["t"], allowed_tools=["keep"]) + names = {t.name for t in agent.tools} + assert names == {"keep"}, f"allowlist scoping broken: {names}" + + +def test_crew_no_allowlist_keeps_all_tools(): + a_tool = Tool(name="a", description="", fn=lambda s: s) + b_tool = Tool(name="b", description="", fn=lambda s: s) + agent = Agent(name="A", role="r", tools=[a_tool, b_tool]) + Crew(agents=[agent], tasks=["t"], allowed_tools=None) + assert {t.name for t in agent.tools} == {"a", "b"} + + +# ── dead global removed ───────────────────────────────────────────────────── +def test_dead_serper_global_removed(): + assert not hasattr(codec_agents, "SERPER_API_KEY"), ( + "the eager SERPER_API_KEY module global should be gone (it was unused " + "and did a Keychain shellout at import)" + ) + # the live getter stays + assert hasattr(codec_agents, "_serper_api_key")