Skip to content

Commit ec034d0

Browse files
committed
feat(queue): add auto triage handling
1 parent 5ff7034 commit ec034d0

File tree

4 files changed

+131
-1
lines changed

4 files changed

+131
-1
lines changed

agent_pm/observability/metrics.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@
3838
labelnames=("queue", "error_type"),
3939
)
4040

41+
dead_letter_alert_total = Counter(
42+
"task_dead_letter_alert_total",
43+
"Dead-letter alert notifications",
44+
labelnames=("queue", "error_type"),
45+
)
46+
4147
planner_requests_total = Counter(
4248
"planner_requests_total",
4349
"Total planner invocations",
@@ -246,4 +252,5 @@ def latest_metrics() -> bytes:
246252
"dead_letter_purged_total",
247253
"dead_letter_active_gauge",
248254
"dead_letter_auto_requeue_total",
255+
"dead_letter_alert_total",
249256
]

agent_pm/settings.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ def _parse_google_scopes(cls, value):
7676
task_queue_retry_backoff_base: float = Field(2.0, alias="TASK_QUEUE_RETRY_BACKOFF_BASE")
7777
task_queue_retry_backoff_max: float = Field(60.0, alias="TASK_QUEUE_RETRY_BACKOFF_MAX")
7878
task_queue_worker_heartbeat_ttl: int = Field(60, alias="TASK_QUEUE_WORKER_HEARTBEAT_TTL")
79+
task_queue_auto_requeue_errors: list[str] = Field(default_factory=list, alias="TASK_QUEUE_AUTO_REQUEUE_ERRORS")
80+
task_queue_alert_threshold: int = Field(5, alias="TASK_QUEUE_ALERT_THRESHOLD")
81+
task_queue_alert_window_minutes: int = Field(5, alias="TASK_QUEUE_ALERT_WINDOW_MINUTES")
82+
task_queue_alert_channel: str | None = Field(None, alias="TASK_QUEUE_ALERT_CHANNEL")
7983
database_url: str | None = Field("sqlite+aiosqlite:///./data/agent_pm.db", alias="DATABASE_URL")
8084
database_echo: bool = Field(False, alias="DATABASE_ECHO")
8185
redis_url: str = Field("redis://localhost:6379", alias="REDIS_URL")

agent_pm/storage/tasks.py

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
from ..observability.metrics import (
1717
dead_letter_active_gauge,
18+
dead_letter_alert_total,
1819
dead_letter_auto_requeue_total,
1920
dead_letter_purged_total,
2021
dead_letter_recorded_total,
@@ -318,7 +319,50 @@ async def pop(self) -> dict[str, Any] | None:
318319

319320
async def _worker(self, worker_id: int):
320321
logger.info("Redis worker %d started", worker_id)
322+
recent_failures: dict[str, list[datetime]] = {}
323+
321324
while self.running:
325+
auto_errors = set(settings.task_queue_auto_requeue_errors)
326+
alert_threshold = settings.task_queue_alert_threshold
327+
alert_window = timedelta(minutes=settings.task_queue_alert_window_minutes)
328+
alert_channel = settings.task_queue_alert_channel or settings.slack_status_channel
329+
330+
def _should_auto_requeue(err_type: str | None) -> bool:
331+
if not err_type:
332+
return False
333+
return err_type in auto_errors
334+
335+
def _record_failure(err_type: str | None, task_identifier: str) -> bool:
336+
if not err_type:
337+
return False
338+
now = utc_now()
339+
key = f"{err_type}:{task_identifier}"
340+
entries = recent_failures.setdefault(key, [])
341+
entries.append(now)
342+
cutoff = now - alert_window
343+
recent_failures[key] = [ts for ts in entries if ts >= cutoff]
344+
return len(recent_failures[key]) >= alert_threshold
345+
346+
async def _send_alert(error_type: str, payload: dict[str, Any]) -> None:
347+
if not alert_channel:
348+
return
349+
if settings.dry_run or not slack_client.enabled:
350+
logger.warning(
351+
"Slack alert skipped (dry run): %s", error_type
352+
)
353+
return
354+
body = (
355+
f":rotating_light: Dead-letter threshold exceeded\n"
356+
f"Queue: `{self.queue_name}`\n"
357+
f"Error: `{error_type}`\n"
358+
f"Task: `{payload.get('name', 'unknown')}`"
359+
)
360+
try:
361+
await slack_client.post_digest(body, alert_channel)
362+
dead_letter_alert_total.labels(queue=self.queue_name, error_type=error_type).inc()
363+
except Exception as exc: # pragma: no cover - logging
364+
logger.error("Failed to send Slack alert: %s", exc)
365+
322366
payload = await self.pop()
323367
if not payload:
324368
await asyncio.sleep(self._poll_interval)
@@ -383,13 +427,23 @@ async def _worker(self, worker_id: int):
383427
payload["error_message"] = str(exc)
384428
payload["stack_trace"] = traceback.format_exc()
385429
payload["worker_id"] = worker_id
430+
error_type = payload.get("error_type", "unknown")
386431
await record_dead_letter(self._redis, payload)
387432
dead_letter_recorded_total.labels(
388433
queue=self.queue_name,
389-
error_type=payload.get("error_type", "unknown"),
434+
error_type=error_type,
390435
).inc()
391436
record_task_completion(self.queue_name, TaskStatus.FAILED.value)
392437
record_task_latency(self.queue_name, (utc_now() - start).total_seconds())
438+
identifier = payload.get("metadata", {}).get("workflow_id") or payload.get("name", "unknown")
439+
if _should_auto_requeue(error_type):
440+
await self.requeue_dead_letter(
441+
task_id,
442+
automatic=True,
443+
notify=False,
444+
)
445+
if _record_failure(error_type, identifier):
446+
await _send_alert(error_type, payload)
393447
continue
394448

395449
backoff = min(

tests/storage/test_redis_worker.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import pytest
55
import pytest_asyncio
66

7+
from agent_pm.observability.metrics import dead_letter_alert_total, dead_letter_auto_requeue_total
78
from agent_pm.settings import settings
89
from agent_pm.storage import redis as redis_helpers
910
import agent_pm.storage.tasks as tasks_module
@@ -66,6 +67,10 @@ async def redis_queue(monkeypatch):
6667
monkeypatch.setattr(settings, "task_queue_retry_backoff_base", 1.1)
6768
monkeypatch.setattr(settings, "task_queue_retry_backoff_max", 0.05)
6869
monkeypatch.setattr(settings, "task_queue_task_timeout", 1)
70+
monkeypatch.setattr(settings, "task_queue_auto_requeue_errors", [])
71+
monkeypatch.setattr(settings, "task_queue_alert_threshold", 10)
72+
monkeypatch.setattr(settings, "task_queue_alert_window_minutes", 5)
73+
monkeypatch.setattr(settings, "task_queue_alert_channel", None)
6974

7075
async def fake_client():
7176
return fake
@@ -136,3 +141,63 @@ async def fail_task() -> None:
136141

137142
queued_len = await fake.llen("agent_pm:tasks")
138143
assert queued_len >= 1
144+
145+
146+
@pytest.mark.asyncio
147+
async def test_auto_triage_requeues_and_alerts(redis_queue, monkeypatch):
148+
queue, fake = redis_queue
149+
150+
monkeypatch.setattr(tasks_module.settings, "dry_run", False)
151+
monkeypatch.setattr(tasks_module.settings, "task_queue_auto_requeue_errors", ["RuntimeError"])
152+
monkeypatch.setattr(tasks_module.settings, "task_queue_alert_threshold", 1)
153+
monkeypatch.setattr(tasks_module.settings, "task_queue_alert_window_minutes", 5)
154+
monkeypatch.setattr(tasks_module.settings, "task_queue_alert_channel", "alerts")
155+
156+
tasks_module.slack_client.token = "token"
157+
tasks_module.slack_client.channel = "alerts"
158+
159+
calls: list[dict[str, str]] = []
160+
161+
async def fake_digest(body_md: str, channel: str | None = None):
162+
calls.append({"body": body_md, "channel": channel or ""})
163+
return {"ok": True}
164+
165+
monkeypatch.setattr(tasks_module.slack_client, "post_digest", fake_digest)
166+
167+
try:
168+
dead_letter_auto_requeue_total.remove("redis", "RuntimeError")
169+
except KeyError:
170+
pass
171+
try:
172+
dead_letter_alert_total.remove("redis", "RuntimeError")
173+
except KeyError:
174+
pass
175+
176+
state = {"count": 0}
177+
178+
async def flaky_task() -> str:
179+
state["count"] += 1
180+
if state["count"] == 1:
181+
raise RuntimeError("transient")
182+
return "ok"
183+
184+
metadata = {"workflow_id": "wf-1"}
185+
task_id = await queue.enqueue("flaky", flaky_task, max_retries=1, metadata=metadata)
186+
187+
result = None
188+
for _ in range(100):
189+
result = await redis_helpers.get_task_result(fake, task_id)
190+
if result:
191+
break
192+
await asyncio.sleep(0.05)
193+
194+
assert result is not None
195+
assert result["status"] == "completed"
196+
assert result["result"] == "ok"
197+
198+
auto_metric = dead_letter_auto_requeue_total.labels(queue="redis", error_type="RuntimeError")._value.get()
199+
alert_metric = dead_letter_alert_total.labels(queue="redis", error_type="RuntimeError")._value.get()
200+
201+
assert auto_metric == 1
202+
assert alert_metric == 1
203+
assert calls

0 commit comments

Comments
 (0)