-
Notifications
You must be signed in to change notification settings - Fork 26
Closed
Description
Cron tasks configured with the @taskiq_broker.task(schedule=[...]) decorator fail to execute when using ListRedisScheduleSource as the scheduler source. The same tasks work correctly with LabelScheduleSource.
Steps to reproduce:
- Clone and start the test project:
git clone https://github.com/build-on-ai/test_taskiq_project
cd test_taskiq_project/
sudo docker compose up- Wait for the scheduled task to execute. You should see the "super big" ASCII art message printed every minute:
Expected behavior:
The heartbeat task should execute every minute (as defined by the */1 * * * * cron expression) regardless of which schedule source is used.
Actual behavior:
- ✅ Tasks execute correctly with
LabelScheduleSource(default) - ❌ Tasks do NOT execute with
ListRedisScheduleSource
Configuration:
To switch between schedule sources, update TASKIQ_USE_LABEL_SCHEDULER in envs/dev.env:
TASKIQ_USE_LABEL_SCHEDULER=1→ UsesLabelScheduleSource(works)TASKIQ_USE_LABEL_SCHEDULER=0→ UsesListRedisScheduleSource(doesn't work)
Relevant code:
Task definition (test_taskiq_app/tasks.py)
from datetime import datetime, timezone
import logfire as logger
from test_taskiq_app.clients.taskiq import taskiq_broker, taskiq_scheduler
@taskiq_broker.task(schedule=[{"cron": "*/1 * * * *"}])
async def heartbeat() -> None:
print(
"""
sssss u u pppp eeeee rrrrrr ttttt
s u u p p e r r t
ssssss u u pppp eeeee rrrrrr t
s u u p e r r t
sssss uuuu p eeeee r r t
bbbbb iiii ggggg
b b ii g
bbbbb ii g ggg
b b ii g g
bbbbb iiii gggg
"""
)
__all__ = ("taskiq_broker", "taskiq_scheduler", "heartbeat")Broker and scheduler configuration (test_taskiq_app/clients/taskiq/base.py)
from typing import Any
from taskiq import TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_redis import (
ListRedisScheduleSource,
RedisAsyncResultBackend,
RedisStreamBroker,
)
from test_taskiq_app.config import cnf
from test_taskiq_app.utils.taskiq.otel_middleware import OtelMiddleware
import logfire as logger
redis_async_result: RedisAsyncResultBackend[Any] = RedisAsyncResultBackend(
redis_url=cnf.redis.url,
max_connection_pool_size=cnf.redis.max_connection_pool_size,
prefix_str=f"{cnf.taskiq.queue_key}-results",
result_ex_time=60 * 15,
keep_results=False,
)
otel_middleware = OtelMiddleware()
middlewares = [otel_middleware]
taskiq_broker = (
RedisStreamBroker(
url=cnf.redis.url,
queue_name=cnf.taskiq.queue_key,
approximate=True,
maxlen=cnf.taskiq.max_stream_length,
)
.with_middlewares(*middlewares)
.with_result_backend(redis_async_result)
)
if cnf.taskiq.use_label_scheduler:
scheduler_source = LabelScheduleSource(taskiq_broker)
logger.info("Using LabelScheduleSource for TaskIQ scheduler.")
else:
scheduler_source = ListRedisScheduleSource(
url=cnf.redis.url,
prefix=cnf.taskiq.queue_key,
buffer_size=cnf.taskiq.schedule_buffer_size,
)
logger.info("Using ListRedisScheduleSource for TaskIQ scheduler.")
taskiq_scheduler = TaskiqScheduler(
broker=taskiq_broker,
sources=[scheduler_source],
)Metadata
Metadata
Assignees
Labels
No labels