Skip to content

Commit 7b59182

Browse files
committed
feat(queue): expose dead-letter detail and purge
1 parent ccc6872 commit 7b59182

File tree

6 files changed

+77
-0
lines changed

6 files changed

+77
-0
lines changed

agent_pm/storage/redis.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,14 @@ async def clear_dead_letter(client: redis.Redis, task_id: str) -> None:
105105
await client.hdel(_dead_letter_key(), task_id)
106106

107107

108+
async def purge_dead_letters(client: redis.Redis) -> int:
109+
count = await client.hlen(_dead_letter_key())
110+
if count == 0:
111+
return 0
112+
await client.delete(_dead_letter_key())
113+
return int(count)
114+
115+
108116
async def write_heartbeat(client: redis.Redis, worker_id: str, payload: dict[str, Any], ttl: int) -> None:
109117
await client.hset(_heartbeat_key(), worker_id, json.dumps(payload))
110118
await client.expire(_heartbeat_key(), ttl)

agent_pm/storage/tasks.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
get_redis_client,
2828
list_heartbeats,
2929
pop_task,
30+
purge_dead_letters,
3031
record_dead_letter,
3132
set_task_result,
3233
write_heartbeat,
@@ -210,6 +211,12 @@ async def worker_heartbeats(self) -> dict[str, Any]:
210211
async def requeue_dead_letter(self, task_id: str) -> dict[str, Any] | None:
211212
return None
212213

214+
async def get_dead_letter(self, task_id: str) -> dict[str, Any] | None:
215+
return None
216+
217+
async def purge_dead_letters(self) -> int:
218+
return 0
219+
213220

214221
# Global task queue instance
215222
_task_queue: TaskQueue | None = None
@@ -356,6 +363,12 @@ async def requeue_dead_letter(self, task_id: str) -> dict[str, Any] | None:
356363
logger.info("Dead-letter task requeued: %s", task_id)
357364
return payload
358365

366+
async def get_dead_letter(self, task_id: str) -> dict[str, Any] | None:
367+
return await get_dead_letter(self._redis, task_id)
368+
369+
async def purge_dead_letters(self) -> int:
370+
return await purge_dead_letters(self._redis)
371+
359372
_task_queue = RedisTaskQueue(max_workers=settings.task_queue_workers)
360373
else:
361374
_task_queue = TaskQueue(max_workers=settings.task_queue_workers)

app.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,16 @@ async def list_dead_letter(limit: int = 50, _admin_key: AdminKeyDep = None) -> d
463463
return {"dead_letter": items, "total": len(items)}
464464

465465

466+
@app.get("/tasks/dead-letter/{task_id}")
467+
async def get_dead_letter(task_id: str, _admin_key: AdminKeyDep = None) -> dict[str, Any]:
468+
if not _task_queue:
469+
raise HTTPException(status_code=503, detail="Task queue not initialized")
470+
item = await _task_queue.get_dead_letter(task_id)
471+
if not item:
472+
raise HTTPException(status_code=404, detail="Dead-letter task not found")
473+
return item
474+
475+
466476
@app.delete("/tasks/dead-letter/{task_id}")
467477
async def delete_dead_letter(task_id: str, _admin_key: AdminKeyDep = None) -> dict[str, Any]:
468478
if not _task_queue:
@@ -471,6 +481,14 @@ async def delete_dead_letter(task_id: str, _admin_key: AdminKeyDep = None) -> di
471481
return {"task_id": task_id, "status": "deleted"}
472482

473483

484+
@app.delete("/tasks/dead-letter")
485+
async def purge_dead_letters(_admin_key: AdminKeyDep = None) -> dict[str, int]:
486+
if not _task_queue:
487+
raise HTTPException(status_code=503, detail="Task queue not initialized")
488+
deleted = await _task_queue.purge_dead_letters()
489+
return {"deleted": deleted}
490+
491+
474492
@app.get("/tasks/workers")
475493
async def worker_status(_admin_key: AdminKeyDep = None) -> dict[str, Any]:
476494
if not _task_queue:

tests/storage/test_redis.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,18 @@ async def hdel(self, key: str, field: str):
4747
if key.endswith("dead_letter"):
4848
self.dead.pop(field, None)
4949

50+
async def hlen(self, key: str) -> int:
51+
if key.endswith("dead_letter"):
52+
return len(self.dead)
53+
return len(self.results)
54+
5055
async def expire(self, key: str, ttl: int):
5156
return None
5257

58+
async def delete(self, key: str):
59+
if key.endswith("dead_letter"):
60+
self.dead.clear()
61+
5362

5463
@pytest.mark.asyncio
5564
async def test_enqueue_task_pushes_payload():
@@ -94,6 +103,11 @@ async def test_dead_letter_and_heartbeat_helpers():
94103
await redis.clear_dead_letter(client, "abc")
95104
assert await redis.fetch_dead_letters(client) == []
96105

106+
await redis.record_dead_letter(client, payload)
107+
removed = await redis.purge_dead_letters(client)
108+
assert removed == 1
109+
assert await redis.fetch_dead_letters(client) == []
110+
97111
await redis.write_heartbeat(client, "worker:1", {"status": "ok"}, ttl=60)
98112
beats = await redis.list_heartbeats(client)
99113
assert beats["worker:1"]["status"] == "ok"

tests/storage/test_redis_worker.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ async def hgetall(self, key: str):
3737
async def hdel(self, key: str, field: str):
3838
self.hashes.setdefault(key, {}).pop(field, None)
3939

40+
async def hlen(self, key: str) -> int:
41+
return len(self.hashes.get(key, {}))
42+
4043
async def expire(self, key: str, ttl: int):
4144
return None
4245

tests/test_tasks_api.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,19 @@ async def requeue_dead_letter(self, task_id: str) -> dict[str, Any] | None:
7373
self.requeued = task_id
7474
return {"task_id": task_id}
7575

76+
async def get_dead_letter(self, task_id: str) -> dict[str, Any] | None:
77+
if task_id == "dead-1":
78+
return {
79+
"task_id": "dead-1",
80+
"name": "explode",
81+
"retry_count": 3,
82+
"last_error": "boom",
83+
}
84+
return None
85+
86+
async def purge_dead_letters(self) -> int:
87+
return 1
88+
7689

7790
@pytest.mark.asyncio
7891
async def test_tasks_admin_endpoints_surface_queue_data(monkeypatch):
@@ -97,6 +110,10 @@ async def test_tasks_admin_endpoints_surface_queue_data(monkeypatch):
97110
assert del_resp.status_code == 200
98111
assert stub.deleted == "dead-1"
99112

113+
detail_resp = await client.get("/tasks/dead-letter/dead-1")
114+
assert detail_resp.status_code == 200
115+
assert detail_resp.json()["task_id"] == "dead-1"
116+
100117
worker_resp = await client.get("/tasks/workers")
101118
assert worker_resp.status_code == 200
102119
assert worker_resp.json() == {"workers": {"worker:1": {"status": "ok"}}}
@@ -105,6 +122,10 @@ async def test_tasks_admin_endpoints_surface_queue_data(monkeypatch):
105122
assert requeue_resp.status_code == 200
106123
assert stub.requeued == "dead-1"
107124
assert requeue_resp.json()["status"] == "requeued"
125+
126+
purge_resp = await client.delete("/tasks/dead-letter")
127+
assert purge_resp.status_code == 200
128+
assert purge_resp.json()["deleted"] == 1
108129
finally:
109130
settings.task_queue_backend = original_backend
110131
app_module._task_queue = original_queue

0 commit comments

Comments
 (0)