Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions codec_compare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""CODEC Compare — fan one prompt out across model tiers, collect, return
labeled or blind.

Sits directly on top of the rest of the stack — it reuses the canonical
callers rather than re-implementing HTTP:
* OpenAI-compatible endpoints (local Qwen @ 8083, every Cookbook-served
model on its 811x port) → `codec_llm.call`
* cloud tiers (Gemini/Claude/GPT via the AVA proxy) → `codec_ava_client`

Endpoint set = three canonical tiers + anything Cookbook is currently serving:
1. local — the local Qwen (config llm_base_url / llm_model)
2. cloud-balanced — a mid cloud model via AVA (default gemini-2.5-flash)
3. cloud-pro — a top cloud model via AVA (default gemini-2.5-pro)
+ cookbook-<id> — each healthy model from codec_cookbook.serve.list_served()

The two cloud tiers + their model ids are overridable in
~/.codec/config.json:compare.cloud_tiers (a list of {label, model}); the
defaults above are grounded in codec_ava_client.choose_model's fast/balanced/pro
map. The fan-out is concurrent, per-endpoint timed, and never lets one
endpoint's failure sink the others.
"""
from __future__ import annotations

import json
import logging
import os
import string
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Optional

log = logging.getLogger("codec_compare")

_CONFIG_PATH = os.path.expanduser("~/.codec/config.json")
_DEFAULT_CLOUD_TIERS = [
{"label": "cloud-balanced", "model": "gemini-2.5-flash"},
{"label": "cloud-pro", "model": "gemini-2.5-pro"},
]
_MAX_TOKENS = 1024
_PER_ENDPOINT_TIMEOUT_S = 60
_MAX_WORKERS = 6


def _load_cfg() -> dict:
try:
with open(_CONFIG_PATH, encoding="utf-8") as f:
return json.load(f)
except (OSError, json.JSONDecodeError):
return {}


def _cloud_tiers(cfg: dict) -> list[dict]:
tiers = (cfg.get("compare") or {}).get("cloud_tiers")
if isinstance(tiers, list) and tiers:
return [t for t in tiers if t.get("model")]
return list(_DEFAULT_CLOUD_TIERS)


def _cookbook_endpoints() -> list[dict]:
"""Every healthy Cookbook-served model as an OpenAI endpoint. Best-effort —
if Cookbook isn't installed / nothing is served, returns []."""
eps = []
try:
from codec_cookbook import serve
for r in serve.list_served():
port = r.get("port")
if not port:
continue
if r.get("pm2_status") not in (None, "online") or r.get("healthy") is False:
continue # skip stopped/unhealthy
eps.append({
"label": f"cookbook-{r.get('id', port)}",
"kind": "openai",
"model": r.get("hf_repo") or str(r.get("id")),
"base_url": f"http://127.0.0.1:{port}/v1",
"tier": "cookbook",
})
except Exception as e:
log.debug("cookbook endpoint discovery skipped: %s", e)
return eps


def default_endpoints() -> list[dict]:
"""The canonical comparison set: local + available cloud tiers + Cookbook."""
cfg = _load_cfg()
eps: list[dict] = [{
"label": "local",
"kind": "openai",
"model": cfg.get("llm_model", "local-qwen"),
"base_url": cfg.get("llm_base_url", "http://localhost:8083/v1"),
"tier": "local",
}]
# cloud tiers via AVA — only when the license/proxy is actually ready
try:
import codec_ava_client
ava = codec_ava_client.load_config()
if ava and ava.is_ready():
for t in _cloud_tiers(cfg):
eps.append({"label": t["label"], "kind": "ava",
"model": t["model"], "tier": "cloud"})
except Exception as e:
log.debug("AVA cloud tiers unavailable: %s", e)
eps.extend(_cookbook_endpoints())
return eps


def _query_one(ep: dict, prompt: str, system: Optional[str], timeout: int) -> dict:
"""Query a single endpoint. Never raises — failures are captured as
{ok: False, error}. Returns the endpoint dict enriched with the result."""
t0 = time.monotonic()
base = {k: ep.get(k) for k in ("label", "model", "tier")}
try:
if ep["kind"] == "ava":
import codec_ava_client
text = codec_ava_client.ava_chat_simple(
prompt, system=system, model=ep["model"],
max_tokens=_MAX_TOKENS, timeout=timeout)
else: # openai-compatible (local + cookbook)
import codec_llm
messages = ([{"role": "system", "content": system}] if system else []) \
+ [{"role": "user", "content": prompt}]
text = codec_llm.call(
messages, base_url=ep["base_url"], model=ep["model"],
max_tokens=_MAX_TOKENS, timeout=timeout, raise_on_error=True)
return {**base, "ok": True, "response": (text or "").strip(),
"elapsed_ms": round((time.monotonic() - t0) * 1000)}
except Exception as e:
return {**base, "ok": False, "error": str(e)[:300],
"elapsed_ms": round((time.monotonic() - t0) * 1000)}


def compare(prompt: str, *, endpoints: Optional[list[dict]] = None,
blind: bool = False, system: Optional[str] = None,
timeout: int = _PER_ENDPOINT_TIMEOUT_S,
max_workers: int = _MAX_WORKERS) -> dict:
"""Fan `prompt` out across `endpoints` (default: default_endpoints())
concurrently and collect every reply.

Returns {prompt, blind, results:[{label|display, model, tier, ok, response|error,
elapsed_ms}], mapping?}. In blind mode each result's display label is
anonymized (Model A/B/…) and a `mapping` of anon→real is returned separately
so the caller decides whether/when to reveal it.
"""
if not prompt or not prompt.strip():
return {"prompt": "", "blind": blind, "results": [], "note": "empty prompt"}
eps = endpoints if endpoints is not None else default_endpoints()
if not eps:
return {"prompt": prompt[:200], "blind": blind, "results": [],
"note": "no endpoints available"}

workers = max(1, min(max_workers, len(eps)))
with ThreadPoolExecutor(max_workers=workers) as ex:
# ex.map preserves input order
results = list(ex.map(lambda e: _query_one(e, prompt, system, timeout), eps))

out = {"prompt": prompt[:200], "blind": blind, "results": results}
if blind:
anon = {}
for i, r in enumerate(results):
tag = f"Model {string.ascii_uppercase[i % 26]}"
anon[tag] = r["label"]
r["display"] = tag
out["mapping"] = anon
else:
for r in results:
r["display"] = r["label"]
return out
1 change: 1 addition & 0 deletions skills/.manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"chrome_tabs.py": "bf971798a8455215655d37edb6bb326d908f4d33c0e3b232eb37d267fec68d7a",
"clipboard.py": "f5ef9cc501fe38a3de95bf0b49896b928250c0e272060173668f6b195728d131",
"clipboard_url_fetch.py": "c2733a92d6e99a0346b91c67bb70698e491be9570305377a82096a0ceb153488",
"compare.py": "f0ff94cfcdc3dd5a4f0dd88d641e042303982888a9838fb4b1781067a9c46839",
"cookbook_download.py": "8fd9c8a5b82f8e910cc0721904b908f2d201908ff0ac6373994be922598c382c",
"cookbook_list.py": "23c19b92742bfd88a801e74bd79e8bbd70f88d49ac1951f1c30e4857cc693a79",
"cookbook_recommend.py": "66b09eac0510ca9ca1e19f5619ab10a167bda6e34d17be13bda77ba7ffe0b5e3",
Expand Down
64 changes: 64 additions & 0 deletions skills/compare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""CODEC Skill: Compare one prompt across model tiers (labeled or blind)."""
import re

from codec_compare import compare

SKILL_NAME = "compare"
SKILL_DESCRIPTION = (
"Send one prompt to multiple model tiers at once — the local Qwen, the "
"cloud tiers (via AVA), and any model Cookbook is currently serving — and "
"show their answers side by side. Add 'blind' to hide which model is which."
)
SKILL_TAGS = ["compare", "models", "eval", "llm", "cookbook"]
SKILL_TRIGGERS = [
"compare models", "blind compare", "compare across models", "ask all models",
"compare llms", "model compare", "compare prompt",
]
SKILL_MCP_EXPOSE = True # query skill (no process/file mutation; same cost profile as chat)

# Longest-first so "compare across models" wins over "compare".
_PREFIXES = (
"blind compare across models", "compare across models", "blind compare",
"compare models", "ask all models", "compare llms", "model compare",
"compare prompt", "compare",
)


def _extract_prompt(task: str) -> str:
t = (task or "").strip()
low = t.lower()
for p in sorted(_PREFIXES, key=len, reverse=True):
if low.startswith(p):
t = t[len(p):].strip(" :->\n\t")
break
# drop a leading 'blind' keyword if it leaked into the prompt
return re.sub(r"^blind\s+", "", t, flags=re.I).strip()


def run(task, app="", ctx=""):
blind = bool(re.search(r"\bblind\b", (task or "").lower()))
prompt = _extract_prompt(task)
if not prompt:
return ("What should I compare? e.g. "
"'compare models: explain quantum tunneling in one paragraph' "
"(prefix with 'blind' to hide the model identities).")

res = compare(prompt, blind=blind)
results = res.get("results", [])
if not results:
return f"No model endpoints available to compare ({res.get('note', 'none configured')})."

head = (f"Compared {len(results)} model{'s' if len(results) != 1 else ''}"
+ (" — blind" if blind else "") + f" on: {res['prompt']}")
lines = [head]
for r in results:
label = r.get("display") or r.get("label")
meta = f"{r.get('elapsed_ms')}ms" + ("" if blind else f", {r.get('tier')}")
if r.get("ok"):
lines.append(f"\n### {label} ({meta})\n{r.get('response', '')}")
else:
lines.append(f"\n### {label} — ✗ {r.get('error', 'failed')} ({meta})")
if blind and res.get("mapping"):
key = " ".join(f"{k} = {v}" for k, v in res["mapping"].items())
lines.append(f"\n— Key (judge the answers first, then peek) —\n{key}")
return "\n".join(lines)
Loading