Skip to content

Commit 505abaa

Browse files
committed
feat(queue): enrich dead-letter insights
1 parent 0f6015c commit 505abaa

File tree

6 files changed

+179
-25
lines changed

6 files changed

+179
-25
lines changed

agent_pm/observability/metrics.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,32 @@
55
from contextlib import contextmanager
66
from time import perf_counter
77

8-
from prometheus_client import Counter, Histogram, Summary, generate_latest
8+
from prometheus_client import Counter, Gauge, Histogram, Summary, generate_latest
9+
10+
11+
dead_letter_recorded_total = Counter(
12+
"task_dead_letter_recorded_total",
13+
"Dead-letter entries recorded",
14+
labelnames=("queue", "error_type"),
15+
)
16+
17+
dead_letter_requeued_total = Counter(
18+
"task_dead_letter_requeued_total",
19+
"Dead-letter entries requeued",
20+
labelnames=("queue", "error_type"),
21+
)
22+
23+
dead_letter_purged_total = Counter(
24+
"task_dead_letter_purged_total",
25+
"Dead-letter entries purged",
26+
labelnames=("queue", "mode"),
27+
)
28+
29+
dead_letter_active_gauge = Gauge(
30+
"task_dead_letter_active",
31+
"Current count of dead-letter entries",
32+
labelnames=("queue",),
33+
)
934

1035
planner_requests_total = Counter(
1136
"planner_requests_total",
@@ -210,4 +235,8 @@ def latest_metrics() -> bytes:
210235
"record_task_completion",
211236
"record_task_latency",
212237
"latest_metrics",
238+
"dead_letter_recorded_total",
239+
"dead_letter_requeued_total",
240+
"dead_letter_purged_total",
241+
"dead_letter_active_gauge",
213242
]

agent_pm/storage/redis.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import json
66
import logging
77
import uuid
8-
from datetime import datetime, timezone
8+
from datetime import UTC, datetime
99
from typing import Any
1010

1111
import redis.asyncio as redis
@@ -73,7 +73,7 @@ async def get_task_result(client: redis.Redis, task_id: str) -> dict[str, Any] |
7373
async def record_dead_letter(client: redis.Redis, payload: dict[str, Any]) -> None:
7474
task_id = payload.get("task_id", uuid.uuid4().hex)
7575
if "recorded_at" not in payload:
76-
payload["recorded_at"] = datetime.now(timezone.utc).isoformat()
76+
payload["recorded_at"] = datetime.now(UTC).isoformat()
7777
await client.hset(_dead_letter_key(), task_id, json.dumps(payload))
7878

7979

@@ -89,21 +89,35 @@ async def get_dead_letter(client: redis.Redis, task_id: str) -> dict[str, Any] |
8989
return data
9090

9191

92-
async def fetch_dead_letters(client: redis.Redis, limit: int = 100) -> list[dict[str, Any]]:
92+
async def fetch_dead_letters(
93+
client: redis.Redis,
94+
*,
95+
limit: int | None = 100,
96+
offset: int = 0,
97+
include_total: bool = False,
98+
) -> tuple[list[dict[str, Any]], int] | list[dict[str, Any]]:
9399
items = await client.hgetall(_dead_letter_key())
94100
tasks: list[dict[str, Any]] = []
95-
for task_id, value in items.items():
96-
if len(tasks) >= limit:
101+
entries = list(items.items())
102+
for task_id, value in entries[offset:]:
103+
if limit is not None and len(tasks) >= limit:
97104
break
98105
try:
99106
data = json.loads(value)
100107
data.setdefault("task_id", task_id)
101108
tasks.append(data)
102109
except json.JSONDecodeError:
103110
tasks.append({"task_id": task_id, "raw": value})
111+
total = len(entries)
112+
if include_total:
113+
return tasks, total
104114
return tasks
105115

106116

117+
async def count_dead_letters(client: redis.Redis) -> int:
118+
return await client.hlen(_dead_letter_key())
119+
120+
107121
async def clear_dead_letter(client: redis.Redis, task_id: str) -> None:
108122
await client.hdel(_dead_letter_key(), task_id)
109123

agent_pm/storage/tasks.py

Lines changed: 79 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,20 @@
44

55
import asyncio
66
import logging
7+
import traceback
78
import uuid
89
from collections import deque
910
from collections.abc import Callable, Coroutine
10-
from dataclasses import dataclass
11+
from dataclasses import dataclass, field
1112
from datetime import datetime, timedelta
1213
from enum import Enum
1314
from typing import Any
1415

1516
from ..observability.metrics import (
17+
dead_letter_active_gauge,
18+
dead_letter_purged_total,
19+
dead_letter_recorded_total,
20+
dead_letter_requeued_total,
1621
record_task_completion,
1722
record_task_enqueued,
1823
record_task_latency,
@@ -21,6 +26,7 @@
2126
from ..utils.datetime import utc_now
2227
from .redis import (
2328
clear_dead_letter,
29+
count_dead_letters,
2430
enqueue_task as redis_enqueue_task,
2531
fetch_dead_letters,
2632
get_dead_letter,
@@ -53,6 +59,7 @@ class Task:
5359
coro_fn: Callable[..., Coroutine[Any, Any, Any]]
5460
args: tuple
5561
kwargs: dict
62+
metadata: dict[str, Any] = field(default_factory=dict)
5663
status: TaskStatus = TaskStatus.PENDING
5764
created_at: datetime = None # type: ignore[assignment]
5865
started_at: datetime | None = None
@@ -104,6 +111,7 @@ async def enqueue(
104111
coro_fn: Callable[..., Coroutine[Any, Any, Any]],
105112
*args: Any,
106113
max_retries: int = 3,
114+
metadata: dict[str, Any] | None = None,
107115
**kwargs: Any,
108116
) -> str:
109117
"""Enqueue a task and return task ID."""
@@ -115,6 +123,7 @@ async def enqueue(
115123
args=args,
116124
kwargs=kwargs,
117125
max_retries=max_retries,
126+
metadata=metadata or {},
118127
)
119128
async with self._lock:
120129
self.queue.append(task)
@@ -199,8 +208,14 @@ async def _execute_task(self, task: Task):
199208
record_task_latency(self.queue_name, (task.completed_at - task.started_at).total_seconds())
200209
logger.error("Task permanently failed: %s (id=%s)", task.name, task.task_id)
201210

202-
async def list_dead_letters(self, limit: int = 100) -> list[dict[str, Any]]:
203-
return []
211+
async def list_dead_letters(
212+
self,
213+
limit: int = 100,
214+
offset: int = 0,
215+
workflow_id: str | None = None,
216+
error_type: str | None = None,
217+
) -> tuple[list[dict[str, Any]], int]:
218+
return [], 0
204219

205220
async def delete_dead_letter(self, task_id: str) -> None:
206221
return None
@@ -254,6 +269,7 @@ async def enqueue( # type: ignore[override]
254269
coro_fn: Callable[..., Coroutine[Any, Any, Any]],
255270
*args: Any,
256271
max_retries: int = 3,
272+
metadata: dict[str, Any] | None = None,
257273
**kwargs: Any,
258274
) -> str:
259275
self.register(name, coro_fn)
@@ -264,6 +280,7 @@ async def enqueue( # type: ignore[override]
264280
"kwargs": kwargs,
265281
"max_retries": max_retries,
266282
"enqueued_at": utc_now().isoformat(),
283+
"metadata": metadata or {},
267284
}
268285
await redis_enqueue_task(self._redis, name, payload)
269286
record_task_enqueued(self.queue_name)
@@ -289,10 +306,20 @@ async def _worker(self, worker_id: int):
289306
coro_fn = self._registry.get(name)
290307
if coro_fn is None:
291308
logger.error("No registered task callable for %s", name)
292-
await record_dead_letter(
293-
self._redis,
294-
{**payload, "error": "missing_callable", "worker_id": worker_id},
295-
)
309+
payload.setdefault("metadata", {})
310+
payload["error_type"] = "MissingCallable"
311+
payload["error_message"] = f"Task callable not registered: {name}"
312+
payload["stack_trace"] = None
313+
payload["worker_id"] = worker_id
314+
await record_dead_letter(self._redis, payload)
315+
dead_letter_recorded_total.labels(
316+
queue=self.queue_name,
317+
error_type="TimeoutError",
318+
).inc()
319+
dead_letter_recorded_total.labels(
320+
queue=self.queue_name,
321+
error_type=payload.get("error_type", payload.get("last_error", "unknown")),
322+
).inc()
296323
record_task_completion(self.queue_name, TaskStatus.FAILED.value)
297324
continue
298325

@@ -305,21 +332,35 @@ async def _worker(self, worker_id: int):
305332
coro_fn(*payload.get("args", ()), **payload.get("kwargs", {})),
306333
timeout=self._task_timeout,
307334
)
308-
except asyncio.TimeoutError:
335+
except TimeoutError:
336+
payload.setdefault("metadata", {})
309337
payload["retry_count"] = retries + 1
310338
payload["last_error"] = "timeout"
339+
payload["error_type"] = "TimeoutError"
340+
payload["error_message"] = (
341+
f"Task execution exceeded timeout of {self._task_timeout} seconds"
342+
)
343+
payload["stack_trace"] = None
311344
payload["worker_id"] = worker_id
312345
await record_dead_letter(self._redis, payload)
313346
record_task_completion(self.queue_name, TaskStatus.FAILED.value)
314347
record_task_latency(self.queue_name, (utc_now() - start).total_seconds())
315348
continue
316349
except Exception as exc: # pylint: disable=broad-except
350+
payload.setdefault("metadata", {})
317351
retries += 1
318352
payload["retry_count"] = retries
319353
payload["last_error"] = str(exc)
320354
if retries >= max_retries:
355+
payload["error_type"] = exc.__class__.__name__
356+
payload["error_message"] = str(exc)
357+
payload["stack_trace"] = traceback.format_exc()
321358
payload["worker_id"] = worker_id
322359
await record_dead_letter(self._redis, payload)
360+
dead_letter_recorded_total.labels(
361+
queue=self.queue_name,
362+
error_type=payload.get("error_type", "unknown"),
363+
).inc()
323364
record_task_completion(self.queue_name, TaskStatus.FAILED.value)
324365
record_task_latency(self.queue_name, (utc_now() - start).total_seconds())
325366
continue
@@ -343,8 +384,25 @@ async def _worker(self, worker_id: int):
343384

344385
logger.info("Redis worker %d stopped", worker_id)
345386

346-
async def list_dead_letters(self, limit: int = 100) -> list[dict[str, Any]]:
347-
return await fetch_dead_letters(self._redis, limit)
387+
async def list_dead_letters(
388+
self,
389+
limit: int = 100,
390+
offset: int = 0,
391+
workflow_id: str | None = None,
392+
error_type: str | None = None,
393+
) -> tuple[list[dict[str, Any]], int]:
394+
items, total_raw = await fetch_dead_letters(self._redis, limit=None, include_total=True)
395+
filtered: list[dict[str, Any]] = []
396+
for item in items:
397+
if workflow_id and item.get("metadata", {}).get("workflow_id") != workflow_id:
398+
continue
399+
if error_type and item.get("error_type") != error_type:
400+
continue
401+
filtered.append(item)
402+
total_filtered = len(filtered)
403+
window = filtered[offset : offset + limit]
404+
dead_letter_active_gauge.labels(queue=self.queue_name).set(total_filtered)
405+
return window, total_filtered
348406

349407
async def delete_dead_letter(self, task_id: str) -> None:
350408
await clear_dead_letter(self._redis, task_id)
@@ -363,6 +421,9 @@ async def requeue_dead_letter(self, task_id: str) -> dict[str, Any] | None:
363421
payload["requeued_at"] = utc_now().isoformat()
364422
await redis_enqueue_task(self._redis, payload.get("name", "unknown"), payload)
365423
record_task_enqueued(self.queue_name)
424+
dead_letter_requeued_total.labels(
425+
queue=self.queue_name, error_type=payload.get("error_type", "unknown")
426+
).inc()
366427
logger.info("Dead-letter task requeued: %s", task_id)
367428
return payload
368429

@@ -371,9 +432,15 @@ async def get_dead_letter(self, task_id: str) -> dict[str, Any] | None:
371432

372433
async def purge_dead_letters(self, *, older_than: timedelta | None = None) -> int:
373434
if older_than is None:
374-
return await purge_dead_letters(self._redis)
435+
deleted = await purge_dead_letters(self._redis)
436+
dead_letter_purged_total.labels(queue=self.queue_name, mode="all").inc(deleted)
437+
dead_letter_active_gauge.labels(queue=self.queue_name).set(await count_dead_letters(self._redis))
438+
return deleted
375439
cutoff = utc_now() - older_than
376-
return await purge_dead_letters(self._redis, older_than=cutoff)
440+
deleted = await purge_dead_letters(self._redis, older_than=cutoff)
441+
dead_letter_purged_total.labels(queue=self.queue_name, mode="age_filter").inc(deleted)
442+
dead_letter_active_gauge.labels(queue=self.queue_name).set(await count_dead_letters(self._redis))
443+
return deleted
377444

378445
_task_queue = RedisTaskQueue(max_workers=settings.task_queue_workers)
379446
else:

app.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -456,11 +456,33 @@ async def list_tasks(status: str | None = None, limit: int = 50, _admin_key: Adm
456456

457457

458458
@app.get("/tasks/dead-letter")
459-
async def list_dead_letter(limit: int = 50, _admin_key: AdminKeyDep = None) -> dict[str, Any]:
459+
async def list_dead_letter(
460+
limit: int = 50,
461+
offset: int = 0,
462+
workflow_id: str | None = None,
463+
error_type: str | None = None,
464+
_admin_key: AdminKeyDep = None,
465+
) -> dict[str, Any]:
460466
if not _task_queue:
461467
raise HTTPException(status_code=503, detail="Task queue not initialized")
462-
items = await _task_queue.list_dead_letters(limit)
463-
return {"dead_letter": items, "total": len(items)}
468+
items, total = await _task_queue.list_dead_letters(
469+
limit=limit, offset=offset, workflow_id=workflow_id, error_type=error_type
470+
)
471+
return {
472+
"dead_letter": [
473+
{
474+
**item,
475+
"metadata": item.get("metadata", {}),
476+
"error_type": item.get("error_type"),
477+
"error_message": item.get("error_message"),
478+
"stack_trace": item.get("stack_trace"),
479+
}
480+
for item in items
481+
],
482+
"total": total,
483+
"limit": limit,
484+
"offset": offset,
485+
}
464486

465487

466488
@app.get("/tasks/dead-letter/{task_id}")

tests/storage/test_redis.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ async def test_dead_letter_and_heartbeat_helpers():
9696
payload = {"task_id": "abc", "name": "job", "args": [], "kwargs": {}}
9797
await redis.record_dead_letter(client, payload)
9898
records = await redis.fetch_dead_letters(client)
99+
assert isinstance(records, list)
99100
assert records[0]["task_id"] == "abc"
100101

101102
stored = await redis.get_dead_letter(client, "abc")

0 commit comments

Comments
 (0)