Skip to content

Log pollution in Taskiq worker container #13

@mmzeynalli

Description

@mmzeynalli

Hi! I got recently acquainted with this library and I love it! However, still have some diffuculties)

I am working with FastAPI and need Taskiq scheduler. Everything works fine, I get my tasks scheduled and executed as expected. However, me on concern is that I get to see all of the logs from FastAPI app in Taskiq worker container as well. It is kind of confusing, and hard to find when the scheduled function is executed or any debug messages in that task. Here are my files:

#docker-compose

version: "3.8"

x-app-template: &template
  env_file:
    - .env
  build:
    context: .
  volumes:
    - ./:/src
  depends_on:
    rabbitmq:
      condition: service_healthy
    db:
      condition: service_healthy
  networks:
    - bridge_network

services:
  app:
    container_name: joul-app
    command: sh -c "uvicorn main:app --host 0.0.0.0 --port 8000 --reload"
    ports:
      - "${HTPP_SERVER_PORT}:8000"
    <<: *template

  db:
    container_name: joul-db
    image: postgres
    volumes:
      - ./data/postgres/:/var/lib/postgresql/data
    ports:
      - ${POSTGRES_PORT}:${POSTGRES_PORT}
    expose:
      - ${POSTGRES_PORT}
    env_file: .env
    networks:
      - bridge_network
    healthcheck:
      test: "exit 0"
    command: ["postgres", "-p", "${POSTGRES_PORT}", "-c", "log_statement=all"]

  rabbitmq:
    container_name: joul-rabbitmq
    image: rabbitmq:3.12.0-management
    env_file:
      - .env
    ports:
      - ${RABBITMQ_PORT}:${RABBITMQ_PORT}
      - ${RABBITMQ_UI_PORT}:${RABBITMQ_UI_PORT}
    environment:
      - RABBITMQ_DEFAULT_USER=${RABBITMQ_USER}
      - RABBITMQ_DEFAULT_PASS=${RABBITMQ_PASS}
    healthcheck:
      test: "exit 0"
    networks:
      - bridge_network

  redis:
    container_name: joul-redis
    image: redis:6.2-alpine
    hostname: redis
    env_file:
      - .env
    ports:
      - ${REDIS_PORT}:6379
    networks:
      - bridge_network

  ws_server:
    container_name: joul-ws-server
    ports:
      - "${WS_SERVER_PORT}:${WS_SERVER_PORT}"
    command: watchmedo auto-restart --pattern "*.py" --recursive --signal SIGTERM python joul/ws_server/main.py
    <<: *template

  taskiq:
    container_name: joul-taskiq
    depends_on:
      - app
    <<: *template
    command: [ taskiq, worker, joul.broker:broker ] 
    environment:
      SKIP_EVENTS: true

  taskiq-scheduler:
    container_name: joul-taskiq-scheduler
    depends_on:
      - app
    command: taskiq scheduler joul.broker:scheduler
    <<: *template

volumes:
  postgres_data:

networks:
  bridge_network:
    driver: bridge
# broker.py

import taskiq_fastapi
from taskiq import TaskiqScheduler
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend, RedisScheduleSource

from joul.settings import settings

broker = ListQueueBroker(url=settings.REDIS_URL).with_result_backend(
    RedisAsyncResultBackend(redis_url=settings.REDIS_URL)
)

redis_source = RedisScheduleSource(settings.REDIS_URL)
scheduler = TaskiqScheduler(broker, sources=[redis_source])

taskiq_fastapi.init(broker, 'joul.main:app')
# lifetime.py

import asyncio
import os

from fastapi import FastAPI
from redis import asyncio as aioredis

from joul import redisdb
from joul.app.ocpp.handler.events import process_event
from joul.broker import broker, redis_source
from joul.core.queue.consumer import consume
from joul.settings import rabbitmq_settings, settings

tasks = set()


async def startup_taskiq() -> None:
    if not broker.is_worker_process:
        await broker.startup()

    await redis_source.startup()


async def shutdown_taskiq() -> None:
    if not broker.is_worker_process:
        await broker.shutdown()

    await redis_source.shutdown()


async def setup_redis() -> None:
    # Redis
    pool = aioredis.ConnectionPool.from_url(
        settings.REDIS_URL,
        max_connections=10,
        decode_responses=True,
    )
    redisdb.redis_client = aioredis.Redis(connection_pool=pool)


async def shutdown_redis() -> None:
    await redisdb.redis_client.close()


async def setup_asyncio() -> None:
    task = asyncio.create_task(
        consume(queue_name=rabbitmq_settings.EVENT_EXCHANGE_NAME, on_message=process_event)
    )
    tasks.add(task)

    def _on_completion(f):
        tasks.remove(f)

    task.add_done_callback(_on_completion)


def startup(app: FastAPI):
    async def _startup():
        if os.getenv('SKIP_EVENTS', False):
            await setup_asyncio()

        await startup_taskiq()
        await setup_redis()

    return _startup


def shutdown(app: FastAPI):
    async def _shutdown():
        await shutdown_taskiq()
        await shutdown_redis()

    return _shutdown

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions