Skip to content

Commit 9ff656e

Browse files
committed
feat(queue): add remediation playbooks and pagerduty alerts
1 parent f53ed1f commit 9ff656e

File tree

8 files changed

+173
-1
lines changed

8 files changed

+173
-1
lines changed

agent_pm/clients/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from .github_client import GitHubClient, github_client
55
from .jira_client import JiraClient, jira_client
66
from .openai_client import OpenAIClient, openai_client
7+
from .pagerduty_client import PagerDutyClient, pagerduty_client
78
from .slack_client import SlackClient, slack_client
89

910
__all__ = [
@@ -17,4 +18,6 @@
1718
"openai_client",
1819
"SlackClient",
1920
"slack_client",
21+
"PagerDutyClient",
22+
"pagerduty_client",
2023
]
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
"""PagerDuty client for incident notifications."""
2+
3+
from __future__ import annotations
4+
5+
from typing import Any
6+
7+
import httpx
8+
9+
from agent_pm.settings import settings
10+
11+
12+
class PagerDutyClient:
13+
def __init__(self) -> None:
14+
self.routing_key = settings.pagerduty_routing_key
15+
16+
@property
17+
def enabled(self) -> bool:
18+
return bool(self.routing_key)
19+
20+
async def trigger_incident(self, summary: str, source: str = "agent-pm", severity: str = "error", **details: Any) -> dict[str, Any]:
21+
if not self.enabled:
22+
return {"dry_run": True, "summary": summary, "details": details}
23+
24+
payload = {
25+
"routing_key": self.routing_key,
26+
"event_action": "trigger",
27+
"payload": {
28+
"summary": summary,
29+
"source": source,
30+
"severity": severity,
31+
"custom_details": details,
32+
},
33+
}
34+
35+
async with httpx.AsyncClient() as client:
36+
response = await client.post("https://events.pagerduty.com/v2/enqueue", json=payload, timeout=10)
37+
response.raise_for_status()
38+
return response.json()
39+
40+
41+
pagerduty_client = PagerDutyClient()
42+
43+
44+
__all__ = ["pagerduty_client", "PagerDutyClient"]
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
"""Helpers to build queue health dashboards."""
2+
3+
from __future__ import annotations
4+
5+
from dataclasses import dataclass
6+
7+
from ..storage.redis import count_dead_letters
8+
from ..storage.tasks import get_task_queue
9+
10+
11+
@dataclass
12+
class QueueHealth:
13+
queue_name: str
14+
dead_letters: int
15+
auto_triage_enabled: bool
16+
17+
18+
async def gather_queue_health() -> QueueHealth:
19+
queue = await get_task_queue()
20+
client = await queue.get_client() # type: ignore[attr-defined]
21+
dead_letters = await count_dead_letters(client)
22+
auto_triage_enabled = bool(queue)
23+
return QueueHealth(queue_name=getattr(queue, "queue_name", "unknown"), dead_letters=dead_letters, auto_triage_enabled=auto_triage_enabled)

agent_pm/settings.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ def _parse_google_scopes(cls, value):
8282
task_queue_alert_channel: str | None = Field(None, alias="TASK_QUEUE_ALERT_CHANNEL")
8383
task_queue_alert_cooldown_minutes: int = Field(10, alias="TASK_QUEUE_ALERT_COOLDOWN_MINUTES")
8484
task_queue_max_auto_requeues: int = Field(3, alias="TASK_QUEUE_MAX_AUTO_REQUEUES")
85+
task_queue_alert_webhook_url: str | None = Field(None, alias="TASK_QUEUE_ALERT_WEBHOOK_URL")
86+
pagerduty_routing_key: str | None = Field(None, alias="PAGERDUTY_ROUTING_KEY")
87+
task_queue_playbooks: dict[str, str] = Field(default_factory=dict, alias="TASK_QUEUE_PLAYBOOKS")
8588
database_url: str | None = Field("sqlite+aiosqlite:///./data/agent_pm.db", alias="DATABASE_URL")
8689
database_echo: bool = Field(False, alias="DATABASE_ECHO")
8790
redis_url: str = Field("redis://localhost:6379", alias="REDIS_URL")

agent_pm/storage/tasks.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
)
2727
from ..settings import settings
2828
from ..utils.datetime import utc_now
29-
from ..clients.slack_client import slack_client
29+
from ..clients import pagerduty_client, slack_client
30+
from ..tasks.playbooks import run_playbook
3031
from .redis import (
3132
append_dead_letter_audit,
3233
clear_dead_letter,
@@ -444,6 +445,9 @@ async def _send_alert(error_type: str, payload: dict[str, Any]) -> None:
444445
record_task_completion(self.queue_name, TaskStatus.FAILED.value)
445446
record_task_latency(self.queue_name, (utc_now() - start).total_seconds())
446447
identifier = payload.get("metadata", {}).get("workflow_id") or payload.get("name", "unknown")
448+
playbook_name = settings.task_queue_playbooks.get(error_type)
449+
if playbook_name:
450+
await run_playbook(playbook_name, payload, self, error_type)
447451
if _should_auto_requeue(error_type):
448452
key = f"{identifier}:{error_type}"
449453
count = auto_requeue_counts.get(key, 0)
@@ -525,6 +529,9 @@ async def delete_dead_letter(self, task_id: str) -> None:
525529
async def worker_heartbeats(self) -> dict[str, dict[str, Any]]:
526530
return await list_heartbeats(self._redis)
527531

532+
async def get_client(self):
533+
return self._redis
534+
528535
async def requeue_dead_letter(
529536
self,
530537
task_id: str,

agent_pm/tasks/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
"""Task utilities."""
2+
3+
from .playbooks import PLAYBOOKS, run_playbook
4+
5+
__all__ = ["PLAYBOOKS", "run_playbook"]

agent_pm/tasks/playbooks.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
"""Remediation playbooks for automated task recovery."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
from typing import Any, Awaitable, Callable
7+
8+
import httpx
9+
10+
from agent_pm.clients import pagerduty_client, slack_client
11+
from agent_pm.settings import settings
12+
13+
logger = logging.getLogger(__name__)
14+
15+
PlaybookHandler = Callable[[dict[str, Any], Any, str], Awaitable[None]]
16+
17+
18+
async def _notify_slack(payload: dict[str, Any], queue: Any, error_type: str) -> None:
19+
channel = settings.task_queue_alert_channel or slack_client.channel
20+
if not channel:
21+
return
22+
message = (
23+
":rotating_light: Remediation triggered\n"
24+
f"Queue: `{getattr(queue, 'queue_name', 'unknown')}`\n"
25+
f"Error: `{error_type}`\n"
26+
f"Task: `{payload.get('name', 'unknown')}`"
27+
)
28+
if settings.dry_run or not slack_client.enabled:
29+
logger.warning("Remediation Slack notification skipped (dry run)")
30+
return
31+
await slack_client.post_digest(message, channel)
32+
33+
34+
async def _invoke_webhook(payload: dict[str, Any], queue: Any, error_type: str) -> None:
35+
url = settings.task_queue_alert_webhook_url
36+
if not url:
37+
return
38+
body = {
39+
"queue": getattr(queue, "queue_name", "unknown"),
40+
"error_type": error_type,
41+
"task": payload.get("name", "unknown"),
42+
"task_id": payload.get("task_id"),
43+
"metadata": payload.get("metadata", {}),
44+
"playbook": "webhook",
45+
}
46+
async with httpx.AsyncClient() as client:
47+
response = await client.post(url, json=body, timeout=10)
48+
response.raise_for_status()
49+
50+
51+
async def _log_only(payload: dict[str, Any], queue: Any, error_type: str) -> None:
52+
logger.warning(
53+
"Remediation playbook invoked", extra={"queue": getattr(queue, "queue_name", "unknown"), "error": error_type, "task": payload.get("task_id")}
54+
)
55+
56+
57+
async def _notify_pagerduty(payload: dict[str, Any], queue: Any, error_type: str) -> None:
58+
if not pagerduty_client.enabled:
59+
return
60+
summary = f"Auto-remediation triggered for {error_type}"
61+
details = {
62+
"queue": getattr(queue, "queue_name", "unknown"),
63+
"task_id": payload.get("task_id"),
64+
"task_name": payload.get("name"),
65+
"metadata": payload.get("metadata", {}),
66+
}
67+
await pagerduty_client.trigger_incident(summary, severity="error", **details)
68+
69+
70+
PLAYBOOKS: dict[str, PlaybookHandler] = {
71+
"log_only": _log_only,
72+
"notify_slack": _notify_slack,
73+
"webhook": _invoke_webhook,
74+
"notify_pagerduty": _notify_pagerduty,
75+
}
76+
77+
78+
async def run_playbook(name: str, payload: dict[str, Any], queue: Any, error_type: str) -> None:
79+
handler = PLAYBOOKS.get(name)
80+
if not handler:
81+
logger.warning("Unknown playbook requested: %s", name)
82+
return
83+
try:
84+
await handler(payload, queue, error_type)
85+
except Exception as exc: # pragma: no cover - defensive logging
86+
logger.exception("Playbook %s failed: %s", name, exc)

tests/storage/test_redis_worker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ async def test_auto_triage_requeues_and_alerts(redis_queue, monkeypatch):
154154
monkeypatch.setattr(tasks_module.settings, "task_queue_alert_channel", "alerts")
155155
monkeypatch.setattr(tasks_module.settings, "task_queue_alert_cooldown_minutes", 0)
156156
monkeypatch.setattr(tasks_module.settings, "task_queue_max_auto_requeues", 2)
157+
monkeypatch.setattr(tasks_module.settings, "task_queue_playbooks", {"RuntimeError": "log_only"})
157158

158159
tasks_module.slack_client.token = "token"
159160
tasks_module.slack_client.channel = "alerts"

0 commit comments

Comments
 (0)