Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 46 additions & 30 deletions plugins/qyl-continuation/hooks/stop-judge.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
JUDGE_TIMEOUT = int(os.environ.get("QYL_CONTINUATION_TIMEOUT", "30"))
JUDGE_MODEL = os.environ.get("QYL_CONTINUATION_MODEL", "haiku")

WORK_DIR = Path.home() / ".claude" / "qyl-continuation"
WORK_DIR.mkdir(parents=True, exist_ok=True)
Comment on lines +25 to +26
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WORK_DIR.mkdir(...) runs at import time and can raise (e.g., HOME not set, permission issues), which would crash the hook without emitting the required JSON decision. Consider wrapping directory creation in a try/except and falling back to a safe behavior (e.g., disable throttling / use a temporary per-run dir) while still calling approve/block.

Copilot uses AI. Check for mistakes.


def approve(reason: str) -> NoReturn:
json.dump({"decision": "approve", "reason": reason}, sys.stdout)
Expand All @@ -39,7 +42,7 @@ def block(reason: str) -> NoReturn:
event = json.loads(sys.stdin.read())
transcript_path = event.get("transcript_path", "")
session_id = event.get("session_id", "unknown")
throttle_file = Path(f"/tmp/.qyl-continue-{session_id.replace('/', '_')}")
throttle_file = WORK_DIR / f"throttle-{session_id.replace('/', '_')}"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The session_id is sanitized by replacing forward slashes with underscores, but backslashes (which are also path separators on Windows) are not replaced. This allows for potential path traversal out of the intended WORK_DIR on Windows systems. An attacker who can control the session_id could potentially point the throttle_file to a sensitive location, leading to unauthorized file deletion or modification when write_text() or unlink() is called.



def read_throttle() -> tuple[int, float]:
Expand All @@ -60,13 +63,13 @@ def clear_throttle() -> None:
throttle_file.unlink(missing_ok=True)


if event.get("stop_hook_active", False):
count, last_time = read_throttle()
if time.time() - last_time > WINDOW_SECONDS:
count = 0
if count >= MAX_CONTINUATIONS:
clear_throttle()
approve("Max continuation cycles reached")
# Throttle guard — always check, not gated on stop_hook_active
count, last_time = read_throttle()
if time.time() - last_time > WINDOW_SECONDS:
count = 0
if count >= MAX_CONTINUATIONS:
clear_throttle()
approve("Max continuation cycles reached")


# --- Load transcript ---
Expand All @@ -75,12 +78,12 @@ def clear_throttle() -> None:
approve("No transcript")

try:
tail = subprocess.run(
["tail", "-n", str(TAIL_LINES), transcript_path],
capture_output=True, text=True, timeout=5
)
lines = [l.strip() for l in tail.stdout.strip().split("\n") if l.strip()]
except (subprocess.TimeoutExpired, OSError):
p = Path(transcript_path)
raw = p.read_bytes()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The current implementation reads the entire transcript file into memory using p.read_bytes(). This poses a denial-of-service risk, as extremely large files or named pipes could cause the script to crash or hang indefinitely due to excessive memory consumption. This also represents a performance regression from the previous tail-based approach. A more memory-efficient approach would be to seek near the end of the file and read only the last chunk.

tail_bytes = raw[-(TAIL_LINES * 8192):] if len(raw) > TAIL_LINES * 8192 else raw
lines = [l.strip() for l in tail_bytes.decode("utf-8", errors="ignore").split("\n") if l.strip()]
Comment on lines +83 to +84

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Tail transcript by lines instead of fixed-size byte window

Using raw[-(TAIL_LINES * 8192):] assumes each of the last transcript lines is <=8KB, so a valid but large final JSON line gets truncated and fails json.loads, which can leave messages empty and force approve("Empty transcript") even when there is actionable context to continue. This is a behavioral regression from line-based tailing and will misclassify long tool outputs or assistant messages.

Useful? React with 👍 / 👎.

lines = lines[-TAIL_LINES:]
Comment on lines +81 to +85
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transcript tailing logic reads the entire transcript into memory (read_bytes()), and then slices a fixed byte window (TAIL_LINES * 8192). Besides being costly for large transcripts, it can also miss/partial the last JSONL entries when a single line is larger than the byte window, causing json.loads to fail and heuristics to operate on an incomplete/empty message set. Consider reading from the end of the file via seek/backward scan until you have N newlines (and avoid loading the whole file).

Copilot uses AI. Check for mistakes.
except OSError:
approve("Failed to read transcript")

messages: list[dict] = []
Expand All @@ -96,12 +99,19 @@ def clear_throttle() -> None:

# --- Transcript helpers ---

def _get_content(msg: dict):
content = msg.get("content", "")
if not content and isinstance(msg.get("message"), dict):
content = msg["message"].get("content", "")
return content


def role_of(msg: dict) -> str:
return msg.get("role", msg.get("type", ""))


def text_of(msg: dict) -> str:
content = msg.get("content", "")
content = _get_content(msg)
if isinstance(content, str):
return content
if isinstance(content, list):
Expand All @@ -114,14 +124,19 @@ def text_of(msg: dict) -> str:


def has_block_type(msg: dict, block_type: str) -> bool:
content = msg.get("content", "")
content = _get_content(msg)
return isinstance(content, list) and any(
isinstance(b, dict) and b.get("type") == block_type for b in content
)


# --- Phase 1: Heuristics ---

NEXT_STEP_RX = re.compile(
r"(?i)(?:next|now) (?:I(?:'ll| will| need to)|let me)|"
r"moving on to|(?:still|also) need to|remaining (?:items|tasks|steps)"
)

last_assistant = ""
for msg in reversed(messages):
if role_of(msg) == "assistant":
Expand All @@ -145,15 +160,11 @@ def has_block_type(msg: dict, block_type: str) -> bool:
r"(?i)that(?:'s| should be) (?:it|all|everything)|"
r"(?i)here(?:'s| is) (?:the|your|a) (?:summary|result|output)"
)
NEXT_STEP_RX = re.compile(
r"(?i)(?:next|now) (?:I(?:'ll| will| need to)|let me)|"
r"moving on to|(?:still|also) need to|remaining (?:items|tasks|steps)"
)
if last_assistant and COMPLETION_RX.search(last_assistant) and not NEXT_STEP_RX.search(last_assistant):
clear_throttle()
approve("Heuristic: completion signal")

# H3: Tool result already addressed — assistant text after tool result
# H3: Tool result already addressed — assistant text after tool result (but not if next steps stated)
saw_tool = False
addressed = False
for msg in messages:
Expand All @@ -164,21 +175,27 @@ def has_block_type(msg: dict, block_type: str) -> bool:
addressed = True

last_role = role_of(messages[-1]) if messages else ""
if addressed and last_role == "assistant":
if addressed and last_role == "assistant" and not NEXT_STEP_RX.search(last_assistant):
clear_throttle()
approve("Heuristic: tool results already addressed")

# H4: Substantial text-only response (no pending tool calls)
# H4: Substantial text-only response (no pending tool calls, no stated next steps)
last_msg = messages[-1]
if role_of(last_msg) == "assistant" and not has_block_type(last_msg, "tool_use"):
if len(text_of(last_msg)) > 100:
text = text_of(last_msg)
if len(text) > 100 and not NEXT_STEP_RX.search(text):
clear_throttle()
approve("Heuristic: substantial text-only response")


# --- Phase 2: Haiku judge ---

transcript_json = json.dumps(messages, ensure_ascii=False)[:MAX_CONTEXT_BYTES]
raw_transcript = json.dumps(messages, ensure_ascii=False)
transcript_bytes = raw_transcript.encode("utf-8")
if len(transcript_bytes) > MAX_CONTEXT_BYTES:
transcript_json = transcript_bytes[:MAX_CONTEXT_BYTES].decode("utf-8", errors="ignore")
else:
transcript_json = raw_transcript

EVAL_PROMPT = f"""Does the assistant have more autonomous work to do RIGHT NOW?

Expand All @@ -198,8 +215,6 @@ def has_block_type(msg: dict, block_type: str) -> bool:
A tool_result or image does NOT mean unaddressed work — check for assistant text AFTER it.
Default: STOP."""

work_dir = Path.home() / ".claude" / "qyl-continuation"
work_dir.mkdir(parents=True, exist_ok=True)
env = os.environ.copy()
env["CLAUDE_HOOK_JUDGE_MODE"] = "true"

Expand All @@ -211,7 +226,7 @@ def has_block_type(msg: dict, block_type: str) -> bool:
"--system-prompt", "Conversation state classifier. Output JSON only. No code, no tools.",
"--disallowedTools", "*"],
input=EVAL_PROMPT, capture_output=True, text=True,
timeout=JUDGE_TIMEOUT, cwd=str(work_dir), env=env,
timeout=JUDGE_TIMEOUT, cwd=str(WORK_DIR), env=env,
)
except (subprocess.TimeoutExpired, OSError):
approve("Judge unavailable, allowing stop")
Expand All @@ -220,8 +235,9 @@ def has_block_type(msg: dict, block_type: str) -> bool:
approve("Judge failed, allowing stop")

try:
evaluation = json.loads(proc.stdout).get("structured_output", {})
except (json.JSONDecodeError, AttributeError, TypeError):
output = json.loads(proc.stdout)
evaluation = (output.get("structured_output") or {}) if isinstance(output, dict) else {}
except json.JSONDecodeError:
approve("Judge response unparseable, allowing stop")

if evaluation.get("should_continue", False):
Expand Down
Loading