-
-
Notifications
You must be signed in to change notification settings - Fork 96
Open
Description
When using taskiq-redis with Redis Sentinel in an async environment (EKS), I’m getting this abruptly 5-6 times (maybe more) in a day. But it restarts automatically and then works fine.
025-07-24T21:54:19.276+06:00 raw = await self._readline()
2025-07-24T21:54:19.276+06:00 ^^^^^^^^^^^^^^^^^^^^^^
2025-07-24T21:54:19.276+06:00 File "/usr/local/lib/python3.12/site-packages/redis/_parsers/base.py", line 221, in _readline
2025-07-24T21:54:19.276+06:00 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
2025-07-24T21:54:19.276+06:00redis.exceptions.ConnectionError: Connection closed by server.Here's my implementation:
import logging
import os
from taskiq import InMemoryBroker
from taskiq.serializers import ORJSONSerializer
from taskiq_redis import (
ListQueueBroker,
ListQueueSentinelBroker,
RedisAsyncResultBackend,
RedisAsyncSentinelResultBackend,
)
import rbac.services.taskiq_fastapi as taskiq_fastapi
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
REDIS_URI = os.getenv("SVC_REDIS")
IS_LOCAL = False # Set to True for local development without Redis
if IS_LOCAL:
logger.info("Using InMemoryBroker (local mode enabled)")
broker = InMemoryBroker()
else:
logger.info("Configuring Redis-based broker")
is_redis_sentinel = os.environ.get("REDIS_SENTINEL_MODE", "false")
redis_sentinel_host = os.environ.get("REDIS_SENTINEL_HOST", "")
redis_sentinel_master_name = os.environ.get("REDIS_SENTINEL_MASTER_NAME", "")
client_name = "access-control-service"
queue_name = "access-control-queue"
if is_redis_sentinel == "true":
logger.info(
f"Setting up Redis Sentinel: host={redis_sentinel_host}, master={redis_sentinel_master_name}"
)
redis_async_result = RedisAsyncSentinelResultBackend(
sentinels=[(redis_sentinel_host, 26379)],
master_name=redis_sentinel_master_name,
serializer=ORJSONSerializer(),
client_name=client_name,
)
broker = ListQueueSentinelBroker(
sentinels=[(redis_sentinel_host, 26379)],
master_name=redis_sentinel_master_name,
client_name=client_name,
queue_name=queue_name,
).with_result_backend(redis_async_result)
logger.info("Redis Sentinel broker successfully initialized")
else:
logger.info(f"Connecting to standalone Redis at: {REDIS_URI}")
redis_async_result = RedisAsyncResultBackend(
redis_url=REDIS_URI,
client_name=client_name,
)
broker = ListQueueBroker(
url=REDIS_URI,
queue_name=queue_name,
client_name=client_name,
).with_result_backend(redis_async_result)
logger.info("Standalone Redis broker successfully initialized")
logger.info("Initializing Taskiq FastAPI integration with 'main:app'")
taskiq_fastapi.init(broker, "main:app")
logger.info("Taskiq FastAPI integration completed")
from .tasks import * # noqaAny help would be appreciated. Thank you!
Metadata
Metadata
Assignees
Labels
No labels