diff --git a/AGENTS.md b/AGENTS.md index 276da4c..e95227d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -56,7 +56,7 @@ docs/ API.md, MCP_HTTP_SETUP.md, CONTEXT_REPORT.md, desig Other engine modules (`codec_overlays`, `codec_metrics`, `codec_logging`, `codec_gdocs`, `codec_google_auth`, `codec_cdp`, `codec_llm_proxy`, `codec_retry`, `codec_alerts`, `codec_search`, `codec_textassist`, `codec_watcher`, `codec_watchdog`) are internal helpers — read them when you need them, but they're not part of the navigation surface for an agent making structural changes. (Keyboard handling — wake word, F13 toggle, F18 voice, double-tap — lives **inline in `codec.py`** in the `codec` PM2 process; the old standalone `codec_keyboard.py` was deleted as a dead duplicate per A-8.) -**Canonical LLM + vision helpers (PR-3E, A-11/A-12).** `codec_vision.py` is the SINGLE source for screen-vision (`describe_sync` / `describe_async`, Gemini-flash → local-Qwen-VL fallback, config read live from `codec_config`) — used by `codec.py`, `codec_voice`, `codec_session`. `codec_llm.py` is the canonical chat/completions caller (`call()` + `strip_think`/`extract_content` — headers, Bearer auth, `enable_thinking`, `` strip, `choices/reasoning` parse, retry+backoff, never-raises). NOTE: `codec_llm_proxy.py` is a priority *queue* (semaphore), NOT an HTTP caller — don't confuse the two. A-12 is migrating the ~45 inline `chat/completions` sites onto `codec_llm` in phased tranches; codec.py voice-reply + `codec_session.qwen_call` are done, streaming (`codec_llm.stream()`) + the rest are pending. +**Canonical LLM + vision helpers (PR-3E, A-11/A-12).** `codec_vision.py` is the SINGLE source for screen-vision (`describe_sync` / `describe_async`, Gemini-flash → local-Qwen-VL fallback, config read live from `codec_config`) — used by `codec.py`, `codec_voice`, `codec_session`. `codec_llm.py` is the canonical chat/completions caller (`call()` + `strip_think`/`extract_content` — headers, Bearer auth, `enable_thinking`, `` strip, `choices/reasoning` parse, retry+backoff, never-raises). NOTE: `codec_llm_proxy.py` is a priority *queue* (semaphore), NOT an HTTP caller — don't confuse the two. A-12 is migrating the ~45 inline `chat/completions` sites onto `codec_llm` in phased tranches. Done: `codec_llm.call()` (non-stream) + `stream()` (sync SSE generator, yields raw deltas, never-raises); migrated sites = codec.py voice-reply, `codec_session.qwen_call` + `qwen_stream`, `codec_compaction`, `codec_dictate`. Pending tranches: 2c raise-mode (`codec_llm.call(raise_on_error=True)` for agent_plan/runner + textassist + the regen script — they MUST fail loud, never-raise would silently paste empty / write empty), an async `astream()` for voice `_stream_qwen` + agents (queue stays at the call site — `codec_llm` never owns the semaphore), dashboard (4 non-stream + the `[SKILL:…]` stream tag-machine, which keeps its own parser and consumes only `stream()`'s raw tokens), bridges, and a skills tranche. ## 3. Agent + Crew runtime diff --git a/codec_compaction.py b/codec_compaction.py index 34637f2..f6029a4 100644 --- a/codec_compaction.py +++ b/codec_compaction.py @@ -49,19 +49,13 @@ def compact_context(recent_messages: list, max_recent: int = 5, max_summary_toke summary = None try: - import httpx - base_url = _LLM_BASE_URL - model = _LLM_MODEL - api_key = _LLM_API_KEY - llm_kwargs = _LLM_KWARGS - - headers = {"Content-Type": "application/json"} - if api_key: - headers["Authorization"] = f"Bearer {api_key}" - - payload = { - "model": model, - "messages": [ + # A-12 (PR-3E-2): canonical codec_llm.call replaces the inline httpx + # POST + parse. Never raises -> "" on failure, which the fallback below + # already handles. (Also now sends enable_thinking=False + strips any + # leak, so summaries are slightly cleaner than the old path.) + import codec_llm + summary = codec_llm.call( + [ { "role": "system", "content": ( @@ -72,19 +66,11 @@ def compact_context(recent_messages: list, max_recent: int = 5, max_summary_toke }, {"role": "user", "content": old_text}, ], - "max_tokens": max_summary_tokens, - "temperature": 0.1, - } - payload.update(llm_kwargs) - - r = httpx.post( - f"{base_url}/chat/completions", - json=payload, - headers=headers, - timeout=15, + base_url=_LLM_BASE_URL, model=_LLM_MODEL, api_key=_LLM_API_KEY, + max_tokens=max_summary_tokens, temperature=0.1, timeout=15, + extra_kwargs=_LLM_KWARGS, ) - if r.status_code == 200: - summary = r.json()["choices"][0]["message"]["content"].strip() + if summary: log.info(f"Context compacted: {len(old_messages)} old msgs → {len(summary)} char summary") except Exception as e: log.warning(f"Compaction LLM failed, using fallback: {e}") diff --git a/codec_dictate.py b/codec_dictate.py index 8b9479e..cf5a622 100644 --- a/codec_dictate.py +++ b/codec_dictate.py @@ -397,28 +397,27 @@ def transcribe_and_type(audio_path): body = text[_draft_match.end():].strip() if body: try: - import requests as _req + # A-12 (PR-3E-2): canonical codec_llm.call. Never raises -> + # "" on any failure (non-200, conn error, empty), which maps + # exactly onto the old "use raw body" fallback for every + # non-success branch. + import codec_llm print("[DICTATE] Draft mode — refining with Qwen...") - r = _req.post("http://localhost:8083/v1/chat/completions", - json={"model": "mlx-community/Qwen3.6-35B-A3B-4bit", - "messages": [ - {"role": "system", "content": "Rewrite the user message as a polished, professional message. Output ONLY the final text. No preamble, no explanation."}, - {"role": "user", "content": body} - ], - "max_tokens": 300, "temperature": 0.3, - "chat_template_kwargs": {"enable_thinking": False}}, - timeout=15) - if r.status_code == 200: - refined = r.json()["choices"][0]["message"]["content"].strip() - if refined: - text = refined - print(f"[DICTATE] Refined: {text}") - else: - text = body - print("[DICTATE] Qwen returned empty, using raw body") + refined = codec_llm.call( + [ + {"role": "system", "content": "Rewrite the user message as a polished, professional message. Output ONLY the final text. No preamble, no explanation."}, + {"role": "user", "content": body}, + ], + base_url="http://localhost:8083/v1", + model="mlx-community/Qwen3.6-35B-A3B-4bit", + max_tokens=300, temperature=0.3, timeout=15, + ) + if refined: + text = refined + print(f"[DICTATE] Refined: {text}") else: text = body - print(f"[DICTATE] Qwen HTTP {r.status_code}, using raw body") + print("[DICTATE] Qwen unavailable or empty, using raw body") except Exception as qe: text = body print(f"[DICTATE] Qwen error: {qe}, using raw body") diff --git a/codec_llm.py b/codec_llm.py index 40760ff..d0c3a63 100644 --- a/codec_llm.py +++ b/codec_llm.py @@ -23,7 +23,7 @@ import logging import re import time -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Iterator, List, Optional log = logging.getLogger("codec.llm") @@ -55,6 +55,37 @@ def extract_content(response_json: Dict[str, Any]) -> str: return "" +def _build_request( + messages: List[Dict[str, Any]], + *, + model: str, + api_key: str, + max_tokens: int, + temperature: float, + enable_thinking: bool, + extra_kwargs: Optional[Dict[str, Any]], + stream: bool = False, +) -> tuple[Dict[str, str], Dict[str, Any]]: + """Build the (headers, payload) for an OpenAI-style chat/completions request. + Shared by call() and stream() so headers/auth/payload shape never drift. + The `stream` flag is applied LAST so extra_kwargs can't clobber it.""" + headers = {"Content-Type": "application/json"} + if api_key: + headers["Authorization"] = "Bearer " + api_key + payload: Dict[str, Any] = { + "model": model, + "messages": messages, + "max_tokens": max_tokens, + "temperature": temperature, + "chat_template_kwargs": {"enable_thinking": enable_thinking}, + } + if extra_kwargs: + payload.update(extra_kwargs) + if stream: + payload["stream"] = True + return headers, payload + + def call( messages: List[Dict[str, Any]], *, @@ -76,18 +107,11 @@ def call( Never raises — network/parse errors are logged and yield "". """ import requests - headers = {"Content-Type": "application/json"} - if api_key: - headers["Authorization"] = "Bearer " + api_key - payload: Dict[str, Any] = { - "model": model, - "messages": messages, - "max_tokens": max_tokens, - "temperature": temperature, - "chat_template_kwargs": {"enable_thinking": enable_thinking}, - } - if extra_kwargs: - payload.update(extra_kwargs) + headers, payload = _build_request( + messages, model=model, api_key=api_key, max_tokens=max_tokens, + temperature=temperature, enable_thinking=enable_thinking, + extra_kwargs=extra_kwargs, + ) attempts = max(1, retries) url = base_url.rstrip("/") + "/chat/completions" @@ -106,3 +130,64 @@ def call( if attempt < attempts - 1: time.sleep(2 ** attempt) return "" + + +def stream( + messages: List[Dict[str, Any]], + *, + base_url: str, + model: str, + api_key: str = "", + max_tokens: int = 500, + temperature: float = 0.7, + timeout: float = 120.0, + enable_thinking: bool = False, + extra_kwargs: Optional[Dict[str, Any]] = None, +) -> Iterator[str]: + """POST with `stream=True` and yield the RAW assistant content deltas in + order. Centralizes the SSE plumbing: header/payload build (shared with + call()), `data: ` framing, the `[DONE]` sentinel, `choices[0].delta.content` + extraction, and per-chunk parse tolerance. + + Think-stripping is intentionally NOT done here — callers that show tokens + live (e.g. codec_session.qwen_stream) strip `` on the accumulated + result, and the dashboard owns its own cross-chunk tag machine. Never + raises: on connect/HTTP/parse error it logs and stops yielding, so the + caller sees a short/empty stream and applies its own fallback. + """ + import json as _json + import requests + headers, payload = _build_request( + messages, model=model, api_key=api_key, max_tokens=max_tokens, + temperature=temperature, enable_thinking=enable_thinking, + extra_kwargs=extra_kwargs, stream=True, + ) + url = base_url.rstrip("/") + "/chat/completions" + try: + with requests.post(url, json=payload, headers=headers, + timeout=timeout, stream=True) as r: + if r.status_code != 200: + log.warning("LLM stream %s returned %s: %s", + url, r.status_code, getattr(r, "text", "")[:200]) + return + for line in r.iter_lines(): + if not line: + continue + if isinstance(line, (bytes, bytearray)): + line = line.decode("utf-8", "replace") + if not line.startswith("data: "): + continue + data = line[6:] + if data.strip() == "[DONE]": + return + try: + delta = (_json.loads(data).get("choices", [{}])[0] + .get("delta", {}).get("content", "")) + except Exception as e: + log.warning("LLM stream chunk parse failed: %s", e) + continue + if delta: + yield delta + except Exception as e: + log.warning("LLM stream call failed: %s", e) + return diff --git a/codec_session.py b/codec_session.py index 342a35b..c8fac10 100644 --- a/codec_session.py +++ b/codec_session.py @@ -256,47 +256,29 @@ def qwen_call(self, messages): ) def qwen_stream(self, messages): - import requests + # A-12 (PR-3E-2): canonical codec_llm.stream replaces the inline SSE + # POST + iter_lines + delta parse. stream() yields RAW deltas (it never + # raises), so we keep the live stdout write here and strip_think() the + # accumulated result — exact parity. An empty stream (non-200, conn + # error, or no content) falls back to the migrated non-streaming + # qwen_call (slightly broader than the old non-200-only fallback). + import codec_llm try: - headers = {"Content-Type": "application/json"} - if self.llm_api_key: - headers["Authorization"] = "Bearer " + self.llm_api_key - payload = { - "model": self.qwen_model, - "messages": messages, - "max_tokens": 500, - "temperature": 0.5, - "stream": True, - } - payload.update(self.llm_kwargs) - r = requests.post( - self.qwen_base_url + "/chat/completions", - json=payload, - headers=headers, - timeout=90, - stream=True, - ) - if r.status_code != 200: - return self.qwen_call(messages) full = "" - for line in r.iter_lines(): - if not line: - continue - line = line.decode("utf-8") - if line.startswith("data: "): - d = line[6:] - if d.strip() == "[DONE]": - break - try: - delta = json.loads(d).get("choices", [{}])[0].get("delta", {}).get("content", "") - if delta: - sys.stdout.write(delta) - sys.stdout.flush() - full += delta - except Exception as e: - log.warning(f"Stream chunk parse failed: {e}") + got = False + for delta in codec_llm.stream( + messages, base_url=self.qwen_base_url, model=self.qwen_model, + api_key=self.llm_api_key, max_tokens=500, temperature=0.5, + timeout=90, extra_kwargs=self.llm_kwargs, + ): + got = True + sys.stdout.write(delta) + sys.stdout.flush() + full += delta print() - return strip_think(full).strip() + if got: + return strip_think(full).strip() + return self.qwen_call(messages) except Exception as e: log.warning(f"Streaming LLM call failed, falling back to non-streaming: {e}") return self.qwen_call(messages) diff --git a/docs/PR3E2-LLM-STREAM-TRANCHE2-DESIGN.md b/docs/PR3E2-LLM-STREAM-TRANCHE2-DESIGN.md new file mode 100644 index 0000000..79d5625 --- /dev/null +++ b/docs/PR3E2-LLM-STREAM-TRANCHE2-DESIGN.md @@ -0,0 +1,94 @@ +# PR-3E-2 — A-12 tranche 2: `codec_llm.stream()` + migrate remaining `chat/completions` sites (DESIGN) + +**Status:** IMPLEMENTED — **Option 1**. Shipped: `codec_llm.stream()` (sync generator, raw deltas, never-raises) + `_build_request` shared by `call()`/`stream()`; migrated `codec_session.qwen_stream` (streaming proof), `codec_compaction.compact_context`, `codec_dictate` draft-refine. **textassist + regen deferred to 2c** (raise-on-error contract — see refinement note in §4). 14 new tests (`tests/test_llm_stream.py`), full suite zero new failures. +**Finding:** A-12 (continuation). PR-3E shipped `codec_llm.call()` (non-streaming) + migrated 2 sites. This designs the **streaming** keystone `codec_llm.stream()` and the phased migration of the **22 remaining text-chat sites**. +**Wave:** 3. Hottest path in the repo → design-first + small reviewable tranches, never a big-bang. + +--- + +## 1. Ground-truth inventory (22 unique text-chat sites) + +A full read-the-source survey (not just grep) found **22 live inline `chat/completions` text-chat sites** remaining after PR-3E. Key facts: + +- **No separate cloud client.** `codec_ava_client.py` does **not** exist in this worktree — "cloud" is purely a runtime config repoint of `llm_base_url`/`api_key` to an OpenAI-compatible endpoint. So `codec_llm.call/stream` serves local **and** cloud as long as `base_url`/`api_key` pass through. No special cloud branch needed. +- **3 streaming, 19 non-streaming.** **2 async** (`codec_voice._stream_qwen` httpx, `codec_agents.Agent.run` httpx); the rest sync `requests` (one sync-httpx: `codec_compaction`). +- **2 queue-coupled.** `codec_voice._stream_qwen` (`llm_queue.acquire(CRITICAL)` @593/621) and `codec_agents.Agent.run` (`MEDIUM` @439/448). The semaphore must stay **at the call site** — `codec_llm` must never own queue acquisition. +- **Vision sites are out of scope.** Dashboard (1069/1208/1680/2666), telegram/imessage `process_image`, `codec_watcher.screenshot_ctx`, `skills/screenshot_text` still hand-roll vision POSTs — those are the **A-11** dedup target (codec_vision), not A-12. `skills/mouse_control` UI-TARS is a different model entirely → leave as-is. + +### Site map (codec_llm targets) +| Subsystem | Sites | Stream | Notes | +|---|---|---|---| +| dashboard | 6 | 2 of 6 | `:2854` stream is the hard one (skill-tag machine); `:973`,`:2464`,`:2975`,`:3167` non-stream | +| voice | 1 | yes | `_stream_qwen` — async + queue CRITICAL | +| session | 1 | yes | `qwen_stream` — sync, already falls back to migrated `qwen_call` | +| agents | 1 | no | `Agent.run` — async + queue MEDIUM | +| agent_plan / agent_runner | 2 | no | **RAISE** `QwenUnavailableError` (opposite of call()'s never-raise) | +| bridges telegram / imessage | 2 | no | identical OpenAI shape, Timeout branch | +| compaction / self_improve / watcher / textassist / dictate | 5 | no | each one isolated POST; some have bespoke retry | +| skills/scripts (translate, fact_extract, create_skill, skill_forge, regen script) | 5 | no | skill files → manifest regen on edit | + +## 2. The keystone — `codec_llm.stream()` (this PR) + +```python +def stream( + messages, *, base_url, model, api_key="", max_tokens=500, temperature=0.7, + timeout=120.0, enable_thinking=False, extra_kwargs=None, +) -> Iterator[str]: + """POST with stream=True and yield raw assistant content deltas. + + Handles internally: header/payload build (shared with call()), the SSE + framing (`data: ` prefix, `[DONE]` sentinel), `choices[0].delta.content` + extraction, and per-chunk error tolerance. Yields the RAW content deltas + in order. Never raises — on connect/HTTP/parse error it logs and stops + yielding (caller sees a short/empty stream and applies its own fallback).""" +``` + +**Design decisions:** +- **Sync generator now; async deferred.** Only `codec_session.qwen_stream` (sync) consumes it this PR. An async `astream()` (for voice/agents) lands with the voice tranche (2e) — building it now without a consumer is YAGNI. +- **Yields RAW content deltas — think-stripping stays with the caller.** *(Revised from the initial draft, which had `stream()` strip `` internally.)* Rationale: `qwen_stream` writes each delta to stdout **live** and only `strip_think`s the *accumulated* result — so internal stripping would silently drop the reasoning it currently shows live (a parity break), and a streaming tag-stripper needs fiddly partial-tag buffering. Yielding raw gives **exact parity** for `qwen_stream` (caller does `for tok: stdout.write(tok); full += tok` then `strip_think(full)`), is simpler/safer for the first streaming API, and matches reality — the dashboard already owns its own cross-chunk `` + `[SKILL:...]` machine (§3.1). `stream()` still centralizes all the real boilerplate (headers/payload, `requests.post(stream=True)`, SSE framing, never-raise). +- **Never raises** (parity with `call()`). The raise-on-failure sites get a separate contract (§3.3), not this PR. +- **Shared internals.** `stream()` reuses `call()`'s header/payload builder (extract a private `_build_request(...)`). + +## 3. Three hard constraints (how each future tranche handles them) + +### 3.1 dashboard `:2854` — the skill-tag stream machine (defer to its own PR) +Mid-stream char-by-char `[SKILL:...]` buffering (5000-cap, prefix validation), cross-chunk `` state, SSE keepalive comments, `[DONE]` re-framing, blank-bubble fallback. **Plan:** `stream()` yields clean tokens; the dashboard keeps its tag machine and SSE re-emit, consuming `stream()` only for the raw-line→token layer. Migrated **last** (tranche 2d/2e), never absorbed into codec_llm. + +### 3.2 async + queue coupling (voice, agents) +Both wrap the POST in `llm_queue.acquire/release` and use a shared module httpx client. **Plan:** `codec_llm.astream()`/an async `call` accepts an injected client (`http=`, like `codec_vision.describe_async`) and the **caller keeps** acquire/release around the codec_llm call. codec_llm never touches the semaphore. Deferred to tranche 2e (voice) / a small agents PR. + +### 3.3 raise-on-failure (agent_plan, agent_runner) +These RAISE `QwenUnavailableError`; `call()` never raises. **Plan (tranche 2c):** add `raise_on_error: bool = False` to `codec_llm.call` — when True, re-raise a typed `LLMUnavailableError` (or the caller maps a sentinel). A design decision, not a swap → its own PR. **Not this PR.** + +## 4. Recommended scope for THIS PR (tranche 2a) + +**Build the keystone, prove it on the simplest consumer, clear the easy non-streaming wins:** +1. **New `codec_llm.stream()`** (sync generator, §2) + refactor `call()`/`stream()` to share `_build_request`. +2. **Migrate the simplest streaming site:** `codec_session.qwen_stream` → consume `codec_llm.stream()` (keeps its stdout write + fallback to the already-migrated `qwen_call` on empty/error). Proof-of-API. +3. **Migrate the genuinely-clean non-streaming trivials:** `codec_compaction.py` (already has a fallback summary on empty), `codec_dictate.py` (every failure mode already collapses to "use raw body" — exactly what `call()`'s never-raise → `""` maps to). + +> **Refinement during implementation (read-the-source).** The approved Option 1 listed 4 trivials; reading the actual downstreams moved **2 of them to tranche 2c**: +> - **`codec_textassist.py`** RAISEs on LLM failure, and its caller's `except` shows an *Error* overlay. With `call()`'s never-raise → `""`, the success path would `pbcopy ""` + ⌘V, **pasting empty over the user's selection** and showing "Text replaced!" — a destructive regression. Needs `raise_on_error`. +> - **`scripts/regen_skill_descriptions.py`** uses `raise_for_status()` (fail-loud dev script); never-raise would silently write empty descriptions. Needs `raise_on_error`. +> +> Both have the **same raise-on-failure contract** as `agent_plan`/`agent_runner`, so they migrate together in **tranche 2c** when `codec_llm.call` gains `raise_on_error: bool`. Deferring honors "never break working code." + +**Explicitly deferred** (each its own tranche/PR): skills/*.py (translate/fact_extract/create_skill/skill_forge — bundled into a "skills tranche" so manifest regen doesn't mix with core), bridges (2b), **raise-on-error sites: textassist + regen + agent_plan/runner (2c, add `raise_on_error` mode)**, dashboard non-stream (2d), voice `_stream_qwen` + dashboard stream + `astream()` (2e). + +### Test plan +- New `tests/test_llm_stream.py`: `stream()` yields the **raw** deltas in order from a fake SSE body (incl. a `` delta — NOT stripped); stops cleanly on `data: [DONE]`; blank / non-`data:` / garbage-JSON lines skipped without raising; HTTP non-200 → empty stream (no raise); connection exception → empty stream (no raise); payload carries `stream: True` + the shared `_build_request` shape (model/messages/max_tokens/temperature/enable_thinking + extra_kwargs). +- Migrated-site tests: `qwen_stream` consumes `stream()`, writes deltas, returns `strip_think(full)`, and falls back to `qwen_call` on empty stream; `codec_compaction.compact_context` and `codec_dictate` draft-refine call `codec_llm.call` with the right base_url/model (monkeypatched), and fall back correctly on `""`. +- Full suite: expect the 23 known-baseline failures, **zero new**. No `skills/` touched → **no manifest regen**. + +## 5. Risk + rollback +- **Blast radius (this PR):** `codec_llm.py` (+stream), `codec_session.py` (qwen_stream), 3 small non-streaming modules + 1 script. Streaming behavior change confined to session (which already falls back to the migrated qwen_call). +- **Rollback:** single-commit revert restores the inline impls. No persistent state touched. +- The hot/hard sites (dashboard stream, voice async, agent raise-mode) are **all deferred** to later small PRs. + +## 6. Open question for you (Mickael) — scope of this PR + +- **Option 1 (recommended):** `stream()` keystone **+** session-stream proof **+** non-streaming core trivials (compaction, textassist, dictate, regen script). ~5 sites + 1 new API. Builds the keystone, proves it, clears easy wins. Skills + bridges + dashboard + voice + raise-mode each follow as their own small PRs. +- **Option 2 (tightest):** `stream()` keystone **+** session-stream proof **only**. Pure API + one consumer, smallest possible review. Non-streaming trivials move to a later batch. +- **Option 3 (bigger):** `stream()` **+** `astream()` **+** all 3 streaming sites (session + voice async + dashboard tag-machine). Most value, but pulls in the two hardest constraints (async+queue, the dashboard skill-tag machine) into one PR — highest risk on the hottest path. + +I recommend **Option 1**. Pick one and I'll implement (TDD) + open the PR (chat-review-then-merge). diff --git a/docs/audits/PHASE-1-CODE-QUALITY.md b/docs/audits/PHASE-1-CODE-QUALITY.md index 1c16749..c17ad1a 100644 --- a/docs/audits/PHASE-1-CODE-QUALITY.md +++ b/docs/audits/PHASE-1-CODE-QUALITY.md @@ -122,7 +122,9 @@ Both scan `SKILLS_DIR` independently, so a skill file is loaded twice in differe ### A-12 — 51 separate `chat/completions` HTTP call sites with copy-pasted payload shapes [MEDIUM] -> **First tranche closed by PR-3E** (Option 2); remainder phased. The audit's premise that `codec_llm_proxy.py` already has a `call()`/`stream()` to reuse was **inaccurate** — that module is a priority *queue*, not an HTTP caller. PR-3E instead built the genuinely-new canonical **`codec_llm.py`** (`call()` non-streaming, plus `strip_think`/`extract_content`: headers, `Bearer` auth, `chat_template_kwargs.enable_thinking`, `` strip, `choices/reasoning` parse, retry+backoff, never-raises). **Migrated this PR:** `codec.py` voice-reply chat + `codec_session.qwen_call`. **Deferred (each its own tranche/PR + design):** `codec_session.qwen_stream` (needs `codec_llm.stream()` SSE generator) + the remaining ~40 sites (dashboard, voice `generate_response`, agents/agent_plan/agent_runner, telegram/imessage bridges, compaction/self_improve/watcher/textassist/dictate). Pinned by `tests/test_llm_vision_dedup.py`. See `docs/PR3E-LLM-VISION-DEDUP-DESIGN.md` §3 (phased plan) + §8 (shipped). +> **First tranche closed by PR-3E** (Option 2); remainder phased. The audit's premise that `codec_llm_proxy.py` already has a `call()`/`stream()` to reuse was **inaccurate** — that module is a priority *queue*, not an HTTP caller. PR-3E instead built the genuinely-new canonical **`codec_llm.py`** (`call()` non-streaming, plus `strip_think`/`extract_content`: headers, `Bearer` auth, `chat_template_kwargs.enable_thinking`, `` strip, `choices/reasoning` parse, retry+backoff, never-raises). **Migrated by PR-3E (tranche 1):** `codec.py` voice-reply chat + `codec_session.qwen_call`. Pinned by `tests/test_llm_vision_dedup.py`. See `docs/PR3E-LLM-VISION-DEDUP-DESIGN.md` §3 (phased plan) + §8 (shipped). +> +> **Tranche 2 (PR-3E-2):** built the streaming keystone **`codec_llm.stream()`** (sync generator, yields raw deltas, never-raises) + extracted a shared `_build_request`; migrated `codec_session.qwen_stream`, `codec_compaction`, `codec_dictate`. Read-the-source moved `codec_textassist` + `scripts/regen_skill_descriptions` to **tranche 2c** (they have a raise-on-failure contract — never-raise would paste empty over the user's selection / write empty descriptions). Pinned by `tests/test_llm_stream.py`. See `docs/PR3E2-LLM-STREAM-TRANCHE2-DESIGN.md`. **Still pending (own PRs):** dashboard (4 non-stream + 1 stream tag-machine), voice `_stream_qwen` + agents (async + queue → need `astream()`), agent_plan/runner + textassist + regen (2c raise-mode), bridges telegram/imessage, self_improve/watcher, skills tranche (translate/fact_extract/create_skill/skill_forge). **Location:** Sample sites: `codec.py:702`, `codec_dashboard.py:980,1076,1215`, `codec_voice.py:180,196,208,213`, `codec_session.py:215,278,307`, `codec_agents.py:51`, `codec_agent_plan.py:239`, `codec_agent_runner.py:148`, `codec_compaction.py:78`, `codec_self_improve.py:238`, `codec_telegram.py:471,508`, `codec_imessage.py:341,391`, `codec_textassist.py:33`, `codec_dictate.py:492`, `codec_watcher.py:86,182`. Total: 51 occurrences via `grep -rn "chat/completions"`. **Impact:** Each site repeats: headers build, `Authorization: Bearer {api_key}` formatting, `{Content-Type: application/json}`, payload assembly with `chat_template_kwargs.enable_thinking=False`, `` stripping, `try/except` for `r.json()` shape (`choices[0].message.content` or `.reasoning` fallback). Many also re-implement streaming SSE parsing. When the Qwen-3.6 upgrade landed, this is exactly the kind of change that needs to be applied in 20+ places. **Recommended fix:** Add `codec_llm_proxy.call(messages, **kwargs)` and `codec_llm_proxy.stream(messages, **kwargs)` as the single canonical API (the module already exists at `codec_llm_proxy.py`, only 130 LOC, only used by codec_voice + codec_agents). Migrate all 51 sites over the course of the Phase 1 hardening. As a first step, just covering the 5 sites in codec.py + codec_dashboard.py + codec_session.py would remove ~80% of the most-edited duplication. diff --git a/docs/audits/PHASE-1-CONSOLIDATED-TRIAGE.md b/docs/audits/PHASE-1-CONSOLIDATED-TRIAGE.md index 99ccb88..f38d180 100644 --- a/docs/audits/PHASE-1-CONSOLIDATED-TRIAGE.md +++ b/docs/audits/PHASE-1-CONSOLIDATED-TRIAGE.md @@ -243,6 +243,7 @@ Mirror the Intake Phase 3 wave pattern. 7 waves planned; sizes are PR-counts, NO - A-4: skill-loader unification ✅ (branch `fix/pr3-a4-skill-loader-unification`, design-first per §11 → `docs/A4-SKILL-LOADER-UNIFICATION-DESIGN.md`). Deleted legacy `codec_core.{loaded_skills,load_skills,run_skill}`; codec.py + cortex_skills now use canonical `codec_dispatch` registry. Closed a real **security gap** (legacy path skipped the PR-1A AST gate) + a **hooks bypass** (voice path now fires run_with_hooks). Option A: `custom_triggers.json` now honored everywhere via SkillRegistry. 10 tests; full suite 1376 passing. - PR-3D: A-5 + A-6 + A-7 — extract helpers from the 3 monolithic functions (`_dispatch_inner`, `chat_completion`, `Agent.run`) - PR-3E: A-11 + A-12 — unify vision + `chat/completions` ✅ (branch `fix/pr3e-llm-vision-dedup`, design-first per §11 → `docs/PR3E-LLM-VISION-DEDUP-DESIGN.md`; **Option 2** chosen by Mickael). **A-11 fully closed**: new `codec_vision.py` (sync+async, Gemini→local fallback, live config); all 3 consumers (codec.py/voice/session) delegate; session gains a Gemini fallback it lacked. **A-12 first tranche**: discovered `codec_llm_proxy` is a *queue*, not an HTTP caller — built genuinely-new `codec_llm.py` (`call()` + `strip_think`/`extract_content`, retry, never-raises) and migrated codec.py voice-reply chat + `codec_session.qwen_call`. **Deferred to phased follow-ons**: `qwen_stream` SSE (needs `codec_llm.stream()`) + ~40 remaining sites (dashboard/voice/agents/bridges/misc), each its own tranche. 19 tests (`tests/test_llm_vision_dedup.py`); full suite zero new failures. +- PR-3E-2: A-12 tranche 2 ✅ (branch `fix/pr3-a12-tranche2-stream`, design-first → `docs/PR3E2-LLM-STREAM-TRANCHE2-DESIGN.md`; **Option 1** chosen). Built streaming keystone `codec_llm.stream()` (sync generator, raw deltas, never-raises) + shared `_build_request`; migrated `codec_session.qwen_stream` (proof) + non-streaming trivials `codec_compaction` + `codec_dictate`. Read-the-source moved `codec_textassist` + `regen_skill_descriptions` to **2c** (raise-on-failure contract — never-raise would paste empty over the user's selection / write empty descriptions). 14 tests (`tests/test_llm_stream.py`); zero net-new ruff; full suite 1409 passing, zero new failures. **Remaining A-12 tranches:** 2c (raise-mode: textassist/regen/agent_plan/agent_runner), bridges (telegram/imessage), dashboard (non-stream + the stream tag-machine), voice `_stream_qwen` + agents (async `astream()` + queue), skills tranche. - PR-3F (optional, large): A-19 — bridge unification (iMessage + Telegram → `BridgeRouter`) - PR-3G: small misc ✅ (branch `fix/pr3g-small-misc-cleanup`) — closed A-9 (DISABLED overlay, ~90 LOC), A-10 (run_session_module, 33 LOC + orphan `import sys`), A-14 (close_session shadow import), A-18 (9 unused Pydantic models + dead typing import). A-13 (dashboard pattern blocker) verified **already closed by PR-2C**. 6 regression tests; zero net-new ruff (net −); full suite 1344 passing. **Deferred from this batch (each needs its own focused PR):** A-8 (codec_keyboard.py 398 LOC — verify-first delete-or-migrate decision), A-15 (config_version — additive migration feature touching `load_config`), A-20 (inline sqlite in the live dispatch path — reliability fix needing a CodecMemory method). - A-15: config schema versioning ✅ (branch `fix/pr3-a15-config-versioning`; `CONFIG_SCHEMA_VERSION=1` + migration ladder + idempotent atomic write-back in `load_config`; never creates-on-missing or overwrites-corrupt; 12 tests; zero net-new ruff; full suite 1356 passing). diff --git a/tests/test_llm_stream.py b/tests/test_llm_stream.py new file mode 100644 index 0000000..1d779dc --- /dev/null +++ b/tests/test_llm_stream.py @@ -0,0 +1,250 @@ +"""Tests for PR-3E-2 — A-12 tranche 2: codec_llm.stream() + first migrations. + +- codec_llm.stream: canonical SSE streaming caller. Yields RAW content deltas + (think-stripping is the caller's job), stops on `data: [DONE]`, tolerates + blank/garbage lines, never raises (HTTP/conn error -> empty stream). +- codec_llm._build_request: shared header/payload builder for call() + stream(). +- Migrations: codec_session.Session.qwen_stream (streaming proof), and the + non-streaming trivials codec_compaction.compact_context + codec_dictate. + +Reference: docs/PR3E2-LLM-STREAM-TRANCHE2-DESIGN.md (Option 1). +""" +from __future__ import annotations + +import json +import sys +import types +from pathlib import Path + +REPO = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(REPO)) + +import codec_llm # noqa: E402 + + +# ── helpers ────────────────────────────────────────────────────────────────── + + +class _StreamResp: + """Fake `requests` streaming response usable as a context manager.""" + + def __init__(self, status, lines=None, text=""): + self.status_code = status + self._lines = lines or [] + self.text = text + + def iter_lines(self): + for ln in self._lines: + yield ln if isinstance(ln, (bytes, bytearray)) else ln.encode("utf-8") + + def __enter__(self): + return self + + def __exit__(self, *a): + return False + + +def _sse(content): + return "data: " + json.dumps({"choices": [{"delta": {"content": content}}]}) + + +# ── codec_llm.stream ────────────────────────────────────────────────────────── + + +def test_stream_yields_raw_deltas_and_stops_at_done(monkeypatch): + captured = {} + + def fake_post(url, json=None, headers=None, timeout=None, stream=None): + captured["url"] = url + captured["json"] = json + captured["stream_kw"] = stream + return _StreamResp(200, lines=[ + _sse("Hello"), + "", # blank keepalive — skipped + _sse(" plan"), # raw, NOT stripped by stream() + _sse(" world"), + "data: [DONE]", + _sse(" AFTER-DONE"), # never reached + ]) + + import requests + monkeypatch.setattr(requests, "post", fake_post) + toks = list(codec_llm.stream([{"role": "user", "content": "q"}], + base_url="http://x/v1", model="m")) + assert toks == ["Hello", " plan", " world"] + assert captured["url"] == "http://x/v1/chat/completions" + assert captured["stream_kw"] is True # requests stream= kwarg + assert captured["json"]["stream"] is True # payload stream flag + + +def test_stream_skips_blank_and_garbage_lines(monkeypatch): + def fake_post(url, json=None, headers=None, timeout=None, stream=None): + return _StreamResp(200, lines=[ + "", # blank + ": keepalive comment", # non-data line + "data: {not valid json", # bad JSON -> skipped, no raise + _sse(""), # empty content delta -> not yielded + _sse("ok"), + "data: [DONE]", + ]) + + import requests + monkeypatch.setattr(requests, "post", fake_post) + assert list(codec_llm.stream([{"role": "user", "content": "q"}], + base_url="http://x/v1", model="m")) == ["ok"] + + +def test_stream_non_200_returns_empty(monkeypatch): + import requests + monkeypatch.setattr(requests, "post", + lambda *a, **k: _StreamResp(500, text="boom")) + assert list(codec_llm.stream([{"role": "user", "content": "q"}], + base_url="http://x/v1", model="m")) == [] + + +def test_stream_exception_returns_empty(monkeypatch): + def boom(*a, **k): + raise ConnectionError("down") + + import requests + monkeypatch.setattr(requests, "post", boom) + assert list(codec_llm.stream([{"role": "user", "content": "q"}], + base_url="http://x/v1", model="m")) == [] + + +def test_stream_auth_header_and_extra_kwargs(monkeypatch): + captured = {} + + def fake_post(url, json=None, headers=None, timeout=None, stream=None): + captured["headers"] = headers + captured["json"] = json + return _StreamResp(200, lines=["data: [DONE]"]) + + import requests + monkeypatch.setattr(requests, "post", fake_post) + list(codec_llm.stream([{"role": "user", "content": "q"}], + base_url="http://x/v1", model="m", api_key="k", + extra_kwargs={"top_p": 0.8})) + assert captured["headers"]["Authorization"] == "Bearer k" + assert captured["json"]["top_p"] == 0.8 + assert captured["json"]["chat_template_kwargs"] == {"enable_thinking": False} + + +def test_stream_no_auth_without_key(monkeypatch): + captured = {} + + def fake_post(url, json=None, headers=None, timeout=None, stream=None): + captured["headers"] = headers + return _StreamResp(200, lines=["data: [DONE]"]) + + import requests + monkeypatch.setattr(requests, "post", fake_post) + list(codec_llm.stream([{"role": "user", "content": "q"}], + base_url="http://x/v1", model="m")) + assert "Authorization" not in captured["headers"] + + +def test_call_and_stream_share_payload_shape(monkeypatch): + """call() must NOT carry a stream flag; stream() must. Both share the rest.""" + seen = {} + + def fake_call_post(url, json=None, headers=None, timeout=None): + seen["call"] = json + from test_llm_vision_dedup import _Resp # reuse non-stream fake + return _Resp(200, {"choices": [{"message": {"content": "x"}}]}) + + def fake_stream_post(url, json=None, headers=None, timeout=None, stream=None): + seen["stream"] = json + return _StreamResp(200, lines=["data: [DONE]"]) + + import requests + monkeypatch.setattr(requests, "post", fake_call_post) + codec_llm.call([{"role": "user", "content": "q"}], base_url="http://x/v1", + model="m", max_tokens=123, temperature=0.4) + monkeypatch.setattr(requests, "post", fake_stream_post) + list(codec_llm.stream([{"role": "user", "content": "q"}], base_url="http://x/v1", + model="m", max_tokens=123, temperature=0.4)) + + for shape in (seen["call"], seen["stream"]): + assert shape["model"] == "m" + assert shape["max_tokens"] == 123 and shape["temperature"] == 0.4 + assert shape["chat_template_kwargs"] == {"enable_thinking": False} + assert "stream" not in seen["call"] # parity with PR-3E call() + assert seen["stream"]["stream"] is True + + +# ── codec_session.Session.qwen_stream migration ─────────────────────────────── + + +def _session_stub(qwen_call_ret="FALLBACK", fb_flag=None): + def _qc(messages): + if fb_flag is not None: + fb_flag["hit"] = True + return qwen_call_ret + return types.SimpleNamespace( + qwen_base_url="http://x/v1", qwen_model="m", llm_api_key="", + llm_kwargs={}, qwen_call=_qc, + ) + + +def test_qwen_stream_consumes_codec_llm_and_strips_think(monkeypatch, capsys): + import codec_session + monkeypatch.setattr(codec_llm, "stream", + lambda *a, **k: iter(["Hel", "lo", " x"])) + stub = _session_stub() + out = codec_session.Session.qwen_stream(stub, [{"role": "user", "content": "q"}]) + assert out == "Hello" # strip_think on accumulated full + assert "Hello" in capsys.readouterr().out # deltas written live to stdout + + +def test_qwen_stream_falls_back_on_empty_stream(monkeypatch): + import codec_session + monkeypatch.setattr(codec_llm, "stream", lambda *a, **k: iter([])) + fb = {} + stub = _session_stub(qwen_call_ret="NONSTREAM", fb_flag=fb) + out = codec_session.Session.qwen_stream(stub, [{"role": "user", "content": "q"}]) + assert out == "NONSTREAM" and fb["hit"] is True + + +# ── codec_compaction.compact_context migration ──────────────────────────────── + + +def _msgs(n): + return [{"role": "user" if i % 2 == 0 else "assistant", + "content": f"message number {i}"} for i in range(n)] + + +def test_compaction_uses_codec_llm(monkeypatch): + import codec_compaction + monkeypatch.setattr(codec_llm, "call", lambda *a, **k: "A crisp summary.") + out = codec_compaction.compact_context(_msgs(8), max_recent=3) + assert "A crisp summary." in out + assert "[SUMMARY OF EARLIER CONVERSATION]" in out + + +def test_compaction_fallback_on_empty(monkeypatch): + import codec_compaction + monkeypatch.setattr(codec_llm, "call", lambda *a, **k: "") + out = codec_compaction.compact_context(_msgs(8), max_recent=3) + assert "Previous context:" in out # rule-based fallback summary + + +# ── source-level migration invariants ───────────────────────────────────────── + + +def test_session_qwen_stream_uses_codec_llm_stream(): + src = (REPO / "codec_session.py").read_text() + assert "codec_llm.stream(" in src + assert ".iter_lines(" not in src # inline SSE loop gone (call, not prose) + + +def test_compaction_uses_codec_llm_call(): + src = (REPO / "codec_compaction.py").read_text() + assert "codec_llm.call(" in src + assert "httpx.post(" not in src # inline POST gone + + +def test_dictate_uses_codec_llm_call(): + src = (REPO / "codec_dictate.py").read_text() + assert "codec_llm.call(" in src + assert "localhost:8083/v1/chat/completions" not in src # inline URL gone