Skip to content

Commit 7c3c8cc

Browse files
committed
feat: enable redis task queue in docker setup
1 parent 43ef957 commit 7c3c8cc

File tree

6 files changed

+17
-31
lines changed

6 files changed

+17
-31
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
.coverage
12
.env
23
__pycache__/
34
*.pyc

agent_pm/storage/tasks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def __init__(self, max_workers: int = 5):
6161
self.running = False
6262
self._lock = asyncio.Lock()
6363

64-
def start(self):
64+
async def start(self):
6565
"""Start background workers."""
6666
if self.running:
6767
return
@@ -203,5 +203,5 @@ async def enqueue( # type: ignore[override]
203203
_task_queue = RedisTaskQueue(max_workers=settings.task_queue_workers)
204204
else:
205205
_task_queue = TaskQueue(max_workers=settings.task_queue_workers)
206-
_task_queue.start()
206+
await _task_queue.start()
207207
return _task_queue

app.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
get_correlation_id,
6060
set_correlation_id,
6161
)
62-
from agent_pm.storage.tasks import TaskQueue, TaskStatus, get_task_queue
62+
from agent_pm.storage.tasks import TaskStatus, get_task_queue
6363
from agent_pm.tools import registry
6464
from agent_pm.observability.export import schedule_trace_export
6565
from agent_pm.observability.traces import list_traces as list_trace_files
@@ -73,7 +73,7 @@
7373
logger = logging.getLogger(__name__)
7474
app = FastAPI(title="Agent PM", version="0.1.0")
7575
_jira_lock = asyncio.Lock()
76-
_task_queue: TaskQueue | None = None
76+
_task_queue = None
7777

7878
plugin_registry.attach_app(app)
7979

@@ -86,18 +86,14 @@ class FollowupUpdate(BaseModel):
8686
async def startup_event():
8787
global _task_queue
8888
_task_queue = await get_task_queue()
89-
if settings.task_queue_backend == "memory" and isinstance(_task_queue, TaskQueue):
90-
_task_queue.start()
89+
if settings.task_queue_backend == "memory":
90+
await _task_queue.start()
9191
logger.info("Agent PM service started")
9292

9393

9494
@app.on_event("shutdown")
9595
async def shutdown_event():
96-
if (
97-
_task_queue
98-
and settings.task_queue_backend == "memory"
99-
and hasattr(_task_queue, "stop")
100-
):
96+
if settings.task_queue_backend == "memory" and _task_queue:
10197
await _task_queue.stop()
10298
logger.info("Agent PM service stopped")
10399

docker-compose.yml

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ services:
2323
- "6379:6379"
2424
volumes:
2525
- redis_data:/data
26+
command: redis-server --save 60 1 --loglevel warning
2627
healthcheck:
2728
test: ["CMD", "redis-cli", "ping"]
2829
interval: 10s
@@ -36,7 +37,8 @@ services:
3637
environment:
3738
- OPENAI_API_KEY=${OPENAI_API_KEY}
3839
- DATABASE_URL=postgresql+asyncpg://agent_pm:agent_pm_dev@postgres:5432/agent_pm
39-
- REDIS_URL=redis://redis:6379
40+
- REDIS_URL=redis://redis:6379/0
41+
- TASK_QUEUE_BACKEND=redis
4042
- DRY_RUN=true
4143
- LOG_FORMAT=json
4244
- ENABLE_OPENTELEMETRY=false
@@ -45,20 +47,7 @@ services:
4547
condition: service_healthy
4648
redis:
4749
condition: service_healthy
48-
command: uvicorn app:app --host 0.0.0.0 --port 8000 --reload
49-
50-
arq_worker:
51-
build: .
52-
environment:
53-
- OPENAI_API_KEY=${OPENAI_API_KEY}
54-
- DATABASE_URL=postgresql+asyncpg://agent_pm:agent_pm_dev@postgres:5432/agent_pm
55-
- REDIS_URL=redis://redis:6379
56-
depends_on:
57-
postgres:
58-
condition: service_healthy
59-
redis:
60-
condition: service_healthy
61-
command: arq agent_pm.redis_queue.WorkerSettings
50+
command: uv run uvicorn app:app --host 0.0.0.0 --port 8000 --reload
6251

6352
volumes:
6453
postgres_data:

tests/storage/test_task_queue.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ async def failing_task(counter: dict[str, int]) -> None:
2020
@pytest.mark.asyncio
2121
async def test_task_queue_executes_successfully():
2222
queue = TaskQueue(max_workers=1)
23-
queue.start()
23+
await queue.start()
2424

2525
task_id = await queue.enqueue("double", sample_task, 3)
2626

@@ -43,7 +43,7 @@ async def test_task_queue_executes_successfully():
4343
@pytest.mark.asyncio
4444
async def test_task_queue_retries_then_fails(monkeypatch):
4545
queue = TaskQueue(max_workers=1)
46-
queue.start()
46+
await queue.start()
4747

4848
counter: dict[str, int] = {}
4949

tests/test_task_queue.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ async def failing_task() -> None:
1919
@pytest.mark.asyncio
2020
async def test_task_queue_enqueue_and_execute():
2121
queue = TaskQueue(max_workers=2)
22-
queue.start()
22+
await queue.start()
2323
try:
2424
task_id = await queue.enqueue("sample_task", sample_task, 5)
2525
assert task_id
@@ -37,7 +37,7 @@ async def test_task_queue_enqueue_and_execute():
3737
@pytest.mark.asyncio
3838
async def test_task_queue_retry_logic():
3939
queue = TaskQueue(max_workers=1)
40-
queue.start()
40+
await queue.start()
4141
try:
4242
task_id = await queue.enqueue("failing_task", failing_task, max_retries=2)
4343
assert task_id
@@ -56,7 +56,7 @@ async def test_task_queue_retry_logic():
5656
@pytest.mark.asyncio
5757
async def test_task_queue_list_tasks():
5858
queue = TaskQueue(max_workers=2)
59-
queue.start()
59+
await queue.start()
6060
try:
6161
task_id1 = await queue.enqueue("task1", sample_task, 1)
6262
task_id2 = await queue.enqueue("task2", sample_task, 2)

0 commit comments

Comments
 (0)