Skip to content

Commit 0f6015c

Browse files
committed
feat(queue): add dead-letter retention controls
1 parent 7b59182 commit 0f6015c

File tree

5 files changed

+58
-13
lines changed

5 files changed

+58
-13
lines changed

agent_pm/storage/redis.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import json
66
import logging
77
import uuid
8+
from datetime import datetime, timezone
89
from typing import Any
910

1011
import redis.asyncio as redis
@@ -71,6 +72,8 @@ async def get_task_result(client: redis.Redis, task_id: str) -> dict[str, Any] |
7172

7273
async def record_dead_letter(client: redis.Redis, payload: dict[str, Any]) -> None:
7374
task_id = payload.get("task_id", uuid.uuid4().hex)
75+
if "recorded_at" not in payload:
76+
payload["recorded_at"] = datetime.now(timezone.utc).isoformat()
7477
await client.hset(_dead_letter_key(), task_id, json.dumps(payload))
7578

7679

@@ -105,12 +108,29 @@ async def clear_dead_letter(client: redis.Redis, task_id: str) -> None:
105108
await client.hdel(_dead_letter_key(), task_id)
106109

107110

108-
async def purge_dead_letters(client: redis.Redis) -> int:
109-
count = await client.hlen(_dead_letter_key())
110-
if count == 0:
111-
return 0
112-
await client.delete(_dead_letter_key())
113-
return int(count)
111+
async def purge_dead_letters(client: redis.Redis, *, older_than: datetime | None = None) -> int:
112+
if older_than is None:
113+
count = await client.hlen(_dead_letter_key())
114+
if count == 0:
115+
return 0
116+
await client.delete(_dead_letter_key())
117+
return int(count)
118+
119+
items = await client.hgetall(_dead_letter_key())
120+
removed = 0
121+
for task_id, value in items.items():
122+
try:
123+
data = json.loads(value)
124+
recorded = data.get("recorded_at")
125+
if not recorded:
126+
continue
127+
recorded_dt = datetime.fromisoformat(recorded)
128+
except (json.JSONDecodeError, ValueError):
129+
continue
130+
if recorded_dt <= older_than:
131+
await client.hdel(_dead_letter_key(), task_id)
132+
removed += 1
133+
return removed
114134

115135

116136
async def write_heartbeat(client: redis.Redis, worker_id: str, payload: dict[str, Any], ttl: int) -> None:

agent_pm/storage/tasks.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from collections import deque
99
from collections.abc import Callable, Coroutine
1010
from dataclasses import dataclass
11-
from datetime import datetime
11+
from datetime import datetime, timedelta
1212
from enum import Enum
1313
from typing import Any
1414

@@ -217,6 +217,9 @@ async def get_dead_letter(self, task_id: str) -> dict[str, Any] | None:
217217
async def purge_dead_letters(self) -> int:
218218
return 0
219219

220+
async def purge_dead_letters_older_than(self, age: timedelta) -> int:
221+
return 0
222+
220223

221224
# Global task queue instance
222225
_task_queue: TaskQueue | None = None
@@ -366,8 +369,11 @@ async def requeue_dead_letter(self, task_id: str) -> dict[str, Any] | None:
366369
async def get_dead_letter(self, task_id: str) -> dict[str, Any] | None:
367370
return await get_dead_letter(self._redis, task_id)
368371

369-
async def purge_dead_letters(self) -> int:
370-
return await purge_dead_letters(self._redis)
372+
async def purge_dead_letters(self, *, older_than: timedelta | None = None) -> int:
373+
if older_than is None:
374+
return await purge_dead_letters(self._redis)
375+
cutoff = utc_now() - older_than
376+
return await purge_dead_letters(self._redis, older_than=cutoff)
371377

372378
_task_queue = RedisTaskQueue(max_workers=settings.task_queue_workers)
373379
else:

app.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import asyncio
66
import logging
77
import uuid
8-
from datetime import datetime
8+
from datetime import datetime, timedelta
99
from typing import Any
1010

1111
from fastapi import Depends, FastAPI, HTTPException, WebSocket, WebSocketDisconnect
@@ -482,10 +482,15 @@ async def delete_dead_letter(task_id: str, _admin_key: AdminKeyDep = None) -> di
482482

483483

484484
@app.delete("/tasks/dead-letter")
485-
async def purge_dead_letters(_admin_key: AdminKeyDep = None) -> dict[str, int]:
485+
async def purge_dead_letters(
486+
older_than_minutes: int | None = None, _admin_key: AdminKeyDep = None
487+
) -> dict[str, int]:
486488
if not _task_queue:
487489
raise HTTPException(status_code=503, detail="Task queue not initialized")
488-
deleted = await _task_queue.purge_dead_letters()
490+
if older_than_minutes is None:
491+
deleted = await _task_queue.purge_dead_letters()
492+
else:
493+
deleted = await _task_queue.purge_dead_letters_older_than(timedelta(minutes=older_than_minutes))
489494
return {"deleted": deleted}
490495

491496

tests/storage/test_redis.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
from datetime import datetime, timedelta, timezone
23

34
import pytest
45

@@ -103,11 +104,17 @@ async def test_dead_letter_and_heartbeat_helpers():
103104
await redis.clear_dead_letter(client, "abc")
104105
assert await redis.fetch_dead_letters(client) == []
105106

106-
await redis.record_dead_letter(client, payload)
107+
old_payload = {"task_id": "old", "name": "job", "args": [], "kwargs": {}, "recorded_at": "2000-01-01T00:00:00+00:00"}
108+
await redis.record_dead_letter(client, old_payload)
107109
removed = await redis.purge_dead_letters(client)
108110
assert removed == 1
109111
assert await redis.fetch_dead_letters(client) == []
110112

113+
# ensure age-based purging skips fresh entries
114+
await redis.record_dead_letter(client, payload)
115+
removed = await redis.purge_dead_letters(client, older_than=datetime.now(timezone.utc) - timedelta(minutes=1))
116+
assert removed == 0
117+
111118
await redis.write_heartbeat(client, "worker:1", {"status": "ok"}, ttl=60)
112119
beats = await redis.list_heartbeats(client)
113120
assert beats["worker:1"]["status"] == "ok"

tests/test_tasks_api.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import timedelta
12
from typing import Any
23

34
import pytest
@@ -86,6 +87,9 @@ async def get_dead_letter(self, task_id: str) -> dict[str, Any] | None:
8687
async def purge_dead_letters(self) -> int:
8788
return 1
8889

90+
async def purge_dead_letters_older_than(self, age): # pragma: no cover - stubbed for API test
91+
return 0
92+
8993

9094
@pytest.mark.asyncio
9195
async def test_tasks_admin_endpoints_surface_queue_data(monkeypatch):
@@ -126,6 +130,9 @@ async def test_tasks_admin_endpoints_surface_queue_data(monkeypatch):
126130
purge_resp = await client.delete("/tasks/dead-letter")
127131
assert purge_resp.status_code == 200
128132
assert purge_resp.json()["deleted"] == 1
133+
134+
purge_resp_age = await client.delete("/tasks/dead-letter", params={"older_than_minutes": 10})
135+
assert purge_resp_age.status_code == 200
129136
finally:
130137
settings.task_queue_backend = original_backend
131138
app_module._task_queue = original_queue

0 commit comments

Comments
 (0)