diff --git a/ROADMAP.md b/ROADMAP.md index ce5b509..945be6f 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -247,7 +247,7 @@ As usage grows, the platform needs stronger derived data pipelines, performance - [x] MCP sidecar with on-demand stats, friction reports, and recommendations - [x] [P0] Proactive coaching skill that activates at session start with contextual suggestions -- [ ] [P0] Live session signals that stream friction, satisfaction, and risk as work happens +- [x] [P0] Live session signals that stream friction, satisfaction, and risk as work happens - [ ] [P1] In-session workflow nudges based on project playbooks and prior failures - [ ] [P1] Daily and weekly personal recaps inside the sidecar - [ ] [P2] Lightweight session planning prompts before complex work begins diff --git a/src/primer/common/schemas.py b/src/primer/common/schemas.py index c9ee6aa..9deda7d 100644 --- a/src/primer/common/schemas.py +++ b/src/primer/common/schemas.py @@ -2203,6 +2203,28 @@ class MaturityAnalyticsResponse(BaseModel): model_diversity_avg: float = 0.0 +# --- Live Session Signals --- + + +class LiveSessionSignal(BaseModel): + signal_type: str + severity: Literal["info", "warning", "critical"] + title: str + detail: str + evidence: dict[str, Any] = Field(default_factory=dict) + + +class LiveSessionSignalsResponse(BaseModel): + session_id: str + agent_type: str + project_name: str | None = None + started_at: datetime | None = None + total_messages: int = 0 + risk_level: Literal["low", "medium", "high"] + satisfaction_signal: Literal["positive", "neutral", "negative", "unknown"] = "unknown" + signals: list[LiveSessionSignal] = Field(default_factory=list) + + # --- Coaching Brief --- diff --git a/src/primer/mcp/server.py b/src/primer/mcp/server.py index 28b8300..228dbe9 100644 --- a/src/primer/mcp/server.py +++ b/src/primer/mcp/server.py @@ -8,6 +8,7 @@ from primer.mcp.tools import ( primer_coaching, primer_friction_report, + primer_live_session_signals, primer_my_stats, primer_recommendations, primer_session_start_coaching, @@ -93,5 +94,18 @@ def session_start_coaching( ) +@mcp.tool() +def live_session_signals( + session_id: str | None = None, + transcript_path: str | None = None, +) -> str: + """Get live friction, satisfaction, and risk signals for the current local session. + + Primer inspects the in-progress local transcript and summarizes whether the + session looks healthy, stuck, or at risk of abandonment. + """ + return primer_live_session_signals(session_id=session_id, transcript_path=transcript_path) + + if __name__ == "__main__": mcp.run() diff --git a/src/primer/mcp/tools.py b/src/primer/mcp/tools.py index 5cce1fa..ef5c771 100644 --- a/src/primer/mcp/tools.py +++ b/src/primer/mcp/tools.py @@ -7,6 +7,7 @@ import httpx from primer.mcp.sync import sync_sessions +from primer.server.services.live_session_signal_service import get_live_session_signals logger = logging.getLogger(__name__) @@ -178,3 +179,29 @@ def primer_session_start_coaching( return f"Error: {resp.status_code} - {resp.text}" except httpx.RequestError as e: return f"Error connecting to server: {e}" + + +def primer_live_session_signals( + session_id: str | None = None, + transcript_path: str | None = None, +) -> str: + """Analyze the current in-progress local session for live risk and friction signals. + + This is computed locally from the active transcript so it can be called repeatedly + during a session without waiting for end-of-session ingest. + """ + try: + data = get_live_session_signals(session_id=session_id, transcript_path=transcript_path) + except ValueError as exc: + return f"Error: {exc}" + lines = ["## Live Session Signals\n"] + lines.append(f"**Risk**: {data.risk_level}\n") + lines.append(f"**Satisfaction**: {data.satisfaction_signal}\n") + if data.project_name: + lines.append(f"**Project**: {data.project_name}\n") + lines.append(f"**Session**: {data.session_id} ({data.agent_type})\n") + for signal in data.signals: + lines.append(f"### [{signal.severity}] {signal.title}") + lines.append(f"- {signal.detail}") + lines.append("") + return "\n".join(lines) diff --git a/src/primer/server/services/live_session_signal_service.py b/src/primer/server/services/live_session_signal_service.py new file mode 100644 index 0000000..9c94298 --- /dev/null +++ b/src/primer/server/services/live_session_signal_service.py @@ -0,0 +1,316 @@ +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +from primer.common.schemas import LiveSessionSignal, LiveSessionSignalsResponse +from primer.hook.extractor_registry import get_extractor_for +from primer.mcp.reader import LocalSession, list_local_sessions +from primer.server.services.execution_evidence_service import ( + _EXECUTION_TOOL_NAMES, + _classify_status, + extract_execution_evidence, +) +from primer.server.services.recovery_path_service import extract_recovery_path +from primer.server.services.session_signal_parsing import message_payload, normalize_tool_name + +if TYPE_CHECKING: + from primer.hook.extractor import SessionMetadata + +_NEGATIVE_SATISFACTION_PHRASES = ( + "still broken", + "not working", + "doesn't work", + "does not work", + "not right", + "try again", + "stuck", + "blocked", + "frustrated", + "annoying", + "i give up", + "this is broken", +) +_POSITIVE_SATISFACTION_PHRASES = ( + "thanks", + "thank you", + "that fixed it", + "works now", + "looks good", + "great", + "nice", + "perfect", + "resolved", +) +_TOOL_ERROR_KEYWORDS = ("error", "failed", "failure", "traceback", "exception", "timeout") + + +def get_live_session_signals( + *, + session_id: str | None = None, + transcript_path: str | None = None, +) -> LiveSessionSignalsResponse: + session = _resolve_local_session(session_id=session_id, transcript_path=transcript_path) + extractor = get_extractor_for(session.agent_type) + if extractor is None: + raise ValueError(f"No extractor available for agent type '{session.agent_type}'") + + metadata = extractor.extract(session.transcript_path) + return _build_live_session_signal_summary(session, metadata) + + +def _resolve_local_session( + *, + session_id: str | None, + transcript_path: str | None, +) -> LocalSession: + sessions = list_local_sessions() + if not sessions: + raise ValueError("No local sessions found") + + if session_id: + for session in sessions: + if session.session_id == session_id: + return session + raise ValueError(f"No local session found for session_id '{session_id}'") + + if transcript_path: + for session in sessions: + if session.transcript_path == transcript_path: + return session + raise ValueError(f"No local session found for transcript_path '{transcript_path}'") + + return max(sessions, key=_session_sort_key) + + +def _session_sort_key(session: LocalSession) -> tuple[float, str]: + return (_session_mtime(session), session.session_id) + + +def _session_mtime(session: LocalSession) -> float: + path = _session_source_path(session.transcript_path) + try: + return path.stat().st_mtime + except OSError: + return 0.0 + + +def _session_source_path(transcript_path: str) -> Path: + path = Path(transcript_path) + if path.exists(): + return path + if "::" in transcript_path: + raw_path = transcript_path.rsplit("::", 1)[0] + return Path(raw_path) + return path + + +def _build_live_session_signal_summary( + session: LocalSession, + metadata: SessionMetadata, +) -> LiveSessionSignalsResponse: + messages = metadata.messages or [] + execution_evidence = extract_execution_evidence(messages) + recovery_path = extract_recovery_path(messages, execution_evidence) + + failed_execution = [record for record in execution_evidence if record.status == "failed"] + failed_verifications = [ + record + for record in failed_execution + if record.evidence_type in {"test", "lint", "build", "verification"} + ] + tool_error_count = _count_tool_errors(messages) + satisfaction_signal = _infer_satisfaction_signal(messages) + recovery_loop = bool( + recovery_path + and recovery_path.recovery_result != "recovered" + and recovery_path.recovery_step_count >= 3 + ) + + signals: list[LiveSessionSignal] = [] + + if failed_verifications: + severity = "critical" if len(failed_verifications) >= 2 else "warning" + signals.append( + LiveSessionSignal( + signal_type="verification_failure", + severity=severity, + title="Verification is failing", + detail=( + f"Recent test/build/lint commands have failed {len(failed_verifications)} " + f"time{'s' if len(failed_verifications) != 1 else ''}." + ), + evidence={ + "failure_count": len(failed_verifications), + "last_failure_type": failed_verifications[-1].evidence_type, + }, + ) + ) + + if tool_error_count >= 2: + signals.append( + LiveSessionSignal( + signal_type="tool_error_cluster", + severity="warning", + title="Tool errors are stacking up", + detail=( + f"{tool_error_count} recent tool results look error-like. " + "This session may be fighting the toolchain, not the task." + ), + evidence={"tool_error_count": tool_error_count}, + ) + ) + + if recovery_loop: + assert recovery_path is not None + signals.append( + LiveSessionSignal( + signal_type="recovery_loop", + severity="critical", + title="Recovery loop detected", + detail=( + f"The session has {recovery_path.recovery_step_count} recovery steps " + "without a clear resolution yet." + ), + evidence={ + "recovery_step_count": recovery_path.recovery_step_count, + "recovery_result": recovery_path.recovery_result, + }, + ) + ) + + if satisfaction_signal == "negative": + signals.append( + LiveSessionSignal( + signal_type="negative_sentiment", + severity="warning", + title="User sentiment looks frustrated", + detail=("Recent user messages suggest the current approach still is not landing."), + evidence={"satisfaction_signal": satisfaction_signal}, + ) + ) + elif satisfaction_signal == "positive" and not signals: + signals.append( + LiveSessionSignal( + signal_type="positive_momentum", + severity="info", + title="Session looks healthy", + detail="Recent interaction signals look positive and unblocked.", + evidence={"satisfaction_signal": satisfaction_signal}, + ) + ) + + if not signals: + signals.append( + LiveSessionSignal( + signal_type="steady_state", + severity="info", + title="No acute live risks detected", + detail="No strong friction or abandonment signals are standing out right now.", + evidence={}, + ) + ) + + risk_score = 0 + if len(failed_verifications) >= 2: + risk_score += 2 + elif failed_verifications: + risk_score += 1 + if tool_error_count >= 2: + risk_score += 1 + if recovery_loop: + risk_score += 1 + if satisfaction_signal == "negative": + risk_score += 1 + elif satisfaction_signal == "positive": + risk_score -= 1 + + if risk_score >= 3: + risk_level = "high" + elif risk_score >= 1: + risk_level = "medium" + else: + risk_level = "low" + + return LiveSessionSignalsResponse( + session_id=metadata.session_id or session.session_id, + agent_type=metadata.agent_type or session.agent_type, + project_name=metadata.project_name or session.project_path, + started_at=metadata.started_at, + total_messages=len(messages), + risk_level=risk_level, + satisfaction_signal=satisfaction_signal, + signals=signals, + ) + + +def _count_tool_errors(messages: list[object]) -> int: + count = 0 + for message in messages: + payload = message_payload(message) + if payload is None: + continue + tool_results = payload.get("tool_results") + if not isinstance(tool_results, list): + continue + for tool_result in tool_results: + if not isinstance(tool_result, dict): + continue + tool_name = normalize_tool_name(tool_result.get("name")) + if tool_name in _EXECUTION_TOOL_NAMES: + continue + output_preview = tool_result.get("output_preview") + if not isinstance(output_preview, str): + continue + status = _classify_status(output_preview) + if status == "passed": + continue + if status == "failed": + count += 1 + continue + normalized = output_preview.lower() + if any(keyword in normalized for keyword in _TOOL_ERROR_KEYWORDS): + count += 1 + continue + if tool_name and "error" in tool_name: + count += 1 + return count + + +def _infer_satisfaction_signal(messages: list[object]) -> str: + negative = 0 + positive = 0 + + recent_user_text = list(_recent_user_messages(messages, limit=6)) + for text in recent_user_text: + normalized = text.lower() + if any(phrase in normalized for phrase in _NEGATIVE_SATISFACTION_PHRASES): + negative += 1 + if any(phrase in normalized for phrase in _POSITIVE_SATISFACTION_PHRASES): + positive += 1 + + if negative > positive and negative > 0: + return "negative" + if positive > negative and positive > 0: + return "positive" + if positive or negative: + return "neutral" + return "unknown" + + +def _recent_user_messages(messages: list[object], *, limit: int) -> list[str]: + user_text: list[str] = [] + for message in reversed(messages): + payload = message_payload(message) + if payload is None: + continue + role = payload.get("role") + if role not in {"human", "user"}: + continue + content = payload.get("content_text") + if isinstance(content, str) and content.strip(): + user_text.append(content.strip()) + if len(user_text) >= limit: + break + user_text.reverse() + return user_text diff --git a/tests/test_live_session_signals.py b/tests/test_live_session_signals.py new file mode 100644 index 0000000..0c1eb90 --- /dev/null +++ b/tests/test_live_session_signals.py @@ -0,0 +1,202 @@ +import os +from datetime import UTC, datetime +from types import SimpleNamespace + +from primer.hook.extractor import SessionMetadata +from primer.mcp.reader import LocalSession +from primer.server.services.live_session_signal_service import get_live_session_signals + + +def test_live_session_signals_selects_latest_session(monkeypatch, tmp_path): + older_path = tmp_path / "older.jsonl" + newer_path = tmp_path / "newer.jsonl" + older_path.write_text("{}\n") + newer_path.write_text("{}\n") + os.utime(older_path, (1, 1)) + os.utime(newer_path, (2, 2)) + + sessions = [ + LocalSession( + session_id="old-session", + transcript_path=str(older_path), + facets_path=None, + has_facets=False, + project_path="/repo/old-project", + agent_type="claude_code", + ), + LocalSession( + session_id="new-session", + transcript_path=str(newer_path), + facets_path=None, + has_facets=False, + project_path="/repo/new-project", + agent_type="claude_code", + ), + ] + monkeypatch.setattr( + "primer.server.services.live_session_signal_service.list_local_sessions", + lambda: sessions, + ) + + now = datetime.now(tz=UTC) + + def fake_extract(path: str) -> SessionMetadata: + if path == str(newer_path): + return SessionMetadata( + session_id="new-session", + project_name="new-project", + agent_type="claude_code", + started_at=now, + messages=[ + {"ordinal": 0, "role": "human", "content_text": "Debug the auth regression"}, + { + "ordinal": 1, + "role": "assistant", + "tool_calls": [ + {"name": "Bash", "input_preview": "pytest tests/test_auth.py"} + ], + }, + { + "ordinal": 2, + "role": "tool_result", + "tool_results": [{"name": "Bash", "output_preview": "2 failed, 10 passed"}], + }, + {"ordinal": 3, "role": "human", "content_text": "Still broken, try again"}, + { + "ordinal": 4, + "role": "assistant", + "tool_calls": [{"name": "Read", "input_preview": '{"path":"auth.py"}'}], + }, + { + "ordinal": 5, + "role": "assistant", + "tool_calls": [{"name": "Edit", "input_preview": '{"path":"auth.py"}'}], + }, + { + "ordinal": 6, + "role": "assistant", + "tool_calls": [ + {"name": "Bash", "input_preview": "pytest tests/test_auth.py"} + ], + }, + { + "ordinal": 7, + "role": "tool_result", + "tool_results": [{"name": "Bash", "output_preview": "1 failed"}], + }, + { + "ordinal": 8, + "role": "assistant", + "tool_calls": [{"name": "github_search", "input_preview": "{}"}], + }, + { + "ordinal": 9, + "role": "tool_result", + "tool_results": [ + { + "name": "github_search", + "output_preview": "Tool error: timeout", + } + ], + }, + { + "ordinal": 10, + "role": "assistant", + "tool_calls": [{"name": "jira_lookup", "input_preview": "{}"}], + }, + { + "ordinal": 11, + "role": "tool_result", + "tool_results": [ + { + "name": "jira_lookup", + "output_preview": "Request failed with exception", + } + ], + }, + ], + ) + return SessionMetadata( + session_id="old-session", + project_name="old-project", + agent_type="claude_code", + started_at=now, + messages=[ + {"ordinal": 0, "role": "human", "content_text": "Great, that fixed it. Thanks!"} + ], + ) + + monkeypatch.setattr( + "primer.server.services.live_session_signal_service.get_extractor_for", + lambda _agent_type: SimpleNamespace(extract=fake_extract), + ) + + response = get_live_session_signals() + + assert response.session_id == "new-session" + assert response.project_name == "new-project" + assert response.risk_level == "high" + assert response.satisfaction_signal == "negative" + signal_titles = {signal.title for signal in response.signals} + assert "Verification is failing" in signal_titles + assert "Tool errors are stacking up" in signal_titles + assert "Recovery loop detected" in signal_titles + assert "User sentiment looks frustrated" in signal_titles + + +def test_live_session_signals_support_explicit_session_lookup(monkeypatch, tmp_path): + transcript_path = tmp_path / "healthy.jsonl" + transcript_path.write_text("{}\n") + + monkeypatch.setattr( + "primer.server.services.live_session_signal_service.list_local_sessions", + lambda: [ + LocalSession( + session_id="healthy-session", + transcript_path=str(transcript_path), + facets_path=None, + has_facets=False, + project_path="/repo/healthy-project", + agent_type="claude_code", + ) + ], + ) + monkeypatch.setattr( + "primer.server.services.live_session_signal_service.get_extractor_for", + lambda _agent_type: SimpleNamespace( + extract=lambda _path: SessionMetadata( + session_id="healthy-session", + project_name="healthy-project", + agent_type="claude_code", + started_at=datetime.now(tz=UTC), + messages=[ + {"ordinal": 0, "role": "human", "content_text": "Run the auth tests"}, + { + "ordinal": 1, + "role": "assistant", + "tool_calls": [ + {"name": "Bash", "input_preview": "pytest tests/test_auth.py"} + ], + }, + { + "ordinal": 2, + "role": "tool_result", + "tool_results": [ + {"name": "Bash", "output_preview": "5 passed, 0 failed, 0 errors"} + ], + }, + { + "ordinal": 3, + "role": "human", + "content_text": "Great, that fixed it. Thanks!", + }, + ], + ) + ), + ) + + response = get_live_session_signals(session_id="healthy-session") + + assert response.risk_level == "low" + assert response.satisfaction_signal == "positive" + assert response.signals[0].title == "Session looks healthy" diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 0ff8d8a..b5b891c 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -75,3 +75,17 @@ def test_session_start_coaching_tool(mock_session_start): task_hint="Fix auth regression", days=14, ) + + +@patch("primer.mcp.server.primer_live_session_signals") +def test_live_session_signals_tool(mock_live_signals): + mock_live_signals.return_value = "signals" + + from primer.mcp.server import live_session_signals + + result = live_session_signals(session_id="session-123") + assert result == "signals" + mock_live_signals.assert_called_once_with( + session_id="session-123", + transcript_path=None, + ) diff --git a/tests/test_mcp_tools.py b/tests/test_mcp_tools.py index 63ed143..52c3230 100644 --- a/tests/test_mcp_tools.py +++ b/tests/test_mcp_tools.py @@ -1,4 +1,5 @@ import json +from types import SimpleNamespace from unittest.mock import MagicMock, patch import httpx @@ -273,3 +274,44 @@ def test_primer_session_start_coaching_no_api_key(monkeypatch): result = primer_session_start_coaching(project_name="api-server") assert "Error" in result + + +# --- primer_live_session_signals --- + + +@patch("primer.mcp.tools.get_live_session_signals") +def test_primer_live_session_signals_success(mock_get_signals): + mock_get_signals.return_value = SimpleNamespace( + risk_level="high", + satisfaction_signal="negative", + project_name="api-server", + session_id="session-123", + agent_type="claude_code", + signals=[ + SimpleNamespace( + severity="warning", + title="Verification is failing", + detail="Recent test commands are failing.", + ) + ], + ) + + from primer.mcp.tools import primer_live_session_signals + + result = primer_live_session_signals(session_id="session-123") + assert "Live Session Signals" in result + assert "Verification is failing" in result + mock_get_signals.assert_called_once_with( + session_id="session-123", + transcript_path=None, + ) + + +@patch("primer.mcp.tools.get_live_session_signals") +def test_primer_live_session_signals_error(mock_get_signals): + mock_get_signals.side_effect = ValueError("No local sessions found") + + from primer.mcp.tools import primer_live_session_signals + + result = primer_live_session_signals() + assert "No local sessions found" in result