-
Notifications
You must be signed in to change notification settings - Fork 0
chore: tests fix/update #60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… user fixtures caused silent test skips 2. DRY violation — Duplicate _cleanup fixture now centralized 3. Deprecated asyncio pattern — Uses modern get_running_loop() API
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. 📝 WalkthroughWalkthroughThreads Settings through DI, shifts lifecycle to async context/hooks (aclose/_on_start/_on_stop), modernizes asyncio loop/time usage, centralizes test cleanup, removes real-services fixtures, and refactors containers/providers, tests, and worker entrypoints to use settings-aware DI. Changes
Sequence Diagram(s)sequenceDiagram
%%{init: {"themeVariables":{"actorBorder":"#0b5f71","actorBackground":"#e6f6f7","noteBackground":"#fff4e6"}}}%%
actor Pytest
participant WorkerEnv as _setup_worker_env
participant TestSettings as TestSettings()
participant AppFactory as create_app(settings)
participant AppContainer as AppContainer
participant SchemaReg as SchemaRegistryManager
participant DLQMgr as DLQManager
participant FastAPI as FastAPI app
participant HttpClient as httpx.AsyncClient
Pytest->>WorkerEnv: set worker-scoped env (DB/REDIS/KAFKA, OTEL)
WorkerEnv-->>Pytest: env ready
Pytest->>TestSettings: instantiate TestSettings() (loads .env.test)
TestSettings-->>Pytest: settings object
Pytest->>AppFactory: create_app(settings)
AppFactory->>AppContainer: create_app_container(settings)
AppContainer->>SchemaReg: provide SchemaRegistryManager(settings, logger)
AppContainer->>DLQMgr: provide DLQManager(settings, schema_registry, logger)
AppContainer->>FastAPI: attach container to app.state
FastAPI-->>Pytest: app instance
Pytest->>HttpClient: ASGITransport(FastAPI) -> httpx.AsyncClient(base_url=https://test)
HttpClient-->>Pytest: ready for requests (per-test CSRF/headers)
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested labels
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
📜 Recent review detailsConfiguration used: defaults Review profile: CHILL Plan: Pro 📒 Files selected for processing (3)
🧰 Additional context used🧬 Code graph analysis (1)backend/tests/integration/dlq/test_dlq_retry_immediate.py (3)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
🔇 Additional comments (3)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/tests/conftest.py (1)
1-1: Complete the asyncio pattern migration across all files.The PR description claims to replace deprecated
asyncio.get_event_loop()with the modernasyncio.get_running_loop()API. While some files have been updated (e.g.,backend/tests/helpers/eventually.py,backend/app/services/pod_monitor/monitor.py), 30+ instances of the deprecated pattern remain scattered across the codebase:
- backend/app/services/result_processor/resource_cleaner.py (5 instances)
- backend/app/events/event_store.py (15+ instances)
- backend/app/events/event_store_consumer.py (3 instances)
- backend/app/services/event_replay/replay_service.py (3 instances)
- backend/app/services/notification_service.py (2 instances)
- backend/app/dlq/manager.py (2 instances)
- backend/tests/e2e/test_resource_cleaner_k8s.py (2 instances)
- Plus several others
Replace all remaining
asyncio.get_event_loop()calls withasyncio.get_running_loop()to complete the migration.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
backend/tests/conftest.pybackend/tests/e2e/conftest.pybackend/tests/fixtures/__init__.pybackend/tests/fixtures/real_services.pybackend/tests/helpers/cleanup.pybackend/tests/helpers/eventually.pybackend/tests/integration/conftest.pybackend/tests/integration/services/test_rate_limit_service.pybackend/tests/unit/conftest.py
💤 Files with no reviewable changes (3)
- backend/tests/unit/conftest.py
- backend/tests/integration/services/test_rate_limit_service.py
- backend/tests/fixtures/real_services.py
🧰 Additional context used
🧬 Code graph analysis (2)
backend/tests/integration/conftest.py (2)
backend/tests/helpers/cleanup.py (1)
cleanup_db_and_redis(9-23)backend/tests/conftest.py (2)
db(170-172)redis_client(176-178)
backend/tests/e2e/conftest.py (1)
backend/tests/helpers/cleanup.py (1)
cleanup_db_and_redis(9-23)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: E2E Tests
- GitHub Check: Integration Tests
🔇 Additional comments (6)
backend/tests/helpers/eventually.py (1)
20-20: LGTM! Correct modernization toget_running_loop().The replacement of
asyncio.get_event_loop()withasyncio.get_running_loop()is the correct modern pattern for accessing the event loop from within an async context. This avoids deprecation warnings in Python 3.10+ and is more explicit about requiring a running loop. The project's Python 3.12+ requirement fully supports this pattern.backend/tests/conftest.py (2)
194-213: LGTM! Scope mismatch resolved.The credentials fixtures are now function-scoped, correctly aligning with the function-scoped
test_userandtest_adminfixtures. This fixes the silent test skip issue mentioned in the PR objectives where session-scoped credentials were incompatible with function-scoped user fixtures.
216-236: LGTM! Better error handling for test infrastructure failures.Replacing
pytest.skipwithpytest.failis appropriate here. If the test infrastructure cannot create users, tests should fail rather than silently skip, ensuring infrastructure issues are surfaced immediately.backend/tests/helpers/cleanup.py (1)
9-23: LGTM! Excellent DRY refactor.The centralized cleanup helper successfully removes the duplication that previously existed across integration and e2e conftest files. The implementation correctly:
- Drops non-system collections
- Flushes Redis safely per worker
- Reinitializes Beanie with all document models
The documentation clearly explains the pytest-xdist safety guarantees.
backend/tests/e2e/conftest.py (1)
1-18: LGTM! Clean delegation to centralized helper.The e2e conftest now properly delegates to the shared
cleanup_db_and_redishelper, removing the previous code duplication. The cleanup behavior remains unchanged (pre-test only), and the code is more maintainable.backend/tests/integration/conftest.py (1)
1-18: LGTM! Clean delegation to centralized helper.The integration conftest now properly delegates to the shared
cleanup_db_and_redishelper, completing the DRY refactor mentioned in the PR objectives. The cleanup behavior is preserved (pre-test only), and the code is more maintainable.
|
@codex review |
|
Codex Review: Didn't find any major issues. Already looking forward to the next diff. ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
backend/tests/unit/services/saga/test_saga_step_and_base.py (1)
82-82: LGTM! Modern asyncio pattern correctly applied.The change from the deprecated
get_event_loop().run_until_complete()toasyncio.run()is the correct modernization for executing async code from synchronous test functions. This avoids deprecation warnings in Python 3.10+ and follows current best practices.Would you like me to search the codebase for other instances of the deprecated
get_event_loop().run_until_complete()pattern that could benefit from the same update?#!/bin/bash # Description: Search for remaining instances of the deprecated asyncio pattern # Search for get_event_loop().run_until_complete usage rg -n "get_event_loop\(\)\.run_until_complete" --type=py
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
backend/app/dlq/manager.pybackend/app/events/admin_utils.pybackend/app/events/event_store.pybackend/app/events/event_store_consumer.pybackend/app/services/event_bus.pybackend/app/services/event_replay/replay_service.pybackend/app/services/notification_service.pybackend/app/services/result_processor/resource_cleaner.pybackend/tests/e2e/test_resource_cleaner_k8s.pybackend/tests/unit/services/saga/test_saga_step_and_base.py
🧰 Additional context used
🧬 Code graph analysis (2)
backend/tests/unit/services/saga/test_saga_step_and_base.py (1)
backend/app/services/saga/saga_step.py (2)
can_execute(95-97)SagaContext(14-70)
backend/tests/e2e/test_resource_cleaner_k8s.py (1)
backend/app/services/result_processor/resource_cleaner.py (1)
cleanup_pod_resources(48-82)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: E2E Tests
- GitHub Check: Integration Tests
🔇 Additional comments (13)
backend/tests/e2e/test_resource_cleaner_k8s.py (1)
45-52: LGTM! Modern asyncio API usage.The migration from
asyncio.get_event_loop().time()toasyncio.get_running_loop().time()is correct and follows current best practices. This ensures the timing measurement uses the currently running event loop, avoiding potential issues with deprecated API usage.backend/app/services/event_bus.py (1)
99-100: LGTM! Proper loop reference for executor.Using
asyncio.get_running_loop()ensures the executor is bound to the currently running event loop, which is the recommended approach and avoids deprecated API usage.backend/app/services/notification_service.py (1)
861-872: LGTM! Consistent timing measurement.The migration to
asyncio.get_running_loop().time()for notification delivery timing is correct and consistent with the broader codebase modernization.backend/app/dlq/manager.py (2)
193-193: LGTM! Modern loop API for timing.Correctly uses
asyncio.get_running_loop().time()for measuring DLQ message processing duration.
252-252: LGTM! Consistent timing measurement.The duration calculation properly uses the running loop's time source, matching the pattern established at line 193.
backend/app/services/event_replay/replay_service.py (3)
70-70: LGTM! Proper loop API for timing.Using
asyncio.get_running_loop().time()for the replay session start time is correct and follows modern asyncio best practices.
122-122: LGTM! Consistent duration calculation.The duration calculation correctly uses the running loop's time source, matching the start time measurement at line 70.
284-285: LGTM! Proper executor binding.Using
asyncio.get_running_loop()to obtain the loop forrun_in_executoris the correct approach and ensures the synchronous file write executes on the appropriate loop.backend/app/events/admin_utils.py (1)
44-44: Modern asyncio API usage - LGTM!The switch from
asyncio.get_event_loop()toasyncio.get_running_loop()correctly adopts the modern asyncio pattern for async contexts.backend/app/events/event_store.py (1)
57-57: Consistent asyncio timing modernization - LGTM!All timing measurements across the event store methods now correctly use
asyncio.get_running_loop().time()instead of the deprecatedget_event_loop()pattern. This ensures robust timing in async contexts.Also applies to: 74-74, 87-87, 115-115, 128-128, 135-135, 147-147, 161-161, 170-170, 178-178, 190-190, 200-200, 211-211, 221-221, 226-226, 234-234, 245-245, 261-261
backend/app/events/event_store_consumer.py (2)
43-43: Proper initialization with modern asyncio timing - LGTM!The explicit typing and initialization of
_last_batch_timeto0.0, followed by proper initialization to the running loop's time instart(), is a clean pattern.Also applies to: 52-52
130-130: Consistent timing source updates - LGTM!Batch timing calculations now correctly use
asyncio.get_running_loop().time()for consistent, modern async timing.Also applies to: 144-144
backend/app/services/result_processor/resource_cleaner.py (1)
90-90: Modern asyncio executor pattern - LGTM!All Kubernetes API operations executed via
run_in_executornow correctly useasyncio.get_running_loop()for proper async context handling. This modernization aligns with current asyncio best practices.Also applies to: 137-137, 182-182, 209-209, 230-230
|
@codex review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/core/providers.py (1)
72-72: Remove unused import:get_settings.The import
get_settingsis no longer used after switching to context-based provisioning viafrom_context. The pipeline failure confirms this unused import.🔎 Proposed fix
-from app.settings import Settings, get_settings +from app.settings import Settings
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
backend/app/core/container.pybackend/app/core/providers.pybackend/app/main.pybackend/tests/conftest.py
🧰 Additional context used
🧬 Code graph analysis (3)
backend/app/core/providers.py (2)
backend/app/settings.py (1)
Settings(11-160)backend/tests/conftest.py (1)
scope(107-109)
backend/tests/conftest.py (2)
backend/app/main.py (1)
create_app(44-127)backend/app/settings.py (1)
Settings(11-160)
backend/app/core/container.py (3)
backend/tests/conftest.py (1)
app(67-78)backend/tests/unit/conftest.py (1)
app(25-26)backend/app/settings.py (1)
Settings(11-160)
🪛 GitHub Actions: Ruff Linting
backend/app/core/providers.py
[error] 72-72: Ruff: F401 - 'get_settings' imported but unused. Remove unused import. 1 fixable with the --fix option.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Integration Tests
- GitHub Check: E2E Tests
🔇 Additional comments (10)
backend/app/core/providers.py (1)
75-78: LGTM: Context-based Settings provisioning.The migration from an explicit
@providemethod tofrom_contextcorrectly implements context-based dependency injection. This pattern allows Settings to be injected during container creation (as seen incontainer.pywherecontext={Settings: settings}is passed), enabling test-time configuration overrides.backend/app/main.py (2)
44-52: LGTM: Settings injection enables test configurability.The optional
settingsparameter allows tests to injectTestSettingswhile production usage falls back toget_settings(). The logic correctly prioritizes injected settings, addressing the concern from the past review comment about honoring pre-configured settings.
63-63: LGTM: Container creation uses injected settings.Correctly passes the
settingsinstance tocreate_app_container, ensuring the DI container receives the appropriate configuration (either from the parameter or fromget_settings()).backend/app/core/container.py (2)
22-44: LGTM: Explicit Settings injection via context.The addition of the
settingsparameter and passing it viacontext={Settings: settings}correctly implements explicit dependency injection. This works in tandem with thefrom_contextapproach inSettingsProvider(providers.py line 78) and enables test-time configuration overrides.
47-65: LGTM: Consistent context injection for worker containers.The
create_result_processor_containerfollows the same pattern ascreate_app_container, ensuring consistent Settings provisioning across both application and worker containers.backend/tests/conftest.py (5)
19-27: LGTM: Test-specific configuration class.The
TestSettingsclass correctly loads from.env.testfor test isolation. Usingextra="ignore"(vs."forbid"in production Settings) is appropriate for test environments where additional environment variables may be present without breaking validation.
35-62: LGTM: Robust pytest-xdist worker isolation.The per-worker environment setup correctly isolates:
- Database names (unique per session and worker)
- Redis DBs (distributed across workers using modulo 16)
- Kafka consumer groups (via unique suffix)
- Schema Registry subjects (via unique prefix)
Calling
_setup_worker_env()at module load time ensures environment variables are configured beforeTestSettingsinstantiation, preventing cross-worker interference in parallel test execution.
66-79: LGTM: DI-aware app fixture with proper cleanup.The session-scoped
appfixture correctly:
- Uses
TestSettings()to pick up worker-specific environment variables- Creates the app via
create_app(settings=TestSettings())- Properly tears down the Dishka container after tests
Session scope is appropriate to avoid Pydantic memory issues when recreating schemas repeatedly with pytest-xdist.
132-173: LGTM: Function-scoped credentials fix scope mismatch.Changing credential fixtures from session-scoped to function-scoped resolves the scope mismatch issue mentioned in the PR objectives (where session-scoped credentials combined with function-scoped user fixtures caused silent test skips).
Each test now receives:
- Unique UUID-based credentials
- Proper CSRF token and headers
- Clear error reporting on registration failures
177-197: LGTM: Consistent another_user fixture.The
another_userfixture follows the same pattern astest_userandtest_admin, returning a complete dictionary with credentials, CSRF token, and headers. The explicit dict structure ensures consistency across test helpers.
|
@codex review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (8)
backend/app/services/result_processor/processor.py (1)
200-205: Existing fragility noted: Memory limit parsing assumes "Mi" suffix.The parsing at line 201 with
rstrip("Mi")only handles theMi(mebibytes) suffix. Kubernetes memory limits can use various units (Gi,M,G, plain bytes, etc.), which would cause incorrect calculations or runtime errors.The existing TODO acknowledges this. Consider using a proper Kubernetes resource quantity parser (e.g., from
kuberneteslibrary or a utility function) if this becomes problematic.backend/workers/dlq_processor.py (3)
16-56: Unusedloggerparameter in_configure_retry_policies.The
loggerparameter is accepted but never used within this function. Either remove it for consistency or add logging for policy registration.🔎 Suggested fix
-def _configure_retry_policies(manager: DLQManager, logger: logging.Logger) -> None: +def _configure_retry_policies(manager: DLQManager) -> None:Then update the call site at line 117:
- _configure_retry_policies(manager, logger) + _configure_retry_policies(manager)
59-72: Unusedloggerparameter in_configure_filters.Similar to
_configure_retry_policies, theloggerparameter is not used. Consider removing it or adding debug logging when filters are applied.
92-100: Dead code:if not testing: passdoes nothing.Lines 97-98 contain a no-op conditional. If this is a placeholder for production alerting (e.g., PagerDuty, Slack), consider adding a TODO comment or implementing the functionality.
🔎 Suggested fix
async def alert_on_discard(message: DLQMessage, reason: str) -> None: logger.warning( f"Message {message.event_id} discarded! Type: {message.event_type}, Topic: {message.original_topic}, " f"Reason: {reason}, Original error: {message.error}" ) - if not testing: - pass + # TODO: Add production alerting (e.g., PagerDuty, Slack) when not in testing modebackend/workers/run_saga_orchestrator.py (1)
50-73: Consider passingsettingstorun_saga_orchestrator()for consistency.
main()already callsget_settings()at line 52. However,run_saga_orchestrator()is called without arguments at line 73, causingget_settings()to be invoked again. While functionally correct, passing the existingsettingswould be more efficient and consistent.🔎 Suggested fix
- asyncio.run(run_saga_orchestrator()) + asyncio.run(run_saga_orchestrator(settings))backend/app/core/providers.py (3)
630-660: Duplication withBusinessServicesProvider.get_execution_coordinator.
CoordinatorProvider.get_execution_coordinator(lines 638-660) is nearly identical toBusinessServicesProvider.get_execution_coordinator(lines 597-619). This appears intentional for different container contexts (APP scope for workers vs REQUEST scope for the main app). Consider extracting the common creation logic into a shared helper to reduce duplication and maintenance burden.🔎 Example refactor
# Helper function (outside providers) async def _create_execution_coordinator( kafka_producer: UnifiedProducer, schema_registry: SchemaRegistryManager, settings: Settings, event_store: EventStore, execution_repository: ExecutionRepository, idempotency_manager: IdempotencyManager, logger: logging.Logger, ) -> ExecutionCoordinator: return ExecutionCoordinator( producer=kafka_producer, schema_registry_manager=schema_registry, settings=settings, event_store=event_store, execution_repository=execution_repository, idempotency_manager=idempotency_manager, logger=logger, ) # Then in providers: @provide async def get_execution_coordinator(self, ...) -> AsyncIterator[ExecutionCoordinator]: coordinator = await _create_execution_coordinator(...) try: yield coordinator finally: await coordinator.stop()
732-778: Duplication withBusinessServicesProvider.get_saga_orchestrator.Same pattern as
CoordinatorProvider- this duplicates the saga orchestrator creation logic fromBusinessServicesProvider(lines 492-526). The same refactoring approach would help reduce code duplication.
696-710:get_event_repositoryandget_kafka_event_serviceduplicate methods fromUserServicesProvider.These methods (lines 696-710) duplicate the same methods in
UserServicesProvider(lines 370-381). While this may be necessary for provider isolation in different containers, consider if these could be shared via a common base provider.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (24)
backend/app/core/container.pybackend/app/core/providers.pybackend/app/events/core/consumer.pybackend/app/events/event_store_consumer.pybackend/app/services/coordinator/coordinator.pybackend/app/services/k8s_worker/worker.pybackend/app/services/notification_service.pybackend/app/services/pod_monitor/monitor.pybackend/app/services/result_processor/processor.pybackend/app/services/saga/saga_orchestrator.pybackend/app/services/sse/kafka_redis_bridge.pybackend/app/services/user_settings_service.pybackend/tests/integration/events/test_consume_roundtrip.pybackend/tests/integration/events/test_consumer_lifecycle.pybackend/tests/integration/events/test_event_dispatcher.pybackend/tests/integration/idempotency/test_consumer_idempotent.pybackend/tests/integration/result_processor/test_result_processor.pybackend/workers/dlq_processor.pybackend/workers/run_coordinator.pybackend/workers/run_event_replay.pybackend/workers/run_k8s_worker.pybackend/workers/run_pod_monitor.pybackend/workers/run_result_processor.pybackend/workers/run_saga_orchestrator.py
✅ Files skipped from review due to trivial changes (1)
- backend/app/services/user_settings_service.py
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/app/services/notification_service.py
🧰 Additional context used
🧬 Code graph analysis (14)
backend/workers/run_coordinator.py (1)
backend/app/services/coordinator/coordinator.py (1)
run_coordinator(501-534)
backend/app/services/sse/kafka_redis_bridge.py (1)
backend/app/events/core/consumer.py (2)
consumer(214-215)UnifiedConsumer(24-257)
backend/workers/run_result_processor.py (1)
backend/app/services/result_processor/processor.py (1)
run_result_processor(323-358)
backend/tests/integration/events/test_consumer_lifecycle.py (4)
backend/app/events/schema/schema_registry.py (1)
SchemaRegistryManager(54-230)backend/app/settings.py (1)
Settings(11-160)backend/tests/conftest.py (1)
scope(113-115)backend/app/events/core/consumer.py (1)
UnifiedConsumer(24-257)
backend/workers/run_event_replay.py (3)
backend/app/core/container.py (1)
create_event_replay_container(140-152)backend/app/core/logging.py (1)
setup_logger(110-147)backend/app/settings.py (2)
Settings(11-160)get_settings(164-165)
backend/workers/run_saga_orchestrator.py (7)
backend/app/core/container.py (1)
create_saga_orchestrator_container(124-137)backend/app/domain/enums/kafka.py (1)
GroupId(56-68)backend/app/events/core/producer.py (3)
UnifiedProducer(30-297)producer(64-65)is_running(52-53)backend/app/events/schema/schema_registry.py (2)
SchemaRegistryManager(54-230)initialize_event_schemas(239-240)backend/app/services/saga/saga_orchestrator.py (2)
SagaOrchestrator(31-524)is_running(70-71)backend/app/settings.py (2)
Settings(11-160)get_settings(164-165)backend/app/events/core/consumer.py (1)
is_running(210-211)
backend/tests/integration/result_processor/test_result_processor.py (2)
backend/app/settings.py (1)
Settings(11-160)backend/tests/conftest.py (1)
scope(113-115)
backend/app/services/saga/saga_orchestrator.py (6)
backend/app/events/schema/schema_registry.py (1)
SchemaRegistryManager(54-230)backend/app/infrastructure/kafka/events/base.py (1)
BaseEvent(13-37)backend/app/settings.py (1)
Settings(11-160)backend/app/events/core/consumer.py (5)
start(48-63)stop(65-80)UnifiedConsumer(24-257)consumer(214-215)state(202-203)backend/app/events/core/types.py (1)
ConsumerConfig(64-102)backend/app/events/core/dispatcher.py (2)
EventDispatcher(15-182)register_handler(64-73)
backend/app/core/providers.py (5)
backend/app/domain/enums/kafka.py (1)
KafkaTopic(7-53)backend/app/settings.py (1)
Settings(11-160)backend/app/events/schema/schema_registry.py (1)
SchemaRegistryManager(54-230)backend/app/dlq/manager.py (2)
DLQManager(27-470)create_dlq_manager(473-512)backend/app/core/k8s_clients.py (1)
K8sClients(9-13)
backend/app/services/pod_monitor/monitor.py (4)
backend/app/core/container.py (1)
create_pod_monitor_container(108-121)backend/app/core/k8s_clients.py (1)
K8sClients(9-13)backend/app/events/schema/schema_registry.py (2)
SchemaRegistryManager(54-230)initialize_event_schemas(239-240)backend/app/settings.py (2)
Settings(11-160)get_settings(164-165)
backend/workers/dlq_processor.py (2)
backend/app/core/container.py (1)
create_dlq_processor_container(155-167)backend/app/settings.py (2)
Settings(11-160)get_settings(164-165)
backend/app/services/result_processor/processor.py (5)
backend/app/events/schema/schema_registry.py (1)
SchemaRegistryManager(54-230)backend/app/settings.py (2)
Settings(11-160)get_settings(164-165)backend/app/events/core/producer.py (2)
producer(64-65)UnifiedProducer(30-297)backend/app/events/core/consumer.py (1)
UnifiedConsumer(24-257)backend/app/core/container.py (1)
create_result_processor_container(54-72)
backend/workers/run_pod_monitor.py (1)
backend/app/services/pod_monitor/monitor.py (1)
run_pod_monitor(535-571)
backend/workers/run_k8s_worker.py (1)
backend/app/services/k8s_worker/worker.py (1)
run_kubernetes_worker(523-556)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: E2E Tests
🔇 Additional comments (37)
backend/workers/run_pod_monitor.py (1)
34-34: LGTM! Settings correctly passed to worker function.The change aligns with the DI pattern by explicitly passing the
settingsinstance torun_pod_monitor, ensuring consistent configuration throughout the worker lifecycle.backend/workers/run_result_processor.py (1)
34-34: LGTM! Settings correctly passed to worker function.The change follows the same DI pattern as other workers, ensuring
run_result_processorreceives the configuration context.backend/workers/run_k8s_worker.py (1)
34-34: LGTM! Settings correctly passed to worker function.Consistent with the DI refactoring pattern applied across all worker entry points.
backend/workers/run_coordinator.py (1)
34-34: LGTM! Settings correctly passed to worker function.This completes the consistent DI pattern across all worker entry points, ensuring uniform configuration propagation throughout the system.
backend/app/events/core/consumer.py (1)
18-18: Excellent dependency injection refactoring!The changes successfully eliminate tight coupling to the global
get_settings()function by:
- Accepting
schema_registryandsettingsas constructor parameters- Storing and using the injected dependencies throughout the class
All 12 instantiation sites across the codebase have been properly updated to pass the new required parameters, including integration tests and production services (event store consumer, notification service, saga orchestrator, k8s worker, coordinator, result processor, and SSE bridge).
backend/app/services/sse/kafka_redis_bridge.py (1)
101-107: LGTM!The
UnifiedConsumerconstruction correctly passesschema_registryandsettingsas keyword arguments, aligning with the updated constructor signature that now requires these dependencies for schema-aware deserialization and topic prefix configuration.backend/tests/integration/events/test_event_dispatcher.py (1)
12-12: LGTM!The test correctly adopts the DI-based pattern by:
- Importing
Settingsfromapp.settings- Retrieving
Settingsvia the test scope container- Passing
schema_registryandsettingstoUnifiedConsumer, consistent with the updated constructor signatureAlso applies to: 25-25, 48-54
backend/tests/integration/events/test_consumer_lifecycle.py (1)
7-8: LGTM!The test correctly wires
SchemaRegistryManagerandSettingsfrom the DI scope intoUnifiedConsumer. The constructor invocation matches the expected signature, and the test properly exercises the consumer lifecycle (start, seek operations, stop).Also applies to: 16-27
backend/tests/integration/result_processor/test_result_processor.py (1)
23-23: LGTM!The test correctly integrates the DI-based wiring:
- Retrieves
Settingsfrom the test scope- Passes
schema_registryandsettingsto bothResultProcessorandUnifiedConsumer- Constructor invocations align with the updated signatures
Also applies to: 34-34, 54-61, 78-84
backend/tests/integration/events/test_consume_roundtrip.py (1)
12-12: LGTM!The roundtrip test correctly adopts the DI pattern by retrieving
Settingsfrom the scope and passing bothschema_registryandsettingstoUnifiedConsumer. The wiring is consistent with other integration tests in this PR.Also applies to: 25-25, 47-53
backend/app/services/pod_monitor/monitor.py (1)
535-572: LGTM!The
run_pod_monitorfunction correctly adopts the DI container pattern:
- Accepts optional
Settingswith fallback toget_settings()- Uses
create_pod_monitor_container(settings)for dependency wiring- Retrieves components (
Database,SchemaRegistryManager,UnifiedProducer,PodMonitor) from the container- Uses
asyncio.get_running_loop()which is the modern API (aligns with PR objective of replacing deprecated patterns)- Properly manages lifecycle via
AsyncExitStackwithcontainer.closecallbackbackend/app/services/result_processor/processor.py (3)
69-89: LGTM!The
ResultProcessorconstructor correctly incorporatesschema_registryandsettingsas explicit dependencies, storing them as instance attributes. This aligns with the project-wide shift to dependency injection.
136-154: LGTM!Consumer creation correctly uses
self._settingsfor Kafka configuration and passesschema_registryandsettingstoUnifiedConsumer, consistent with the updated constructor signature.
323-357: LGTM!The
run_result_processorfunction correctly adopts the DI container pattern:
- Accepts optional
Settingswith fallback toget_settings()- Initializes MongoDB/Beanie with settings
- Creates container and retrieves dependencies via DI
- Properly manages lifecycle via
AsyncExitStackwith cleanup callbacks for both container and DB clientbackend/app/events/event_store_consumer.py (5)
14-14: LGTM: Settings dependency injection wiring.The
Settingsimport and constructor parameter addition properly enables DI-based configuration, replacing the previousget_settings()runtime lookup pattern.Also applies to: 25-25, 34-34
45-45: Correct use ofasyncio.get_running_loop().time()for timing.Initializing
_last_batch_timeto0.0in the constructor and then setting it viaget_running_loop().time()instart()is the correct pattern. This replaces the deprecatedasyncio.get_event_loop()approach as mentioned in the PR objectives.Also applies to: 54-54
56-68: DI-wired consumer configuration looks good.The
ConsumerConfigandUnifiedConsumernow correctly useself.settingsfor Kafka bootstrap servers and group suffix, and the schema registry and settings are properly passed through.
137-137: Consistent timing API usage in batch processing.Both
_batch_processorand_flush_batchuseasyncio.get_running_loop().time()consistently for time calculations, which is the modern approach.Also applies to: 151-151
168-189: Factory function properly propagates settings.The
create_event_store_consumerfactory function correctly accepts and forwards theSettingsparameter toEventStoreConsumer.backend/tests/integration/idempotency/test_consumer_idempotent.py (2)
11-11: Test correctly wires DI-injected dependencies.The test retrieves
SchemaRegistryManagerandSettingsfrom the DI scope, aligning with the updated component signatures throughout the codebase.Also applies to: 15-15, 27-28
45-59: Consumer and wrapper construction follows updated signatures.The
UnifiedConsumeris correctly instantiated withschema_registryandsettingsparameters, and theIdempotentConsumerWrapperreceives the additional configuration parameters (dispatcher,default_key_strategy,enable_for_all_handlers,logger). This aligns with the DI-driven changes across the codebase.backend/app/services/saga/saga_orchestrator.py (3)
34-59: Comprehensive DI wiring in SagaOrchestrator constructor.The constructor now accepts
schema_registry_manager,settings, andloggeras explicit dependencies, storing them as instance fields. This enables proper DI-based configuration and improves testability.
125-151: Consumer setup uses DI-injected dependencies correctly.The
ConsumerConfigusesself._settingsfor bootstrap servers and group suffix, and theUnifiedConsumeris properly wired withschema_registry,settings, and the instance logger.
527-564: Factory function properly documents and propagates dependencies.The
create_saga_orchestratorfactory function has an updated signature and docstring reflecting all new dependencies. The implementation correctly forwards them to theSagaOrchestratorconstructor.backend/workers/run_event_replay.py (3)
16-24: Cleanup task accepts explicit logger dependency.Passing the logger as a parameter improves testability and aligns with the DI-driven approach. The function no longer creates its own logger internally.
27-48: DI container-based initialization is well structured.The
run_replay_servicefunction:
- Accepts optional
Settings(defaulting toget_settings())- Creates the container with settings
- Retrieves logger, database, producer, and service from DI
- Initializes Beanie with the container-provided database
- Uses
AsyncExitStackfor proper lifecycle managementThe pattern is consistent with other worker entry points in this PR.
62-81: Main entry point correctly propagates settings.The
main()function obtains settings first, sets up logging and tracing, then passes settings torun_replay_service(). This ensures consistent configuration across the worker lifecycle.backend/app/services/k8s_worker/worker.py (3)
56-72: Constructor properly accepts and uses Settings.The
KubernetesWorkerconstructor now acceptssettings: Settings, stores it asself._settings, and uses it for configuration (e.g.,kafka_serversfallback). This aligns with the DI pattern across the codebase.
117-135: Consumer setup uses DI-injected configuration.The
ConsumerConfigusesself._settings.KAFKA_GROUP_SUFFIXfor the group ID, andUnifiedConsumeris properly constructed withschema_registryandsettingsparameters.
523-556: Worker entry point follows consistent DI container pattern.The
run_kubernetes_workerfunction:
- Accepts optional
Settings- Creates the container and retrieves dependencies
- Initializes Beanie and event schemas
- Uses
AsyncExitStackfor lifecycle management withcontainer.closepushed as a callbackThis pattern is consistent with other worker entry points updated in this PR.
backend/app/services/coordinator/coordinator.py (3)
59-77: Coordinator constructor properly accepts Settings.The
ExecutionCoordinatorconstructor now acceptssettings: Settings, stores it asself._settings, and uses it for Kafka configuration. This enables DI-based configuration consistent with the rest of the codebase.
124-142: Consumer configuration uses DI-injected settings.The
ConsumerConfigandUnifiedConsumerare properly wired withself._settingsfor the group suffix and withschema_registryandsettingsparameters.
501-534: Coordinator entry point follows DI container pattern.The
run_coordinatorfunction follows the same well-structured pattern as other workers:
- Optional
Settingsparameter with fallback toget_settings()- Container creation with settings
- Beanie and schema initialization
AsyncExitStackfor lifecycle managementThis ensures consistency across all worker entry points.
backend/app/core/container.py (2)
29-51: App container properly accepts and injects Settings.The
create_app_containerfunction now accepts aSettingsparameter and injects it viacontext={Settings: settings}, enabling DI consumers to receive configuration without global lookups.
75-167: Worker-specific containers follow consistent pattern.All new container factory functions (
create_coordinator_container,create_k8s_worker_container,create_pod_monitor_container,create_saga_orchestrator_container,create_event_replay_container,create_dlq_processor_container) follow a consistent structure:
- Accept
Settingsparameter- Include appropriate providers for each worker type
- Inject settings via
context={Settings: settings}This modular approach enables each worker to have a minimal, focused dependency graph.
backend/app/core/providers.py (2)
80-83: LGTM: SettingsProvider correctly usesfrom_context.The transition to
from_context(provides=Settings, scope=Scope.APP)is the proper Dishka pattern for injecting settings from the container context.
676-685: No changes needed.K8sWorkerConfig()instantiation with default values is correct and intentional. The class is designed with sensible defaults, and the worker uses a fallback pattern where settings are passed separately toKubernetesWorker. For example,kafka_bootstrap_serversdefaults toNonein the config, and the worker resolves it viaself.config.kafka_bootstrap_servers or self._settings.KAFKA_BOOTSTRAP_SERVERS. This separation of concerns is consistent throughout the codebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (2)
backend/workers/run_result_processor.py (2)
46-54: Verify cleanup ordering in AsyncExitStack.The cleanup order is LIFO:
db_client.close→container.close→processor.__aexit__. This means the database connection is closed before the processor'sstop()method runs.Currently
processor.stop()only closes the consumer, idempotency manager, and producer (no DB access), so this appears safe. However, this ordering is counterintuitive and fragile if future changes add DB operations to shutdown logic.Consider reordering to ensure the processor exits before closing the database:
🔎 Suggested reordering
async with AsyncExitStack() as stack: + stack.callback(db_client.close) + stack.push_async_callback(container.close) await stack.enter_async_context(processor) - stack.push_async_callback(container.close) - stack.callback(db_client.close)This ensures cleanup order:
processor.__aexit__→container.close→db_client.close
61-62: Potentially redundant logging configuration.
setup_logger(settings.LOG_LEVEL)is called first, thenlogging.basicConfigwith a hardcodedINFOlevel. Ifsetup_loggeralready configures handlers on the root logger,basicConfighas no effect. If it doesn't, the hardcodedINFOlevel ignoressettings.LOG_LEVEL.Consider removing the redundant
basicConfigcall or verifying this is intentional.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
backend/app/services/coordinator/coordinator.pybackend/app/services/k8s_worker/worker.pybackend/app/services/pod_monitor/monitor.pybackend/app/services/result_processor/processor.pybackend/workers/run_coordinator.pybackend/workers/run_k8s_worker.pybackend/workers/run_pod_monitor.pybackend/workers/run_result_processor.py
🧰 Additional context used
🧬 Code graph analysis (3)
backend/app/services/pod_monitor/monitor.py (2)
backend/tests/conftest.py (2)
client(95-103)app(73-84)backend/app/core/k8s_clients.py (1)
K8sClients(9-13)
backend/workers/run_k8s_worker.py (5)
backend/app/core/container.py (1)
create_k8s_worker_container(91-105)backend/app/domain/enums/kafka.py (1)
GroupId(56-68)backend/app/events/schema/schema_registry.py (2)
SchemaRegistryManager(54-230)initialize_event_schemas(239-240)backend/app/services/k8s_worker/worker.py (2)
KubernetesWorker(37-513)get_status(434-444)backend/app/settings.py (2)
Settings(11-160)get_settings(164-165)
backend/app/services/k8s_worker/worker.py (6)
backend/app/events/core/types.py (1)
ConsumerConfig(64-102)backend/app/events/core/dispatcher.py (1)
EventDispatcher(15-182)backend/app/events/core/consumer.py (2)
UnifiedConsumer(24-257)consumer(214-215)backend/app/events/core/producer.py (1)
UnifiedProducer(30-297)backend/app/events/event_store.py (1)
EventStore(29-330)backend/app/settings.py (1)
Settings(11-160)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: E2E Tests
- GitHub Check: Integration Tests
🔇 Additional comments (11)
backend/app/services/pod_monitor/monitor.py (1)
5-5: LGTM: Import supports DI refactoring.The
asynccontextmanagerimport is correctly used to decoratecreate_pod_monitor(line 505), enabling proper lifecycle management for the monitor instance in the new DI pattern.backend/app/services/result_processor/processor.py (3)
63-84: LGTM! Clean dependency injection refactor.The constructor now properly accepts
schema_registryandsettingsas explicit dependencies rather than relying on global state, which improves testability and makes dependencies explicit.
128-159: Consumer creation properly wired with injected dependencies.The consumer configuration correctly uses the injected
self._settingsfor Kafka configuration, andUnifiedConsumerreceives the schema registry and settings it needs.
194-199: Settings usage updated correctly; existing TODO for parsing acknowledged.The switch to
self._settingsis appropriate. The fragile memory limit parsing (rstrip("Mi")) is already noted with a TODO. Consider addressing this in a follow-up to handle formats like"1Gi"or numeric bytes.backend/workers/run_result_processor.py (2)
24-27: Good MongoDB client configuration.Using
tz_aware=Trueensures consistent timezone handling, and the 5-second server selection timeout is reasonable for startup.
29-44: Clean dependency resolution and processor instantiation.Dependencies are properly resolved from the container and passed to
ResultProcessor, aligning with the updated constructor signature. The DI wiring is correct.backend/app/services/coordinator/coordinator.py (2)
51-66: LGTM: Settings dependency injection properly implemented.The constructor now accepts
Settingsas a parameter and stores it for internal use, replacing directget_settings()calls. This enables proper dependency injection and improves testability.
128-134: LGTM: UnifiedConsumer initialization correctly updated.The consumer is now instantiated with
schema_registryandsettingsparameters, aligning with the DI-based refactor across the codebase.backend/app/services/k8s_worker/worker.py (2)
49-66: LGTM: Settings dependency injection properly implemented.The
KubernetesWorkerconstructor now accepts and storesSettingsfor internal use, consistent with the DI refactor across the codebase.
122-128: LGTM: UnifiedConsumer initialization correctly updated.The consumer is properly instantiated with
schema_registryandsettingsparameters, matching the updated API and DI pattern.backend/workers/run_pod_monitor.py (1)
21-57: LGTM: Excellent implementation of DI-based runner with correct signal handling.This implementation demonstrates the proper patterns:
- Uses
loop.add_signal_handler()for safe signal handling within the event loop context (lines 39-47)- Accesses public API (
monitor.state) instead of private attributes (line 54)- Properly manages lifecycle with
AsyncExitStackThis serves as a good reference for the other runner implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
backend/workers/dlq_processor.py (1)
115-132: Critical: DLQManager lifecycle is not managed.The
DLQManageris retrieved from the container and configured, but it's never started or stopped. The manager should be:
- Started before waiting for the stop event
- Entered as an async context or explicitly stopped on shutdown
🔎 Proposed fix
async with AsyncExitStack() as stack: stack.push_async_callback(container.close) + await stack.enter_async_context(manager) await stop_event.wait()Or if
DLQManagerdoesn't implement the async context protocol:+ await manager.start() + async with AsyncExitStack() as stack: stack.push_async_callback(container.close) + stack.push_async_callback(manager.stop) await stop_event.wait()backend/workers/run_event_replay.py (1)
44-59: Missing graceful shutdown mechanism.Similar to
run_result_processor.py, this worker lacks signal handlers. Theawait asyncio.Event().wait()blocks forever, meaning the process can only be terminated forcefully.🔎 Suggested addition
+ stop_event = asyncio.Event() + loop = asyncio.get_running_loop() + + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, stop_event.set) + async with AsyncExitStack() as stack: stack.push_async_callback(container.close) await stack.enter_async_context(producer) task = asyncio.create_task(cleanup_task(replay_service, logger)) # ... existing cancellation callback ... - await asyncio.Event().wait() + await stop_event.wait()backend/workers/run_saga_orchestrator.py (1)
73-73: Settings not passed torun_saga_orchestrator().Unlike other workers in this PR,
main()retrievessettingsbut doesn't pass it torun_saga_orchestrator(). This means settings are loaded twice (once inmain()and once inrun_saga_orchestrator()), which is inconsistent and wasteful.🔎 Proposed fix
- asyncio.run(run_saga_orchestrator()) + asyncio.run(run_saga_orchestrator(settings))
♻️ Duplicate comments (5)
backend/workers/run_coordinator.py (2)
38-43: Critical: Signal handler implementation is unsafe.This uses
signal.signal()with a handler that callsasyncio.create_task(), which is problematic because signal handlers execute in the main thread outside the event loop context. Useloop.add_signal_handler()instead, as correctly implemented inrun_pod_monitor.py(lines 39-47).🔎 Proposed fix
+ loop = asyncio.get_running_loop() + + async def shutdown() -> None: + logger.info("Initiating graceful shutdown...") + await coordinator.stop() + - def signal_handler(sig: int, frame: Any) -> None: - logger.info(f"Received signal {sig}, initiating shutdown...") - asyncio.create_task(coordinator.stop()) - - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown()))
50-53: Accessing private attribute_running.The code accesses
coordinator._runningdirectly. Consider using the existingget_status()method or adding a publicis_runningproperty for better encapsulation.🔎 Suggested alternatives
Option 1: Use state from
get_status():- while coordinator._running: + while (await coordinator.get_status())["running"]: await asyncio.sleep(60)Option 2: Add
is_runningproperty toExecutionCoordinatorand use it.backend/workers/run_k8s_worker.py (2)
38-43: Critical: Signal handler implementation is unsafe.Same issue as in
run_coordinator.py. Usingsignal.signal()with a handler that callsasyncio.create_task()can fail because signal handlers execute outside the event loop context. Useloop.add_signal_handler()instead, as demonstrated inrun_pod_monitor.py.🔎 Proposed fix
+ loop = asyncio.get_running_loop() + + async def shutdown() -> None: + logger.info("Initiating graceful shutdown...") + await worker.stop() + - def signal_handler(sig: int, frame: Any) -> None: - logger.info(f"Received signal {sig}, initiating shutdown...") - asyncio.create_task(worker.stop()) - - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown()))
50-53: Accessing private attribute_running.The code accesses
worker._runningdirectly. Consider adding a publicis_runningproperty toKubernetesWorkerfor better encapsulation.backend/app/core/providers.py (1)
804-825: Previously flagged issue has been addressed.The
DLQProcessorProvider.get_dlq_managernow explicitly callsawait manager.start()(line 821) before yielding, matching theMessagingProviderpattern and ensuring single start/stop lifecycle ownership.
🧹 Nitpick comments (6)
backend/tests/unit/services/saga/test_saga_orchestrator_unit.py (1)
48-49: Consider using Mock objects for better test isolation.The empty placeholder classes work but could be replaced with
unittest.mock.Mock()objects for more explicit test doubles and better validation of interactions.🔎 Optional refactor using Mock objects
+from unittest.mock import Mock + class _Store: ... class _Alloc: ... -class _SchemaRegistry: ... -class _Settings: ... + +_schema_registry_mock = Mock() +_settings_mock = Mock()Then update the constructor calls at lines 75-76:
- schema_registry_manager=_SchemaRegistry(), # type: ignore[arg-type] - settings=_Settings(), # type: ignore[arg-type] + schema_registry_manager=_schema_registry_mock, + settings=_settings_mock,backend/workers/run_result_processor.py (1)
24-27: Inconsistent database initialization pattern.This worker creates a MongoDB client directly, while other workers in this PR (run_coordinator.py, run_k8s_worker.py, run_pod_monitor.py, etc.) obtain the
Databasefrom the DI container. Consider using the container-providedDatabasefor consistency:- db_client: AsyncMongoClient[dict[str, object]] = AsyncMongoClient( - settings.MONGODB_URL, tz_aware=True, serverSelectionTimeoutMS=5000 - ) - await init_beanie(database=db_client[settings.DATABASE_NAME], document_models=ALL_DOCUMENTS) + db = await container.get(Database) + await init_beanie(database=db, document_models=ALL_DOCUMENTS)This also simplifies lifecycle management by removing the need to manually close
db_client.backend/workers/dlq_processor.py (1)
16-27: Minor: Unusedloggerparameter in configuration functions.The
loggerparameter in_configure_retry_policiesand_configure_filtersis passed but never used within the function body. Either use it for logging or remove it if not needed.Also applies to: 59-72
backend/app/core/providers.py (3)
503-511: Consider extractingSagaConfigto a shared constant.This exact configuration is duplicated in
SagaOrchestratorProvider(lines 755-763). A shared constant or factory function would reduce duplication.# Could define at module level or in a config module: DEFAULT_SAGA_CONFIG = SagaConfig( name="main-orchestrator", timeout_seconds=300, max_retries=3, retry_delay_seconds=5, enable_compensation=True, store_events=True, publish_commands=True, )
630-660: Consider consolidating duplicate repository factory methods.
get_execution_repositoryis defined identically in:
ResultProcessorProvider(line 626)CoordinatorProvider(line 634)BusinessServicesProvider(line 472)While scoping differences may require separate providers, the repository instantiation logic could be extracted to a shared helper or base class to reduce maintenance burden.
695-710: Duplicate repository/service factory methods.
get_event_repositoryandget_kafka_event_serviceare identical to those inUserServicesProvider(lines 370-381). Consider extracting shared logic if this pattern expands further.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
backend/app/core/providers.pybackend/tests/unit/services/result_processor/test_processor.pybackend/tests/unit/services/saga/test_saga_orchestrator_unit.pybackend/workers/dlq_processor.pybackend/workers/run_coordinator.pybackend/workers/run_event_replay.pybackend/workers/run_k8s_worker.pybackend/workers/run_pod_monitor.pybackend/workers/run_result_processor.pybackend/workers/run_saga_orchestrator.py
🧰 Additional context used
🧬 Code graph analysis (6)
backend/workers/run_k8s_worker.py (4)
backend/app/core/container.py (1)
create_k8s_worker_container(91-105)backend/app/events/schema/schema_registry.py (2)
SchemaRegistryManager(54-230)initialize_event_schemas(239-240)backend/app/services/k8s_worker/worker.py (2)
KubernetesWorker(37-513)stop(151-182)backend/app/settings.py (2)
Settings(11-160)get_settings(164-165)
backend/workers/dlq_processor.py (4)
backend/app/core/container.py (1)
create_dlq_processor_container(155-167)backend/app/dlq/models.py (2)
DLQMessage(30-55)RetryPolicy(82-118)backend/app/dlq/manager.py (1)
DLQManager(27-470)backend/app/settings.py (2)
Settings(11-160)get_settings(164-165)
backend/workers/run_saga_orchestrator.py (3)
backend/app/core/container.py (1)
create_saga_orchestrator_container(124-137)backend/app/events/schema/schema_registry.py (2)
SchemaRegistryManager(54-230)initialize_event_schemas(239-240)backend/app/settings.py (2)
Settings(11-160)get_settings(164-165)
backend/workers/run_coordinator.py (8)
backend/app/core/container.py (1)
create_coordinator_container(75-88)backend/app/core/tracing/config.py (1)
init_tracing(177-197)backend/app/events/core/producer.py (4)
UnifiedProducer(30-297)producer(64-65)stop(153-172)get_status(132-151)backend/app/events/schema/schema_registry.py (2)
SchemaRegistryManager(54-230)initialize_event_schemas(239-240)backend/app/services/coordinator/coordinator.py (3)
ExecutionCoordinator(39-490)stop(173-199)get_status(483-490)backend/app/settings.py (2)
Settings(11-160)get_settings(164-165)backend/workers/run_k8s_worker.py (1)
signal_handler(38-40)backend/app/events/core/consumer.py (2)
stop(65-80)get_status(217-234)
backend/workers/run_result_processor.py (7)
backend/app/core/container.py (1)
create_result_processor_container(54-72)backend/app/core/logging.py (1)
setup_logger(110-147)backend/app/db/repositories/execution_repository.py (1)
ExecutionRepository(17-91)backend/app/events/core/producer.py (3)
UnifiedProducer(30-297)producer(64-65)get_status(132-151)backend/app/events/schema/schema_registry.py (1)
SchemaRegistryManager(54-230)backend/app/services/result_processor/processor.py (2)
ResultProcessor(60-314)get_status(309-314)backend/app/settings.py (2)
Settings(11-160)get_settings(164-165)
backend/app/core/providers.py (4)
backend/app/domain/enums/kafka.py (1)
KafkaTopic(7-53)backend/app/settings.py (1)
Settings(11-160)backend/app/core/k8s_clients.py (1)
K8sClients(9-13)backend/app/domain/saga/models.py (1)
SagaConfig(92-104)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Integration Tests
- GitHub Check: E2E Tests
🔇 Additional comments (14)
backend/tests/unit/services/saga/test_saga_orchestrator_unit.py (2)
1-1: LGTM! Clean logging setup for tests.The test logger provides proper observability during test execution with a clear namespace.
Also applies to: 13-14
75-76: LGTM! Proper dependency injection for SagaOrchestrator.The test helper correctly wires the new dependencies (schema_registry_manager, settings, logger) into the SagaOrchestrator constructor, aligning with the broader DI refactoring across the codebase.
Also applies to: 80-80
backend/tests/unit/services/result_processor/test_processor.py (1)
32-39: LGTM!The test correctly instantiates
ResultProcessorwith the two additional dependencies (schema_registryandsettings) required by the DI refactoring. UsingMagicMock()is appropriate for this unit test, which focuses on verifying dispatcher handler registration rather than the full behavior of these dependencies.backend/workers/run_pod_monitor.py (2)
39-47: Good: Correct signal handler pattern usingloop.add_signal_handler().This implementation correctly uses the event loop's signal handling mechanism, avoiding the issues present in
run_coordinator.pyandrun_k8s_worker.py. This is the recommended pattern for async signal handling.
41-44: No issue: UnifiedProducer.stop() is idempotent.The
shutdown()function callsawait producer.stop(), andAsyncExitStackwill also invokeproducer.__aexit__()(which callsstop()). However,UnifiedProducer.stop()includes state guards (lines 154-156) that return early if the producer is already inSTOPPEDorSTOPPINGstate. Multiple calls are safe and handle gracefully.backend/workers/run_event_replay.py (1)
27-48: LGTM: DI container pattern and lifecycle management.The container-based initialization, Beanie setup, and
AsyncExitStackusage follow the established patterns. The cleanup task cancellation via async callback is properly implemented.backend/workers/run_saga_orchestrator.py (1)
36-39: Cleanup order is correct.The past review flagged cleanup order issues, but the current implementation is correct:
container.closeis pushed first, thenproducerandorchestratorare entered. With LIFO ordering,orchestrator.__aexit__()runs first, thenproducer.__aexit__(), thencontainer.close().backend/app/core/providers.py (7)
80-83: Clean refactor to context-based Settings provision.Using
from_contextis the idiomatic dishka pattern for dependencies passed tomake_async_container. This simplifies the provider and aligns with the PR objective of centralizing Settings injection.
165-174: LGTM!Settings injection and lifecycle management are correctly implemented.
195-221: LGTM!Settings propagation through
get_schema_registryandget_event_store_consumeris consistent with the DI refactor.
596-619: LGTM!Settings injection into
ExecutionCoordinatoris consistent, and lifecycle management is correct.
663-689: LGTM!The
K8sWorkerProvidercleanly encapsulates worker instantiation with proper Settings injection and lifecycle management.
732-778: Approve with prior duplication notes.The provider structure is correct for APP-scoped saga orchestration in worker contexts. The
SagaConfigduplication was flagged earlier.
781-801: LGTM!The
EventReplayProvidercleanly provides the replay service with proper dependency injection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
backend/workers/run_coordinator.py (1)
51-54: Private attribute_runningaccess remains unaddressed.This was flagged in a previous review. Consider exposing a public
is_runningproperty onExecutionCoordinatoror deriving state fromget_status()["running"]for better encapsulation.backend/workers/run_k8s_worker.py (1)
51-54: Private attribute_runningaccess remains unaddressed.This was flagged in a previous review. Consider exposing a public
is_runningproperty onKubernetesWorkerfor better encapsulation.
🧹 Nitpick comments (7)
backend/workers/run_coordinator.py (1)
43-44: Signal handler lambda captures loop variable by reference.All signal handlers use the same
shutdown()coroutine, so the closure oversigis benign here. However, if you later need signal-specific behavior, the lambda would always see the last value ofsig. Consider capturing explicitly for defensive coding:for sig in (signal.SIGINT, signal.SIGTERM): - loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown())) + loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(shutdown()))backend/workers/run_result_processor.py (3)
25-28: Inconsistent database initialization pattern.This file creates
AsyncMongoClientmanually and callsinit_beaniedirectly, while other workers (run_coordinator, run_k8s_worker, run_saga_orchestrator) retrieveDatabasefrom the DI container. Consider using the container-providedDatabasefor consistency:- db_client: AsyncMongoClient[dict[str, object]] = AsyncMongoClient( - settings.MONGODB_URL, tz_aware=True, serverSelectionTimeoutMS=5000 - ) - await init_beanie(database=db_client[settings.DATABASE_NAME], document_models=ALL_DOCUMENTS) - container = create_result_processor_container(settings) + db = await container.get(Database) + await init_beanie(database=db, document_models=ALL_DOCUMENTS)This also simplifies cleanup since the container manages the database lifecycle.
53-54: Signal handler lambda captures loop variable by reference.Same as run_coordinator.py - consider capturing
sigexplicitly for defensive coding if signal-specific behavior is needed later.
61-64: Loop condition may exit prematurely on intermediate states.The condition
processor._state == ProcessingState.PROCESSINGexits on any state other thanPROCESSING. If there are transient states (e.g.,STOPPING), consider checking for the terminal state instead:- while processor._state == ProcessingState.PROCESSING: + while processor._state not in (ProcessingState.STOPPED,):Or expose a public
is_runningproperty onResultProcessorfor consistency with other components.backend/workers/run_k8s_worker.py (1)
43-44: Signal handler lambda captures loop variable by reference.Same pattern as other workers - consider capturing
sigexplicitly for defensive coding.backend/workers/run_saga_orchestrator.py (1)
43-44: Signal handler lambda captures loop variable by reference.Same pattern as other workers - consider capturing
sigexplicitly for defensive coding.backend/workers/dlq_processor.py (1)
16-16: Logger parameter is unused in these functions.The
loggerparameter is accepted but never used in_configure_retry_policiesand_configure_filters. Consider removing it if not needed, or add logging statements for configuration actions.🔎 Optional: Add logging for configuration actions
def _configure_retry_policies(manager: DLQManager, logger: logging.Logger) -> None: + logger.info("Configuring retry policies for DLQ manager") manager.set_retry_policy(def _configure_filters(manager: DLQManager, testing: bool, logger: logging.Logger) -> None: + logger.info(f"Configuring filters for DLQ manager (testing={testing})") if not testing:Also applies to: 59-59
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
backend/tests/unit/services/pod_monitor/test_monitor.pybackend/workers/dlq_processor.pybackend/workers/run_coordinator.pybackend/workers/run_k8s_worker.pybackend/workers/run_result_processor.pybackend/workers/run_saga_orchestrator.py
🧰 Additional context used
🧬 Code graph analysis (4)
backend/workers/run_coordinator.py (7)
backend/app/core/container.py (1)
create_coordinator_container(75-88)backend/app/domain/enums/kafka.py (1)
GroupId(56-68)backend/app/events/core/producer.py (4)
UnifiedProducer(30-297)producer(64-65)stop(153-172)get_status(132-151)backend/app/events/schema/schema_registry.py (2)
SchemaRegistryManager(54-230)initialize_event_schemas(239-240)backend/app/services/coordinator/coordinator.py (3)
ExecutionCoordinator(39-490)stop(173-199)get_status(483-490)backend/app/settings.py (2)
Settings(11-160)get_settings(164-165)backend/app/events/core/consumer.py (2)
stop(65-80)get_status(217-234)
backend/workers/dlq_processor.py (4)
backend/app/core/container.py (1)
create_dlq_processor_container(155-167)backend/app/dlq/models.py (2)
DLQMessage(30-55)RetryPolicy(82-118)backend/app/dlq/manager.py (1)
DLQManager(27-470)backend/app/settings.py (2)
Settings(11-160)get_settings(164-165)
backend/workers/run_k8s_worker.py (3)
backend/app/core/container.py (1)
create_k8s_worker_container(91-105)backend/app/events/schema/schema_registry.py (2)
SchemaRegistryManager(54-230)initialize_event_schemas(239-240)backend/app/settings.py (2)
Settings(11-160)get_settings(164-165)
backend/workers/run_saga_orchestrator.py (6)
backend/app/core/container.py (1)
create_saga_orchestrator_container(124-137)backend/app/core/logging.py (1)
setup_logger(110-147)backend/app/events/core/producer.py (4)
UnifiedProducer(30-297)producer(64-65)stop(153-172)is_running(52-53)backend/app/events/schema/schema_registry.py (2)
SchemaRegistryManager(54-230)initialize_event_schemas(239-240)backend/app/services/saga/saga_orchestrator.py (3)
SagaOrchestrator(31-524)stop(86-103)is_running(70-71)backend/app/settings.py (2)
Settings(11-160)get_settings(164-165)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: E2E Tests
- GitHub Check: Integration Tests
🔇 Additional comments (8)
backend/tests/unit/services/pod_monitor/test_monitor.py (2)
4-11: LGTM!The new imports for
MagicMockandK8sClientsare correctly added to support the DI test. They follow the existing import organization pattern in the file.
523-548: LGTM!This test properly validates the DI injection path for
create_pod_monitor:
- Correctly sets up mock
K8sClientswith all required components- Verifies the monitor uses the injected clients (
_clientsand_v1assertions)- Validates state lifecycle (RUNNING during context, STOPPED after exit)
- Follows existing test patterns in the file (uses
_patch_k8s,_FakeKafkaEventService)The test pairs well with
test_create_pod_monitor_context_manager(lines 509-520) which covers the non-injected path, ensuring both code paths are tested.backend/workers/run_coordinator.py (1)
46-49: LGTM: Correct AsyncExitStack cleanup order.The container cleanup is pushed first (line 47), so it will execute last during teardown (LIFO), after producer and coordinator have exited. This ensures dependencies remain available during component shutdown.
backend/workers/run_result_processor.py (1)
56-59: LGTM: Cleanup order is correct.The LIFO order ensures processor exits first, then container closes, then the database client closes. The graceful shutdown mechanism is now properly implemented.
backend/workers/run_k8s_worker.py (1)
19-35: LGTM: DI container initialization is consistent with other workers.The initialization sequence (container → logger → database/Beanie → schema registry → producer → worker) follows the established pattern and correctly wires dependencies.
backend/workers/run_saga_orchestrator.py (2)
53-55: Good: Uses publicis_runningproperty.Unlike run_coordinator and run_k8s_worker which access private
_running, this correctly uses the publicorchestrator.is_runningproperty. This is the preferred pattern - consider updating the other workers to match.
46-49: LGTM: Cleanup order and lifecycle management are correct.Previous review comment about cleanup order has been addressed. The container closes after producer and orchestrator exit, ensuring dependencies remain available during component shutdown.
backend/workers/dlq_processor.py (1)
122-122: Good modernization of asyncio API.Using
get_running_loop()instead of the deprecatedget_event_loop()pattern is correct and aligns with modern asyncio best practices.
|
@codex review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/app/core/providers.py (2)
521-546: Eliminate duplication: SagaOrchestrator provided by both BusinessServicesProvider and SagaOrchestratorProvider.The
get_saga_orchestratormethod appears in both:
BusinessServicesProvider(lines 521-546, scope=REQUEST)SagaOrchestratorProvider(lines 723-752, scope=APP)This duplication creates maintenance overhead and potential inconsistency. The two providers serve different purposes:
BusinessServicesProvider.get_saga_orchestrator: Used by the main app for per-request orchestrationSagaOrchestratorProvider.get_saga_orchestrator: Used by the saga orchestrator worker containerThe scope difference (REQUEST vs. APP) indicates they're used in different contexts, but the construction logic is identical.
💡 Recommended approach to eliminate duplication
Extract the orchestrator construction logic into a shared factory function at module level:
def _create_saga_orchestrator_instance( saga_repository: SagaRepository, kafka_producer: UnifiedProducer, schema_registry: SchemaRegistryManager, settings: Settings, event_store: EventStore, idempotency_manager: IdempotencyManager, resource_allocation_repository: ResourceAllocationRepository, logger: logging.Logger, ) -> SagaOrchestrator: """Shared factory for SagaOrchestrator instances.""" return create_saga_orchestrator( saga_repository=saga_repository, producer=kafka_producer, schema_registry_manager=schema_registry, settings=settings, event_store=event_store, idempotency_manager=idempotency_manager, resource_allocation_repository=resource_allocation_repository, config=_create_default_saga_config(), logger=logger, )Then both providers can call this factory:
# In BusinessServicesProvider async def get_saga_orchestrator(...) -> AsyncIterator[SagaOrchestrator]: orchestrator = _create_saga_orchestrator_instance(...) try: yield orchestrator finally: await orchestrator.stop() # In SagaOrchestratorProvider async def get_saga_orchestrator(...) -> AsyncIterator[SagaOrchestrator]: orchestrator = _create_saga_orchestrator_instance(...) try: yield orchestrator finally: await orchestrator.stop()Also applies to: 723-752
617-639: Eliminate duplication: ExecutionCoordinator provided by both BusinessServicesProvider and CoordinatorProvider.The
get_execution_coordinatormethod appears in both:
BusinessServicesProvider(lines 617-639, scope=REQUEST)CoordinatorProvider(lines 646-668, scope=APP)This is code duplication with identical construction logic. Similar to the SagaOrchestrator case, the scope difference (REQUEST vs. APP) indicates they're used in different contexts (main app vs. coordinator worker), but the construction code is identical.
💡 Recommended approach to eliminate duplication
Extract the coordinator construction logic into a shared factory function:
def _create_execution_coordinator_instance( kafka_producer: UnifiedProducer, schema_registry: SchemaRegistryManager, settings: Settings, event_store: EventStore, execution_repository: ExecutionRepository, idempotency_manager: IdempotencyManager, logger: logging.Logger, ) -> ExecutionCoordinator: """Shared factory for ExecutionCoordinator instances.""" return ExecutionCoordinator( producer=kafka_producer, schema_registry_manager=schema_registry, settings=settings, event_store=event_store, execution_repository=execution_repository, idempotency_manager=idempotency_manager, logger=logger, )Then both providers can call this factory with the appropriate scope and lifecycle management.
Also applies to: 646-668
🧹 Nitpick comments (1)
backend/app/core/providers.py (1)
504-514: Consider making SagaConfig configurable via Settings.The
_create_default_saga_configfactory returns hardcoded values (timeout_seconds=300, max_retries=3, etc.). If saga behavior needs to vary between environments (test vs. production) or be tunable without code changes, consider reading these values from the Settings object.💡 Optional enhancement to make saga config environment-aware
If saga timeouts, retries, or compensation behavior should vary by environment, add fields to Settings (e.g.,
SAGA_TIMEOUT_SECONDS,SAGA_MAX_RETRIES) and update the factory:def _create_default_saga_config(settings: Settings) -> SagaConfig: """Factory for default SagaConfig used by orchestrators.""" return SagaConfig( name="main-orchestrator", timeout_seconds=settings.SAGA_TIMEOUT_SECONDS, max_retries=settings.SAGA_MAX_RETRIES, retry_delay_seconds=settings.SAGA_RETRY_DELAY_SECONDS, enable_compensation=True, store_events=True, publish_commands=True, )Then update the two call sites (lines 540, 746) to pass
settings.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
backend/app/core/container.pybackend/app/core/providers.py
🧰 Additional context used
🧬 Code graph analysis (2)
backend/app/core/container.py (2)
backend/app/core/providers.py (4)
CoordinatorProvider(642-668)KafkaServicesProvider(420-433)SettingsProvider(80-83)SSEProvider(360-409)backend/app/settings.py (1)
Settings(11-160)
backend/app/core/providers.py (5)
backend/app/domain/enums/kafka.py (1)
KafkaTopic(7-53)backend/app/settings.py (1)
Settings(11-160)backend/app/events/schema/schema_registry.py (1)
SchemaRegistryManager(54-230)backend/app/events/core/producer.py (4)
UnifiedProducer(30-297)producer(64-65)stop(153-172)start(111-130)backend/app/domain/saga/models.py (1)
SagaConfig(92-104)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Integration Tests
- GitHub Check: E2E Tests
🔇 Additional comments (14)
backend/app/core/container.py (3)
31-56: LGTM! Settings propagation pattern established.The updated
create_app_containersignature now acceptssettingsand correctly passes it viacontext={Settings: settings}to the DI container. The expanded provider list (MetricsProvider, RepositoryProvider, KafkaServicesProvider, SSEProvider) aligns with the refactoring to centralize metrics and repository provisioning.
59-77: LGTM! Result processor container properly configured.The
create_result_processor_containerincludes a minimal yet complete set of providers for worker operation: logging, database, Redis, core services, metrics, repositories, event handling, and messaging. The Settings context is correctly threaded through.
80-178: Provider compositions are correct and properly tailored to each worker's actual dependencies.All six worker container factories are correctly implemented:
create_coordinator_container: Includes Redis and Messaging for distributed coordination needscreate_k8s_worker_container: Includes Redis and Messaging for Kubernetes operationscreate_pod_monitor_container: Omits Redis intentionally (no Redis usage in pod monitor service); includes Messaging for Kafka event publishingcreate_saga_orchestrator_container: Includes Redis and Messaging for saga state managementcreate_event_replay_container: Omits Redis intentionally (no Redis usage); includes Messaging for Kafka producercreate_dlq_processor_container: Omits Messaging (uses DLQProcessorProvider instead, which provides its own DLQ manager); omits Redis (not needed)Each factory includes EventProvider for SchemaRegistryManager and CoreServicesProvider for base services. All factories are properly instantiated in their corresponding worker entry points.
backend/app/core/providers.py (11)
80-83: LGTM! Context-based Settings provisioning.The SettingsProvider now uses
from_context(provides=Settings, scope=Scope.APP)to expose Settings from the container context, replacing the previous method-based approach. This aligns with the DI pattern established in the container factories.
166-174: LGTM! DLQ manager lifecycle correctly managed.The
get_dlq_managermethod correctly callsawait manager.start()before yielding andawait manager.stop()in the finally block. The signature has been updated to acceptsettingsparameter which is passed tocreate_dlq_manager.
196-221: LGTM! Schema registry and event store consumer updated for Settings propagation.Both
get_schema_registry(lines 196-197) andget_event_store_consumer(lines 205-221) have been updated to accept and usesettings. Thecreate_schema_registry_managerandcreate_event_store_consumerfactory functions now receive Settings, enabling proper configuration propagation.
244-296: LGTM! Centralized metrics provisioning.The
MetricsProviderclass consolidates all metrics instances (EventMetrics, ConnectionMetrics, RateLimitMetrics, ExecutionMetrics, DatabaseMetrics, HealthMetrics, KubernetesMetrics, CoordinatorMetrics, DLQMetrics, NotificationMetrics, ReplayMetrics, SecurityMetrics) into a single provider. This improves organization and makes metrics dependencies explicit in the DI graph.
298-358: LGTM! Centralized repository provisioning.The
RepositoryProviderclass consolidates all repository instances into a single provider. Repositories are correctly described as "stateless facades over database operations" and all use APP scope, which is appropriate since repositories don't hold request-specific state.
360-409: LGTM! SSE provider properly organized.The
SSEProviderclass groups SSE-related services (SSERedisBus, SSEKafkaRedisBridge, SSEShutdownManager, SSEService) with appropriate scopes. The SSE shutdown manager and service use REQUEST scope (lines 387, 391), which is correct for per-request lifecycle management.
420-434: LGTM! Kafka services provider properly scoped.The
KafkaServicesProvidergroups Kafka-related event services and correctly uses APP scope for shared services accessed by both the main app and workers.
671-697: LGTM! K8sWorker provider properly configured.The
K8sWorkerProvidercreates aKubernetesWorkerwith all necessary dependencies (producer, schema registry, settings, event store, idempotency manager, logger) and properly manages its lifecycle with start/stop. TheK8sWorkerConfig()is instantiated inline, which is appropriate if it has no constructor parameters.
700-720: LGTM! PodMonitor provider properly configured.The
PodMonitorProvidercreates aPodMonitorwith the appropriate dependencies (kafka_event_service, k8s_clients, logger) and manages its lifecycle. ThePodMonitorConfig()is instantiated inline.
755-771: LGTM! EventReplay provider properly configured.The
EventReplayProvidercreates anEventReplayServicewith the necessary dependencies (repository, producer, event store, logger). Note that EventReplayService does not appear to require lifecycle management (no async context manager pattern), which is correct for this provider.
774-795: LGTM! DLQ processor lifecycle correctly managed.The
DLQProcessorProvider.get_dlq_managercorrectly callsawait manager.start()at line 791 before yielding, thenawait manager.stop()in the finally block. This matches the pattern inMessagingProvider.get_dlq_managerand addresses the previous review concern about double-stop.The provider also passes explicit parameters to
create_dlq_managerincludingdlq_topic=KafkaTopic.DEAD_LETTER_QUEUEandretry_topic_suffix="-retry", making the configuration explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/tests/conftest.py (1)
189-209: Minor: Missing error handling for user creation.Unlike
test_userandtest_adminfixtures (lines 171-172, 182-183), theanother_userfixture doesn't check the response status of the registration call (line 193-201). If user creation fails, the subsequent login will fail with a less clear error.🔎 Proposed fix to add error handling
- await client.post( + r = await client.post( "/api/v1/auth/register", json={ "username": username, "email": email, "password": password, "role": "user", }, ) + if r.status_code not in (200, 201, 400): + pytest.fail(f"Cannot create another_user (status {r.status_code}): {r.text}") csrf = await _http_login(client, username, password)backend/tests/integration/services/sse/test_partitioned_event_router.py (1)
58-72: Use a non-mutating approach for settings modifications.Line 59 mutates
test_settings.SSE_CONSUMER_POOL_SIZE = 1, which affects the session-scoped fixture and can break test isolation if test execution order changes or new tests are added. Usemodel_copy()instead:Recommended fix
-async def test_router_start_and_stop(redis_client, test_settings: Settings) -> None: - test_settings.SSE_CONSUMER_POOL_SIZE = 1 +async def test_router_start_and_stop(redis_client, test_settings: Settings) -> None: + local_settings = test_settings.model_copy(update={"SSE_CONSUMER_POOL_SIZE": 1}) suffix = uuid4().hex[:6] router = SSEKafkaRedisBridge( - schema_registry=SchemaRegistryManager(settings=test_settings, logger=_test_logger), - settings=test_settings, + schema_registry=SchemaRegistryManager(settings=local_settings, logger=_test_logger), + settings=local_settings,
🧹 Nitpick comments (2)
backend/app/core/providers.py (2)
658-681: LGTM! Proper lifecycle management for KubernetesWorker.The Settings propagation and lifecycle pattern (yield, then stop in finally) are correctly implemented.
Consider whether
K8sWorkerConfig()(line 667) should acceptsettingsfor configuration. If the worker's behavior depends on environment-specific values (timeouts, resource limits, etc.), passing settings to the config would centralize configuration management.
687-703: LGTM! Proper lifecycle management for PodMonitor.The lifecycle pattern (yield, then stop in finally) is correctly implemented. Similar to
K8sWorkerProvider, consider whetherPodMonitorConfig()(line 693) should acceptsettingsfor centralized configuration if monitor behavior depends on environment-specific values.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
backend/.env.testbackend/app/core/providers.pybackend/app/db/repositories/notification_repository.pybackend/app/events/core/dispatcher.pybackend/app/infrastructure/kafka/events/metadata.pybackend/app/services/coordinator/coordinator.pybackend/app/services/idempotency/redis_repository.pybackend/app/services/sse/redis_bus.pybackend/tests/conftest.pybackend/tests/integration/events/test_schema_registry_real.pybackend/tests/integration/events/test_schema_registry_roundtrip.pybackend/tests/integration/services/sse/test_partitioned_event_router.py
🧰 Additional context used
🧬 Code graph analysis (3)
backend/tests/integration/events/test_schema_registry_roundtrip.py (3)
backend/app/settings.py (1)
Settings(11-160)backend/tests/conftest.py (1)
test_settings(72-74)backend/app/events/schema/schema_registry.py (1)
SchemaRegistryManager(54-230)
backend/app/services/coordinator/coordinator.py (5)
backend/app/events/core/types.py (1)
ConsumerConfig(64-102)backend/app/events/core/dispatcher.py (2)
EventDispatcher(15-188)register(47-68)backend/app/events/core/consumer.py (2)
UnifiedConsumer(24-257)consumer(214-215)backend/app/settings.py (1)
Settings(11-160)backend/app/infrastructure/kafka/events/execution.py (4)
ExecutionRequestedEvent(13-49)ExecutionCompletedEvent(86-93)ExecutionFailedEvent(96-105)ExecutionCancelledEvent(118-136)
backend/app/core/providers.py (4)
backend/app/domain/enums/kafka.py (1)
KafkaTopic(7-53)backend/app/settings.py (1)
Settings(11-160)backend/app/events/schema/schema_registry.py (1)
SchemaRegistryManager(54-230)backend/app/domain/saga/models.py (1)
SagaConfig(92-104)
🪛 dotenv-linter (4.0.0)
backend/.env.test
[warning] 40-40: [UnorderedKey] The OTEL_LOGS_EXPORTER key should go before the OTEL_METRICS_EXPORTER key
(UnorderedKey)
🪛 GitHub Actions: MyPy Type Checking
backend/app/services/idempotency/redis_repository.py
[error] 141-141: mypy: Call to untyped function "execute_command" in typed context
backend/app/services/sse/redis_bus.py
[error] 41-41: mypy: Call to untyped function "aclose" in typed context
backend/app/infrastructure/kafka/events/metadata.py
[error] 4-4: mypy: Module "pydantic_avro" does not explicitly export attribute "AvroBase"
backend/app/core/providers.py
[error] 112-112: mypy: Call to untyped function "execute_command" in typed context
🪛 GitHub Actions: Ruff Linting
backend/app/services/coordinator/coordinator.py
[error] 19-19: F401: 'BaseEvent' imported but unused. Remove unused import. Found 1 error. Fixable with the '--fix' option (ruff check --fix).
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: E2E Tests
- GitHub Check: Integration Tests
🔇 Additional comments (26)
backend/app/db/repositories/notification_repository.py (2)
4-4: LGTM! Necessary import for type annotation.The
Anyimport is required for the type hint added tocount_notificationsand is correctly placed with the other typing imports.
94-96: LGTM! Type annotation improves code quality.The addition of
*additional_conditions: Anyproperly types the variadic arguments. UsingAnyis appropriate here since the method accepts flexible Beanie query operators and conditions that can vary in type.backend/app/events/core/dispatcher.py (1)
47-68: LGTM! Type-safe handler registration.The generic type parameter
Tallows callers to register handlers with specific event subclasses while preserving type information. Thetype: ignore[arg-type]on line 65 is appropriately documented—runtime type safety is guaranteed bydispatch()routing viaevent_type.backend/app/services/coordinator/coordinator.py (4)
55-55: LGTM! Settings dependency injection.Injecting
Settingsthrough the constructor instead of usingget_settings()inside methods improves testability and makes dependencies explicit. The usage throughout the coordinator is consistent.Also applies to: 66-66, 69-69, 118-118
128-134: LGTM! UnifiedConsumer construction updated correctly.The constructor call provides all required parameters including the newly injected
schema_registryandsettings, matching the updatedUnifiedConsumersignature.
137-151: LGTM! Properly typed event handlers.The handler registrations now use specific event types (
ExecutionRequestedEvent,ExecutionCompletedEvent, etc.) instead of the genericBaseEvent, leveraging the type-saferegister()decorator. This provides better type checking and IDE support.
201-223: LGTM! Type-safe event routing with proper narrowing.The routing methods use union types and narrow them via
event.event_typechecks before dispatching to specific handlers. This pattern works correctly with type checkers that supportLiteraltype narrowing, ensuring type safety both at compile-time and runtime.backend/app/core/providers.py (7)
80-84: LGTM! Clean context-based provisioning pattern.The switch to
from_contextfor Settings provisioning follows Dishka best practices and eliminates the need for a provider method, making Settings available at APP scope throughout the DI container.
166-174: LGTM! Settings propagation with proper lifecycle management.The addition of the
settingsparameter correctly threads Settings through to DLQManager creation, and the lifecycle pattern (start before yield, stop in finally) is properly implemented.
196-221: LGTM! Consistent Settings propagation through event providers.The addition of
settingsparameters to bothget_schema_registryandget_event_store_consumercorrectly threads Settings through the DI chain to their respective factory functions and constructors.
518-543: LGTM! Shared lifecycle factory eliminates duplication.The
_provide_saga_orchestratorfactory properly manages the orchestrator lifecycle and is correctly shared across multiple providers (BusinessServicesProvider, SagaOrchestratorProvider), eliminating duplication while ensuring consistent cleanup.
546-568: LGTM! Shared lifecycle factory with proper cleanup.The
_provide_execution_coordinatorfactory correctly manages the coordinator lifecycle and is shared across BusinessServicesProvider and CoordinatorProvider, ensuring consistent lifecycle management.
736-753: LGTM! Properly configured DLQ processor with lifecycle management.The explicit topic configuration (
dlq_topic=KafkaTopic.DEAD_LETTER_QUEUE,retry_topic_suffix="-retry") correctly specializes the DLQManager for processing, and the lifecycle pattern (start before yield, stop in finally) is properly implemented.
244-753: Excellent provider organization and separation of concerns.The introduction of dedicated provider classes (MetricsProvider, RepositoryProvider, SSEProvider, KafkaServicesProvider, etc.) with clear docstrings improves code organization and maintainability. The consistent use of lifecycle management patterns across providers ensures proper resource cleanup.
The only concern (REQUEST vs APP scope for BusinessServicesProvider) has been flagged separately.
backend/.env.test (2)
36-40: LGTM! Test OTEL configuration is appropriate.The OpenTelemetry configuration correctly disables exporters for tests by setting an empty endpoint and
noneexporters, which prevents unnecessary connection attempts during test runs.
25-25: No action required. The trailing dot inKAFKA_TOPIC_PREFIX=test.is intentional and correctly used as a namespace separator. KafkaTopic enum values (e.g.,execution_events,dead_letter_queue) have no leading dots, so concatenation with the prefix produces correct topic names liketest.execution_eventswithout double dots.backend/tests/integration/services/sse/test_partitioned_event_router.py (2)
4-14: LGTM! Import organization improved.The import reordering improves readability by grouping related imports together.
22-36: LGTM! Settings injection properly implemented.The test correctly uses dependency-injected
test_settingsto configure bothSchemaRegistryManagerandSSEKafkaRedisBridge, aligning with the broader DI refactor.backend/tests/conftest.py (5)
18-26: LGTM! TestSettings class properly configured.The
TestSettingssubclass correctly loads configuration from.env.test, enabling test-specific environment configuration without affecting production settings.
34-64: LGTM! Worker isolation setup is comprehensive.The
_setup_worker_envfunction properly isolates pytest-xdist workers by:
- Creating unique database names per worker
- Distributing Redis DBs across workers (0-15)
- Prefixing Kafka consumer groups and schema subjects
- Disabling OTEL exporters to prevent connection noise
This addresses the test isolation concerns mentioned in the PR objectives.
79-90: LGTM! App fixture properly uses TestSettings.The app fixture correctly:
- Creates the app with
TestSettings()- Properly cleans up the dishka container on teardown
- Uses session scope to avoid Pydantic schema validator memory issues
144-163: LGTM! Function-scoped credential fixtures prevent scope mismatch.The credential fixtures are now function-scoped with unique IDs per test, which fixes the scope mismatch issue mentioned in the PR objectives where session-scoped credentials combined with function-scoped users caused problems.
167-185: LGTM! User creation with explicit failure handling.The test user fixtures now:
- Create users per-test (function-scoped) for proper isolation
- Use
pytest.fail()instead of silent skips, improving CI visibility- Return complete user context including CSRF tokens and headers
This aligns with the PR objective to fix the scope mismatch issue.
backend/tests/integration/events/test_schema_registry_roundtrip.py (1)
28-31: LGTM! Settings injection consistent with DI pattern.The test correctly uses the
test_settingsfixture to instantiateSchemaRegistryManager, aligning with the broader dependency injection refactor across the test suite.backend/tests/integration/events/test_schema_registry_real.py (1)
14-26: LGTM! Settings injection properly applied.The test correctly uses the
test_settingsfixture forSchemaRegistryManagerinstantiation, consistent with the DI pattern applied throughout the test suite.backend/app/services/sse/redis_bus.py (1)
41-41: Fix the MyPy type-checking failure on line 41 by either restoring thetype: ignoredirective or installing type stubs.The call to
await self._pubsub.aclose()will fail MyPy type-checking under the current strict configuration (disallow_untyped_defs=true,disallow_incomplete_defs=true) becauseredis.asyncio.client.PubSublacks proper type annotations for theaclose()method.Apply one of these solutions:
- Add the
type: ignoredirective:await self._pubsub.aclose() # type: ignore[no-untyped-call]
- Install type stubs for redis: Add
types-redisto the dev dependencies inpyproject.toml, which may provide proper type annotations for the redis library methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
♻️ Duplicate comments (2)
backend/app/core/providers.py (2)
504-514:publish_commands=Truemay cause duplicate command events.A previous review flagged that
ExecutionCoordinatoralready publishesCreatePodCommandEvent, so settingpublish_commands=Truehere causesSagaOrchestratorto emit duplicates. Consider changing topublish_commands=Falseper the prior feedback.
571-576: Scope inconsistency for lifecycle-managed orchestrators remains.A previous review noted that
BusinessServicesProviderusesScope.REQUESTwhileCoordinatorProvider(line 648) andSagaOrchestratorProvider(line 706) useScope.APPfor the same services. Per-request instantiation ofSagaOrchestratorandExecutionCoordinatorcreates and tears down expensive resources (Kafka consumers, background tasks) on every request.Consider whether REQUEST scope is intentional here for a different use case, or if it should be APP scope to match the dedicated providers.
🧹 Nitpick comments (2)
backend/app/services/saga/saga_orchestrator.py (1)
63-63: Consider standardizing on structured logging.The logging statements throughout this file use two different styles:
- F-string interpolation (e.g., line 63:
f"Registered saga: {saga_class.get_name()}")- Structured logging with
extra=(e.g., lines 436-443)For better observability and log querying, consider standardizing on structured logging with the
extraparameter, which allows log aggregation tools to index fields. This is especially valuable for operational monitoring.Example refactor for consistent structured logging
-self.logger.info(f"Registered saga: {saga_class.get_name()}") +self.logger.info("Registered saga", extra={"saga_name": saga_class.get_name()}) -self.logger.info(f"Starting saga orchestrator: {self.config.name}") +self.logger.info("Starting saga orchestrator", extra={"orchestrator_name": self.config.name}) -self.logger.info(f"Saga consumer started for topics: {topics}") +self.logger.info("Saga consumer started", extra={"topics": list(topics)})Also applies to: 67-67, 74-74, 84-84, 87-87, 103-103, 106-106, 112-112, 119-119, 122-122, 134-134, 156-156, 160-160, 166-166, 173-173, 176-176, 190-190, 196-196, 198-198, 203-203, 217-217, 259-259, 286-286, 299-299, 308-308, 318-318, 325-325, 328-328, 335-335, 349-349, 361-361, 375-375, 385-385, 418-418, 423-426, 436-443, 487-490, 492-492, 496-500, 532-532, 535-535
backend/app/services/coordinator/coordinator.py (1)
50-69: Excellent Settings injection pattern!The constructor now properly accepts Settings via dependency injection and uses it consistently throughout (lines 68, 117), improving testability and making configuration sources explicit.
💡 Optional: Remove redundant intermediate variable
Line 68 stores
self.kafka_serversbut it's only used once at line 116. Consider usingself._settings.KAFKA_BOOTSTRAP_SERVERSdirectly in the ConsumerConfig construction to reduce redundancy.- # Kafka configuration - self.kafka_servers = self._settings.KAFKA_BOOTSTRAP_SERVERS - self.consumer_group = consumer_groupThen at line 116:
consumer_config = ConsumerConfig( - bootstrap_servers=self.kafka_servers, + bootstrap_servers=self._settings.KAFKA_BOOTSTRAP_SERVERS,Also applies to: 117-117
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
.github/workflows/backend-ci.ymlbackend/app/core/providers.pybackend/app/infrastructure/kafka/events/metadata.pybackend/app/services/coordinator/coordinator.pybackend/app/services/idempotency/redis_repository.pybackend/app/services/saga/saga_orchestrator.pybackend/app/services/sse/redis_bus.pybackend/tests/e2e/test_k8s_worker_create_pod.py
🚧 Files skipped from review as they are similar to previous changes (2)
- backend/app/services/idempotency/redis_repository.py
- backend/app/infrastructure/kafka/events/metadata.py
🧰 Additional context used
🧬 Code graph analysis (2)
backend/tests/e2e/test_k8s_worker_create_pod.py (2)
backend/app/settings.py (1)
Settings(11-160)backend/tests/conftest.py (2)
scope(119-121)test_settings(72-74)
backend/app/services/coordinator/coordinator.py (5)
backend/app/events/core/dispatcher.py (2)
EventDispatcher(15-188)register(47-68)backend/app/events/core/consumer.py (2)
UnifiedConsumer(24-257)consumer(214-215)backend/app/events/core/producer.py (1)
UnifiedProducer(30-297)backend/app/settings.py (1)
Settings(11-160)backend/app/infrastructure/kafka/events/execution.py (4)
ExecutionRequestedEvent(13-49)ExecutionCompletedEvent(86-93)ExecutionFailedEvent(96-105)ExecutionCancelledEvent(118-136)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Integration Tests
- GitHub Check: E2E Tests
🔇 Additional comments (22)
backend/app/services/sse/redis_bus.py (1)
41-41: LGTM: Good documentation practice.Documenting the reason for the
type: ignoredirective improves code maintainability and helps future developers understand why type checking is suppressed here.backend/tests/e2e/test_k8s_worker_create_pod.py (3)
14-15: LGTM! Import additions support the DI pattern.The
SettingsandAsyncContainerimports are correctly used to add type annotations to the test function signature, improving type safety and aligning with the dependency injection approach described in the PR objectives.
24-26: LGTM! Type annotations and test isolation parameter added.The function signature improvements include:
- Type annotations for
scopeandmonkeypatchenhance type safety- New
test_settingsparameter enables per-test isolation usingTestSettings, addressing the scope mismatch issue mentioned in the PR objectivesAll parameters are properly utilized within the test body.
43-43: LGTM! Settings properly wired through DI.Passing
test_settingsto theKubernetesWorkerconstructor aligns with the PR's objective to thread Settings through dependency injection. This ensures the worker uses test-specific configuration, supporting per-test isolation and preventing configuration leakage between tests.backend/app/services/saga/saga_orchestrator.py (5)
17-17: LGTM! New imports support dependency injection.The imports for
SchemaRegistryManagerandSettingsare correctly added to support the new constructor dependencies.Also applies to: 24-24
126-127: No action required. BothKAFKA_BOOTSTRAP_SERVERS(line 48) andKAFKA_GROUP_SUFFIX(line 49) are properly defined in the Settings class with appropriate default values. The code at lines 126-127 is correct and will not raiseAttributeErrors.Likely an incorrect or invalid review comment.
538-575: Factory function signature correctly updated with all callers aligned.The factory function has been properly updated to accept all required dependencies (schema_registry_manager, settings, logger, and others), and the docstring accurately documents all parameters. The only call site in
backend/app/core/providers.py:529has been correctly updated to pass all 9 required arguments in the correct order.
34-45: Verify that all instantiation sites have been updated.The constructor signature has been expanded with three new required parameters (
schema_registry_manager,settings,logger), which is a breaking change. All instantiation sites have been properly updated:
- Test suite (test_saga_orchestrator_unit.py:71) provides all new parameters
- Factory function (saga_orchestrator.py:565) passes all parameters to constructor
- DI provider (core/providers.py:529) calls factory with all required parameters
131-131: All component constructors properly accept the parameters being passed. Verification confirms:
EventDispatcher.__init__acceptsloggerparameter ✓UnifiedConsumer.__init__acceptsschema_registry,settings, andloggerparameters ✓IdempotentConsumerWrapper.__init__acceptslogger,dispatcher, and all other passed parameters ✓No runtime errors expected from parameter mismatches.
backend/app/services/coordinator/coordinator.py (5)
14-15: LGTM! Clean import updates for DI refactor.The imports correctly reflect the new dependencies (Settings, EventStore) and the shift to concrete event types. The previously flagged unused BaseEvent import has been removed.
Also applies to: 32-32
127-133: LGTM! Consumer construction matches the expected signature.The UnifiedConsumer instantiation correctly passes all required parameters (config, event_dispatcher, schema_registry, settings, logger) with proper types, enabling richer integration as noted in the AI summary.
136-150: Strong type safety with concrete event classes!The handler registrations now use specific event types (ExecutionRequestedEvent, ExecutionCompletedEvent, etc.) instead of BaseEvent, providing:
- Compile-time type safety
- Better IDE autocomplete and static analysis
- Clear contracts for each handler
The dispatcher's generic register method preserves these type guarantees through the routing chain.
200-222: Well-structured event routing with proper type constraints.The routing methods correctly use union types and event_type checks to dispatch to specific handlers. The additional routing layer (after dispatcher already routes by type) provides:
- Centralized logging for debugging (lines 202-206)
- Logical grouping of request events vs. result events
- Future-proof hook points for middleware
The type narrowing based on event_type checks works correctly at runtime, and the design trade-off is reasonable for maintainability.
209-211: Correct handler dispatching with type narrowing.The event_type checks properly narrow the union types before calling specific handler methods, ensuring type safety at both compile time and runtime.
Also applies to: 218-220
backend/app/core/providers.py (8)
80-83: LGTM!Using
from_contextfor Settings is the correct dishka pattern for externally-provided configuration. APP scope ensures a single Settings instance across the application.
112-112: Good fix for the typing issue.Using
ping()instead ofexecute_command("PING")addresses the mypy error from the previous review. Thetype: ignore[misc]comment is appropriate given redis-py's dual sync/async signature.
195-221: LGTM!Settings threading through
get_schema_registryandget_event_store_consumeraligns with the broader refactor to context-based configuration injection.
298-358: LGTM!The
RepositoryProvidercorrectly consolidates repository provisioning with APP scope, appropriate for stateless database facades.
686-703: Pattern consistent with other lifecycle providers.Same yield-then-stop pattern. If
PodMonitor.start()is required, this would need the same fix as other providers. Otherwise, the lifecycle management looks correct.
732-753: Past issue resolved -start()is now called before yielding.The provider correctly calls
await manager.start()(line 749) before yielding, matching theMessagingProvider.get_dlq_managerpattern. This fixes the previously flagged double-stop bug.
166-174: LGTM!Settings threading and lifecycle management (start before yield, stop in finally) are correct.
365-369: No cleanup is required forSSERedisBus. The class is stateless and holds no resources of its own—it receivesredis_clientas a dependency (which is properly cleaned up by its own provider) and returns pub/sub subscriptions wrapped inSSERedisSubscriptionobjects, which callers must manage via theclose()method. The pattern is correct.
|
@codex review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/app/events/core/producer.py (1)
273-277: Potential double-counting ofmessages_sent.Line 276 increments
self._metrics.messages_sent += 1directly, but the_handle_deliverycallback (registered on line 269) also incrementsmessages_senton successful delivery (line 79).This could result in double-counting for DLQ messages. Consider removing the direct increment here since the callback will handle it:
🔎 Proposed fix
# Record metrics self._event_metrics.record_kafka_message_produced( f"{self._topic_prefix}{str(KafkaTopic.DEAD_LETTER_QUEUE)}" ) - self._metrics.messages_sent += 1backend/app/events/event_store_consumer.py (1)
159-180: Update test call site with missingsettingsparameter.The
create_event_store_consumerfactory requires asettings: Settingsparameter, but the call atbackend/tests/integration/events/test_event_store_consumer.py:38is missing it. The call site inbackend/app/core/providers.py:206has been correctly updated.
♻️ Duplicate comments (7)
.github/workflows/backend-ci.yml (2)
93-100: LGTM! Previous review concerns addressed.The dynamic prefix and timeout have been added as recommended. This prevents topic collisions between concurrent CI runs and ensures the job fails fast if topic creation hangs.
187-194: LGTM! Consistent implementation across test jobs.This e2e topic creation step mirrors the integration test implementation with the same dynamic prefix pattern and timeout, ensuring consistent isolation across all test suites.
backend/workers/run_result_processor.py (1)
63-66: Minor: Accessing private attributeprocessor._state.Line 63 accesses
processor._statedirectly. Consider usingprocessor.is_running(inherited fromLifecycleEnabled) or adding a publicstateproperty toResultProcessorfor better encapsulation.🔎 Proposed fix
- while processor._state == ProcessingState.PROCESSING and not shutdown_event.is_set(): + while processor.is_running and not shutdown_event.is_set():backend/app/services/k8s_worker/worker.py (1)
49-86: Breaking constructor change requires call site updates.The constructor now mandates a
settings: Settingsparameter. As noted in the past review, existing consumers (e.g.,backend/tests/e2e/test_k8s_worker_create_pod.py) that call the constructor without this parameter will fail with a TypeError.#!/bin/bash # Search for KubernetesWorker instantiation to verify settings parameter rg -n -C5 'KubernetesWorker\s*\(' --type=pybackend/app/services/pod_monitor/monitor.py (1)
499-515: DI injection path lacks test coverage.As noted in the past review,
create_pod_monitornow supports injectingk8s_clients, but the test at line 517 (in the test file) doesn't exercise this path. Consider adding a test case that passes a mocked or testK8sClientsinstance tocreate_pod_monitorto verify the injection works correctly.#!/bin/bash # Search for tests that use create_pod_monitor with k8s_clients parameter rg -n -C5 'create_pod_monitor.*k8s_clients' --type=py backend/tests/backend/app/core/providers.py (2)
498-508: Changepublish_commands=Falseto prevent duplicate pod creation commands.According to the
SagaConfigdocstring inbackend/app/domain/saga/models.py: "Keep False when another component (e.g., coordinator) publishes commands to avoid duplicate actions."The
ExecutionCoordinatoralready publishesCreatePodCommandEventvia_publish_execution_started()(coordinator.py line 398). Settingpublish_commands=Truewill cause theSagaOrchestratorto also publish the same command, resulting in duplicate pod creation attempts.🔎 Proposed fix
def _create_default_saga_config() -> SagaConfig: """Factory for default SagaConfig used by orchestrators.""" return SagaConfig( name="main-orchestrator", timeout_seconds=300, max_retries=3, retry_delay_seconds=5, enable_compensation=True, store_events=True, - publish_commands=True, + publish_commands=False, )
559-564: Resolve scope inconsistency for orchestration services.
BusinessServicesProvider(line 560) usesScope.REQUESTand provides bothget_saga_orchestratorandget_execution_coordinator. However,CoordinatorProvider(line 636) andSagaOrchestratorProvider(line 689) useScope.APPfor the same services.This creates an inconsistency where the same service can be either request-scoped or app-scoped depending on which provider is used. Since
SagaOrchestratorandExecutionCoordinatormaintain persistent Kafka consumers and background tasks, they should beScope.APPsingletons.Recommendation: Change
BusinessServicesProvider.scopefromScope.REQUESTtoScope.APPto match the dedicated providers and prevent per-request instantiation of expensive resources.🔎 Proposed fix
class BusinessServicesProvider(Provider): - scope = Scope.REQUEST + scope = Scope.APPAlso applies to: 635-639, 688-692
🧹 Nitpick comments (5)
.github/workflows/backend-ci.yml (1)
93-100: Consider adding topic cleanup for long-term Kafka hygiene.The dynamic prefix creates unique topics per CI run without subsequent cleanup. Over time, Kafka may accumulate unused topics from completed workflows. Consider adding a cleanup step or implementing a retention policy for CI-prefixed topics to maintain Kafka cluster hygiene.
Also applies to: 187-194
backend/tests/integration/events/test_producer_roundtrip.py (1)
32-44: Acceptable but fragile test approach.Using
object.__new__(UP)to bypass__init__and directly test_handle_statsis a pragmatic way to achieve coverage for the stats parsing logic. However, this approach is tightly coupled to internal implementation details.Consider adding a brief comment explaining why this bypass is necessary, or alternatively test via the public interface if feasible in the future.
backend/app/core/lifecycle.py (1)
33-42: Consider exception safety inaclose().The flag
_lifecycle_startedis set toFalsebefore calling_on_stop(). If_on_stop()raises an exception, the service will be marked as stopped even though cleanup may be incomplete.Depending on your requirements, you may want to either:
- Keep current behavior (prevents retry loops if stop keeps failing)
- Set the flag after
_on_stop()completes (allows retry on failure)The current approach is reasonable for most use cases as it prevents infinite cleanup attempts.
backend/app/services/sse/kafka_redis_bridge.py (1)
44-52: Consider handling partial failures in_on_start.If
_create_consumer(i)fails for consumer N, consumers 0 to N-1 are already started but the startup fails. These consumers won't be stopped since_on_stopis only called if the context manager exits normally or viaaclose().Consider wrapping the loop in try/except and stopping already-created consumers on failure:
🔎 Proposed fix
async def _on_start(self) -> None: """Start the SSE Kafka→Redis bridge.""" self.logger.info(f"Starting SSE Kafka→Redis bridge with {self.num_consumers} consumers") - for i in range(self.num_consumers): - consumer = await self._create_consumer(i) - self.consumers.append(consumer) + try: + for i in range(self.num_consumers): + consumer = await self._create_consumer(i) + self.consumers.append(consumer) + except Exception: + # Cleanup partially created consumers + for consumer in self.consumers: + await consumer.stop() + self.consumers.clear() + raise self.logger.info("SSE Kafka→Redis bridge started successfully")backend/app/services/result_processor/processor.py (1)
188-189: Existing TODO: Brittle memory limit parsing.The TODO comment on line 189 flags that parsing
K8S_POD_MEMORY_LIMITby stripping "Mi" is fragile. This could fail if the format changes (e.g., "Gi", "Ki", or numeric bytes).Consider creating a utility function that handles various Kubernetes memory unit formats.
Would you like me to open an issue to track implementing a robust memory unit parser?
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (21)
.github/workflows/backend-ci.ymlbackend/app/core/lifecycle.pybackend/app/core/providers.pybackend/app/dlq/manager.pybackend/app/events/core/producer.pybackend/app/events/event_store_consumer.pybackend/app/services/coordinator/coordinator.pybackend/app/services/event_bus.pybackend/app/services/k8s_worker/worker.pybackend/app/services/pod_monitor/monitor.pybackend/app/services/result_processor/processor.pybackend/app/services/saga/saga_orchestrator.pybackend/app/services/sse/kafka_redis_bridge.pybackend/tests/integration/events/test_producer_roundtrip.pybackend/tests/integration/services/sse/test_partitioned_event_router.pybackend/tests/unit/services/pod_monitor/test_monitor.pybackend/workers/run_coordinator.pybackend/workers/run_k8s_worker.pybackend/workers/run_pod_monitor.pybackend/workers/run_result_processor.pybackend/workers/run_saga_orchestrator.py
🧰 Additional context used
🧬 Code graph analysis (12)
backend/app/dlq/manager.py (4)
backend/app/settings.py (1)
Settings(11-160)backend/app/core/lifecycle.py (4)
LifecycleEnabled(7-62)_on_start(25-27)_on_stop(29-31)is_running(45-47)backend/app/events/core/producer.py (3)
_on_start(111-126)_on_stop(149-164)is_running(52-53)backend/app/events/core/consumer.py (2)
consumer(214-215)is_running(210-211)
backend/tests/integration/services/sse/test_partitioned_event_router.py (5)
backend/app/core/metrics/events.py (1)
EventMetrics(4-209)backend/app/events/schema/schema_registry.py (1)
SchemaRegistryManager(54-230)backend/app/settings.py (1)
Settings(11-160)backend/tests/conftest.py (1)
test_settings(72-74)backend/app/core/lifecycle.py (1)
aclose(33-42)
backend/app/events/event_store_consumer.py (3)
backend/app/settings.py (1)
Settings(11-160)backend/app/core/lifecycle.py (3)
_on_start(25-27)_on_stop(29-31)is_running(45-47)backend/app/events/core/consumer.py (3)
consumer(214-215)UnifiedConsumer(24-257)is_running(210-211)
backend/tests/unit/services/pod_monitor/test_monitor.py (3)
backend/app/core/k8s_clients.py (1)
K8sClients(9-13)backend/app/services/pod_monitor/monitor.py (3)
PodMonitor(93-496)create_pod_monitor(500-515)state(137-139)backend/app/core/lifecycle.py (1)
aclose(33-42)
backend/app/services/result_processor/processor.py (4)
backend/app/events/schema/schema_registry.py (1)
SchemaRegistryManager(54-230)backend/app/settings.py (1)
Settings(11-160)backend/app/events/core/dispatcher.py (1)
EventDispatcher(15-188)backend/app/events/core/consumer.py (2)
stop(65-80)UnifiedConsumer(24-257)
backend/app/services/saga/saga_orchestrator.py (5)
backend/app/events/schema/schema_registry.py (1)
SchemaRegistryManager(54-230)backend/app/settings.py (1)
Settings(11-160)backend/app/events/event_store.py (1)
EventStore(29-330)backend/app/events/core/dispatcher.py (2)
EventDispatcher(15-188)register_handler(70-79)backend/app/events/core/consumer.py (4)
UnifiedConsumer(24-257)consumer(214-215)is_running(210-211)state(202-203)
backend/workers/run_result_processor.py (11)
backend/app/core/container.py (1)
create_result_processor_container(59-77)backend/app/core/logging.py (1)
setup_logger(110-147)backend/app/core/tracing/config.py (1)
init_tracing(177-197)backend/app/db/repositories/execution_repository.py (1)
ExecutionRepository(17-91)backend/app/domain/enums/kafka.py (1)
GroupId(56-68)backend/app/events/core/producer.py (3)
UnifiedProducer(30-289)producer(64-65)get_status(128-147)backend/app/events/schema/schema_registry.py (1)
SchemaRegistryManager(54-230)backend/app/services/idempotency/idempotency_manager.py (1)
IdempotencyManager(69-315)backend/app/services/result_processor/processor.py (3)
ProcessingState(34-39)ResultProcessor(60-308)get_status(303-308)backend/app/settings.py (2)
Settings(11-160)get_settings(164-165)backend/app/events/core/consumer.py (1)
get_status(217-234)
backend/app/services/k8s_worker/worker.py (4)
backend/app/events/core/types.py (1)
ConsumerConfig(64-102)backend/app/events/core/consumer.py (4)
UnifiedConsumer(24-257)metrics(206-207)consumer(214-215)is_running(210-211)backend/app/settings.py (1)
Settings(11-160)backend/app/core/lifecycle.py (3)
_on_start(25-27)_on_stop(29-31)is_running(45-47)
backend/app/services/event_bus.py (3)
backend/app/events/core/producer.py (5)
metrics(60-61)producer(64-65)_on_start(111-126)_on_stop(149-164)is_running(52-53)backend/app/core/lifecycle.py (4)
_on_start(25-27)_on_stop(29-31)is_running(45-47)aclose(33-42)backend/app/events/event_store_consumer.py (2)
_on_start(49-89)_on_stop(91-105)
backend/app/services/pod_monitor/monitor.py (2)
backend/app/core/k8s_clients.py (1)
K8sClients(9-13)backend/app/core/lifecycle.py (2)
_on_start(25-27)_on_stop(29-31)
backend/app/core/providers.py (4)
backend/app/domain/enums/kafka.py (1)
KafkaTopic(7-53)backend/app/services/k8s_worker/config.py (1)
K8sWorkerConfig(8-36)backend/app/settings.py (1)
Settings(11-160)backend/app/domain/saga/models.py (1)
SagaConfig(92-104)
backend/workers/run_saga_orchestrator.py (3)
backend/app/core/container.py (1)
create_saga_orchestrator_container(133-147)backend/app/events/schema/schema_registry.py (2)
SchemaRegistryManager(54-230)initialize_event_schemas(239-240)backend/app/services/saga/saga_orchestrator.py (1)
SagaOrchestrator(31-530)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: E2E Tests
- GitHub Check: Integration Tests
🔇 Additional comments (63)
.github/workflows/backend-ci.yml (2)
111-111: LGTM! Consistent with topic creation step.The dynamic prefix matches the topic creation environment, ensuring integration tests use the correctly isolated topics.
203-203: LGTM! Consistent with topic creation step.The dynamic prefix matches the e2e topic creation environment, maintaining isolation between concurrent workflow runs.
backend/tests/integration/events/test_producer_roundtrip.py (1)
21-29: LGTM! Context manager usage aligns with lifecycle refactor.The switch from explicit
start()/stop()calls toasync with prod:properly leverages the newLifecycleEnabledbase class pattern. The lifecycle is now managed via__aenter__/__aexit__, ensuring idempotent cleanup.backend/workers/run_k8s_worker.py (2)
35-52: Well-structured shutdown handling.The signal handling now correctly uses
loop.add_signal_handler()with a shutdown event, and the worker state is checked via the publicworker.is_runningproperty. Both issues from previous reviews have been addressed.The lifecycle is properly managed with
container.close()in the finally block for cleanup.
17-34: LGTM! Clean DI container setup.The async entry point properly:
- Falls back to
get_settings()if none provided- Creates container and retrieves dependencies
- Initializes Beanie ODM and event schemas before starting the worker
This aligns with the broader DI refactor across the codebase.
backend/app/core/lifecycle.py (1)
49-54: LGTM! Idempotent startup handling.The early return when
_lifecycle_startedis True prevents duplicate initialization, and the flag is set after_on_start()succeeds, ensuring proper exception safety on startup.backend/app/services/sse/kafka_redis_bridge.py (2)
54-62: LGTM! Clean consumer shutdown.The
_on_stopimplementation properly iterates over all consumers, stops each one, and clears the list. This ensures all resources are released on shutdown.
87-93: LGTM! Consumer wiring with DI dependencies.The
UnifiedConsumeris now correctly instantiated withschema_registryandsettings, aligning with the broader DI refactor across the codebase.backend/app/services/result_processor/processor.py (2)
63-84: LGTM! Clean DI wiring in constructor.The constructor properly accepts and stores
schema_registryandsettingsas instance attributes, callssuper().__init__()for lifecycle support, and initializes all required state. The comment on line 108 correctly documents that the producer is DI-managed.
86-109: LGTM! Lifecycle hooks properly implemented.The
_on_startand_on_stophooks correctly:
- Initialize/close the idempotency manager
- Create/stop the consumer
- Manage processing state transitions
The producer is correctly left to the DI container as noted in the comment.
backend/workers/run_result_processor.py (2)
48-68: Graceful shutdown properly implemented.The shutdown handling now uses
asyncio.Eventwithloop.add_signal_handler()forSIGINT/SIGTERM, addressing the concern from the previous review. TheAsyncExitStackproperly manages cleanup of the database client, container, and processor.
25-28: Verify: MongoDB client created outside DI container.The MongoDB client is created directly here rather than obtained from the DI container. This is likely intentional since the container's
DatabaseProvidermay expect Beanie to already be initialized, but it creates a slight inconsistency with how other workers obtain their database connection.Ensure this pattern is consistent across all workers or consider refactoring to use container-provided database connections.
backend/app/events/core/producer.py (3)
51-53: Note:is_runningproperty override may conflict with base class.The base
LifecycleEnabled.is_runningreturnsself._lifecycle_started, but this override returnsself._state == ProducerState.RUNNING.Since
_on_startsets_state = RUNNINGbefore the base__aenter__sets_lifecycle_started = True, there's a brief window where these could be inconsistent. This is likely fine since the producer's state-based check is more semantically accurate, but be aware of the divergence from the base class contract.
111-126: LGTM! Clean lifecycle hook implementation.The
_on_startmethod properly:
- Sets state to STARTING
- Configures and creates the Producer with thread-safe initialization lock
- Starts the poll loop task
- Transitions to RUNNING state
166-173: LGTM! Poll loop uses correct state check.The poll loop now uses
self.is_runninginstead of the previousself._running, aligning with the lifecycle refactor. The loop properly terminates when the producer is stopped.backend/workers/run_coordinator.py (1)
35-52: LGTM - Past critical issues resolved!The signal handling has been correctly fixed to use
loop.add_signal_handler()instead of the unsafesignal.signal()pattern. Additionally, the code now properly uses the publicis_runningproperty instead of accessing the private_runningattribute. Both critical and minor issues from previous reviews have been addressed.backend/workers/run_pod_monitor.py (1)
19-54: LGTM - Consistent DI-driven lifecycle pattern!The implementation follows the same correct pattern as the coordinator worker, using
loop.add_signal_handler()for signal handling and accessing the publicstateproperty. The periodic status logging withRECONCILIATION_LOG_INTERVALis a nice operational touch.backend/app/services/event_bus.py (2)
93-94: LGTM - Modernized asyncio API usage!Correctly replaced the deprecated
asyncio.get_event_loop()withasyncio.get_running_loop(), which is the recommended approach for retrieving the event loop within an async context.
62-122: LGTM - Clean lifecycle hook migration!The migration from
start()/stop()to_on_start()/_on_stop()lifecycle hooks is well-executed. The cleanup logic is properly consolidated in_on_stop(), and the use of the publicis_runningproperty throughout maintains proper encapsulation.backend/tests/integration/services/sse/test_partitioned_event_router.py (2)
22-38: LGTM - Proper settings injection!The test correctly uses the
test_settingsfixture and passes it to components that now require it. TheSchemaRegistryManagerinstantiation properly includes thesettingsparameter.
58-83: LGTM - Lifecycle testing with settings!The test properly exercises lifecycle hooks using
__aenter__()andaclose(), and validates idempotent behavior. Mutatingtest_settings.SSE_CONSUMER_POOL_SIZEis safe sincetest_settingsis function-scoped per the conftest.backend/tests/unit/services/pod_monitor/test_monitor.py (3)
120-124: LGTM - Lifecycle pattern adoption!Test correctly updated to use
__aenter__()andaclose()for lifecycle management, aligning with the broader lifecycle hook refactor.
523-548: LGTM - Excellent DI path coverage!This new test validates the dependency injection path for K8sClients, ensuring that injected clients are properly wired through the factory. Good coverage of the DI-driven architecture.
551-594: LGTM - Idempotent lifecycle tests!Tests properly validate idempotent behavior of lifecycle hooks. Direct manipulation of
_lifecycle_startedis appropriate for unit testing internal state transitions.backend/app/dlq/manager.py (5)
28-40: LGTM! Proper dependency injection pattern.The constructor now accepts
Settingsvia dependency injection and correctly initializes the base class withsuper().__init__(). This aligns with the broader DI refactoring across the codebase.
147-156: LGTM! Lifecycle hook correctly implemented.The rename from
start()to_on_start()aligns with theLifecycleEnabledbase class contract. The method correctly usesself.settingsfor Kafka topic configuration.
185-185: LGTM! Modern asyncio timing API.Replaced deprecated
asyncio.get_event_loop().time()withasyncio.get_running_loop().time(), which is the correct approach for obtaining loop time in Python 3.10+.Also applies to: 244-244
176-176: LGTM! Proper lifecycle state checking.The processing loops correctly use
self.is_runningfrom theLifecycleEnabledbase class instead of a separate_runningflag, ensuring consistent lifecycle management.Also applies to: 390-390
463-502: All call sites correctly updated.All five usages of
create_dlq_managerpass thesettingsparameter: two in backend/app/core/providers.py and three in integration tests. No issues found.backend/app/events/event_store_consumer.py (3)
20-46: LGTM! Proper dependency injection and initialization.The constructor correctly accepts
Settings, initializes the base class, and sets_last_batch_timeto0.0. This value is properly updated to the current loop time in_on_start()at line 51.
49-89: LGTM! Lifecycle hook and consumer initialization.The
_on_start()hook correctly:
- Initializes
_last_batch_timewith the running loop's time- Creates
ConsumerConfigusingself.settings- Instantiates
UnifiedConsumerwith the requiredschema_registryandsettingsparameters- Registers DLQ error handling when a producer is available
123-131: LGTM! Consistent lifecycle state and modern asyncio.The batch processor correctly uses
self.is_runningfrom the base class andasyncio.get_running_loop().time()for timing calculations.backend/app/services/k8s_worker/worker.py (3)
87-144: LGTM! Lifecycle hook and DI-based consumer creation.The
_on_start()hook correctly:
- Uses
self._settings.KAFKA_GROUP_SUFFIXfor consumer group configuration- Instantiates
UnifiedConsumerwithschema_registryandsettingsparameters- Wraps the consumer with idempotency middleware
146-171: LGTM! Proper shutdown sequence with DI lifecycle note.The
_on_stop()hook implements a proper shutdown sequence, and the comment at line 169 correctly documents that the producer lifecycle is managed by the DI container.
423-433: LGTM! Status uses lifecycle state.The
get_status()method correctly usesself.is_runningfrom theLifecycleEnabledbase class.backend/app/services/pod_monitor/monitor.py (3)
102-134: LGTM! Optional DI parameter for Kubernetes clients.The constructor accepts an optional
k8s_clients: K8sClients | Noneparameter, enabling dependency injection while maintaining backward compatibility. WhenNone, the clients are initialized from kubeconfig in_initialize_kubernetes_client().
141-156: LGTM! Lifecycle hook with proper initialization.The
_on_start()hook correctly initializes the Kubernetes client, sets the monitor state toRUNNING, and starts the watch and reconciliation tasks.
158-181: LGTM! Proper cleanup in lifecycle hook.The
_on_stop()hook correctly cancels tasks, stops the watch, clears state, and transitions toSTOPPED.backend/workers/run_saga_orchestrator.py (4)
17-34: LGTM! DI-based initialization.The function correctly:
- Accepts an optional
Settingsparameter, defaulting toget_settings()- Uses
create_saga_orchestrator_container()for DI-based service resolution- Initializes Beanie with the container-provided
Database- Initializes event schemas through the container-provided
SchemaRegistryManager
35-46: LGTM! Graceful shutdown mechanism addresses past review.The signal handlers and shutdown loop correctly implement graceful shutdown as requested in the past review. The loop waits for either:
- The orchestrator to stop (
orchestrator.is_runningbecomesFalse)- A shutdown signal (
shutdown_event.is_set())
47-52: LGTM! Container cleanup in finally block.The container cleanup is properly placed in the
finallyblock, ensuring resources are released regardless of how the loop exits. The past review concern about cleanup order appears to have been addressed in commit 131c28d.
55-74: LGTM! Settings passed to orchestrator.The
main()function correctly retrieves settings and passes them torun_saga_orchestrator(), ensuring consistent configuration throughout the lifecycle.backend/app/services/saga/saga_orchestrator.py (5)
34-59: LGTM! Constructor properly implements DI pattern.The constructor correctly accepts and stores the new dependencies (
schema_registry_manager,settings,logger). The call tosuper().__init__()properly initializes theLifecycleEnabledbase class, and the instance attributes are correctly assigned.
69-98: LGTM! Lifecycle hooks correctly implemented.The transition from
start()/stop()to_on_start()/_on_stop()properly aligns with theLifecycleEnabledbase class pattern. The methods are correctly marked as async and include appropriate logging at lifecycle boundaries.
100-151: LGTM! Consumer setup correctly wired with new dependencies.The consumer configuration properly uses
self._settingsfor Kafka bootstrap servers and group suffix. TheUnifiedConsumeris correctly initialized withschema_registry,settings, andlogger, and theIdempotentConsumerWrapperproperly receives theloggerparameter.
431-438: Good use of structured logging for cancellation context.The structured logging approach using the
extraparameter (lines 431-438, 482-485, 491-495) provides better observability and makes it easier to filter and analyze logs in production monitoring systems.Also applies to: 482-485, 491-495
533-570: LGTM! Factory function correctly updated.The factory function signature properly includes the new dependencies (
schema_registry_manager,settings,logger), the docstring accurately documents all parameters, and theSagaOrchestratorinstantiation correctly passes all required arguments.backend/app/services/coordinator/coordinator.py (6)
50-101: LGTM! Constructor properly implements DI pattern.The constructor correctly accepts
settingsas a parameter, callssuper().__init__()to initialize theLifecycleEnabledbase class, and properly stores the settings and schema registry manager as instance attributes.
103-165: LGTM! Lifecycle hooks and consumer setup correctly implemented.The
_on_startmethod properly initializes the consumer withschema_registryandsettings, registers event handlers with concrete event types, and correctly configures theIdempotentConsumerWrapperwithenable_for_all_handlers=Truefor consistent idempotency protection.
167-189: LGTM! Cleanup logic properly implemented.The
_on_stopmethod correctly stops the consumer, queue manager, and idempotency manager, and properly cancels the scheduling task with appropriate error handling.
191-214: LGTM! Event routing correctly uses concrete event types.The routing methods properly use union types of concrete event classes (
ExecutionRequestedEvent | ExecutionCancelledEventandExecutionCompletedEvent | ExecutionFailedEvent) instead of genericBaseEvent, providing better type safety and clarity.
215-291: LGTM! Event handlers properly manage resources and state.The handler implementations correctly:
- Add executions to the queue with priority handling
- Publish appropriate events (accepted, queue full, cancelled)
- Track metrics consistently
- Release resources and update active execution tracking
- Handle errors with proper logging
292-309: LGTM! Scheduling loop and status correctly use lifecycle state.The scheduling loop properly checks
self.is_running(line 294) and theget_statusmethod correctly exposes this flag (line 476), ensuring consistent lifecycle management with theLifecycleEnabledpattern.Also applies to: 473-480
backend/app/core/providers.py (10)
80-83: LGTM! Settings provider correctly uses context-based injection.The
from_contextpattern properly injectsSettingswithScope.APP, making settings available as an application singleton throughout the DI container.
112-112: LGTM! Correctly uses typedping()method.The code properly uses
client.ping()instead of the untypedexecute_command("PING"). The type ignore comment appropriately acknowledges redis-py's dual sync/async return type annotation issue.
150-181: LGTM! Messaging provider correctly manages service lifecycles.All messaging services properly use async context managers or explicit lifecycle management:
UnifiedProducerusesasync withfor automatic lifecycleDLQManagerusesasync withfrom factoryIdempotencyManagerexplicitly initializes and closes
184-223: LGTM! Event provider correctly wires schema registry and event store.The provider properly:
- Creates
SchemaRegistryManagerwithsettingsparameter- Uses
async withforEventStoreConsumerlifecycle management- Passes
settingsto the event store consumer factory- Wires all dependencies correctly
291-350: LGTM! Repository provider correctly creates stateless repository instances.The
RepositoryProviderwithScope.APPis appropriate for stateless repository facades. All repositories are correctly instantiated with their required dependencies (logger where needed).
353-403: LGTM! SSE provider correctly manages service lifecycles and scopes.The provider properly:
- Uses
async withforSSEKafkaRedisBridgelifecycle management- Uses
Scope.REQUESTfor request-scoped services (SSEShutdownManager,SSEService)- Wires all dependencies correctly
714-731: LGTM! DLQ processor provider correctly manages DLQ manager lifecycle.The provider properly uses
async withto manage theDLQManagerlifecycle and correctly configures it withsettings,schema_registry,logger, and DLQ-specific parameters.
668-685: No action required —PodMonitorcorrectly implements the async context manager protocol.
PodMonitorinherits fromLifecycleEnabled(line 93 in monitor.py), which provides the__aenter__and__aexit__methods needed for theasync withstatement. The implementation is safe and tested.
642-665: KubernetesWorker supports async context manager protocol via LifecycleEnabled inheritance.
KubernetesWorkerinherits fromLifecycleEnabled(backend/app/core/lifecycle.py), which properly implements__aenter__and__aexit__. The async context manager usage on lines 656-664 is safe and will work correctly.
512-556: Async context manager protocol is properly implemented.Both
SagaOrchestratorandExecutionCoordinatorinherit fromLifecycleEnabled, which provides__aenter__and__aexit__implementations. The factory functioncreate_saga_orchestratorreturns aSagaOrchestratorinstance, making the patternasync with create_saga_orchestrator(...)valid.ExecutionCoordinatorcan be used directly withasync withas well. The lifecycle management is correctly configured.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
backend/app/core/providers.py (2)
498-508:publish_commands=Truemay cause duplicate command events.This issue was raised in a previous review. If
ExecutionCoordinatoralready publishesCreatePodCommandEventto the SAGA_COMMANDS topic, enablingpublish_commands=Truehere causesSagaOrchestratorto also publish, resulting in duplicate pod creation commands.
559-566: Scope inconsistency and duplicate bindings for orchestrators.Two concerns with this setup:
Scope mismatch (from prior review):
BusinessServicesProviderusesScope.REQUEST, butCoordinatorProvider(line 637) andSagaOrchestratorProvider(line 691) useScope.APPfor the same factory functions. Per-request instantiation ofSagaOrchestratorandExecutionCoordinatoris expensive due to Kafka consumers and background tasks.Duplicate bindings: The same factory functions (
_provide_saga_orchestrator,_provide_execution_coordinator) are registered in multiple providers. Depending on container registration order, the resolved scope forSagaOrchestratorandExecutionCoordinatorwill differ, leading to unpredictable behavior.Consider either:
- Removing the factory registrations from
BusinessServicesProviderand relying on dedicated APP-scoped providers- Or changing
BusinessServicesProviderscope toScope.APPif all its services should be singletons
🧹 Nitpick comments (1)
backend/tests/unit/services/sse/test_shutdown_manager.py (1)
44-44: Consider updating test name to reflectacloseinstead ofstop.The test name still references
router_stop, but the implementation now uses theaclosemethod. Renaming to something liketest_shutdown_force_close_calls_router_aclose_and_rejects_newwould improve clarity.🔎 Suggested rename
-async def test_shutdown_force_close_calls_router_stop_and_rejects_new(): +async def test_shutdown_force_close_calls_router_aclose_and_rejects_new():
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
backend/app/core/providers.pybackend/app/services/kafka_event_service.pybackend/app/services/sse/sse_shutdown_manager.pybackend/tests/integration/events/test_event_store_consumer.pybackend/tests/unit/services/sse/test_shutdown_manager.py
🧰 Additional context used
🧬 Code graph analysis (3)
backend/app/services/kafka_event_service.py (1)
backend/tests/unit/services/sse/test_shutdown_manager.py (1)
aclose(15-16)
backend/app/services/sse/sse_shutdown_manager.py (1)
backend/tests/unit/services/sse/test_shutdown_manager.py (1)
aclose(15-16)
backend/tests/integration/events/test_event_store_consumer.py (3)
backend/app/domain/enums/auth.py (1)
LoginMethod(4-10)backend/app/infrastructure/kafka/events/metadata.py (1)
AvroEventMetadata(9-31)backend/app/settings.py (1)
Settings(11-160)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Integration Tests
- GitHub Check: E2E Tests
🔇 Additional comments (10)
backend/tests/integration/events/test_event_store_consumer.py (1)
11-11: LGTM! Clean refactor to support Settings DI and type-safe enums.The changes correctly:
- Retrieve Settings via DI (line 31) following the same pattern as other dependencies
- Pass settings to
create_event_store_consumer(line 45) to support the updated signature- Replace the string literal with
LoginMethod.PASSWORD(line 36) for type safety and consistencyThese changes align well with the PR objectives of threading Settings through DI and strongly typing event fields.
Also applies to: 14-14, 31-31, 36-36, 45-45
backend/tests/unit/services/sse/test_shutdown_manager.py (1)
11-17: LGTM! Async lifecycle pattern modernization.The transition from
stop()toasync aclose()aligns with modern Python async patterns and the project-wide lifecycle refactoring mentioned in the AI summary. The type annotations improve code clarity.backend/app/services/sse/sse_shutdown_manager.py (1)
260-260: The change fromstop()toaclose()is correctly implemented.SSEKafkaRedisBridge inherits
aclose()from its base class LifecycleEnabled, which provides a proper async cleanup implementation following Python's standard pattern (like aiofiles and aiohttp). The method is idempotent and safe to call multiple times. No stalerouter.stop()calls remain in the codebase.backend/app/services/kafka_event_service.py (1)
287-289: LGTM: Lifecycle modernization correctly implements async best practices.The shift from
stop()toaclose()is properly implemented.UnifiedProducerinheritsaclose()fromLifecycleEnabled(backend/app/core/lifecycle.py), which follows Python's standard async resource management pattern (like aiofiles, aiohttp) and is idempotent for safe repeated calls. The method triggers proper cleanup via_on_stop(), providing equivalent shutdown behavior to the previous pattern.backend/app/core/providers.py (6)
80-83: LGTM!Using
from_contextforSettingsis the correct Dishka pattern for injecting external values that are passed tomake_async_container.
112-112: Good fix for the typedping()method.Using
ping()instead ofexecute_command("PING")resolves the mypy error. Thetype: ignore[misc]comment appropriately handles the dual sync/async return type ambiguity in redis-py.
158-166: Good lifecycle management via async context managers.The switch to
async withforUnifiedProducerandcreate_dlq_managerensures proper initialization and cleanup, addressing previous concerns about unstarted services.
291-351: Good consolidation of repository providers.Centralizing all repository instantiation in a single
RepositoryProviderwithScope.APPis appropriate since repositories are stateless database facades.
645-688: Good lifecycle management for K8s worker and pod monitor.Both
KubernetesWorkerandPodMonitornow use async context managers, ensuring proper initialization and cleanup. This addresses the previous concern about missingstart()calls.
718-735: Lifecycle properly managed; note parameter difference from MessagingProvider.The async context manager pattern correctly handles lifecycle. The explicit
dlq_topicandretry_topic_suffixparameters differ fromMessagingProvider.get_dlq_manager(lines 162-166) which relies on defaults — this appears intentional for the dedicated DLQ processor worker use case.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/tests/integration/dlq/test_dlq_retry_immediate.py (1)
49-49: Usetest_settings.KAFKA_BOOTSTRAP_SERVERSinstead of hardcoding bootstrap server.The Producer at line 49 is hardcoded to
"localhost:9092", but the DLQ manager (created at line 28) usestest_settings.KAFKA_BOOTSTRAP_SERVERS, which defaults to"kafka:29092". These point to different Kafka instances, causing the producer's messages to be sent to a different broker than the manager consumes from. ChangeProducer({"bootstrap.servers": "localhost:9092"})to usetest_settings.KAFKA_BOOTSTRAP_SERVERSto match all other integration tests in the codebase.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
backend/tests/integration/dlq/test_dlq_discard_policy.pybackend/tests/integration/dlq/test_dlq_manager.pybackend/tests/integration/dlq/test_dlq_retry_immediate.py
🧰 Additional context used
🧬 Code graph analysis (1)
backend/tests/integration/dlq/test_dlq_retry_immediate.py (3)
backend/tests/conftest.py (2)
db(125-127)test_settings(72-74)backend/app/events/schema/schema_registry.py (1)
create_schema_registry_manager(233-236)backend/app/dlq/manager.py (1)
create_dlq_manager(463-502)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: E2E Tests
- GitHub Check: Integration Tests
🔇 Additional comments (3)
backend/tests/integration/dlq/test_dlq_retry_immediate.py (1)
12-12: LGTM: Proper settings injection via test_settings fixture.The changes correctly thread the
test_settingsfixture through component creation:
- Schema registry and DLQ manager now receive settings explicitly
- Topic prefix consistently derived from
test_settings.KAFKA_TOPIC_PREFIX- Addresses the scope mismatch mentioned in the PR objectives
This ensures test components use test-specific configuration rather than environment variables.
Also applies to: 26-30
backend/tests/integration/dlq/test_dlq_discard_policy.py (1)
12-12: LGTM: Consistent settings injection pattern.The changes mirror those in
test_dlq_retry_immediate.py, properly threadingtest_settingsthrough schema registry and DLQ manager creation, and deriving the topic prefix fromtest_settings.KAFKA_TOPIC_PREFIXfor consistency.Also applies to: 26-30
backend/tests/integration/dlq/test_dlq_manager.py (1)
3-3: LGTM: Settings injection plus unique execution IDs.The changes properly implement the same settings threading pattern as the other DLQ tests, with an additional improvement:
- The
uuidimport and unique execution ID (exec-dlq-persist-{uuid.uuid4().hex[:8]}) prevent conflicts when tests run in parallel under pytest-xdist- This aligns well with the
xdist_groupmarker ensuring serial execution within the DLQ test groupAlso applies to: 11-11, 25-27, 29-33
|
@codex review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".



Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.