Skip to content

Commit 5ff7034

Browse files
committed
feat(queue): track auto requeue metrics
1 parent f8a9cd7 commit 5ff7034

File tree

3 files changed

+30
-5
lines changed

3 files changed

+30
-5
lines changed

agent_pm/observability/metrics.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@
3232
labelnames=("queue",),
3333
)
3434

35+
dead_letter_auto_requeue_total = Counter(
36+
"task_dead_letter_auto_requeue_total",
37+
"Dead-letter entries automatically requeued",
38+
labelnames=("queue", "error_type"),
39+
)
40+
3541
planner_requests_total = Counter(
3642
"planner_requests_total",
3743
"Total planner invocations",
@@ -239,4 +245,5 @@ def latest_metrics() -> bytes:
239245
"dead_letter_requeued_total",
240246
"dead_letter_purged_total",
241247
"dead_letter_active_gauge",
248+
"dead_letter_auto_requeue_total",
242249
]

agent_pm/storage/tasks.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
from ..observability.metrics import (
1717
dead_letter_active_gauge,
18+
dead_letter_auto_requeue_total,
1819
dead_letter_purged_total,
1920
dead_letter_recorded_total,
2021
dead_letter_requeued_total,
@@ -24,6 +25,7 @@
2425
)
2526
from ..settings import settings
2627
from ..utils.datetime import utc_now
28+
from ..clients.slack_client import slack_client
2729
from .redis import (
2830
append_dead_letter_audit,
2931
clear_dead_letter,
@@ -408,7 +410,9 @@ async def _worker(self, worker_id: int):
408410
"name": name,
409411
"completed_at": utc_now().isoformat(),
410412
}
411-
await write_heartbeat(self._redis, f"worker:{worker_id}", heartbeat_payload, self._heartbeat_ttl)
413+
await write_heartbeat(
414+
self._redis, f"worker:{worker_id}", heartbeat_payload, self._heartbeat_ttl
415+
)
412416

413417
logger.info("Redis worker %d stopped", worker_id)
414418

@@ -453,7 +457,13 @@ async def delete_dead_letter(self, task_id: str) -> None:
453457
async def worker_heartbeats(self) -> dict[str, dict[str, Any]]:
454458
return await list_heartbeats(self._redis)
455459

456-
async def requeue_dead_letter(self, task_id: str) -> dict[str, Any] | None:
460+
async def requeue_dead_letter(
461+
self,
462+
task_id: str,
463+
*,
464+
automatic: bool = False,
465+
notify: bool = True,
466+
) -> dict[str, Any] | None:
457467
payload = await get_dead_letter(self._redis, task_id)
458468
if payload is None:
459469
return None
@@ -464,9 +474,8 @@ async def requeue_dead_letter(self, task_id: str) -> dict[str, Any] | None:
464474
payload["requeued_at"] = utc_now().isoformat()
465475
await redis_enqueue_task(self._redis, payload.get("name", "unknown"), payload)
466476
record_task_enqueued(self.queue_name)
467-
dead_letter_requeued_total.labels(
468-
queue=self.queue_name, error_type=payload.get("error_type", "unknown")
469-
).inc()
477+
metric = dead_letter_auto_requeue_total if automatic else dead_letter_requeued_total
478+
metric.labels(queue=self.queue_name, error_type=payload.get("error_type", "unknown")).inc()
470479
logger.info("Dead-letter task requeued: %s", task_id)
471480
return payload
472481

tests/storage/test_redis_worker.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,15 @@ async def hlen(self, key: str) -> int:
4343
async def expire(self, key: str, ttl: int):
4444
return None
4545

46+
async def lpush(self, key: str, value: str) -> None:
47+
return None
48+
49+
async def ltrim(self, key: str, start: int, stop: int) -> None:
50+
return None
51+
52+
async def lrange(self, key: str, start: int, stop: int):
53+
return []
54+
4655
async def flushall(self):
4756
self.items.clear()
4857
self.hashes.clear()

0 commit comments

Comments
 (0)