Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ docs/ API.md, MCP_HTTP_SETUP.md, CONTEXT_REPORT.md, desig

Other engine modules (`codec_overlays`, `codec_metrics`, `codec_logging`, `codec_gdocs`, `codec_google_auth`, `codec_cdp`, `codec_llm_proxy`, `codec_retry`, `codec_alerts`, `codec_search`, `codec_textassist`, `codec_watcher`, `codec_watchdog`) are internal helpers — read them when you need them, but they're not part of the navigation surface for an agent making structural changes. (Keyboard handling — wake word, F13 toggle, F18 voice, double-tap — lives **inline in `codec.py`** in the `codec` PM2 process; the old standalone `codec_keyboard.py` was deleted as a dead duplicate per A-8.)

**Canonical LLM + vision helpers (PR-3E, A-11/A-12).** `codec_vision.py` is the SINGLE source for screen-vision (`describe_sync` / `describe_async`, Gemini-flash → local-Qwen-VL fallback, config read live from `codec_config`) — used by `codec.py`, `codec_voice`, `codec_session`. `codec_llm.py` is the canonical chat/completions caller (`call()` + `strip_think`/`extract_content` — headers, Bearer auth, `enable_thinking`, `<think>` strip, `choices/reasoning` parse, retry+backoff, never-raises). NOTE: `codec_llm_proxy.py` is a priority *queue* (semaphore), NOT an HTTP caller — don't confuse the two. A-12 is migrating the ~45 inline `chat/completions` sites onto `codec_llm` in phased tranches; codec.py voice-reply + `codec_session.qwen_call` are done, streaming (`codec_llm.stream()`) + the rest are pending.
**Canonical LLM + vision helpers (PR-3E, A-11/A-12).** `codec_vision.py` is the SINGLE source for screen-vision (`describe_sync` / `describe_async`, Gemini-flash → local-Qwen-VL fallback, config read live from `codec_config`) — used by `codec.py`, `codec_voice`, `codec_session`. `codec_llm.py` is the canonical chat/completions caller (`call()` + `strip_think`/`extract_content` — headers, Bearer auth, `enable_thinking`, `<think>` strip, `choices/reasoning` parse, retry+backoff, never-raises). NOTE: `codec_llm_proxy.py` is a priority *queue* (semaphore), NOT an HTTP caller — don't confuse the two. A-12 is migrating the ~45 inline `chat/completions` sites onto `codec_llm` in phased tranches. Done: `codec_llm.call()` (non-stream) + `stream()` (sync SSE generator, yields raw deltas, never-raises); migrated sites = codec.py voice-reply, `codec_session.qwen_call` + `qwen_stream`, `codec_compaction`, `codec_dictate`. Pending tranches: 2c raise-mode (`codec_llm.call(raise_on_error=True)` for agent_plan/runner + textassist + the regen script — they MUST fail loud, never-raise would silently paste empty / write empty), an async `astream()` for voice `_stream_qwen` + agents (queue stays at the call site — `codec_llm` never owns the semaphore), dashboard (4 non-stream + the `[SKILL:…]` stream tag-machine, which keeps its own parser and consumes only `stream()`'s raw tokens), bridges, and a skills tranche.

## 3. Agent + Crew runtime

Expand Down
36 changes: 11 additions & 25 deletions codec_compaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,13 @@ def compact_context(recent_messages: list, max_recent: int = 5, max_summary_toke

summary = None
try:
import httpx
base_url = _LLM_BASE_URL
model = _LLM_MODEL
api_key = _LLM_API_KEY
llm_kwargs = _LLM_KWARGS

headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"

payload = {
"model": model,
"messages": [
# A-12 (PR-3E-2): canonical codec_llm.call replaces the inline httpx
# POST + parse. Never raises -> "" on failure, which the fallback below
# already handles. (Also now sends enable_thinking=False + strips any
# <think> leak, so summaries are slightly cleaner than the old path.)
import codec_llm
summary = codec_llm.call(
[
{
"role": "system",
"content": (
Expand All @@ -72,19 +66,11 @@ def compact_context(recent_messages: list, max_recent: int = 5, max_summary_toke
},
{"role": "user", "content": old_text},
],
"max_tokens": max_summary_tokens,
"temperature": 0.1,
}
payload.update(llm_kwargs)

r = httpx.post(
f"{base_url}/chat/completions",
json=payload,
headers=headers,
timeout=15,
base_url=_LLM_BASE_URL, model=_LLM_MODEL, api_key=_LLM_API_KEY,
max_tokens=max_summary_tokens, temperature=0.1, timeout=15,
extra_kwargs=_LLM_KWARGS,
)
if r.status_code == 200:
summary = r.json()["choices"][0]["message"]["content"].strip()
if summary:
log.info(f"Context compacted: {len(old_messages)} old msgs → {len(summary)} char summary")
except Exception as e:
log.warning(f"Compaction LLM failed, using fallback: {e}")
Expand Down
37 changes: 18 additions & 19 deletions codec_dictate.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,28 +397,27 @@ def transcribe_and_type(audio_path):
body = text[_draft_match.end():].strip()
if body:
try:
import requests as _req
# A-12 (PR-3E-2): canonical codec_llm.call. Never raises ->
# "" on any failure (non-200, conn error, empty), which maps
# exactly onto the old "use raw body" fallback for every
# non-success branch.
import codec_llm
print("[DICTATE] Draft mode — refining with Qwen...")
r = _req.post("http://localhost:8083/v1/chat/completions",
json={"model": "mlx-community/Qwen3.6-35B-A3B-4bit",
"messages": [
{"role": "system", "content": "Rewrite the user message as a polished, professional message. Output ONLY the final text. No preamble, no explanation."},
{"role": "user", "content": body}
],
"max_tokens": 300, "temperature": 0.3,
"chat_template_kwargs": {"enable_thinking": False}},
timeout=15)
if r.status_code == 200:
refined = r.json()["choices"][0]["message"]["content"].strip()
if refined:
text = refined
print(f"[DICTATE] Refined: {text}")
else:
text = body
print("[DICTATE] Qwen returned empty, using raw body")
refined = codec_llm.call(
[
{"role": "system", "content": "Rewrite the user message as a polished, professional message. Output ONLY the final text. No preamble, no explanation."},
{"role": "user", "content": body},
],
base_url="http://localhost:8083/v1",
model="mlx-community/Qwen3.6-35B-A3B-4bit",
max_tokens=300, temperature=0.3, timeout=15,
)
if refined:
text = refined
print(f"[DICTATE] Refined: {text}")
else:
text = body
print(f"[DICTATE] Qwen HTTP {r.status_code}, using raw body")
print("[DICTATE] Qwen unavailable or empty, using raw body")
except Exception as qe:
text = body
print(f"[DICTATE] Qwen error: {qe}, using raw body")
Expand Down
111 changes: 98 additions & 13 deletions codec_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import logging
import re
import time
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Iterator, List, Optional

log = logging.getLogger("codec.llm")

Expand Down Expand Up @@ -55,6 +55,37 @@ def extract_content(response_json: Dict[str, Any]) -> str:
return ""


def _build_request(
messages: List[Dict[str, Any]],
*,
model: str,
api_key: str,
max_tokens: int,
temperature: float,
enable_thinking: bool,
extra_kwargs: Optional[Dict[str, Any]],
stream: bool = False,
) -> tuple[Dict[str, str], Dict[str, Any]]:
"""Build the (headers, payload) for an OpenAI-style chat/completions request.
Shared by call() and stream() so headers/auth/payload shape never drift.
The `stream` flag is applied LAST so extra_kwargs can't clobber it."""
headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = "Bearer " + api_key
payload: Dict[str, Any] = {
"model": model,
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
"chat_template_kwargs": {"enable_thinking": enable_thinking},
}
if extra_kwargs:
payload.update(extra_kwargs)
if stream:
payload["stream"] = True
return headers, payload


def call(
messages: List[Dict[str, Any]],
*,
Expand All @@ -76,18 +107,11 @@ def call(
Never raises — network/parse errors are logged and yield "".
"""
import requests
headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = "Bearer " + api_key
payload: Dict[str, Any] = {
"model": model,
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
"chat_template_kwargs": {"enable_thinking": enable_thinking},
}
if extra_kwargs:
payload.update(extra_kwargs)
headers, payload = _build_request(
messages, model=model, api_key=api_key, max_tokens=max_tokens,
temperature=temperature, enable_thinking=enable_thinking,
extra_kwargs=extra_kwargs,
)

attempts = max(1, retries)
url = base_url.rstrip("/") + "/chat/completions"
Expand All @@ -106,3 +130,64 @@ def call(
if attempt < attempts - 1:
time.sleep(2 ** attempt)
return ""


def stream(
messages: List[Dict[str, Any]],
*,
base_url: str,
model: str,
api_key: str = "",
max_tokens: int = 500,
temperature: float = 0.7,
timeout: float = 120.0,
enable_thinking: bool = False,
extra_kwargs: Optional[Dict[str, Any]] = None,
) -> Iterator[str]:
"""POST with `stream=True` and yield the RAW assistant content deltas in
order. Centralizes the SSE plumbing: header/payload build (shared with
call()), `data: ` framing, the `[DONE]` sentinel, `choices[0].delta.content`
extraction, and per-chunk parse tolerance.

Think-stripping is intentionally NOT done here — callers that show tokens
live (e.g. codec_session.qwen_stream) strip `<think>` on the accumulated
result, and the dashboard owns its own cross-chunk tag machine. Never
raises: on connect/HTTP/parse error it logs and stops yielding, so the
caller sees a short/empty stream and applies its own fallback.
"""
import json as _json
import requests
headers, payload = _build_request(
messages, model=model, api_key=api_key, max_tokens=max_tokens,
temperature=temperature, enable_thinking=enable_thinking,
extra_kwargs=extra_kwargs, stream=True,
)
url = base_url.rstrip("/") + "/chat/completions"
try:
with requests.post(url, json=payload, headers=headers,
timeout=timeout, stream=True) as r:
if r.status_code != 200:
log.warning("LLM stream %s returned %s: %s",
url, r.status_code, getattr(r, "text", "")[:200])
return
for line in r.iter_lines():
if not line:
continue
if isinstance(line, (bytes, bytearray)):
line = line.decode("utf-8", "replace")
if not line.startswith("data: "):
continue
data = line[6:]
if data.strip() == "[DONE]":
return
try:
delta = (_json.loads(data).get("choices", [{}])[0]
.get("delta", {}).get("content", ""))
except Exception as e:
log.warning("LLM stream chunk parse failed: %s", e)
continue
if delta:
yield delta
except Exception as e:
log.warning("LLM stream call failed: %s", e)
return
58 changes: 20 additions & 38 deletions codec_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,47 +256,29 @@ def qwen_call(self, messages):
)

def qwen_stream(self, messages):
import requests
# A-12 (PR-3E-2): canonical codec_llm.stream replaces the inline SSE
# POST + iter_lines + delta parse. stream() yields RAW deltas (it never
# raises), so we keep the live stdout write here and strip_think() the
# accumulated result — exact parity. An empty stream (non-200, conn
# error, or no content) falls back to the migrated non-streaming
# qwen_call (slightly broader than the old non-200-only fallback).
import codec_llm
try:
headers = {"Content-Type": "application/json"}
if self.llm_api_key:
headers["Authorization"] = "Bearer " + self.llm_api_key
payload = {
"model": self.qwen_model,
"messages": messages,
"max_tokens": 500,
"temperature": 0.5,
"stream": True,
}
payload.update(self.llm_kwargs)
r = requests.post(
self.qwen_base_url + "/chat/completions",
json=payload,
headers=headers,
timeout=90,
stream=True,
)
if r.status_code != 200:
return self.qwen_call(messages)
full = ""
for line in r.iter_lines():
if not line:
continue
line = line.decode("utf-8")
if line.startswith("data: "):
d = line[6:]
if d.strip() == "[DONE]":
break
try:
delta = json.loads(d).get("choices", [{}])[0].get("delta", {}).get("content", "")
if delta:
sys.stdout.write(delta)
sys.stdout.flush()
full += delta
except Exception as e:
log.warning(f"Stream chunk parse failed: {e}")
got = False
for delta in codec_llm.stream(
messages, base_url=self.qwen_base_url, model=self.qwen_model,
api_key=self.llm_api_key, max_tokens=500, temperature=0.5,
timeout=90, extra_kwargs=self.llm_kwargs,
):
got = True
sys.stdout.write(delta)
sys.stdout.flush()
full += delta
print()
return strip_think(full).strip()
if got:
return strip_think(full).strip()
return self.qwen_call(messages)
except Exception as e:
log.warning(f"Streaming LLM call failed, falling back to non-streaming: {e}")
return self.qwen_call(messages)
Expand Down
Loading
Loading