Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 44 additions & 7 deletions codec_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@
_VALID_TOOL_NAME_RE = re.compile(r'^[A-Za-z0-9_.\-]+$')
_MAX_TOOL_NAME_LEN = 100
_MAX_TOOL_INPUT_LEN = 50000
# L1 / SR-61: per-tool wall-clock budget. A skill tool with no internal timeout
# (input(), a no-timeout network call, a deadlock) would otherwise hang the
# agent — and its default-thread-pool worker — forever. wait_for returns control
# to the agent as a recoverable error string. (Residual: the abandoned worker
# thread can't be killed and stays parked until the blocking call returns — a
# generous budget keeps that rare; tools doing real work finish well inside it.)
_TOOL_CALL_TIMEOUT_SECONDS = 120

# ── CONFIG ──
CONFIG_PATH = os.path.expanduser("~/.codec/config.json")
Expand Down Expand Up @@ -62,7 +69,10 @@ def _serper_api_key() -> str:
except Exception:
return os.environ.get("SERPER_API_KEY", "")

SERPER_API_KEY = _serper_api_key()
# L1 / SR-61: dropped the eager module global `SERPER_API_KEY = _serper_api_key()`
# — it was never read (web_search routes through codec_search.search, which
# fetches the key itself) yet did a Keychain shellout at every import. The
# getter above stays for callers that want a live read.

# ── HTTP connection pools (reuse TCP connections across calls) ──
_HTTP_HEADERS = {
Expand Down Expand Up @@ -470,7 +480,19 @@ def _inner(t, _c):
)

ctx = contextvars.copy_context()
result = await loop.run_in_executor(None, ctx.run, _run_tool_with_hooks)
try:
result = await asyncio.wait_for(
loop.run_in_executor(None, ctx.run, _run_tool_with_hooks),
timeout=_TOOL_CALL_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError:
# L1 / SR-61: surface as a tool-error string so the agent's ReAct
# loop can recover (try a different tool / FINAL) instead of hanging.
_audit("tool_result", tool=tool_name, agent=self.name, outcome="timeout",
error=f"tool exceeded {_TOOL_CALL_TIMEOUT_SECONDS}s budget")
return (f"Tool '{tool_name}' timed out after "
f"{_TOOL_CALL_TIMEOUT_SECONDS}s. Try a different approach or "
f"give your FINAL answer.")
if isinstance(result, HookVeto):
result = (f"Tool '{tool_name}' was vetoed by plugin "
f"'{result.plugin_name}': {result.reason}")
Expand Down Expand Up @@ -816,10 +838,12 @@ def __post_init__(self):
for agent in self.agents:
before = len(agent.tools)
agent.tools = [t for t in agent.tools if t.name in allowed]
if agent.tools != agent.tools or before != len(agent.tools):
stripped = before - len(agent.tools)
if stripped:
print(f"[Crew] Scoped {agent.name}: removed {stripped} tool(s) outside allowlist")
# L1 / SR-61: was `if agent.tools != agent.tools or ...` — the
# first operand is always False (list compared to itself); the
# condition reduced to the `if stripped:` below. Simplified.
stripped = before - len(agent.tools)
if stripped:
print(f"[Crew] Scoped {agent.name}: removed {stripped} tool(s) outside allowlist")

async def run(self, callback: Optional[Callable] = None) -> str:
# One correlation_id per crew run. All nested agent_start / agent_finish /
Expand Down Expand Up @@ -869,7 +893,20 @@ async def run(self, callback: Optional[Callable] = None) -> str:
# spawned N concurrent agent.run coroutines unbounded.
pairs = list(zip(self.agents, self.tasks))[:self.max_steps]
coros = [a.run(t, callback=callback) for a, t in pairs]
results = await asyncio.gather(*coros)
# L1 / SR-61: return_exceptions=True so ONE agent's failure
# (e.g. a stuck-detection abort raising) doesn't discard every
# other agent's result. Sequential mode isolates per-agent
# (try/raise above); parallel now degrades gracefully too — a
# failed agent contributes an error marker, the rest still return.
raw = await asyncio.gather(*coros, return_exceptions=True)
results = []
for (agent, _task), r in zip(pairs, raw):
if isinstance(r, Exception):
_audit("agent_finish", agent=agent.name, outcome="error",
error_type=type(r).__name__, error=str(r)[:500])
results.append(f"[{agent.name} failed: {type(r).__name__}: {r}]")
else:
results.append(r)
final = "\n\n---\n\n".join(results)
else:
final = f"Unknown crew mode: {self.mode}"
Expand Down
124 changes: 124 additions & 0 deletions tests/test_agents_reliability_l1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""L1 — regression tests for codec_agents.py reliability fixes (review sweep).

- parallel crews degrade gracefully (one agent's exception no longer sinks all)
- a hanging tool is bounded by a per-tool wall-clock budget (no infinite hang)
- the Crew allowlist still strips out-of-allowlist tools (tautology simplification)
- the dead eager SERPER_API_KEY module global is gone
"""
from __future__ import annotations

import asyncio
import sys
import time
from pathlib import Path

_REPO = Path(__file__).resolve().parents[1]
if str(_REPO) not in sys.path:
sys.path.insert(0, str(_REPO))

import codec_agents # noqa: E402
from codec_agents import Agent, Crew, Tool # noqa: E402


# ── parallel crew graceful degradation ─────────────────────────────────────
def test_parallel_crew_survives_one_agent_exception():
good = Agent(name="Good", role="r", tools=[])
bad = Agent(name="Bad", role="r", tools=[])

async def _good_run(task, context="", callback=None):
return "good-result"

async def _bad_run(task, context="", callback=None):
raise RuntimeError("boom")

good.run = _good_run
bad.run = _bad_run

crew = Crew(agents=[good, bad], tasks=["t1", "t2"], mode="parallel")
final = asyncio.run(crew.run())

# the good agent's result survives; the bad one becomes an error marker
assert "good-result" in final
assert "Bad failed" in final
assert "RuntimeError" in final


def test_parallel_crew_all_succeed_still_joins():
a = Agent(name="A", role="r", tools=[])
b = Agent(name="B", role="r", tools=[])

async def _ra(task, context="", callback=None):
return "ra"

async def _rb(task, context="", callback=None):
return "rb"

a.run, b.run = _ra, _rb
crew = Crew(agents=[a, b], tasks=["t", "t"], mode="parallel")
final = asyncio.run(crew.run())
assert "ra" in final and "rb" in final
assert "failed" not in final


# ── per-tool timeout ───────────────────────────────────────────────────────
def test_hanging_tool_is_bounded(monkeypatch):
# shrink the budget so the test is fast; the tool blocks longer than that
monkeypatch.setattr(codec_agents, "_TOOL_CALL_TIMEOUT_SECONDS", 0.3)

def _slow(_s):
time.sleep(2.0) # exceeds the 0.3s budget
return "never"

tool = Tool(name="slow", description="blocks", fn=_slow)
agent = Agent(name="T", role="r", tools=[tool])

# Measure the COROUTINE's own return time, not asyncio.run() — the latter
# also waits for the abandoned (non-killable) executor thread to finish its
# 2s sleep at loop teardown. The fix's contract is that the *agent* regains
# control at the budget, which is what run_until_complete returning proves.
loop = asyncio.new_event_loop()
try:
t0 = time.time()
out = loop.run_until_complete(agent._execute_tool_with_hooks(tool, "slow", "x"))
elapsed = time.time() - t0 # captured BEFORE loop.close() joins the thread
finally:
loop.close()

assert "timed out" in out.lower(), f"expected timeout message, got: {out!r}"
assert elapsed < 1.5, f"wait_for did not bound the hang (took {elapsed:.1f}s)"


def test_fast_tool_returns_normally(monkeypatch):
monkeypatch.setattr(codec_agents, "_TOOL_CALL_TIMEOUT_SECONDS", 5)
tool = Tool(name="echo", description="echo", fn=lambda s: f"got:{s}")
agent = Agent(name="T", role="r", tools=[tool])
out = asyncio.run(agent._execute_tool_with_hooks(tool, "echo", "hi"))
assert out == "got:hi"


# ── Crew allowlist still strips (tautology simplification) ──────────────────
def test_crew_allowlist_strips_out_of_scope_tools():
keep = Tool(name="keep", description="", fn=lambda s: s)
drop = Tool(name="drop", description="", fn=lambda s: s)
agent = Agent(name="A", role="r", tools=[keep, drop])
Crew(agents=[agent], tasks=["t"], allowed_tools=["keep"])
names = {t.name for t in agent.tools}
assert names == {"keep"}, f"allowlist scoping broken: {names}"


def test_crew_no_allowlist_keeps_all_tools():
a_tool = Tool(name="a", description="", fn=lambda s: s)
b_tool = Tool(name="b", description="", fn=lambda s: s)
agent = Agent(name="A", role="r", tools=[a_tool, b_tool])
Crew(agents=[agent], tasks=["t"], allowed_tools=None)
assert {t.name for t in agent.tools} == {"a", "b"}


# ── dead global removed ─────────────────────────────────────────────────────
def test_dead_serper_global_removed():
assert not hasattr(codec_agents, "SERPER_API_KEY"), (
"the eager SERPER_API_KEY module global should be gone (it was unused "
"and did a Keychain shellout at import)"
)
# the live getter stays
assert hasattr(codec_agents, "_serper_api_key")