Skip to content

Commit ccc6872

Browse files
committed
feat(queue): requeue dead-letter tasks
1 parent 40d7317 commit ccc6872

File tree

6 files changed

+85
-15
lines changed

6 files changed

+85
-15
lines changed

agent_pm/storage/redis.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,18 @@ async def record_dead_letter(client: redis.Redis, payload: dict[str, Any]) -> No
7474
await client.hset(_dead_letter_key(), task_id, json.dumps(payload))
7575

7676

77+
async def get_dead_letter(client: redis.Redis, task_id: str) -> dict[str, Any] | None:
78+
item = await client.hget(_dead_letter_key(), task_id)
79+
if not item:
80+
return None
81+
try:
82+
data = json.loads(item)
83+
except json.JSONDecodeError:
84+
return None
85+
data.setdefault("task_id", task_id)
86+
return data
87+
88+
7789
async def fetch_dead_letters(client: redis.Redis, limit: int = 100) -> list[dict[str, Any]]:
7890
items = await client.hgetall(_dead_letter_key())
7991
tasks: list[dict[str, Any]] = []

agent_pm/storage/tasks.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
clear_dead_letter,
2424
enqueue_task as redis_enqueue_task,
2525
fetch_dead_letters,
26+
get_dead_letter,
2627
get_redis_client,
2728
list_heartbeats,
2829
pop_task,
@@ -206,6 +207,9 @@ async def delete_dead_letter(self, task_id: str) -> None:
206207
async def worker_heartbeats(self) -> dict[str, Any]:
207208
return {}
208209

210+
async def requeue_dead_letter(self, task_id: str) -> dict[str, Any] | None:
211+
return None
212+
209213

210214
# Global task queue instance
211215
_task_queue: TaskQueue | None = None
@@ -338,6 +342,20 @@ async def delete_dead_letter(self, task_id: str) -> None:
338342
async def worker_heartbeats(self) -> dict[str, dict[str, Any]]:
339343
return await list_heartbeats(self._redis)
340344

345+
async def requeue_dead_letter(self, task_id: str) -> dict[str, Any] | None:
346+
payload = await get_dead_letter(self._redis, task_id)
347+
if payload is None:
348+
return None
349+
350+
await clear_dead_letter(self._redis, task_id)
351+
payload.pop("last_error", None)
352+
payload["retry_count"] = 0
353+
payload["requeued_at"] = utc_now().isoformat()
354+
await redis_enqueue_task(self._redis, payload.get("name", "unknown"), payload)
355+
record_task_enqueued(self.queue_name)
356+
logger.info("Dead-letter task requeued: %s", task_id)
357+
return payload
358+
341359
_task_queue = RedisTaskQueue(max_workers=settings.task_queue_workers)
342360
else:
343361
_task_queue = TaskQueue(max_workers=settings.task_queue_workers)

app.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,16 @@ async def get_task(task_id: str, _admin_key: AdminKeyDep = None) -> dict[str, An
498498
}
499499

500500

501+
@app.post("/tasks/dead-letter/{task_id}/requeue")
502+
async def requeue_dead_letter(task_id: str, _admin_key: AdminKeyDep = None) -> dict[str, Any]:
503+
if not _task_queue:
504+
raise HTTPException(status_code=503, detail="Task queue not initialized")
505+
payload = await _task_queue.requeue_dead_letter(task_id)
506+
if payload is None:
507+
raise HTTPException(status_code=404, detail="Dead-letter task not found")
508+
return {"task_id": payload.get("task_id", task_id), "status": "requeued"}
509+
510+
501511
@app.post("/prd/{plan_id}/versions")
502512
async def create_prd_version(
503513
plan_id: str,

tests/storage/test_redis.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ async def lpop(self, key: str):
2020
return None
2121
return self.items.pop(0)
2222

23+
async def llen(self, key: str) -> int:
24+
return len(self.items)
25+
2326
async def hset(self, key: str, field: str, value: str) -> None:
2427
if key.endswith("dead_letter"):
2528
self.dead[field] = value
@@ -85,6 +88,9 @@ async def test_dead_letter_and_heartbeat_helpers():
8588
records = await redis.fetch_dead_letters(client)
8689
assert records[0]["task_id"] == "abc"
8790

91+
stored = await redis.get_dead_letter(client, "abc")
92+
assert stored["task_id"] == "abc"
93+
8894
await redis.clear_dead_letter(client, "abc")
8995
assert await redis.fetch_dead_letters(client) == []
9096

tests/storage/test_redis_worker.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ async def lpop(self, key: str):
2222
return None
2323
return self.items.popleft()
2424

25+
async def llen(self, key: str) -> int:
26+
return len(self.items)
27+
2528
async def hset(self, key: str, field: str, value: str) -> None:
2629
self.hashes.setdefault(key, {})[field] = value
2730

@@ -113,3 +116,11 @@ async def fail_task() -> None:
113116
assert match is not None
114117
assert match["retry_count"] == 1
115118
assert match["last_error"] == "boom"
119+
120+
requeued = await queue.requeue_dead_letter(task_id)
121+
assert requeued is not None
122+
assert requeued["task_id"] == task_id
123+
assert requeued["retry_count"] == 0
124+
125+
queued_len = await fake.llen("agent_pm:tasks")
126+
assert queued_len >= 1

tests/test_tasks_api.py

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing import Any
2+
13
import pytest
24
from httpx import ASGITransport, AsyncClient
35

@@ -7,22 +9,17 @@
79
from app import app
810

911

10-
class AppClient:
11-
def __init__(self):
12-
self.transport = ASGITransport(app=app)
13-
self.client = AsyncClient(transport=self.transport, base_url="http://test")
14-
15-
async def __aenter__(self):
16-
await app_module.startup_event()
17-
return self.client
12+
async def _create_client():
13+
transport = ASGITransport(app=app)
14+
client = AsyncClient(transport=transport, base_url="http://test")
15+
await app_module.startup_event()
1816

19-
async def __aexit__(self, exc_type, exc, tb):
20-
await self.client.aclose()
17+
async def cleanup():
18+
await client.aclose()
2119
await app_module.shutdown_event()
2220

23-
24-
async def _create_client():
25-
return AppClient()
21+
client.cleanup = cleanup # type: ignore[attr-defined]
22+
return client
2623

2724

2825
@pytest.mark.asyncio
@@ -31,14 +28,17 @@ async def test_tasks_admin_endpoints_with_memory_backend(monkeypatch):
3128
app.dependency_overrides[AdminKeyDep] = lambda: "admin-test-key"
3229

3330
try:
34-
async with await _create_client() as client:
31+
client = await _create_client()
32+
try:
3533
response = await client.get("/tasks/dead-letter")
3634
assert response.status_code == 200
3735
assert response.json() == {"dead_letter": [], "total": 0}
3836

3937
worker_resp = await client.get("/tasks/workers")
4038
assert worker_resp.status_code == 200
4139
assert worker_resp.json() == {"workers": {}}
40+
finally:
41+
await client.cleanup()
4242
finally:
4343
app.dependency_overrides.clear()
4444

@@ -47,6 +47,7 @@ class StubQueue:
4747
def __init__(self):
4848
self.limit: int | None = None
4949
self.deleted: str | None = None
50+
self.requeued: str | None = None
5051

5152
async def list_dead_letters(self, limit: int = 100):
5253
self.limit = limit
@@ -68,14 +69,19 @@ async def worker_heartbeats(self) -> dict[str, dict[str, str]]:
6869
async def get_task(self, task_id: str): # pragma: no cover - minimal stub for routing
6970
return None
7071

72+
async def requeue_dead_letter(self, task_id: str) -> dict[str, Any] | None:
73+
self.requeued = task_id
74+
return {"task_id": task_id}
75+
7176

7277
@pytest.mark.asyncio
7378
async def test_tasks_admin_endpoints_surface_queue_data(monkeypatch):
7479
monkeypatch.setattr(settings, "task_queue_backend", "memory")
7580
app.dependency_overrides[AdminKeyDep] = lambda: "admin-test-key"
7681

7782
try:
78-
async with await _create_client() as client:
83+
client = await _create_client()
84+
try:
7985
original_backend = settings.task_queue_backend
8086
original_queue = app_module._task_queue
8187
stub = StubQueue()
@@ -94,8 +100,15 @@ async def test_tasks_admin_endpoints_surface_queue_data(monkeypatch):
94100
worker_resp = await client.get("/tasks/workers")
95101
assert worker_resp.status_code == 200
96102
assert worker_resp.json() == {"workers": {"worker:1": {"status": "ok"}}}
103+
104+
requeue_resp = await client.post("/tasks/dead-letter/dead-1/requeue")
105+
assert requeue_resp.status_code == 200
106+
assert stub.requeued == "dead-1"
107+
assert requeue_resp.json()["status"] == "requeued"
97108
finally:
98109
settings.task_queue_backend = original_backend
99110
app_module._task_queue = original_queue
111+
finally:
112+
await client.cleanup()
100113
finally:
101114
app.dependency_overrides.clear()

0 commit comments

Comments
 (0)