Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions codec_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1113,16 +1113,23 @@ async def _shutdown_services():
_bg_tasks.clear()
log.info("[SHUTDOWN] All background services stopped")
import routes._shared as _shared
global _qchat_conn, _vibe_conn
# M-5 (PR-4J): get_db() is now per-thread; close all of them via the registry.
_shared._close_all_db_conns()
for conn in (_qchat_conn, _vibe_conn): # dashboard-local singletons
if conn is not None:
# J1 fix: the qchat / vibe DB singletons moved to their route modules in
# D1/D2. The old code declared them `global` and read them here, but they
# were never module-level names in codec_dashboard anymore → the shutdown
# handler raised NameError before closing anything. Close them where they
# actually live now.
import routes.qchat as _qchat_mod
import routes.vibe as _vibe_mod
for _mod, _attr in ((_qchat_mod, "_qchat_conn"), (_vibe_mod, "_vibe_conn")):
_conn = getattr(_mod, _attr, None)
if _conn is not None:
try:
conn.close()
_conn.close()
except Exception:
pass
_qchat_conn = _vibe_conn = None
setattr(_mod, _attr, None)


# E2 health → moved to routes/*.py
Expand Down
88 changes: 82 additions & 6 deletions routes/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,51 @@



def _url_host_is_public(url: str) -> bool:
"""SSRF guard: True only if `url` is http(s) AND every IP its host resolves
to is a public, routable address. Rejects loopback / private / link-local
(incl. 169.254.169.254 cloud-metadata) / reserved / multicast / unspecified.

J1 (re-audit, CWE-918): `_enrich_messages` auto-fetches URLs found in chat
content — the prompt-injection vector. Without this an injected link could
drive server-side GETs against `http://127.0.0.1:8083/...` or other local
`~/.codec` services. CODEC is loopback-only by default, but this keeps the
`dashboard_host: 0.0.0.0` opt-in safe. (Residual: DNS-rebinding TOCTOU
between this check and httpx's own resolve is accepted for a local app.)
"""
import ipaddress
import socket
from urllib.parse import urlparse
try:
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
return False
host = parsed.hostname
if not host:
return False
infos = socket.getaddrinfo(host, parsed.port or (443 if parsed.scheme == "https" else 80),
proto=socket.IPPROTO_TCP)
if not infos:
return False
for info in infos:
ip = ipaddress.ip_address(info[4][0])
if (ip.is_private or ip.is_loopback or ip.is_link_local
or ip.is_reserved or ip.is_multicast or ip.is_unspecified):
return False
return True
except Exception as e:
log.warning(f"URL host validation failed ({url}): {e}")
return False


def _fetch_url_content(url: str, max_chars: int = 8000) -> str:
"""Fetch a URL and return stripped text content."""
"""Fetch a URL and return stripped text content.

SSRF-hardened (J1): the host is validated as public BEFORE the fetch, and
redirects are followed manually (≤5 hops) so each Location is re-validated
— `follow_redirects=True` would let a public URL 30x-redirect to an
internal one, defeating the pre-check.
"""
try:
import httpx
from html.parser import HTMLParser
Expand All @@ -70,9 +113,25 @@ def handle_data(self, data):
if stripped:
self.chunks.append(stripped)

r = httpx.get(url, timeout=15, follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"})
headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"}
cur = url
r = None
with httpx.Client(timeout=15, follow_redirects=False) as client:
for _hop in range(5):
if not _url_host_is_public(cur):
log.warning(f"URL fetch blocked (non-public host): {cur}")
return ""
r = client.get(cur, headers=headers)
if r.is_redirect and "location" in r.headers:
cur = str(r.url.join(r.headers["location"]))
continue
break
else:
log.warning(f"URL fetch aborted (too many redirects): {url}")
return ""
if r is None:
return ""
if 'text/html' in r.headers.get('content-type', ''):
parser = _Stripper()
parser.feed(r.text)
Expand Down Expand Up @@ -226,7 +285,10 @@ def _enrich_messages(messages: list, config: dict, force_search: bool = False) -
try:
import sys
import os as _os
repo_dir = _os.path.dirname(_os.path.abspath(__file__))
# J1 fix: this module lives in routes/ now, so the repo root (where
# codec_search.py is) is TWO levels up, not one. The pre-extraction
# original was at repo root → single dirname. Match web_search.py.
repo_dir = _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__)))
if repo_dir not in sys.path:
sys.path.insert(0, repo_dir)
from codec_search import search, format_results
Expand Down Expand Up @@ -561,10 +623,16 @@ async def chat_completion(request: Request):
correlation_id=secrets.token_hex(6),
)

# Bind before the use_tools gate so the non-stream / system-prompt paths
# below never hit an UnboundLocalError when a client sends {"tools": false}
# (re-audit J1: was a silent opaque 500 — _build_chat_system_prompt is
# called with both names regardless of the tools flag).
last_user_text = ""
has_attachment = False

# ── Tool Calling: check if last user message matches a skill ──
use_tools = body.get("tools", True) # frontend can disable with tools:false
if use_tools:
last_user_text = ""
for m in reversed(messages):
if m.get("role") == "user" and isinstance(m.get("content"), str):
last_user_text = m["content"]
Expand Down Expand Up @@ -792,6 +860,14 @@ def _resolve_skill_tag(raw_tag):
)
except Exception:
pass
else:
# J1 parity: a non-allowlisted skill name is never executed
# (the invariant holds via the two branches above) AND its raw
# tag is stripped — the streaming path's _resolve_skill_tag
# already drops disallowed tags; the non-stream path used to
# leave "[SKILL:foo:...]" visible in the chat bubble.
log.info(f"[Chat] LLM tried disallowed skill {s_name!r} (non-stream) — dropping tag")
answer = answer.replace(skill_tag.group(0), "")

return {"response": answer, "model": model}
except Exception as e:
Expand Down
17 changes: 15 additions & 2 deletions routes/vibe_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,26 @@ async def run_code(request: Request):
body = await request.json()
code = body.get("code", "")
language = body.get("language", "python")
body.get("filename", "script.py")
if not code.strip():
return JSONResponse({"error": "No code"}, status_code=400)
from codec_config import is_dangerous
if is_dangerous(code):
return JSONResponse({"error": "Blocked: code contains dangerous pattern"}, status_code=403)
ext_map = {"python": ".py", "javascript": ".js", "typescript": ".ts", "bash": ".sh", "go": ".go", "rust": ".rs", "java": ".java", "cpp": ".cpp", "swift": ".swift", "ruby": ".rb", "sql": ".sql"}
# J1: reject languages we can't actually run instead of silently feeding a
# .java/.cpp/.sql file to python3.13 (ext_map had more langs than cmd_map).
cmd_template = {
"python": ["python3.13"],
"javascript": ["node"],
"typescript": ["npx", "ts-node"],
"bash": ["bash"],
"go": ["go", "run"],
"rust": ["rustc"], # special-cased below
"swift": ["swift"],
"ruby": ["ruby"],
}
if language not in cmd_template:
return JSONResponse({"error": f"Unsupported language: {language}"}, status_code=400)
ext_map = {"python": ".py", "javascript": ".js", "typescript": ".ts", "bash": ".sh", "go": ".go", "rust": ".rs", "swift": ".swift", "ruby": ".rb"}
ext = ext_map.get(language, ".txt")
tmp = tempfile.NamedTemporaryFile(suffix=ext, delete=False, mode="w")
tmp.write(code)
Expand Down
112 changes: 112 additions & 0 deletions tests/test_review_fixes_j1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""J1 — regression tests for the post-refactor review findings.

Covers the real bugs surfaced by the code-review / security-review sweep after
the route-extraction series:

1. SSRF guard on _fetch_url_content (chat URL auto-fetch — the injection vector)
2. UnboundLocalError on POST /api/chat with {"tools": false}
3. _enrich_messages repo_dir is two-levels-up (codec_search lives at repo root)
4. _shutdown_services no longer NameErrors on the moved _qchat_conn/_vibe_conn
5. /api/run_code rejects unsupported languages instead of running them as python
"""
from __future__ import annotations

import asyncio
import inspect
import sys
from pathlib import Path

import pytest

_REPO = Path(__file__).resolve().parents[1]
if str(_REPO) not in sys.path:
sys.path.insert(0, str(_REPO))


# ── 1. SSRF guard ──────────────────────────────────────────────────────────
class TestSSRFGuard:
@pytest.mark.parametrize("url", [
"http://127.0.0.1:8083/v1/chat/completions", # local LLM
"http://localhost/admin",
"http://169.254.169.254/latest/meta-data/", # cloud metadata
"http://192.168.1.10/x", # private LAN
"http://10.0.0.5/x", # private
"http://[::1]/x", # ipv6 loopback
"ftp://example.com/x", # non-http scheme
"file:///etc/passwd", # file scheme
"http://0.0.0.0/x", # unspecified
])
def test_blocks_internal_and_non_http(self, url):
import routes.chat as c
assert c._url_host_is_public(url) is False, f"{url} should be blocked"

def test_allows_public_numeric(self):
import routes.chat as c
# 1.1.1.1 is a public, routable address — no DNS needed.
assert c._url_host_is_public("https://1.1.1.1/") is True

def test_fetch_returns_empty_for_blocked_host(self):
"""_fetch_url_content must short-circuit to '' for a non-public host —
no httpx call is made (we'd get a connection, not a block, otherwise)."""
import routes.chat as c
assert c._fetch_url_content("http://127.0.0.1:8083/secret") == ""


# ── 2. tools:false must not UnboundLocalError ──────────────────────────────
def test_chat_bindings_hoisted_before_use_tools_gate():
import routes.chat as c
src = inspect.getsource(c.chat_completion)
i_lut = src.index('last_user_text = ""')
i_ha = src.index("has_attachment = False")
i_gate = src.index("if use_tools:")
assert i_lut < i_gate, "last_user_text must be bound before the use_tools gate"
assert i_ha < i_gate, "has_attachment must be bound before the use_tools gate"


# ── 3. _enrich_messages repo_dir resolves to repo ROOT (two dirnames) ──────
def test_enrich_messages_repo_dir_is_two_levels_up():
src = (_REPO / "routes" / "chat.py").read_text()
# the codec_search sys.path insert must climb to the repo root, not routes/
assert "_os.path.dirname(_os.path.dirname(_os.path.abspath(__file__)))" in src


# ── 4. shutdown handler runs clean (no NameError on moved singletons) ──────
def test_shutdown_services_no_nameerror():
import codec_dashboard as cd
# Must complete without raising NameError for _qchat_conn / _vibe_conn.
asyncio.run(cd._shutdown_services())


def test_dashboard_no_dead_global_singletons():
src = (_REPO / "codec_dashboard.py").read_text()
assert "global _qchat_conn, _vibe_conn" not in src, (
"dead `global _qchat_conn, _vibe_conn` should be gone — they live in "
"routes/qchat.py + routes/vibe.py now"
)


# ── 5. /api/run_code rejects unsupported languages ─────────────────────────
class _FakeReq:
def __init__(self, payload):
self._payload = payload

async def json(self):
return self._payload


def test_run_code_rejects_unsupported_language():
import routes.vibe_exec as ve
resp = asyncio.run(ve.run_code(_FakeReq({"code": "SELECT 1;", "language": "sql"})))
# JSONResponse with 400 — sql isn't runnable, must not fall through to python3.13
assert getattr(resp, "status_code", None) == 400


def test_run_code_still_accepts_python():
import routes.vibe_exec as ve
# empty-code guard returns 400 too, but a real python snippet must NOT be
# rejected as "unsupported language" — assert the body differs.
resp = asyncio.run(ve.run_code(_FakeReq({"code": "", "language": "python"})))
# empty code → 400 "No code", NOT "Unsupported language"
import json as _json
body = _json.loads(bytes(resp.body).decode())
assert "Unsupported language" not in body.get("error", "")