-
-
Notifications
You must be signed in to change notification settings - Fork 96
Open
Description
I am running my taskiq worker as:-
poetry run taskiq worker src.jobs.broker:broker src.jobs -w 1 --no-configure-logging
my broker configuration
"""TaskIQ broker configuration with PostgreSQL backend."""
import logging
from taskiq import TaskiqScheduler
from taskiq_postgresql import PostgresqlBroker, PostgresqlResultBackend
from taskiq_postgresql.scheduler_source import PostgresqlSchedulerSource
from src.config.settings import settings
logger = logging.getLogger(__name__)
# Connection pool configuration to prevent exhausting database connections
# These settings are specific to asyncpg connection pools
POOL_CONFIG = {
"min_size": 2, # Minimum number of connections to maintain
"max_size": 5, # Maximum number of connections allowed
"max_inactive_connection_lifetime": 300, # Close idle connections after 5 minutes
}
# Connection configuration for individual connections (not pools)
# These can be passed to asyncpg.connect() calls
CONNECTION_CONFIG = {
# Add any connection-specific settings here if needed
# For example: "server_settings": {"jit": "off"}
}
# Configure result backend for storing task results
result_backend = PostgresqlResultBackend(
dsn=settings.postgres.url,
# Result backend manages its own connection pool
)
# Configure broker with asyncpg driver for better async performance
# The broker needs both connection_kwargs (for listen driver) and pool_kwargs (for main pool)
broker = PostgresqlBroker(
dsn=settings.postgres.url,
connection_kwargs=CONNECTION_CONFIG, # For individual connections (listen driver)
pool_kwargs=POOL_CONFIG, # For the main connection pool
).with_result_backend(result_backend)
# Configure scheduler source for cron jobs
scheduler_source = PostgresqlSchedulerSource(
dsn=settings.postgres.url,
# Scheduler manages its own connections
)
# Create scheduler instance
scheduler = TaskiqScheduler(
broker=broker,
sources=[scheduler_source],
)
logger.info("TaskIQ broker configured with PostgreSQL backend")
logs for cron
Started with pid 73437..
[1;38;5;4mcron �[0m | [2025-09-01 08:00:20,055][INFO ][broker:<module>:54] TaskIQ broker configured with PostgreSQL backend
�[1;38;5;4mcron �[0m | [2025-09-01 08:00:20,060][WARNING][base_events:_run_once:1994] Executing <Task pending name='Task-1' coro=<run_scheduler() running at /.cache/venv/lib/python3.12/site-packages/taskiq/cli/scheduler/run.py:266> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.12/asyncio/futures.py:389, Task.task_wakeup()] created at /usr/local/lib/python3.12/asyncio/base_events.py:448> cb=[_run_until_complete_cb() at /usr/local/lib/python3.12/asyncio/base_events.py:181] created at /usr/local/lib/python3.12/asyncio/runners.py:100> took 0.153 seconds
mcron �[0m | [2025-09-01 08:00:20,172][INFO ][run:run_scheduler:272] Starting scheduler.
[1;38;5;4mcron �[0m | [2025-09-01 08:00:20,479][INFO ][run:run_scheduler:274] Startup completed.
[1;38;5;4mcron �[0m | [2025-09-01 08:05:01,005][INFO ][run:delayed_send:138] Sending task src.jobs.cron.example_cron:health_check with schedule_id fbda2152-6039-513e-b903-f1b3eeb26295.
logs for worker
Started with pid 73440..
Can't taskiq be run with 1 worker?
Metadata
Metadata
Assignees
Labels
No labels