Skip to content

Commit 2adf2df

Browse files
committed
mypy+ruff fixes
1 parent d559aaa commit 2adf2df

File tree

8 files changed

+43
-42
lines changed

8 files changed

+43
-42
lines changed

backend/scripts/create_topics.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@
1111
from app.core.logging import setup_logger
1212
from app.infrastructure.kafka.topics import get_all_topics, get_topic_configs
1313
from app.settings import get_settings
14-
15-
logger = setup_logger(os.environ.get("LOG_LEVEL", "INFO"))
1614
from confluent_kafka import KafkaException
1715
from confluent_kafka.admin import AdminClient, NewTopic
1816

17+
logger = setup_logger(os.environ.get("LOG_LEVEL", "INFO"))
18+
1919

2020
async def create_topics() -> None:
2121
"""Create all required Kafka topics"""

backend/workers/dlq_processor.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@
33
import signal
44
from typing import Optional
55

6-
from app.core.database_context import Database, DBClient
6+
from app.core.database_context import DBClient
77
from app.core.logging import setup_logger
88
from app.dlq import DLQMessage, RetryPolicy, RetryStrategy
99
from app.dlq.manager import DLQManager, create_dlq_manager
1010
from app.domain.enums.kafka import KafkaTopic
1111
from app.events.schema.schema_registry import create_schema_registry_manager
12-
from app.infrastructure.mappers.dlq_mapper import DLQMapper
1312
from app.settings import get_settings
1413
from pymongo.asynchronous.mongo_client import AsyncMongoClient
1514

@@ -111,15 +110,13 @@ async def main() -> None:
111110
serverSelectionTimeoutMS=5000,
112111
)
113112
db_name = settings.DATABASE_NAME
114-
database: Database = db_client[db_name]
113+
_ = db_client[db_name] # Access database to verify connection
115114
await db_client.admin.command("ping")
116115
logger.info(f"Connected to database: {db_name}")
117116

118117
schema_registry = create_schema_registry_manager(logger)
119-
dlq_mapper = DLQMapper(schema_registry)
120118
manager = create_dlq_manager(
121-
database=database,
122-
dlq_mapper=dlq_mapper,
119+
schema_registry=schema_registry,
123120
logger=logger,
124121
dlq_topic=KafkaTopic.DEAD_LETTER_QUEUE,
125122
retry_topic_suffix="-retry",

backend/workers/run_coordinator.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,21 @@
1212

1313
def main() -> None:
1414
"""Main entry point for coordinator worker"""
15+
settings = get_settings()
16+
1517
# Setup logging
16-
setup_logger()
18+
logger = setup_logger(settings.LOG_LEVEL)
1719

1820
# Configure root logger for worker
1921
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
2022

21-
logger = logging.getLogger(__name__)
2223
logger.info("Starting ExecutionCoordinator worker...")
2324

2425
# Initialize tracing
25-
settings = get_settings()
2626
if settings.ENABLE_TRACING:
2727
init_tracing(
2828
service_name=GroupId.EXECUTION_COORDINATOR,
29+
logger=logger,
2930
service_version=settings.TRACING_SERVICE_VERSION,
3031
enable_console_exporter=False,
3132
sampling_rate=settings.TRACING_SAMPLING_RATE,

backend/workers/run_event_replay.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22
import logging
33
from contextlib import AsyncExitStack
44

5-
from beanie import init_beanie
6-
from pymongo.asynchronous.mongo_client import AsyncMongoClient
7-
85
from app.core.database_context import DBClient
96
from app.core.logging import setup_logger
107
from app.core.tracing import init_tracing
@@ -15,6 +12,8 @@
1512
from app.events.schema.schema_registry import SchemaRegistryManager
1613
from app.services.event_replay.replay_service import EventReplayService
1714
from app.settings import get_settings
15+
from beanie import init_beanie
16+
from pymongo.asynchronous.mongo_client import AsyncMongoClient
1817

1918

2019
async def cleanup_task(replay_service: EventReplayService, interval_hours: int = 6) -> None:
@@ -30,10 +29,8 @@ async def cleanup_task(replay_service: EventReplayService, interval_hours: int =
3029
logger.error(f"Error during cleanup: {e}")
3130

3231

33-
async def run_replay_service() -> None:
32+
async def run_replay_service(logger: logging.Logger) -> None:
3433
"""Run the event replay service with cleanup task"""
35-
logger = logging.getLogger(__name__)
36-
3734
# Get settings
3835
settings = get_settings()
3936

@@ -55,13 +52,15 @@ async def run_replay_service() -> None:
5552
producer = UnifiedProducer(producer_config, schema_registry, logger)
5653

5754
# Create event store
58-
event_store = create_event_store(db=database, schema_registry=schema_registry, logger=logger)
55+
event_store = create_event_store(schema_registry=schema_registry, logger=logger)
5956

6057
# Create repository
61-
replay_repository = ReplayRepository(database, logger)
58+
replay_repository = ReplayRepository(logger)
6259

6360
# Create replay service
64-
replay_service = EventReplayService(repository=replay_repository, producer=producer, event_store=event_store)
61+
replay_service = EventReplayService(
62+
repository=replay_repository, producer=producer, event_store=event_store, logger=logger
63+
)
6564
logger.info("Event replay service initialized")
6665

6766
async with AsyncExitStack() as stack:
@@ -84,27 +83,28 @@ async def _cancel_task() -> None:
8483

8584
def main() -> None:
8685
"""Main entry point for event replay service"""
86+
settings = get_settings()
87+
8788
# Setup logging
88-
setup_logger()
89+
logger = setup_logger(settings.LOG_LEVEL)
8990

9091
# Configure root logger for worker
9192
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
9293

93-
logger = logging.getLogger(__name__)
9494
logger.info("Starting Event Replay Service...")
9595

9696
# Initialize tracing
97-
settings = get_settings()
9897
if settings.ENABLE_TRACING:
9998
init_tracing(
10099
service_name="event-replay",
100+
logger=logger,
101101
service_version=settings.TRACING_SERVICE_VERSION,
102102
enable_console_exporter=False,
103103
sampling_rate=settings.TRACING_SAMPLING_RATE,
104104
)
105105
logger.info("Tracing initialized for Event Replay Service")
106106

107-
asyncio.run(run_replay_service())
107+
asyncio.run(run_replay_service(logger))
108108

109109

110110
if __name__ == "__main__":

backend/workers/run_k8s_worker.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,21 @@
1212

1313
def main() -> None:
1414
"""Main entry point for Kubernetes worker"""
15+
settings = get_settings()
16+
1517
# Setup logging
16-
setup_logger()
18+
logger = setup_logger(settings.LOG_LEVEL)
1719

1820
# Configure root logger for worker
1921
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
2022

21-
logger = logging.getLogger(__name__)
2223
logger.info("Starting KubernetesWorker...")
2324

2425
# Initialize tracing
25-
settings = get_settings()
2626
if settings.ENABLE_TRACING:
2727
init_tracing(
2828
service_name=GroupId.K8S_WORKER,
29+
logger=logger,
2930
service_version=settings.TRACING_SERVICE_VERSION,
3031
enable_console_exporter=False,
3132
sampling_rate=settings.TRACING_SAMPLING_RATE,

backend/workers/run_pod_monitor.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,21 @@
1212

1313
def main() -> None:
1414
"""Main entry point for pod monitor worker"""
15+
settings = get_settings()
16+
1517
# Setup logging
16-
setup_logger()
18+
logger = setup_logger(settings.LOG_LEVEL)
1719

1820
# Configure root logger for worker
1921
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
2022

21-
logger = logging.getLogger(__name__)
2223
logger.info("Starting PodMonitor worker...")
2324

2425
# Initialize tracing
25-
settings = get_settings()
2626
if settings.ENABLE_TRACING:
2727
init_tracing(
2828
service_name=GroupId.POD_MONITOR,
29+
logger=logger,
2930
service_version=settings.TRACING_SERVICE_VERSION,
3031
enable_console_exporter=False,
3132
sampling_rate=settings.TRACING_SAMPLING_RATE,

backend/workers/run_result_processor.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,21 @@
1010

1111
def main() -> None:
1212
"""Main entry point for result processor worker"""
13+
settings = get_settings()
14+
1315
# Setup logging
14-
setup_logger()
16+
logger = setup_logger(settings.LOG_LEVEL)
1517

1618
# Configure root logger for worker
1719
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
1820

19-
logger = logging.getLogger(__name__)
2021
logger.info("Starting ResultProcessor worker...")
2122

2223
# Initialize tracing
23-
settings = get_settings()
2424
if settings.ENABLE_TRACING:
2525
init_tracing(
2626
service_name=GroupId.RESULT_PROCESSOR,
27+
logger=logger,
2728
service_version=settings.TRACING_SERVICE_VERSION,
2829
enable_console_exporter=False,
2930
sampling_rate=settings.TRACING_SAMPLING_RATE,

backend/workers/run_saga_orchestrator.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22
import logging
33

44
import redis.asyncio as redis
5-
from beanie import init_beanie
6-
from pymongo.asynchronous.mongo_client import AsyncMongoClient
7-
85
from app.core.database_context import DBClient
96
from app.core.logging import setup_logger
107
from app.core.tracing import init_tracing
@@ -20,6 +17,8 @@
2017
from app.services.idempotency.redis_repository import RedisIdempotencyRepository
2118
from app.services.saga import create_saga_orchestrator
2219
from app.settings import get_settings
20+
from beanie import init_beanie
21+
from pymongo.asynchronous.mongo_client import AsyncMongoClient
2322

2423

2524
async def run_saga_orchestrator() -> None:
@@ -53,10 +52,10 @@ async def run_saga_orchestrator() -> None:
5352

5453
# Create event store (schema ensured separately)
5554
logger.info("Creating event store...")
56-
event_store = create_event_store(db=database, schema_registry=schema_registry_manager, logger=logger, ttl_days=90)
55+
event_store = create_event_store(schema_registry=schema_registry_manager, logger=logger, ttl_days=90)
5756

5857
# Create repository and idempotency manager (Redis-backed)
59-
saga_repository = SagaRepository(database)
58+
saga_repository = SagaRepository()
6059
r = redis.Redis(
6160
host=settings.REDIS_HOST,
6261
port=settings.REDIS_PORT,
@@ -70,7 +69,7 @@ async def run_saga_orchestrator() -> None:
7069
)
7170
idem_repo = RedisIdempotencyRepository(r, key_prefix="idempotency")
7271
idempotency_manager = create_idempotency_manager(repository=idem_repo, config=IdempotencyConfig(), logger=logger)
73-
resource_allocation_repository = ResourceAllocationRepository(database)
72+
resource_allocation_repository = ResourceAllocationRepository()
7473

7574
# Create saga orchestrator
7675
saga_config = SagaConfig(
@@ -119,20 +118,21 @@ async def run_saga_orchestrator() -> None:
119118

120119
def main() -> None:
121120
"""Main entry point for saga orchestrator worker"""
121+
settings = get_settings()
122+
122123
# Setup logging
123-
setup_logger()
124+
logger = setup_logger(settings.LOG_LEVEL)
124125

125126
# Configure root logger for worker
126127
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
127128

128-
logger = logging.getLogger(__name__)
129129
logger.info("Starting Saga Orchestrator worker...")
130130

131131
# Initialize tracing
132-
settings = get_settings()
133132
if settings.ENABLE_TRACING:
134133
init_tracing(
135134
service_name=GroupId.SAGA_ORCHESTRATOR,
135+
logger=logger,
136136
service_version=settings.TRACING_SERVICE_VERSION,
137137
enable_console_exporter=False,
138138
sampling_rate=settings.TRACING_SAMPLING_RATE,

0 commit comments

Comments
 (0)