-
Notifications
You must be signed in to change notification settings - Fork 0
2 events system (domain, avro) -> single one #84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughSwaps legacy Kafka event types for an Avro-backed DomainEvent model, embeds DomainEvent in DLQ documents, converts DLQManager to an async consumer that emits DLQ lifecycle events, and updates many imports, routing mappings, repositories, and tests to the new domain event surface. Changes
Sequence Diagram(s)sequenceDiagram
participant Producer as UnifiedProducer
participant Schema as SchemaRegistry
participant Kafka as Kafka Broker
participant DLQMgr as DLQManager
participant Mongo as MongoDB
participant Consumer as DLQ Events Consumer
Producer->>Schema: serialize_event(DomainEvent)
Schema-->>Producer: bytes
Producer->>Kafka: publish to EVENT_TYPE_TO_TOPIC[event.event_type]
alt publish failure or explicit DLQ
Producer->>Kafka: publish DLQ message to dlq_events
end
Kafka->>DLQMgr: deliver dlq_events message
DLQMgr->>Mongo: persist DLQMessage (event: DomainEvent)
DLQMgr->>Kafka: emit DLQ_MESSAGE_RECEIVED event (DLQ_EVENTS)
Kafka->>Consumer: deliver DLQ_MESSAGE_RECEIVED
Consumer->>DLQMgr: request retry/discard via manager API
alt retry
DLQMgr->>Kafka: republish original DomainEvent
DLQMgr->>Kafka: emit DLQ_MESSAGE_RETRIED
else discard
DLQMgr->>Mongo: update status DISCARDED
DLQMgr->>Kafka: emit DLQ_MESSAGE_DISCARDED
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 issue found across 7 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/tests/unit/services/saga/test_execution_saga_steps.py">
<violation number="1" location="backend/tests/unit/services/saga/test_execution_saga_steps.py:6">
P3: Duplicate import source: `app.domain.events.typed` is imported twice. Consider consolidating with the existing import on line 3: `from app.domain.events.typed import BaseEvent, ExecutionRequestedEvent`</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| from app.events.core import UnifiedProducer | ||
| from app.infrastructure.kafka.events import BaseEvent | ||
| from app.infrastructure.kafka.events.execution import ExecutionRequestedEvent | ||
| from app.domain.events.typed import BaseEvent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P3: Duplicate import source: app.domain.events.typed is imported twice. Consider consolidating with the existing import on line 3: from app.domain.events.typed import BaseEvent, ExecutionRequestedEvent
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/tests/unit/services/saga/test_execution_saga_steps.py, line 6:
<comment>Duplicate import source: `app.domain.events.typed` is imported twice. Consider consolidating with the existing import on line 3: `from app.domain.events.typed import BaseEvent, ExecutionRequestedEvent`</comment>
<file context>
@@ -3,7 +3,7 @@
from app.domain.saga import DomainResourceAllocation, DomainResourceAllocationCreate
from app.events.core import UnifiedProducer
-from app.infrastructure.kafka.events import BaseEvent
+from app.domain.events.typed import BaseEvent
from app.services.saga.execution_saga import (
AllocateResourcesStep,
</file context>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
15 issues found across 92 files
Note: This PR contains a large number of files. cubic only reviews up to 75 files per PR, so some files may not have been reviewed.
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="docs/architecture/model-conversion.md">
<violation number="1" location="docs/architecture/model-conversion.md:28">
P3: Path inconsistency in architecture diagram: `app/domain/events/typed.py` is placed in the Infrastructure Layer box, but `app/domain/` is shown as the Domain Layer. Consider either moving the events to an infrastructure path (e.g., `app/infrastructure/events/typed.py`) to match the layer designation, or clarify in the documentation why Pydantic event models live within the domain directory structure.</violation>
</file>
<file name="backend/tests/integration/dlq/test_dlq_manager.py">
<violation number="1" location="backend/tests/integration/dlq/test_dlq_manager.py:60">
P2: Broad exception catching with debug-level logging may mask test failures. If deserialization consistently fails, the test will timeout after 15 seconds with no actionable error message. Consider logging at warning/error level, or re-raising after a few failures to make issues visible.</violation>
</file>
<file name="backend/tests/integration/idempotency/test_consumer_idempotent.py">
<violation number="1" location="backend/tests/integration/idempotency/test_consumer_idempotent.py:84">
P1: Test no longer verifies idempotency correctly. The Future resolves after the first event, but the second duplicate event may not have been processed yet. The assertion `seen["n"] >= 1` doesn't prove duplicates were blocked—it would pass even if idempotency is broken. Consider waiting for both events to be attempted (e.g., brief delay after Future resolves) and asserting `seen["n"] == 1` to verify exactly one event was processed.</violation>
</file>
<file name="backend/app/dlq/models.py">
<violation number="1" location="backend/app/dlq/models.py:52">
P0: Breaking change: `age_seconds` property was removed but is still used by `dlq_processor.py` for filtering old messages and by `DLQMessageResponse` schema. This will cause `AttributeError` at runtime.</violation>
</file>
<file name="backend/app/events/core/producer.py">
<violation number="1" location="backend/app/events/core/producer.py:121">
P2: Using direct dictionary access `EVENT_TYPE_TO_TOPIC[...]` may raise `KeyError` if an event type is not in the mapping. Consider using the safer `get_topic_for_event()` function from the same module, which provides a default fallback to `KafkaTopic.SYSTEM_EVENTS`.</violation>
</file>
<file name="backend/app/services/notification_service.py">
<violation number="1" location="backend/app/services/notification_service.py:330">
P2: Changing from `asyncio.create_task()` to `await` blocks the caller until notification delivery completes. Since `_deliver_notification` can make external HTTP calls (webhooks/Slack with 30s timeouts), this could significantly increase latency for callers of `create_notification`. If this blocking behavior is intentional, consider whether the performance impact on API response times is acceptable. Otherwise, restore the background task pattern.</violation>
</file>
<file name="docs/components/schema-manager.md">
<violation number="1" location="docs/components/schema-manager.md:21">
P3: Inconsistent path format: should be `app/domain/events/typed.py` to match the convention used elsewhere in this document (e.g., `app/db/schema/schema_manager.py`).</violation>
</file>
<file name="backend/app/dlq/manager.py">
<violation number="1" location="backend/app/dlq/manager.py:75">
P1: Missing null safety in header decoding. If any Kafka header value is `None`, this will crash with `AttributeError`. The old code handled this with `v.decode("utf-8") if isinstance(v, (bytes, bytearray)) else (v or "")`.</violation>
<violation number="2" location="backend/app/dlq/manager.py:137">
P2: Missing error backoff sleep after exception. If persistent errors occur (e.g., database unavailable), the loop will spin rapidly without pause, causing excessive logging and CPU usage. The old code had `await asyncio.sleep(5)` after errors.</violation>
</file>
<file name="backend/app/services/sse/sse_shutdown_manager.py">
<violation number="1" location="backend/app/services/sse/sse_shutdown_manager.py:68">
P1: The `initiated_event` is created but never set. External code awaiting this event will hang indefinitely. Add `self.initiated_event.set()` in `initiate_shutdown()` after setting `_shutdown_initiated = True`.</violation>
<violation number="2" location="backend/app/services/sse/sse_shutdown_manager.py:69">
P1: The `notifying_event` is created but never set. External code awaiting this event will hang indefinitely. Add `self.notifying_event.set()` in `_notify_connections()` after setting the phase to NOTIFYING.</violation>
</file>
<file name="backend/app/events/core/dispatcher.py">
<violation number="1" location="backend/app/events/core/dispatcher.py:29">
P2: Dead code: `_topic_event_types` is initialized but never populated or used. The `_build_topic_mapping` method that populated this attribute was removed, but the attribute declaration was left behind. Consider removing this unused attribute.</violation>
</file>
<file name="backend/app/services/kafka_event_service.py">
<violation number="1" location="backend/app/services/kafka_event_service.py:201">
P2: Logging was reduced to only `event_id`, removing `event_type` and `aggregate_id` that were previously logged. This makes debugging and tracing events harder. Consider adding these fields back for consistency with `publish_event` and better observability.</violation>
</file>
<file name="backend/app/services/coordinator/coordinator.py">
<violation number="1" location="backend/app/services/coordinator/coordinator.py:246">
P1: Off-by-one error: queue positions are 1-indexed, so the front of queue is `position == 1`, not `position == 0`. The current check will never trigger for items at the front of the queue - it only triggers when position was `None` (error case) and defaulted to 0.</violation>
</file>
<file name="backend/tests/unit/services/idempotency/test_middleware.py">
<violation number="1" location="backend/tests/unit/services/idempotency/test_middleware.py:31">
P2: `MagicMock(spec=DomainEvent)` is problematic because `DomainEvent` is an `Annotated` union type alias, not a class. The `spec` parameter is designed for concrete classes. Consider using `spec=BaseEvent` instead, which is the base class that defines `event_type` and `event_id`.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| @property | ||
| def age_seconds(self) -> float: | ||
| return (datetime.now(timezone.utc) - self.failed_at).total_seconds() | ||
| headers: dict[str, str] = Field(default_factory=dict) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P0: Breaking change: age_seconds property was removed but is still used by dlq_processor.py for filtering old messages and by DLQMessageResponse schema. This will cause AttributeError at runtime.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/dlq/models.py, line 52:
<comment>Breaking change: `age_seconds` property was removed but is still used by `dlq_processor.py` for filtering old messages and by `DLQMessageResponse` schema. This will cause `AttributeError` at runtime.</comment>
<file context>
@@ -48,11 +49,7 @@ class DLQMessage:
- @property
- def age_seconds(self) -> float:
- return (datetime.now(timezone.utc) - self.failed_at).total_seconds()
+ headers: dict[str, str] = Field(default_factory=dict)
</file context>
| headers: dict[str, str] = Field(default_factory=dict) | |
| headers: dict[str, str] = Field(default_factory=dict) | |
| @property | |
| def age_seconds(self) -> float: | |
| return (datetime.now(timezone.utc) - self.failed_at).total_seconds() |
| await eventually(_one, timeout=10.0, interval=0.2) | ||
| # Await the future directly - true async, no polling | ||
| await asyncio.wait_for(handled_future, timeout=10.0) | ||
| assert seen["n"] >= 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: Test no longer verifies idempotency correctly. The Future resolves after the first event, but the second duplicate event may not have been processed yet. The assertion seen["n"] >= 1 doesn't prove duplicates were blocked—it would pass even if idempotency is broken. Consider waiting for both events to be attempted (e.g., brief delay after Future resolves) and asserting seen["n"] == 1 to verify exactly one event was processed.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/tests/integration/idempotency/test_consumer_idempotent.py, line 84:
<comment>Test no longer verifies idempotency correctly. The Future resolves after the first event, but the second duplicate event may not have been processed yet. The assertion `seen["n"] >= 1` doesn't prove duplicates were blocked—it would pass even if idempotency is broken. Consider waiting for both events to be attempted (e.g., brief delay after Future resolves) and asserting `seen["n"] == 1` to verify exactly one event was processed.</comment>
<file context>
@@ -68,18 +78,9 @@ async def handle(_ev: BaseEvent) -> None:
- await eventually(_one, timeout=10.0, interval=0.2)
+ # Await the future directly - true async, no polling
+ await asyncio.wait_for(handled_future, timeout=10.0)
+ assert seen["n"] >= 1
finally:
await wrapper.stop()
</file context>
| headers=headers, | ||
| ) | ||
| data = json.loads(msg.value) | ||
| headers = {k: v.decode() for k, v in (msg.headers or [])} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: Missing null safety in header decoding. If any Kafka header value is None, this will crash with AttributeError. The old code handled this with v.decode("utf-8") if isinstance(v, (bytes, bytearray)) else (v or "").
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/dlq/manager.py, line 75:
<comment>Missing null safety in header decoding. If any Kafka header value is `None`, this will crash with `AttributeError`. The old code handled this with `v.decode("utf-8") if isinstance(v, (bytes, bytearray)) else (v or "")`.</comment>
<file context>
@@ -61,91 +66,14 @@ def __init__(
- headers=headers,
- )
+ data = json.loads(msg.value)
+ headers = {k: v.decode() for k, v in (msg.headers or [])}
+ return DLQMessage(**data, dlq_offset=msg.offset, dlq_partition=msg.partition, headers=headers)
</file context>
|
|
||
| # Phase transition events for external coordination | ||
| self.initiated_event = asyncio.Event() # Set when shutdown initiated | ||
| self.notifying_event = asyncio.Event() # Set when entering notifying phase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: The notifying_event is created but never set. External code awaiting this event will hang indefinitely. Add self.notifying_event.set() in _notify_connections() after setting the phase to NOTIFYING.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/sse/sse_shutdown_manager.py, line 69:
<comment>The `notifying_event` is created but never set. External code awaiting this event will hang indefinitely. Add `self.notifying_event.set()` in `_notify_connections()` after setting the phase to NOTIFYING.</comment>
<file context>
@@ -64,6 +64,10 @@ def __init__(
+ # Phase transition events for external coordination
+ self.initiated_event = asyncio.Event() # Set when shutdown initiated
+ self.notifying_event = asyncio.Event() # Set when entering notifying phase
+
self.logger.info(
</file context>
| self._drain_complete_event = asyncio.Event() | ||
|
|
||
| # Phase transition events for external coordination | ||
| self.initiated_event = asyncio.Event() # Set when shutdown initiated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: The initiated_event is created but never set. External code awaiting this event will hang indefinitely. Add self.initiated_event.set() in initiate_shutdown() after setting _shutdown_initiated = True.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/sse/sse_shutdown_manager.py, line 68:
<comment>The `initiated_event` is created but never set. External code awaiting this event will hang indefinitely. Add `self.initiated_event.set()` in `initiate_shutdown()` after setting `_shutdown_initiated = True`.</comment>
<file context>
@@ -64,6 +64,10 @@ def __init__(
self._drain_complete_event = asyncio.Event()
+ # Phase transition events for external coordination
+ self.initiated_event = asyncio.Event() # Set when shutdown initiated
+ self.notifying_event = asyncio.Event() # Set when entering notifying phase
+
</file context>
|
|
||
| # Map topics to event types that can appear on them | ||
| self._topic_event_types: dict[str, set[type[BaseEvent]]] = defaultdict(set) | ||
| self._topic_event_types: dict[str, set[type[DomainEvent]]] = defaultdict(set) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Dead code: _topic_event_types is initialized but never populated or used. The _build_topic_mapping method that populated this attribute was removed, but the attribute declaration was left behind. Consider removing this unused attribute.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/events/core/dispatcher.py, line 29:
<comment>Dead code: `_topic_event_types` is initialized but never populated or used. The `_build_topic_mapping` method that populated this attribute was removed, but the attribute declaration was left behind. Consider removing this unused attribute.</comment>
<file context>
@@ -23,34 +23,23 @@ class EventDispatcher:
# Map topics to event types that can appear on them
- self._topic_event_types: dict[str, set[type[BaseEvent]]] = defaultdict(set)
+ self._topic_event_types: dict[str, set[type[DomainEvent]]] = defaultdict(set)
# Metrics per event type
</file context>
| ) | ||
|
|
||
| self.metrics.record_event_processing_duration(time.time() - start_time, event.event_type) | ||
| self.logger.info("Domain event published", extra={"event_id": event.event_id}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Logging was reduced to only event_id, removing event_type and aggregate_id that were previously logged. This makes debugging and tracing events harder. Consider adding these fields back for consistency with publish_event and better observability.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/kafka_event_service.py, line 201:
<comment>Logging was reduced to only `event_id`, removing `event_type` and `aggregate_id` that were previously logged. This makes debugging and tracing events harder. Consider adding these fields back for consistency with `publish_event` and better observability.</comment>
<file context>
@@ -214,83 +174,31 @@ async def publish_pod_event(
- )
-
+ self.metrics.record_event_processing_duration(time.time() - start_time, event.event_type)
+ self.logger.info("Domain event published", extra={"event_id": event.event_id})
return event.event_id
</file context>
| self.logger.info("Domain event published", extra={"event_id": event.event_id}) | |
| self.logger.info("Domain event published", extra={"event_type": event.event_type, "event_id": event.event_id, "aggregate_id": event.aggregate_id}) |
| @pytest.fixture | ||
| def event(self) -> MagicMock: | ||
| event = MagicMock(spec=BaseEvent) | ||
| event = MagicMock(spec=DomainEvent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: MagicMock(spec=DomainEvent) is problematic because DomainEvent is an Annotated union type alias, not a class. The spec parameter is designed for concrete classes. Consider using spec=BaseEvent instead, which is the base class that defines event_type and event_id.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/tests/unit/services/idempotency/test_middleware.py, line 31:
<comment>`MagicMock(spec=DomainEvent)` is problematic because `DomainEvent` is an `Annotated` union type alias, not a class. The `spec` parameter is designed for concrete classes. Consider using `spec=BaseEvent` instead, which is the base class that defines `event_type` and `event_id`.</comment>
<file context>
@@ -28,7 +28,7 @@ def mock_handler(self) -> AsyncMock:
@pytest.fixture
def event(self) -> MagicMock:
- event = MagicMock(spec=BaseEvent)
+ event = MagicMock(spec=DomainEvent)
event.event_type = "test.event"
event.event_id = "event-123"
</file context>
| subgraph "Infrastructure Layer" | ||
| INF["Pydantic/ODM<br/><code>app/db/docs/</code><br/><code>app/infrastructure/kafka/events/</code>"] | ||
| INF["Pydantic/ODM<br/><code>app/db/docs/</code><br/><code>app/domain/events/typed.py</code>"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P3: Path inconsistency in architecture diagram: app/domain/events/typed.py is placed in the Infrastructure Layer box, but app/domain/ is shown as the Domain Layer. Consider either moving the events to an infrastructure path (e.g., app/infrastructure/events/typed.py) to match the layer designation, or clarify in the documentation why Pydantic event models live within the domain directory structure.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At docs/architecture/model-conversion.md, line 28:
<comment>Path inconsistency in architecture diagram: `app/domain/events/typed.py` is placed in the Infrastructure Layer box, but `app/domain/` is shown as the Domain Layer. Consider either moving the events to an infrastructure path (e.g., `app/infrastructure/events/typed.py`) to match the layer designation, or clarify in the documentation why Pydantic event models live within the domain directory structure.</comment>
<file context>
@@ -25,7 +25,7 @@ graph TB
subgraph "Infrastructure Layer"
- INF["Pydantic/ODM<br/><code>app/db/docs/</code><br/><code>app/infrastructure/kafka/events/</code>"]
+ INF["Pydantic/ODM<br/><code>app/db/docs/</code><br/><code>app/domain/events/typed.py</code>"]
end
</file context>
| The `SchemaRegistryManager` class in `app/events/schema/schema_registry.py` handles Avro serialization for Kafka events. All registry operations are async and must be awaited. The manager connects to a Confluent Schema Registry and registers schemas for all event types at startup via `await initialize_schemas()`. | ||
|
|
||
| Each event class (subclass of `BaseEvent`) generates its own Avro schema from Pydantic model definitions. The manager registers these schemas with subjects named after the class (like `ExecutionRequestedEvent-value`) and sets FORWARD compatibility, meaning new schemas can add fields but not remove required ones. This allows producers to be upgraded before consumers without breaking deserialization. | ||
| All event classes in `domain/events/typed.py` extend `AvroBase` (from `pydantic-avro`), enabling automatic Avro schema generation. The manager registers these schemas with subjects named after the class (like `ExecutionRequestedEvent-value`) and sets FORWARD compatibility, meaning new schemas can add fields but not remove required ones. This allows producers to be upgraded before consumers without breaking deserialization. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P3: Inconsistent path format: should be app/domain/events/typed.py to match the convention used elsewhere in this document (e.g., app/db/schema/schema_manager.py).
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At docs/components/schema-manager.md, line 21:
<comment>Inconsistent path format: should be `app/domain/events/typed.py` to match the convention used elsewhere in this document (e.g., `app/db/schema/schema_manager.py`).</comment>
<file context>
@@ -18,7 +18,7 @@ Repositories don't create their own indexes — they only read and write. This s
The `SchemaRegistryManager` class in `app/events/schema/schema_registry.py` handles Avro serialization for Kafka events. All registry operations are async and must be awaited. The manager connects to a Confluent Schema Registry and registers schemas for all event types at startup via `await initialize_schemas()`.
-Each event class (subclass of `BaseEvent`) generates its own Avro schema from Pydantic model definitions. The manager registers these schemas with subjects named after the class (like `ExecutionRequestedEvent-value`) and sets FORWARD compatibility, meaning new schemas can add fields but not remove required ones. This allows producers to be upgraded before consumers without breaking deserialization.
+All event classes in `domain/events/typed.py` extend `AvroBase` (from `pydantic-avro`), enabling automatic Avro schema generation. The manager registers these schemas with subjects named after the class (like `ExecutionRequestedEvent-value`) and sets FORWARD compatibility, meaning new schemas can add fields but not remove required ones. This allows producers to be upgraded before consumers without breaking deserialization.
Serialization and deserialization are async — `await serialize_event(event)` and `await deserialize_event(data, topic)` must be awaited. The wire format follows Confluent conventions: a magic byte, four-byte schema id, then the Avro binary payload. The underlying `python-schema-registry-client` library handles schema registration caching internally. The manager maintains a bidirectional cache between schema ids and Python event classes for deserialization. When deserializing, it reads the schema id from the message header, looks up the corresponding event class, deserializes the Avro payload to a dict, and hydrates the Pydantic model.
</file context>
| All event classes in `domain/events/typed.py` extend `AvroBase` (from `pydantic-avro`), enabling automatic Avro schema generation. The manager registers these schemas with subjects named after the class (like `ExecutionRequestedEvent-value`) and sets FORWARD compatibility, meaning new schemas can add fields but not remove required ones. This allows producers to be upgraded before consumers without breaking deserialization. | |
| All event classes in `app/domain/events/typed.py` extend `AvroBase` (from `pydantic-avro`), enabling automatic Avro schema generation. The manager registers these schemas with subjects named after the class (like `ExecutionRequestedEvent-value`) and sets FORWARD compatibility, meaning new schemas can add fields but not remove required ones. This allows producers to be upgraded before consumers without breaking deserialization. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (11)
backend/app/services/notification_service.py (1)
631-633: Fix null-pointer dereference onresource_usage.The pipeline failure indicates
resource_usagecan beNone(typed asResourceUsageDomain | None), but line 632 accesses.execution_time_wall_secondswithout a null check. This will raiseAttributeErrorat runtime.🐛 Proposed fix
title = f"Execution Completed: {event.execution_id}" - body = ( - f"Your execution completed successfully. Duration: {event.resource_usage.execution_time_wall_seconds:.2f}s." - ) + if event.resource_usage: + body = f"Your execution completed successfully. Duration: {event.resource_usage.execution_time_wall_seconds:.2f}s." + else: + body = "Your execution completed successfully."backend/tests/unit/services/saga/test_saga_orchestrator_unit.py (1)
30-43: Pipeline failure: MyPy reports BaseEvent has type "Any".The metadata field update to
EventMetadatais correct. However, MyPy reports thatBaseEvent(imported fromapp.infrastructure.kafka.eventson line 17) has typeAny, causing the "Class cannot subclass" error.This suggests
BaseEventmay lack proper type annotations or exports. Consider:
- Ensuring
BaseEventis properly typed inapp.infrastructure.kafka.events- Or updating
_FakeEventto extend fromDomainEvent(fromapp.domain.events.typed) to align with the migration#!/bin/bash # Check BaseEvent definition and exports in infrastructure module ast-grep --pattern 'class BaseEvent($_) { $$$ }' # Also check if there's a __all__ that might be missing BaseEvent rg -n "BaseEvent" --type py -C 3 backend/app/infrastructure/kafka/events/backend/tests/unit/services/saga/test_saga_step_and_base.py (1)
75-76: Type error:SagaStep[BaseEvent]generic constraint violation.
SagaStepis generic overDomainEvent, which is a union of concrete event types. UsingBaseEventas the type argument violates this constraint. Replace with a concrete event type or useDomainEventdirectly if the step should handle any domain event.🐛 Proposed fix
def get_steps(self) -> list[SagaStep[BaseEvent]]: + def get_steps(self) -> list[SagaStep[DomainEvent]]: return []- class S(SagaStep[BaseEvent]): - async def execute(self, context: SagaContext, event: BaseEvent) -> bool: + class S(SagaStep[DomainEvent]): + async def execute(self, context: SagaContext, event: DomainEvent) -> bool: return TrueAlso add the import at line 6:
-from app.domain.events.typed import BaseEvent, EventMetadata +from app.domain.events.typed import BaseEvent, DomainEvent, EventMetadataAlso applies to: 82-84
backend/tests/integration/events/test_schema_registry_real.py (1)
17-24:PodCreatedEventhas notopicattribute.The pipeline fails because
PodCreatedEventdoesn't expose atopicattribute. Based on the PR's migration to domain events, topic resolution is now handled via centralized mappings (likely inmappings.py), not as an event attribute.🐛 Proposed fix using topic mapping or constant
+from app.domain.enums.kafka import KafkaTopicThen update the deserialization call:
- obj = await m.deserialize_event(data, topic=str(ev.topic)) + obj = await m.deserialize_event(data, topic=KafkaTopic.POD_EVENTS.value)Alternatively, if there's a topic resolver utility:
+from app.infrastructure.kafka.mappings import get_topic_for_event - obj = await m.deserialize_event(data, topic=str(ev.topic)) + obj = await m.deserialize_event(data, topic=get_topic_for_event(ev))backend/workers/dlq_processor.py (1)
67-71:DLQMessagedoes not have anage_secondsattribute—usecreated_atto compute message age.The filter function accesses
message.age_seconds, which does not exist on theDLQMessageclass. The class has acreated_atdatetime field that should be used instead.Fix
+from datetime import datetime, timezone def filter_old_messages(message: DLQMessage) -> bool: max_age_days = 7 - return message.age_seconds < (max_age_days * 24 * 3600) + max_age_seconds = max_age_days * 24 * 3600 + age = (datetime.now(timezone.utc) - message.created_at).total_seconds() + return age < max_age_secondsbackend/tests/integration/services/sse/test_redis_bus.py (1)
21-28: Change BaseEvent import to correct module path.Line 12 imports
BaseEventfrom a non-existent moduleapp.infrastructure.kafka.events.BaseEventis defined inapp.domain.events.typedand exported fromapp.domain.events. Change the import to:from app.domain.events import BaseEventThis non-existent module causes MyPy to treat
BaseEventas typeAny, which triggers the subclass error for_DummyEvent.backend/tests/unit/services/sse/test_kafka_redis_bridge.py (1)
26-29: Type mismatch:_FakeBus.publish_eventsignature differs fromSSERedisBus.The real
SSERedisBus.publish_eventnow acceptsDomainEvent(line 65 inredis_bus.py), but this fake usesBaseEvent. This inconsistency could mask integration issues and cause type errors.🔧 Proposed fix
+from app.domain.events.typed import DomainEvent + class _FakeBus(SSERedisBus): """Fake SSERedisBus for testing.""" def __init__(self) -> None: - self.published: list[tuple[str, BaseEvent]] = [] + self.published: list[tuple[str, DomainEvent]] = [] - async def publish_event(self, execution_id: str, event: BaseEvent) -> None: + async def publish_event(self, execution_id: str, event: DomainEvent) -> None: self.published.append((execution_id, event))backend/app/services/result_processor/processor.py (2)
177-183: Fix null dereference onresource_usage- pipeline failure.
ExecutionCompletedEvent.resource_usageis typed asResourceUsageDomain | None. Accessing.execution_time_wall_secondsand.peak_memory_kbwithout a null check will raiseAttributeErrorwhenresource_usageisNone.Proposed fix
- runtime_seconds = event.resource_usage.execution_time_wall_seconds - self._metrics.record_execution_duration(runtime_seconds, lang_and_version) - - # Record memory utilization - memory_mib = event.resource_usage.peak_memory_kb / 1024 + if event.resource_usage is not None: + runtime_seconds = event.resource_usage.execution_time_wall_seconds + self._metrics.record_execution_duration(runtime_seconds, lang_and_version) + + # Record memory utilization + memory_mib = event.resource_usage.peak_memory_kb / 1024 + else: + self.logger.warning(f"No resource_usage for completed execution {event.execution_id}") + runtime_seconds = 0.0 + memory_mib = 0.0Also guard the memory utilization percentage calculation (lines 186-191) since it depends on
memory_mib.
218-218: Fix type mismatch inrecord_errorcall - pipeline failure.
event.error_typeisExecutionErrorType | None, butrecord_errorexpects astr. Additionally, iferror_typeisNone, this will fail.Proposed fix
- self._metrics.record_error(event.error_type) + if event.error_type is not None: + self._metrics.record_error(event.error_type.value if hasattr(event.error_type, 'value') else str(event.error_type))Or if
record_errorshould accept the enum directly, update its signature instead.backend/tests/integration/dlq/test_dlq_manager.py (1)
63-70: Fix:DomainEventusesmodel_dump(), notto_dict().The pipeline failure indicates
ExecutionRequestedEventhas no attributeto_dict. Pydantic v2 models usemodel_dump()for serialization.🐛 Proposed fix
payload = { - "event": ev.to_dict(), + "event": ev.model_dump(mode="json"), "original_topic": f"{prefix}{str(KafkaTopic.EXECUTION_EVENTS)}", "error": "handler failed", "retry_count": 0, "failed_at": datetime.now(timezone.utc).isoformat(), "producer_id": "tests", }backend/app/events/core/producer.py (1)
119-148: PotentialUnboundLocalErrorifevent_typenot in topic mapping.If
EVENT_TYPE_TO_TOPIC[event_to_produce.event_type]raises aKeyErrorat line 121, thetopicvariable will be undefined when theexcept KafkaErrorblock executes at line 146, causing anUnboundLocalError.🐛 Proposed fix: define topic before the try block or handle KeyError separately
async def produce( self, event_to_produce: DomainEvent, key: str | None = None, headers: dict[str, str] | None = None ) -> None: """Produce a message to Kafka.""" if not self._producer: self.logger.error("Producer not running") return + topic = f"{self._topic_prefix}{EVENT_TYPE_TO_TOPIC[event_to_produce.event_type]}" try: serialized_value = await self._schema_registry.serialize_event(event_to_produce) - topic = f"{self._topic_prefix}{EVENT_TYPE_TO_TOPIC[event_to_produce.event_type]}" # Convert headers to list of tuples format header_list = [(k, v.encode()) for k, v in headers.items()] if headers else None
🤖 Fix all issues with AI agents
In @backend/app/services/sse/sse_shutdown_manager.py:
- Around line 67-70: The new asyncio.Events initiated_event and notifying_event
are never set; in initiate_shutdown() call self.initiated_event.set()
immediately after transitioning state to the initiated/shutdown-started phase
(so external coordinators see shutdown begin), and in _notify_connections() call
self.notifying_event.set() right before or as you begin the notifying/broadcast
phase so external observers know notifying has started; ensure these calls are
safe to call idempotently (no blocking) and placed before any awaits that yield
control.
In @backend/tests/integration/services/sse/test_partitioned_event_router.py:
- Around line 49-51: The test is flaky because using
asyncio.wait_for(subscription.get(RedisSSEMessage), timeout=2.0) only protects
against hangs but subscription.get has an internal 0.5s timeout and can return
None; replace the single-call approach with a retry/poll loop (or reuse the
existing eventual() helper) that repeatedly calls
subscription.get(RedisSSEMessage) until a non-None message is returned or a
total timeout elapses, and then assert msg is not None; target the test code
around subscription.get, RedisSSEMessage, and asyncio.wait_for to implement the
retry semantics.
In @backend/tests/unit/services/saga/test_execution_saga_steps.py:
- Around line 3-6: The _FakeProducer test double currently uses BaseEvent types
which mismatches UnifiedProducer.produce's contract; update the imports to
include DomainEvent from app.domain.events.typed and change _FakeProducer so its
events list is typed as list[DomainEvent] and its async produce method
signature/parameter uses DomainEvent (not BaseEvent), matching
UnifiedProducer.produce; ensure you remove or stop using BaseEvent in
_FakeProducer and keep behavior of appending the event to self.events.
In @backend/tests/unit/services/saga/test_saga_step_and_base.py:
- Around line 46-50: The test constructs BaseEvent (an abstract base) and passes
it to SagaContext.add_event, but add_event expects a concrete DomainEvent
subtype; replace the BaseEvent instantiation with a concrete event class (e.g.,
a SystemErrorEvent or other concrete subclass that corresponds to
EventType.SYSTEM_ERROR) and pass that to ctx.add_event, or if such a concrete
class does not exist, add a concrete subclass of BaseEvent (and include it in
the DomainEvent union) with appropriate metadata and use that instance in the
test.
🧹 Nitpick comments (9)
backend/tests/integration/services/events/test_event_bus.py (1)
51-53: Consider extending assertions for robustness.The Future-based await with timeout is cleaner than polling. Optionally, consider also asserting on the payload to ensure full event integrity:
assert received.event_type == "test.created" assert received.payload == {"x": 1}This would catch potential serialization/deserialization issues in the Kafka path.
backend/tests/integration/test_replay_routes.py (1)
388-392: Validate start response and align with file's error-handling pattern.Two concerns:
Line 388: The start request response is discarded. If the session fails to start, the test proceeds anyway and may produce misleading failures on line 392.
Line 392: Uses strict
== 200assertion, but other tests in this file (e.g., lines 102, 135, 168) usein [200, 404]with early return. This inconsistency could cause flaky test failures if the session becomes unavailable.Suggested fix to align with file patterns
- await test_admin.post(f"/api/v1/replay/sessions/{session_id}/start") - - # Check progress immediately - session state available right after start - detail_response = await test_admin.get(f"/api/v1/replay/sessions/{session_id}") - assert detail_response.status_code == 200 + start_response = await test_admin.post(f"/api/v1/replay/sessions/{session_id}/start") + assert start_response.status_code in [200, 404] + if start_response.status_code != 200: + return + + # Check progress immediately - session state available right after start + detail_response = await test_admin.get(f"/api/v1/replay/sessions/{session_id}") + assert detail_response.status_code in [200, 404] + if detail_response.status_code != 200: + returnbackend/app/events/schema/schema_registry.py (1)
16-21: Consider adding defensive handling for union extraction.The logic assumes
DomainEventis alwaysAnnotated[Union[...], Discriminator]. If the type structure changes, this could break silently. The current implementation is correct for the existing design, but consider adding validation.💡 Optional: Add defensive check
@lru_cache(maxsize=1) def _get_all_event_classes() -> list[type[DomainEvent]]: """Get all concrete event classes from DomainEvent union.""" union_type = get_args(DomainEvent)[0] # Annotated[Union[...], Discriminator] -> Union + if not get_args(union_type): + raise RuntimeError("DomainEvent union has no concrete types - check type definition") return list(get_args(union_type)) if get_origin(union_type) else [union_type]backend/app/services/k8s_worker/worker.py (1)
215-225: Consider replacingassertwith explicit type guards for production robustness.Using
assert isinstance(...)for type narrowing works but can be silently disabled when Python runs with the-O(optimize) flag. In production, this could lead to unexpectedAttributeErrorexceptions instead of clear type errors.♻️ Suggested defensive pattern
async def _handle_create_pod_command_wrapper(self, event: DomainEvent) -> None: """Wrapper for handling CreatePodCommandEvent with type safety.""" - assert isinstance(event, CreatePodCommandEvent) + if not isinstance(event, CreatePodCommandEvent): + self.logger.error(f"Expected CreatePodCommandEvent, got {type(event).__name__}") + return self.logger.info(f"Processing create_pod_command for execution {event.execution_id} from saga {event.saga_id}") await self._handle_create_pod_command(event) async def _handle_delete_pod_command_wrapper(self, event: DomainEvent) -> None: """Wrapper for handling DeletePodCommandEvent.""" - assert isinstance(event, DeletePodCommandEvent) + if not isinstance(event, DeletePodCommandEvent): + self.logger.error(f"Expected DeletePodCommandEvent, got {type(event).__name__}") + return self.logger.info(f"Processing delete_pod_command for execution {event.execution_id} from saga {event.saga_id}") await self._handle_delete_pod_command(event)backend/tests/integration/idempotency/test_consumer_idempotent.py (1)
82-84: Consider tightening the assertion for idempotency verification.The assertion
seen["n"] >= 1verifies the handler ran at least once, but since two identical events are produced and the test is verifying idempotent behavior, assertingseen["n"] == 1would more precisely confirm that the duplicate was blocked.Suggested fix
- assert seen["n"] >= 1 + assert seen["n"] == 1, f"Expected exactly 1 invocation (duplicate blocked), got {seen['n']}"backend/tests/unit/domain/events/test_event_schema_coverage.py (1)
6-6: Consider updating docstring terminology for consistency.The docstrings still reference "Kafka event class" terminology, but after this PR's migration, all events are unified under
DomainEvent. The distinction between "domain" and "Kafka" event classes is now less meaningful since they share the sameBaseEventhierarchy.This is a minor documentation nit that could be addressed in a follow-up.
Also applies to: 52-54
backend/app/infrastructure/kafka/mappings.py (1)
102-104: Consider caching forget_event_types_for_topic.Unlike
get_event_class_for_typeandget_topic_for_event, this function iterates through the entire mapping on each call. If called frequently, consider adding@lru_cache.♻️ Optional: Add caching
+@lru_cache(maxsize=32) def get_event_types_for_topic(topic: KafkaTopic) -> list[EventType]: """Get all event types that publish to a given topic.""" return [et for et, t in EVENT_TYPE_TO_TOPIC.items() if t == topic]backend/app/services/kafka_event_service.py (1)
67-73: Correlation ID precedence logic is correct but could be simplified.The logic ensures explicit
correlation_idparameter takes precedence over metadata's value. Consider simplifying by always setting correlation_id on metadata creation if provided.♻️ Optional simplification
- event_metadata = metadata or EventMetadata( - service_name=self.settings.SERVICE_NAME, - service_version=self.settings.SERVICE_VERSION, - correlation_id=correlation_id or str(uuid4()), - ) - if correlation_id and event_metadata.correlation_id != correlation_id: - event_metadata = event_metadata.model_copy(update={"correlation_id": correlation_id}) + effective_correlation_id = correlation_id or (metadata.correlation_id if metadata else None) or str(uuid4()) + event_metadata = metadata.model_copy(update={"correlation_id": effective_correlation_id}) if metadata else EventMetadata( + service_name=self.settings.SERVICE_NAME, + service_version=self.settings.SERVICE_VERSION, + correlation_id=effective_correlation_id, + )backend/app/services/idempotency/middleware.py (1)
206-250: Remove unusedasync_handlerfunction.The
async_handlerfunction is defined but never used. The code path either registersdispatch_handlerwith the dispatcher (line 258) or logs an error (line 261) - neither path usesasync_handler. This appears to be dead code left over from a previous implementation.♻️ Proposed fix
- # Create an async handler that processes the message - async def async_handler(message: Any) -> Any: - self.logger.info(f"IDEMPOTENT HANDLER CALLED for {event_type}") - - # Extract event from confluent-kafka Message - if not hasattr(message, "value"): - self.logger.error(f"Received non-Message object for {event_type}: {type(message)}") - return None - - # Debug log to check message details - self.logger.info( - f"Handler for {event_type} - Message type: {type(message)}, " - f"has key: {hasattr(message, 'key')}, " - f"has topic: {hasattr(message, 'topic')}" - ) - - raw_value = message.value - - # Debug the raw value - self.logger.info(f"Raw value extracted: {raw_value[:100] if raw_value else 'None or empty'}") - - # Handle tombstone messages (null value for log compaction) - if raw_value is None: - self.logger.warning(f"Received empty message for {event_type} - tombstone or consumed value") - return None - - # Handle empty messages - if not raw_value: - self.logger.warning(f"Received empty message for {event_type} - empty bytes") - return None - - try: - # Deserialize using schema registry if available - event = await self.consumer._schema_registry.deserialize_event(raw_value, message.topic) - if not event: - self.logger.error(f"Failed to deserialize event for {event_type}") - return None - - # Call the idempotent wrapper directly in async context - await idempotent_wrapper(event) - - self.logger.debug(f"Successfully processed {event_type} event: {event.event_id}") - return None - except Exception as e: - self.logger.error(f"Failed to process message for {event_type}: {e}", exc_info=True) - raise - # Register with the dispatcher if available
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (92)
backend/app/api/routes/dlq.pybackend/app/api/routes/events.pybackend/app/api/routes/execution.pybackend/app/db/docs/dlq.pybackend/app/dlq/manager.pybackend/app/dlq/models.pybackend/app/domain/enums/events.pybackend/app/domain/enums/kafka.pybackend/app/domain/events/typed.pybackend/app/events/core/consumer.pybackend/app/events/core/dispatcher.pybackend/app/events/core/dlq_handler.pybackend/app/events/core/producer.pybackend/app/events/event_store.pybackend/app/events/event_store_consumer.pybackend/app/events/schema/schema_registry.pybackend/app/infrastructure/kafka/__init__.pybackend/app/infrastructure/kafka/events/__init__.pybackend/app/infrastructure/kafka/events/base.pybackend/app/infrastructure/kafka/events/execution.pybackend/app/infrastructure/kafka/events/metadata.pybackend/app/infrastructure/kafka/events/notification.pybackend/app/infrastructure/kafka/events/pod.pybackend/app/infrastructure/kafka/events/saga.pybackend/app/infrastructure/kafka/events/system.pybackend/app/infrastructure/kafka/events/user.pybackend/app/infrastructure/kafka/mappings.pybackend/app/services/coordinator/coordinator.pybackend/app/services/event_replay/replay_service.pybackend/app/services/execution_service.pybackend/app/services/idempotency/idempotency_manager.pybackend/app/services/idempotency/middleware.pybackend/app/services/k8s_worker/pod_builder.pybackend/app/services/k8s_worker/worker.pybackend/app/services/kafka_event_service.pybackend/app/services/notification_service.pybackend/app/services/pod_monitor/event_mapper.pybackend/app/services/pod_monitor/monitor.pybackend/app/services/result_processor/processor.pybackend/app/services/saga/execution_saga.pybackend/app/services/saga/saga_orchestrator.pybackend/app/services/saga/saga_step.pybackend/app/services/sse/kafka_redis_bridge.pybackend/app/services/sse/redis_bus.pybackend/app/services/sse/sse_shutdown_manager.pybackend/tests/e2e/test_k8s_worker_create_pod.pybackend/tests/e2e/test_resource_cleaner_orphan.pybackend/tests/helpers/events.pybackend/tests/helpers/eventually.pybackend/tests/helpers/kafka.pybackend/tests/integration/dlq/test_dlq_manager.pybackend/tests/integration/events/test_consume_roundtrip.pybackend/tests/integration/events/test_dlq_handler.pybackend/tests/integration/events/test_event_dispatcher.pybackend/tests/integration/events/test_event_store.pybackend/tests/integration/events/test_schema_registry_real.pybackend/tests/integration/idempotency/test_consumer_idempotent.pybackend/tests/integration/idempotency/test_decorator_idempotent.pybackend/tests/integration/idempotency/test_idempotency.pybackend/tests/integration/idempotency/test_idempotent_handler.pybackend/tests/integration/notifications/test_notification_sse.pybackend/tests/integration/result_processor/test_result_processor.pybackend/tests/integration/services/coordinator/test_execution_coordinator.pybackend/tests/integration/services/events/test_event_bus.pybackend/tests/integration/services/sse/test_partitioned_event_router.pybackend/tests/integration/services/sse/test_redis_bus.pybackend/tests/integration/test_notifications_routes.pybackend/tests/integration/test_replay_routes.pybackend/tests/integration/test_sse_routes.pybackend/tests/integration/test_user_settings_routes.pybackend/tests/unit/domain/events/test_event_schema_coverage.pybackend/tests/unit/events/test_event_dispatcher.pybackend/tests/unit/events/test_metadata_model.pybackend/tests/unit/events/test_schema_registry_manager.pybackend/tests/unit/services/coordinator/test_queue_manager.pybackend/tests/unit/services/idempotency/test_idempotency_manager.pybackend/tests/unit/services/idempotency/test_middleware.pybackend/tests/unit/services/pod_monitor/test_event_mapper.pybackend/tests/unit/services/pod_monitor/test_monitor.pybackend/tests/unit/services/saga/test_execution_saga_steps.pybackend/tests/unit/services/saga/test_saga_comprehensive.pybackend/tests/unit/services/saga/test_saga_orchestrator_unit.pybackend/tests/unit/services/saga/test_saga_step_and_base.pybackend/tests/unit/services/sse/test_kafka_redis_bridge.pybackend/tests/unit/services/sse/test_sse_shutdown_manager.pybackend/tests/unit/services/test_pod_builder.pybackend/workers/dlq_processor.pydocs/architecture/event-system-design.mddocs/architecture/kafka-topic-architecture.mddocs/architecture/model-conversion.mddocs/architecture/user-settings-events.mddocs/components/schema-manager.md
💤 Files with no reviewable changes (10)
- backend/tests/helpers/eventually.py
- backend/app/infrastructure/kafka/events/base.py
- backend/app/infrastructure/kafka/events/execution.py
- backend/app/infrastructure/kafka/events/notification.py
- backend/app/infrastructure/kafka/events/metadata.py
- backend/app/infrastructure/kafka/events/system.py
- backend/app/infrastructure/kafka/events/user.py
- backend/app/infrastructure/kafka/events/saga.py
- backend/app/infrastructure/kafka/events/init.py
- backend/app/infrastructure/kafka/events/pod.py
🧰 Additional context used
🧬 Code graph analysis (56)
backend/app/events/core/dlq_handler.py (1)
backend/tests/unit/services/idempotency/test_middleware.py (1)
event(30-34)
backend/tests/e2e/test_resource_cleaner_orphan.py (1)
backend/app/services/result_processor/resource_cleaner.py (1)
cleanup_orphaned_resources(149-173)
backend/app/services/idempotency/idempotency_manager.py (1)
backend/app/domain/events/typed.py (1)
BaseEvent(30-42)
backend/tests/integration/services/coordinator/test_execution_coordinator.py (2)
backend/app/services/coordinator/coordinator.py (1)
_handle_execution_requested(215-251)backend/tests/unit/services/coordinator/test_queue_manager.py (1)
ev(14-15)
backend/tests/unit/services/saga/test_saga_orchestrator_unit.py (1)
backend/app/domain/events/typed.py (1)
EventMetadata(16-27)
backend/tests/unit/events/test_schema_registry_manager.py (1)
backend/app/domain/events/typed.py (1)
ExecutionRequestedEvent(48-62)
backend/app/domain/events/typed.py (1)
backend/app/domain/enums/events.py (1)
EventType(4-87)
backend/app/services/k8s_worker/pod_builder.py (1)
backend/app/domain/events/typed.py (1)
CreatePodCommandEvent(382-397)
backend/app/services/saga/saga_step.py (1)
backend/tests/unit/services/idempotency/test_middleware.py (1)
event(30-34)
backend/app/services/pod_monitor/event_mapper.py (2)
backend/app/domain/events/typed.py (7)
EventMetadata(16-27)ExecutionCompletedEvent(95-101)ExecutionFailedEvent(104-112)ExecutionTimeoutEvent(115-121)PodRunningEvent(149-153)PodScheduledEvent(142-146)PodTerminatedEvent(176-182)backend/app/domain/execution/models.py (1)
ResourceUsageDomain(13-19)
backend/tests/unit/services/sse/test_sse_shutdown_manager.py (2)
backend/app/services/sse/sse_shutdown_manager.py (6)
initiate_shutdown(127-154)unregister_connection(109-125)get_shutdown_status(273-286)SSEShutdownManager(22-307)register_connection(80-107)is_shutting_down(269-271)backend/tests/unit/services/sse/test_sse_service.py (4)
unregister_connection(91-92)get_shutdown_status(97-104)register_connection(87-89)is_shutting_down(94-95)
backend/app/services/pod_monitor/monitor.py (1)
backend/app/services/kafka_event_service.py (1)
publish_domain_event(177-202)
backend/tests/unit/services/coordinator/test_queue_manager.py (1)
backend/app/domain/events/typed.py (1)
ExecutionRequestedEvent(48-62)
backend/tests/integration/services/sse/test_partitioned_event_router.py (3)
backend/app/services/sse/redis_bus.py (1)
get(23-35)backend/tests/unit/services/sse/test_sse_service.py (1)
get(32-41)backend/app/schemas_pydantic/sse.py (1)
RedisSSEMessage(58-63)
backend/tests/integration/events/test_dlq_handler.py (1)
backend/app/domain/events/typed.py (2)
EventMetadata(16-27)SagaStartedEvent(326-331)
backend/tests/helpers/kafka.py (1)
backend/app/events/core/producer.py (2)
producer(56-57)UnifiedProducer(23-218)
backend/app/api/routes/execution.py (1)
backend/app/domain/events/typed.py (2)
BaseEvent(30-42)EventMetadata(16-27)
backend/tests/integration/notifications/test_notification_sse.py (3)
backend/app/services/sse/redis_bus.py (1)
get(23-35)backend/tests/unit/services/sse/test_sse_service.py (1)
get(32-41)backend/app/schemas_pydantic/sse.py (1)
RedisNotificationMessage(91-101)
backend/tests/integration/idempotency/test_idempotency.py (2)
backend/tests/unit/services/idempotency/test_middleware.py (1)
event(30-34)backend/app/services/idempotency/middleware.py (1)
idempotent_handler(93-119)
backend/tests/unit/services/idempotency/test_idempotency_manager.py (1)
backend/app/domain/events/typed.py (1)
BaseEvent(30-42)
backend/tests/unit/events/test_event_dispatcher.py (2)
backend/app/events/core/dispatcher.py (1)
EventDispatcher(15-177)backend/tests/helpers/events.py (1)
make_execution_requested_event(7-50)
backend/app/services/k8s_worker/worker.py (1)
backend/app/domain/events/typed.py (5)
CreatePodCommandEvent(382-397)DeletePodCommandEvent(400-406)ExecutionFailedEvent(104-112)ExecutionStartedEvent(80-85)PodCreatedEvent(135-139)
backend/app/events/event_store.py (3)
backend/tests/unit/services/pod_monitor/test_monitor.py (1)
store_event(53-55)backend/app/db/repositories/event_repository.py (2)
store_event(39-52)get_event(68-72)backend/tests/unit/services/idempotency/test_middleware.py (1)
event(30-34)
backend/tests/integration/events/test_event_store.py (3)
backend/tests/conftest.py (1)
app(79-91)backend/tests/unit/conftest.py (1)
app(62-63)backend/tests/unit/services/idempotency/test_middleware.py (1)
event(30-34)
backend/app/services/saga/saga_orchestrator.py (2)
backend/app/domain/events/typed.py (2)
EventMetadata(16-27)SagaCancelledEvent(351-360)backend/app/services/saga/base_saga.py (1)
BaseSaga(8-52)
backend/tests/integration/test_sse_routes.py (1)
backend/app/domain/events/typed.py (2)
EventMetadata(16-27)PodCreatedEvent(135-139)
backend/app/services/notification_service.py (1)
backend/app/domain/events/typed.py (3)
ExecutionCompletedEvent(95-101)ExecutionFailedEvent(104-112)ExecutionTimeoutEvent(115-121)
backend/tests/unit/services/saga/test_execution_saga_steps.py (1)
backend/app/domain/events/typed.py (1)
ExecutionRequestedEvent(48-62)
backend/tests/unit/events/test_metadata_model.py (2)
backend/app/domain/events/typed.py (1)
EventMetadata(16-27)backend/app/services/coordinator/queue_manager.py (1)
user_id(34-35)
backend/app/api/routes/dlq.py (3)
backend/tests/unit/services/idempotency/test_middleware.py (1)
event(30-34)backend/tests/integration/services/sse/test_redis_bus.py (1)
model_dump(27-28)backend/tests/unit/services/sse/test_kafka_redis_bridge.py (1)
model_dump(41-42)
backend/tests/integration/idempotency/test_consumer_idempotent.py (4)
backend/app/events/core/dispatcher.py (2)
EventDispatcher(15-177)register(36-57)backend/app/domain/enums/events.py (1)
EventType(4-87)backend/tests/helpers/events.py (1)
make_execution_requested_event(7-50)backend/app/events/core/producer.py (2)
producer(56-57)produce(111-148)
backend/app/services/coordinator/coordinator.py (6)
backend/app/domain/events/typed.py (4)
CreatePodCommandEvent(382-397)EventMetadata(16-27)ExecutionAcceptedEvent(65-70)ExecutionRequestedEvent(48-62)backend/app/events/core/dispatcher.py (1)
EventDispatcher(15-177)backend/app/events/core/consumer.py (1)
UnifiedConsumer(23-256)backend/app/events/core/producer.py (1)
UnifiedProducer(23-218)backend/app/events/event_store.py (1)
EventStore(19-314)backend/app/events/schema/schema_registry.py (1)
SchemaRegistryManager(35-119)
backend/tests/unit/services/saga/test_saga_step_and_base.py (2)
backend/app/domain/events/typed.py (2)
BaseEvent(30-42)EventMetadata(16-27)backend/app/domain/enums/events.py (1)
EventType(4-87)
backend/app/services/execution_service.py (1)
backend/app/domain/events/typed.py (3)
EventMetadata(16-27)ExecutionCancelledEvent(124-129)ExecutionRequestedEvent(48-62)
backend/tests/e2e/test_k8s_worker_create_pod.py (1)
backend/app/domain/events/typed.py (2)
CreatePodCommandEvent(382-397)EventMetadata(16-27)
backend/app/services/idempotency/middleware.py (3)
backend/tests/unit/events/test_event_dispatcher.py (2)
handler(37-38)handler(48-49)backend/app/services/idempotency/idempotency_manager.py (1)
IdempotencyManager(69-315)backend/tests/integration/idempotency/test_idempotency.py (1)
on_duplicate(351-352)
backend/app/events/event_store_consumer.py (2)
backend/app/services/saga/saga_orchestrator.py (1)
_handle_event(151-170)backend/tests/unit/services/idempotency/test_middleware.py (1)
event(30-34)
backend/app/services/result_processor/processor.py (4)
backend/app/domain/events/typed.py (4)
EventMetadata(16-27)ExecutionCompletedEvent(95-101)ExecutionFailedEvent(104-112)ExecutionTimeoutEvent(115-121)backend/app/domain/execution/exceptions.py (1)
ExecutionNotFoundError(4-8)backend/app/domain/execution/models.py (1)
ExecutionResultDomain(40-51)backend/app/events/core/dispatcher.py (1)
EventDispatcher(15-177)
backend/tests/helpers/events.py (1)
backend/app/domain/events/typed.py (2)
EventMetadata(16-27)ExecutionRequestedEvent(48-62)
backend/app/events/core/producer.py (1)
backend/app/events/schema/schema_registry.py (1)
serialize_event(70-78)
backend/tests/integration/result_processor/test_result_processor.py (2)
backend/app/domain/events/typed.py (3)
EventMetadata(16-27)ExecutionCompletedEvent(95-101)ResultStoredEvent(195-200)backend/app/domain/execution/models.py (1)
ResourceUsageDomain(13-19)
backend/app/dlq/models.py (2)
backend/app/core/utils.py (1)
StringEnum(6-31)backend/app/domain/enums/events.py (1)
EventType(4-87)
backend/tests/integration/services/events/test_event_bus.py (1)
backend/app/services/event_bus.py (2)
EventBusEvent(21-29)subscribe(162-188)
backend/app/db/docs/dlq.py (2)
backend/tests/unit/services/idempotency/test_middleware.py (1)
event(30-34)backend/app/dlq/models.py (1)
DLQMessageStatus(12-18)
backend/tests/unit/services/test_pod_builder.py (2)
backend/app/domain/events/typed.py (2)
CreatePodCommandEvent(382-397)EventMetadata(16-27)backend/app/services/coordinator/queue_manager.py (1)
user_id(34-35)
backend/app/api/routes/events.py (1)
backend/app/domain/events/typed.py (2)
BaseEvent(30-42)EventMetadata(16-27)
backend/tests/integration/services/sse/test_redis_bus.py (2)
backend/app/domain/events/typed.py (1)
EventMetadata(16-27)backend/tests/unit/services/sse/test_kafka_redis_bridge.py (1)
_make_metadata(32-33)
backend/tests/integration/dlq/test_dlq_manager.py (3)
backend/app/dlq/manager.py (1)
create_dlq_manager(443-480)backend/app/domain/enums/kafka.py (1)
KafkaTopic(7-54)backend/app/domain/events/typed.py (1)
DLQMessageReceivedEvent(522-532)
backend/tests/unit/services/saga/test_saga_comprehensive.py (2)
backend/app/domain/events/typed.py (1)
ExecutionRequestedEvent(48-62)backend/app/services/saga/saga_step.py (3)
SagaStep(73-100)execute(80-88)SagaContext(14-70)
backend/app/services/kafka_event_service.py (4)
backend/app/domain/events/typed.py (1)
EventMetadata(16-27)backend/app/db/repositories/event_repository.py (1)
store_event(39-52)backend/app/events/core/producer.py (2)
produce(111-148)metrics(52-53)backend/app/core/tracing/utils.py (1)
inject_trace_context(88-100)
backend/app/events/schema/schema_registry.py (1)
backend/app/domain/enums/events.py (1)
EventType(4-87)
backend/tests/unit/services/pod_monitor/test_monitor.py (2)
backend/app/domain/events/typed.py (3)
EventMetadata(16-27)ExecutionCompletedEvent(95-101)ExecutionStartedEvent(80-85)backend/app/events/core/producer.py (1)
produce(111-148)
backend/tests/integration/idempotency/test_decorator_idempotent.py (1)
backend/app/services/idempotency/middleware.py (1)
idempotent_handler(93-119)
backend/tests/unit/domain/events/test_event_schema_coverage.py (1)
backend/app/domain/events/typed.py (1)
BaseEvent(30-42)
backend/tests/integration/events/test_schema_registry_real.py (1)
backend/app/domain/events/typed.py (2)
EventMetadata(16-27)PodCreatedEvent(135-139)
backend/app/events/core/dispatcher.py (2)
backend/app/infrastructure/kafka/mappings.py (1)
get_event_class_for_type(91-93)backend/app/domain/enums/events.py (1)
EventType(4-87)
🪛 GitHub Actions: docs
backend/app/dlq/manager.py
[error] 26-26: ModuleNotFoundError: No module named 'app.infrastructure.kafka.events' while importing DLQManager. Step './deploy.sh openapi' failed with exit code 1.
🪛 GitHub Actions: MyPy Type Checking
backend/tests/unit/services/saga/test_saga_orchestrator_unit.py
[error] 30-30: Class cannot subclass "BaseEvent" (has type "Any")
backend/workers/dlq_processor.py
[error] 69-69: Returning Any from function declared to return "bool"
[error] 69-69: "DLQMessage" has no attribute "age_seconds"
backend/tests/unit/services/sse/test_kafka_redis_bridge.py
[error] 36-36: Class cannot subclass "BaseEvent" (has type "Any")
backend/app/services/notification_service.py
[error] 632-632: Item "None" of "ResourceUsageDomain | None" has no attribute "execution_time_wall_seconds"
backend/tests/unit/services/saga/test_saga_step_and_base.py
[error] 50-50: Argument 1 to "add_event" of "SagaContext" has incompatible type "BaseEvent"; expected one of ExecutionRequestedEvent | ExecutionAcceptedEvent | ExecutionQueuedEvent | ExecutionStartedEvent | ExecutionRunningEvent | <51 more items>
[error] 75-75: Type argument "BaseEvent" of "SagaStep" must be a subtype of "ExecutionRequestedEvent | ExecutionAcceptedEvent | ExecutionQueuedEvent | ExecutionStartedEvent | ExecutionRunningEvent | <51 more items>"
[error] 82-82: Type argument "BaseEvent" of "SagaStep" must be a subtype of "ExecutionRequestedEvent | ExecutionAcceptedEvent | ExecutionQueuedEvent | ExecutionStartedEvent | ExecutionRunningEvent | <51 more items>"
backend/app/services/result_processor/processor.py
[error] 178-178: Item "None" of "ResourceUsageDomain | None" has no attribute "execution_time_wall_seconds"
[error] 182-182: Item "None" of "ResourceUsageDomain | None" has no attribute "peak_memory_kb"
[error] 218-218: Argument 1 to "record_error" of "ExecutionMetrics" has incompatible type "ExecutionErrorType | None"; expected "str"
backend/tests/integration/services/sse/test_redis_bus.py
[error] 21-21: Class cannot subclass "BaseEvent" (has type "Any")
backend/tests/integration/dlq/test_dlq_manager.py
[error] 64-64: "ExecutionRequestedEvent" has no attribute "to_dict"
backend/tests/integration/events/test_schema_registry_real.py
[error] 24-24: "PodCreatedEvent" has no attribute "topic"
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Scan Backend
- GitHub Check: cubic · AI code reviewer
- GitHub Check: E2E Tests
| # Phase transition events for external coordination | ||
| self.initiated_event = asyncio.Event() # Set when shutdown initiated | ||
| self.notifying_event = asyncio.Event() # Set when entering notifying phase | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Phase transition events are never set within this class.
The new initiated_event and notifying_event are created but never called with .set() in the shutdown flow:
initiate_shutdown()(line 127) doesn't setinitiated_event_notify_connections()(line 185) doesn't setnotifying_event
If these are intended for external coordination, they should be set at the appropriate phase transitions.
Proposed fix
async def initiate_shutdown(self) -> None:
"""Initiate graceful shutdown of all SSE connections"""
async with self._lock:
if self._shutdown_initiated:
self.logger.warning("SSE shutdown already initiated")
return
self._shutdown_initiated = True
self._shutdown_start_time = time.time()
self._phase = ShutdownPhase.DRAINING
total_connections = sum(len(conns) for conns in self._active_connections.values())
self.logger.info(f"Initiating SSE shutdown with {total_connections} active connections")
self.metrics.update_sse_draining_connections(total_connections)
# Start shutdown process
self._shutdown_event.set()
+ self.initiated_event.set() async def _notify_connections(self) -> None:
"""Notify all active connections about shutdown"""
self._phase = ShutdownPhase.NOTIFYING
+ self.notifying_event.set()🤖 Prompt for AI Agents
In @backend/app/services/sse/sse_shutdown_manager.py around lines 67 - 70, The
new asyncio.Events initiated_event and notifying_event are never set; in
initiate_shutdown() call self.initiated_event.set() immediately after
transitioning state to the initiated/shutdown-started phase (so external
coordinators see shutdown begin), and in _notify_connections() call
self.notifying_event.set() right before or as you begin the notifying/broadcast
phase so external observers know notifying has started; ensure these calls are
safe to call idempotently (no blocking) and placed before any awaits that yield
control.
| # Await the subscription directly - true async, no polling | ||
| msg = await asyncio.wait_for(subscription.get(RedisSSEMessage), timeout=2.0) | ||
| assert msg is not None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential flaky test: asyncio.wait_for doesn't provide retry semantics here.
The subscription.get() method has an internal 0.5-second timeout (from redis_bus.py line 23). If no message arrives within that window, get() returns None immediately, and asyncio.wait_for completes successfully—it only guards against the coroutine hanging, not against None returns.
The 2.0-second timeout is misleading: the test effectively only waits 0.5 seconds for the message to arrive before msg becomes None and the assertion fails.
If the previous eventual() helper polled with retries, this change reduces test resilience to timing variations.
🔧 Consider adding a retry loop if timing issues arise
# Await the subscription directly - true async, no polling
- msg = await asyncio.wait_for(subscription.get(RedisSSEMessage), timeout=2.0)
- assert msg is not None
+ msg: RedisSSEMessage | None = None
+ deadline = asyncio.get_event_loop().time() + 2.0
+ while msg is None and asyncio.get_event_loop().time() < deadline:
+ msg = await subscription.get(RedisSSEMessage)
+ assert msg is not None, "Expected RedisSSEMessage within 2 seconds"Please confirm whether this single-call approach is reliable in your CI environment, or if the previous polling behavior was intentionally providing more robustness against timing issues.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Await the subscription directly - true async, no polling | |
| msg = await asyncio.wait_for(subscription.get(RedisSSEMessage), timeout=2.0) | |
| assert msg is not None | |
| # Await the subscription directly - true async, no polling | |
| msg: RedisSSEMessage | None = None | |
| deadline = asyncio.get_event_loop().time() + 2.0 | |
| while msg is None and asyncio.get_event_loop().time() < deadline: | |
| msg = await subscription.get(RedisSSEMessage) | |
| assert msg is not None, "Expected RedisSSEMessage within 2 seconds" |
🤖 Prompt for AI Agents
In @backend/tests/integration/services/sse/test_partitioned_event_router.py
around lines 49 - 51, The test is flaky because using
asyncio.wait_for(subscription.get(RedisSSEMessage), timeout=2.0) only protects
against hangs but subscription.get has an internal 0.5s timeout and can return
None; replace the single-call approach with a retry/poll loop (or reuse the
existing eventual() helper) that repeatedly calls
subscription.get(RedisSSEMessage) until a non-None message is returned or a
total timeout elapses, and then assert msg is not None; target the test code
around subscription.get, RedisSSEMessage, and asyncio.wait_for to implement the
retry semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (5)
backend/tests/integration/services/sse/test_redis_bus.py (1)
21-28: Fix type incompatibility causing pipeline failure.The MyPy error indicates
_DummyEventis not part of theDomainEventunion expected bypublish_event. After the unification, only specific domain event types are accepted.Two options to fix:
Use a real event type (preferred): Import and use an actual event class like
ExecutionCompletedEventinstead of_DummyEvent.Cast for test purposes: If the dummy event structure is intentional for isolation, cast at the call site.
Option 1: Use a real event type
-from app.domain.events.typed import EventMetadata -from app.domain.events.typed import BaseEvent +from app.domain.events.typed import EventMetadata, ExecutionCompletedEventThen replace the
_DummyEventusage in the test:-class _DummyEvent(BaseEvent): - """Dummy event for testing.""" - execution_id: str = "" - status: str | None = None - topic: ClassVar[KafkaTopic] = KafkaTopic.EXECUTION_EVENTS - - def model_dump(self, **kwargs: object) -> dict[str, Any]: - return {"execution_id": self.execution_id, "status": self.status}# Publish event - evt = _DummyEvent( - event_type=EventType.EXECUTION_COMPLETED, - metadata=_make_metadata(), - execution_id="exec-1", - status="completed" - ) + evt = ExecutionCompletedEvent( + metadata=_make_metadata(), + execution_id="exec-1", + )Option 2: Cast for test isolation
+from typing import cast +from app.domain.events.typed import DomainEvent- await bus.publish_event("exec-1", evt) + await bus.publish_event("exec-1", cast(DomainEvent, evt))Also applies to: 90-96
backend/tests/unit/services/saga/test_saga_orchestrator_unit.py (1)
30-44: Fix MyPy type errors:_FakeEventis not compatible with expectedDomainEventunion.The pipeline reports 4 MyPy errors because
_FakeEventis not part of theDomainEventtype union expected bySagaStepandSagaOrchestratormethods. The type system now enforces that only registered domain event types can be used.Options to resolve:
- Use an existing event type (e.g.,
ExecutionRequestedEvent) instead of a custom fake event- Add
# type: ignorecomments on the affected lines if test flexibility is more important than strict typing- Cast the fake event where needed:
cast(DomainEvent, _make_event(...))Option 2: Add type ignore comments
-class _StepOK(SagaStep[_FakeEvent]): +class _StepOK(SagaStep[_FakeEvent]): # type: ignore[type-arg]- def get_steps(self) -> list[SagaStep[_FakeEvent]]: + def get_steps(self) -> list[SagaStep[_FakeEvent]]: # type: ignore[type-arg]- await orch._handle_event(_make_event(EventType.EXECUTION_REQUESTED, "e")) + await orch._handle_event(_make_event(EventType.EXECUTION_REQUESTED, "e")) # type: ignore[arg-type]- assert orch._should_trigger_saga(_Saga, _make_event(EventType.EXECUTION_REQUESTED, "e")) is True + assert orch._should_trigger_saga(_Saga, _make_event(EventType.EXECUTION_REQUESTED, "e")) is True # type: ignore[arg-type]backend/tests/integration/dlq/test_dlq_manager.py (1)
63-70: Fix:to_dict()method does not exist on domain events.The pipeline failure indicates
ExecutionRequestedEventhas no attributeto_dict. Domain events now use Pydantic'smodel_dump()method.Proposed fix
payload = { - "event": ev.to_dict(), + "event": ev.model_dump(mode="json"), "original_topic": f"{prefix}{str(KafkaTopic.EXECUTION_EVENTS)}", "error": "handler failed", "retry_count": 0, "failed_at": datetime.now(timezone.utc).isoformat(), "producer_id": "tests", }backend/app/dlq/manager.py (2)
1-32: Fix: Import block is un-sorted.The Ruff linter reports unsorted imports. The
app.domain.events.typedimport should be grouped with otherapp.*imports in alphabetical order.Run
ruff check --fixto auto-fix, or manually reorder:Suggested import ordering
from app.core.lifecycle import LifecycleEnabled from app.core.metrics.context import get_dlq_metrics from app.core.tracing import EventAttributes from app.core.tracing.utils import extract_trace_context, get_tracer, inject_trace_context from app.db.docs import DLQMessageDocument from app.dlq.models import ( DLQBatchRetryResult, DLQMessage, DLQMessageStatus, DLQMessageUpdate, DLQRetryResult, RetryPolicy, RetryStrategy, ) from app.domain.enums.kafka import GroupId, KafkaTopic -from app.events.schema.schema_registry import SchemaRegistryManager from app.domain.events.typed import ( DLQMessageDiscardedEvent, DLQMessageReceivedEvent, DLQMessageRetriedEvent, EventMetadata, ) +from app.events.schema.schema_registry import SchemaRegistryManager from app.settings import Settings
206-255: Non-atomic dual-topic send could leave inconsistent state.The method sends to
retry_topicthenoriginal_topicsequentially. If the secondsend_and_waitfails after the first succeeds, the message appears in the retry topic but not the original, and the status update won't run.Consider wrapping in a try/except to handle partial failure, or at minimum updating status to reflect the partial state.
Suggested improvement
+ # Send to both topics - if second fails, we still want to track partial retry + sent_to_retry = False + sent_to_original = False # Send to retry topic - await self.producer.send_and_wait( - topic=retry_topic, - value=json.dumps(event.model_dump(mode="json")).encode(), - key=message.event.event_id.encode(), - headers=kafka_headers, - ) - - # Send to original topic - await self.producer.send_and_wait( - topic=message.original_topic, - value=json.dumps(event.model_dump(mode="json")).encode(), - key=message.event.event_id.encode(), - headers=kafka_headers, - ) + try: + await self.producer.send_and_wait( + topic=retry_topic, + value=json.dumps(event.model_dump(mode="json")).encode(), + key=message.event.event_id.encode(), + headers=kafka_headers, + ) + sent_to_retry = True + + await self.producer.send_and_wait( + topic=message.original_topic, + value=json.dumps(event.model_dump(mode="json")).encode(), + key=message.event.event_id.encode(), + headers=kafka_headers, + ) + sent_to_original = True + except Exception as e: + self.logger.error( + f"Partial retry failure: retry_topic={sent_to_retry}, original={sent_to_original}, error={e}", + extra={"event_id": message.event.event_id}, + ) + raise
🧹 Nitpick comments (4)
backend/tests/unit/services/saga/test_saga_orchestrator_unit.py (1)
12-17: Consider consolidating imports from the same module.Both
EventMetadata(line 12) andBaseEvent(line 17) are imported fromapp.domain.events.typed. These can be combined into a single import statement.Suggested fix
-from app.domain.events.typed import EventMetadata from app.domain.saga.models import Saga, SagaConfig from app.events.core import UnifiedProducer from app.events.event_store import EventStore from app.events.schema.schema_registry import SchemaRegistryManager -from app.domain.events.typed import BaseEvent +from app.domain.events.typed import BaseEvent, EventMetadatabackend/tests/integration/dlq/test_dlq_manager.py (1)
48-61: Consider logging more context on deserialization failures.The exception handler only logs at debug level, which could hide issues during test debugging. For test code, consider logging at warning level or including the raw message value.
Suggested improvement
except Exception as e: - _test_logger.debug(f"Error deserializing DLQ event: {e}") + _test_logger.warning(f"Error deserializing DLQ event: {e}, raw: {msg.value[:100] if msg.value else None}")backend/app/dlq/manager.py (2)
72-76: Consider defensive handling for header decoding.If a header value contains non-UTF-8 bytes, the
.decode()call will raiseUnicodeDecodeError. Consider usingerrors="replace"orerrors="ignore"for resilience.Suggested fix
def _kafka_msg_to_message(self, msg: Any) -> DLQMessage: """Parse Kafka ConsumerRecord into DLQMessage.""" data = json.loads(msg.value) - headers = {k: v.decode() for k, v in (msg.headers or [])} + headers = {k: v.decode(errors="replace") for k, v in (msg.headers or [])} return DLQMessage(**data, dlq_offset=msg.offset, dlq_partition=msg.partition, headers=headers)
107-138: Span attribute uses enum value instead of full topic name.Line 125 sets
EventAttributes.KAFKA_TOPIC: self.dlq_topicwhich is theKafkaTopicenum value, not the prefixed topic name. For consistency with how topics are referenced elsewhere, consider using the full topic name.Suggested fix
with get_tracer().start_as_current_span( name="dlq.consume", context=ctx, kind=SpanKind.CONSUMER, attributes={ - EventAttributes.KAFKA_TOPIC: self.dlq_topic, + EventAttributes.KAFKA_TOPIC: f"{self.settings.KAFKA_TOPIC_PREFIX}{self.dlq_topic}", EventAttributes.EVENT_TYPE: dlq_msg.event.event_type, EventAttributes.EVENT_ID: dlq_msg.event.event_id, },
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
backend/app/dlq/manager.pybackend/app/services/coordinator/queue_manager.pybackend/tests/integration/dlq/test_dlq_manager.pybackend/tests/integration/services/sse/test_redis_bus.pybackend/tests/unit/services/saga/test_execution_saga_steps.pybackend/tests/unit/services/saga/test_saga_orchestrator_unit.pybackend/tests/unit/services/sse/test_kafka_redis_bridge.py
✅ Files skipped from review due to trivial changes (1)
- backend/app/services/coordinator/queue_manager.py
🚧 Files skipped from review as they are similar to previous changes (2)
- backend/tests/unit/services/sse/test_kafka_redis_bridge.py
- backend/tests/unit/services/saga/test_execution_saga_steps.py
🧰 Additional context used
🧬 Code graph analysis (3)
backend/tests/integration/services/sse/test_redis_bus.py (2)
backend/app/domain/events/typed.py (2)
EventMetadata(16-27)BaseEvent(30-42)backend/tests/unit/services/sse/test_kafka_redis_bridge.py (1)
_make_metadata(32-33)
backend/tests/integration/dlq/test_dlq_manager.py (5)
backend/app/dlq/manager.py (1)
create_dlq_manager(443-480)backend/app/domain/enums/events.py (1)
EventType(4-87)backend/app/domain/enums/kafka.py (1)
KafkaTopic(7-54)backend/app/events/schema/schema_registry.py (2)
SchemaRegistryManager(35-119)deserialize_event(80-95)backend/app/domain/events/typed.py (1)
DLQMessageReceivedEvent(522-532)
backend/tests/unit/services/saga/test_saga_orchestrator_unit.py (1)
backend/app/domain/events/typed.py (2)
EventMetadata(16-27)BaseEvent(30-42)
🪛 GitHub Actions: MyPy Type Checking
backend/tests/integration/services/sse/test_redis_bus.py
[error] 96-96: Argument 2 to "publish_event" of "SSERedisBus" has incompatible type "_DummyEvent"; expected a union of ExecutionRequestedEvent | ExecutionAcceptedEvent | ExecutionQueuedEvent | ExecutionStartedEvent | ExecutionRunningEvent | <51 more items>
backend/tests/integration/dlq/test_dlq_manager.py
[error] 64-64: "ExecutionRequestedEvent" has no attribute "to_dict".
backend/tests/unit/services/saga/test_saga_orchestrator_unit.py
[error] 97-97: Type argument "_FakeEvent" of "SagaStep" must be a subtype of ExecutionRequestedEvent | ExecutionAcceptedEvent | ExecutionQueuedEvent | ExecutionStartedEvent | ExecutionRunningEvent | <51 more items>.
[error] 117-117: Type argument "_FakeEvent" of "SagaStep" must be a subtype of ExecutionRequestedEvent | ExecutionAcceptedEvent | ExecutionQueuedEvent | ExecutionStartedEvent | ExecutionRunningEvent | <51 more items>.
[error] 145-145: Argument 1 to "_handle_event" of "SagaOrchestrator" has incompatible type "_FakeEvent"; expected one of ExecutionRequestedEvent | ExecutionAcceptedEvent | ExecutionQueuedEvent | ExecutionStartedEvent | ExecutionRunningEvent | <51 more items>.
[error] 165-165: Argument 2 to "_should_trigger_saga" of "SagaOrchestrator" has incompatible type "_FakeEvent"; expected one of ExecutionRequestedEvent | ExecutionAcceptedEvent | ExecutionQueuedEvent | ExecutionStartedEvent | ExecutionRunningEvent | <51 more items>.
🪛 GitHub Actions: Ruff Linting
backend/app/dlq/manager.py
[error] 1-32: Ruff check failed: Import block is un-sorted or un-formatted. Found 1 error fixable with the --fix option. Tool: Ruff (Python linter). Run 'ruff --fix' to auto-fix imports.
🔇 Additional comments (10)
backend/tests/integration/services/sse/test_redis_bus.py (2)
11-12: LGTM on import migration.The imports are correctly updated to use the unified domain event model (
EventMetadataandBaseEventfromapp.domain.events.typed), consistent with the PR's objective to consolidate the event systems.
75-76: LGTM!The metadata factory correctly uses
EventMetadatawith the required fields, matching the pattern used in other test files (e.g.,test_kafka_redis_bridge.py).backend/tests/unit/services/saga/test_saga_orchestrator_unit.py (1)
40-42: LGTM!The
metadatafield correctly uses the newEventMetadatatype with appropriate default values for testing. The migration fromAvroEventMetadatatoEventMetadataaligns with the PR objectives.backend/tests/integration/dlq/test_dlq_manager.py (2)
1-14: Imports look correct for the new unified event model.The imports properly reference the new domain event types (
EventType,DLQMessageReceivedEvent) from the unifiedapp.domainpackages, aligning with the PR's migration objectives.
88-103: LGTM! Solid async test structure with proper cleanup.The test correctly:
- Starts the consumer before the manager to ensure message capture
- Uses
asyncio.wait_forwith timeout to prevent hangs- Properly cancels and cleans up the consumer task in the
finallyblock- Validates key fields of the emitted
DLQMessageReceivedEventbackend/app/dlq/manager.py (5)
170-183: Event emission failure is silently logged, not propagated.The
_emit_message_received_eventcall at line 183 will swallow exceptions (as seen in_produce_dlq_event). This is reasonable for telemetry events, but consider whether failed emission should trigger any compensating action or metric.The design choice to log-and-continue for emission failures is appropriate—DLQ storage is the primary concern, and emission is secondary telemetry.
322-372: LGTM! Clean event emission pattern.The three emission methods follow a consistent pattern:
- Create typed event with appropriate fields
- Delegate to
_produce_dlq_eventfor serialization and send- Silent failure with logging is appropriate for telemetry events
The use of
schema_registry.serialize_eventensures proper Avro serialization aligned with the unified event model.
275-304: LGTM! Monitor loop handles scheduled retries appropriately.The monitor:
- Queries with sensible limit (100 messages)
- Processes individually, allowing partial progress
- Has different sleep intervals for normal (10s) vs error (60s) states
- Failed retries remain SCHEDULED for next iteration
374-440: LGTM! Manual operations have proper state guards.Both
retry_message_manuallyanddiscard_message_manually:
- Correctly query using nested
{"event.event_id": event_id}path- Guard against terminal states (DISCARDED, RETRIED)
- Return boolean success indicator
The batch retry properly aggregates results without short-circuiting on individual failures.
443-480: LGTM! Factory creates well-configured Kafka clients.Producer configuration includes:
acks="all"for durabilityenable_idempotence=Truefor exactly-once semantics- Compression and batching for efficiency
Consumer configuration properly uses manual commit with earliest offset reset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 issue found across 14 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/tests/integration/db/repositories/test_dlq_repository.py">
<violation number="1" location="backend/tests/integration/db/repositories/test_dlq_repository.py:89">
P1: Test will fail: `msg.event.event_id` will be a random UUID, not `"id1"`. The test data setup passes `event_id="id1"` as a top-level field (which the new model ignores), but the `event` dict doesn't include `event_id`. The test data needs to include `"event_id": "id1"` inside each `event` dict.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/tests/integration/db/repositories/test_dlq_repository.py (1)
24-33: Test assertion on line 89 is broken and will fail.The assertion
msg.event.event_id == "id1"has two critical issues:
Incorrect dict access syntax: The
eventfield is adict[str, Any], so accessing it with attribute notation (msg.event.event_id) raisesAttributeError. Use dict key access instead:msg.event["event_id"]ormsg.event.get("event_id").Missing
event_idin fixture data: Even with correct access syntax, theeventdicts in test fixtures (lines 28-33, 44-49, 60-65) do not contain theevent_idkey.Add
"event_id"to each event dict in the fixtures, and fix the assertion to use proper dict key access.backend/workers/dlq_processor.py (1)
76-107: DLQ manager is never started - processing will not occur.The
main()function initializes the manager and configures policies/filters, but it never callsmanager.start()(or usesasync with manager). The function just waits onstop_eventindefinitely without starting any processing. TheDLQManagerinherits fromLifecycleEnabledand requires explicit startup.🐛 Proposed fix
async with AsyncExitStack() as stack: stack.push_async_callback(container.close) + await manager.start() + stack.push_async_callback(manager.stop) await stop_event.wait()Alternatively, use the context manager pattern:
async with AsyncExitStack() as stack: stack.push_async_callback(container.close) + await stack.enter_async_context(manager) await stop_event.wait()
🧹 Nitpick comments (8)
backend/app/services/result_processor/processor.py (1)
155-165: Consider using runtime checks instead of assert for type narrowing.The
assert isinstance(...)pattern works for type narrowing, butassertstatements are removed when Python runs with optimization flags (-O). If the dispatcher ever routes an incorrect event type in production with optimizations enabled, these assertions would be silently skipped, leading to attribute errors downstream.♻️ Suggested alternative using explicit checks
async def _handle_completed_wrapper(self, event: DomainEvent) -> None: - assert isinstance(event, ExecutionCompletedEvent) + if not isinstance(event, ExecutionCompletedEvent): + raise TypeError(f"Expected ExecutionCompletedEvent, got {type(event).__name__}") await self._handle_completed(event) async def _handle_failed_wrapper(self, event: DomainEvent) -> None: - assert isinstance(event, ExecutionFailedEvent) + if not isinstance(event, ExecutionFailedEvent): + raise TypeError(f"Expected ExecutionFailedEvent, got {type(event).__name__}") await self._handle_failed(event) async def _handle_timeout_wrapper(self, event: DomainEvent) -> None: - assert isinstance(event, ExecutionTimeoutEvent) + if not isinstance(event, ExecutionTimeoutEvent): + raise TypeError(f"Expected ExecutionTimeoutEvent, got {type(event).__name__}") await self._handle_timeout(event)backend/tests/integration/db/repositories/test_dlq_repository.py (1)
90-91: Assertions always pass regardless of return value.
assert await repo.mark_message_retried("id1") in (True, False)will always pass since any boolean is in(True, False). If the intent is to verify these methods don't raise exceptions, consider making that explicit. If you expect specific outcomes (e.g., success on first call), assert those values directly.💡 Suggested improvement
- assert await repo.mark_message_retried("id1") in (True, False) - assert await repo.mark_message_discarded("id1", "r") in (True, False) + # Verify methods execute without error; first call should succeed + result_retried = await repo.mark_message_retried("id1") + assert result_retried is True, "Expected successful retry mark" + result_discarded = await repo.mark_message_discarded("id1", "r") + # After retry, discard may or may not succeed depending on business logic + assert isinstance(result_discarded, bool)backend/tests/integration/dlq/test_dlq_manager.py (1)
48-61: Silent exception handling may hide deserialization issues.The
consume_dlq_eventsfunction catches all exceptions and logs atdebuglevel, which could mask real problems during test execution. Consider logging at a higher level or re-raising after a certain number of failures.♻️ Suggested improvement
except Exception as e: - _test_logger.debug(f"Error deserializing DLQ event: {e}") + _test_logger.warning(f"Error deserializing DLQ event: {e}")backend/app/dlq/manager.py (5)
107-138: Message commit occurs even if storage or event emission fails.In
_process_messages, the commit happens after_process_dlq_messagereturns, but_process_dlq_messagecould partially succeed (e.g., store the message but fail during event emission). If the emit fails silently (caught in_produce_dlq_event), the message is committed but the received event was never emitted. Consider whether this is the desired behavior.Also, if an exception occurs in
_process_dlq_message, the message is not committed (good), but the loop continues to the next message without any backoff, which could cause tight-loop failures.♻️ Consider adding backoff on repeated failures
except Exception as e: self.logger.error(f"Error processing DLQ message: {e}") + await asyncio.sleep(1) # Brief backoff to prevent tight-loop failures
170-183: Upsert pattern may have race condition.The
_store_messagemethod performs a find-then-save pattern which is not atomic. If two instances process the same message concurrently, both could find no existing document and attempt to insert, potentially causing duplicate key errors (though the unique index onevent.event_idwould prevent actual duplicates).Consider using
upsert=Truewithfind_one_and_updateor catching the duplicate key exception.♻️ Consider using Beanie's upsert capability
- existing = await DLQMessageDocument.find_one({"event.event_id": message.event.event_id}) - if existing: - doc.id = existing.id - await doc.save() + await DLQMessageDocument.find_one( + {"event.event_id": message.event.event_id} + ).upsert( + Set(message.model_dump()), + on_insert=doc + )Alternatively, catch
DuplicateKeyErrorand update:try: await doc.insert() except DuplicateKeyError: existing = await DLQMessageDocument.find_one({"event.event_id": message.event.event_id}) if existing: doc.id = existing.id await doc.save()
360-372: DLQ event emission failures are silently logged - consider retry or metrics.The
_produce_dlq_eventmethod catches all exceptions and only logs the error. For observability events, this is acceptable, but you lose visibility into emission failures. Consider:
- Recording a metric for emission failures
- Adding structured logging with event details
♻️ Add metrics for emission failures
except Exception as e: self.logger.error(f"Failed to emit DLQ event {event.event_type}: {e}") + self.metrics.record_dlq_event_emission_failure(str(event.event_type))
389-416: Batch retry processes sequentially - consider concurrency for large batches.The
retry_messages_batchmethod processes each event ID sequentially. For large batches, this could be slow. Consider usingasyncio.gatherwith a concurrency limit for better throughput.♻️ Concurrent batch processing with semaphore
async def retry_messages_batch(self, event_ids: list[str]) -> DLQBatchRetryResult: semaphore = asyncio.Semaphore(10) # Limit concurrency async def retry_with_limit(event_id: str) -> DLQRetryResult: async with semaphore: try: success = await self.retry_message_manually(event_id) return DLQRetryResult( event_id=event_id, status="success" if success else "failed", error=None if success else "Retry failed" ) except Exception as e: self.logger.error(f"Error retrying message {event_id}: {e}") return DLQRetryResult(event_id=event_id, status="failed", error=str(e)) results = await asyncio.gather(*[retry_with_limit(eid) for eid in event_ids]) successful = sum(1 for r in results if r.status == "success") return DLQBatchRetryResult( total=len(event_ids), successful=successful, failed=len(event_ids) - successful, details=list(results) )
69-70: Hardcoded service metadata - consider making configurable.The
_event_metadatauses hardcoded values forservice_nameandservice_version. Consider deriving these from settings or environment variables for better maintainability across deployments.♻️ Use settings for service metadata
- self._event_metadata = EventMetadata(service_name="dlq-manager", service_version="1.0.0") + self._event_metadata = EventMetadata( + service_name=getattr(settings, "SERVICE_NAME", "dlq-manager"), + service_version=getattr(settings, "SERVICE_VERSION", "1.0.0") + )
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (14)
backend/app/dlq/manager.pybackend/app/services/notification_service.pybackend/app/services/result_processor/processor.pybackend/tests/integration/db/repositories/test_dlq_repository.pybackend/tests/integration/dlq/test_dlq_manager.pybackend/tests/integration/events/test_producer_roundtrip.pybackend/tests/integration/events/test_schema_registry_real.pybackend/tests/integration/events/test_schema_registry_roundtrip.pybackend/tests/integration/services/sse/test_redis_bus.pybackend/tests/unit/services/saga/test_execution_saga_steps.pybackend/tests/unit/services/saga/test_saga_orchestrator_unit.pybackend/tests/unit/services/saga/test_saga_step_and_base.pybackend/tests/unit/services/sse/test_kafka_redis_bridge.pybackend/workers/dlq_processor.py
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/tests/integration/services/sse/test_redis_bus.py
🧰 Additional context used
🧬 Code graph analysis (12)
backend/tests/integration/db/repositories/test_dlq_repository.py (1)
backend/tests/unit/services/idempotency/test_middleware.py (1)
event(30-34)
backend/app/services/result_processor/processor.py (5)
backend/app/domain/events/typed.py (4)
EventMetadata(16-27)ExecutionCompletedEvent(95-101)ExecutionFailedEvent(104-112)ExecutionTimeoutEvent(115-121)backend/app/domain/execution/models.py (1)
ExecutionResultDomain(40-51)backend/app/events/core/dispatcher.py (1)
EventDispatcher(15-177)backend/app/events/schema/schema_registry.py (1)
SchemaRegistryManager(35-119)backend/app/core/metrics/execution.py (3)
record_execution_duration(68-69)record_memory_usage(77-78)record_error(80-81)
backend/tests/unit/services/saga/test_saga_step_and_base.py (2)
backend/app/domain/events/typed.py (2)
EventMetadata(16-27)SystemErrorEvent(497-502)backend/app/services/saga/saga_step.py (4)
SagaStep(73-100)execute(80-88)SagaContext(14-70)can_execute(95-97)
backend/app/services/notification_service.py (1)
backend/app/domain/events/typed.py (3)
ExecutionCompletedEvent(95-101)ExecutionFailedEvent(104-112)ExecutionTimeoutEvent(115-121)
backend/workers/dlq_processor.py (3)
backend/app/services/coordinator/queue_manager.py (1)
age_seconds(38-39)backend/app/dlq/manager.py (1)
add_filter(319-320)backend/app/db/docs/dlq.py (1)
Settings(34-43)
backend/tests/integration/events/test_producer_roundtrip.py (1)
backend/app/infrastructure/kafka/mappings.py (1)
get_topic_for_event(97-99)
backend/tests/unit/services/saga/test_saga_orchestrator_unit.py (3)
backend/app/domain/events/typed.py (1)
ExecutionRequestedEvent(48-62)backend/app/services/saga/saga_step.py (4)
CompensationStep(103-121)SagaContext(14-70)SagaStep(73-100)execute(80-88)backend/tests/helpers/events.py (1)
make_execution_requested_event(7-50)
backend/tests/integration/dlq/test_dlq_manager.py (6)
backend/app/dlq/manager.py (1)
create_dlq_manager(443-480)backend/app/domain/enums/events.py (1)
EventType(4-87)backend/app/domain/enums/kafka.py (1)
KafkaTopic(7-54)backend/app/domain/events/typed.py (1)
DLQMessageReceivedEvent(522-532)backend/app/settings.py (1)
Settings(11-169)backend/app/events/schema/schema_registry.py (2)
SchemaRegistryManager(35-119)deserialize_event(80-95)
backend/tests/unit/services/sse/test_kafka_redis_bridge.py (2)
backend/app/domain/events/typed.py (2)
EventMetadata(16-27)ExecutionStartedEvent(80-85)backend/app/services/sse/redis_bus.py (1)
publish_event(65-71)
backend/tests/integration/events/test_schema_registry_real.py (2)
backend/app/domain/events/typed.py (2)
EventMetadata(16-27)PodCreatedEvent(135-139)backend/app/infrastructure/kafka/mappings.py (1)
get_topic_for_event(97-99)
backend/tests/integration/events/test_schema_registry_roundtrip.py (2)
backend/app/infrastructure/kafka/mappings.py (1)
get_topic_for_event(97-99)backend/app/events/schema/schema_registry.py (1)
deserialize_event(80-95)
backend/tests/unit/services/saga/test_execution_saga_steps.py (1)
backend/app/domain/events/typed.py (1)
ExecutionRequestedEvent(48-62)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: cubic · AI code reviewer
- GitHub Check: Integration Tests
- GitHub Check: E2E Tests
🔇 Additional comments (32)
backend/tests/integration/events/test_schema_registry_roundtrip.py (3)
5-5: LGTM!Import correctly added to support centralized topic resolution via the mappings module.
23-24: LGTM!Topic derivation correctly updated to use the centralized
get_topic_for_eventmapping. Thestr()conversion is appropriate sinceget_topic_for_eventreturns aKafkaTopicenum whiledeserialize_eventexpects a string parameter.
27-28: LGTM!Good addition to exercise the idempotent initialization path of
initialize_schemas(). This helps ensure coverage of the lazy initialization behavior mentioned in the comment at line 19.backend/tests/integration/events/test_schema_registry_real.py (2)
4-6: LGTM!Imports correctly updated to use the unified domain event model. The switch from
AvroEventMetadatatoEventMetadataand sourcingPodCreatedEventfromapp.domain.events.typedaligns with the PR objective of consolidating to a single event system.
18-26: LGTM!Test correctly updated to use the unified event model:
EventMetadatareplacesAvroEventMetadatawith the same required fields- Topic derivation via
get_topic_for_event(ev.event_type)follows the new centralized mapping pattern- The
str()conversion appropriately handles theKafkaTopicenum for thedeserialize_eventstring parameterbackend/tests/unit/services/saga/test_execution_saga_steps.py (2)
3-3: LGTM!Import correctly updated from the infrastructure layer to the domain layer, aligning with the unified event model migration.
112-120: LGTM!The
_FakeProducercorrectly updated to useDomainEventtype hints instead ofBaseEvent, consistent with the unified event model. The minimal implementation (skipping parent__init__) is appropriate for test isolation.backend/tests/integration/events/test_producer_roundtrip.py (2)
7-7: LGTM!Import correctly reflects the migration to the centralized topic mapping helper.
35-36: LGTM!The topic derivation now correctly uses the centralized mapping via
get_topic_for_event, and the explicitstr()cast properly converts theKafkaTopicenum to the string expected bysend_to_dlq.backend/tests/unit/services/sse/test_kafka_redis_bridge.py (4)
7-7: LGTM!Import correctly updated to use the unified domain event types from
app.domain.events.typed, consistent with the PR's migration to the single event model.
19-26: LGTM!The
_FakeBustype hints correctly updated toDomainEvent, matching the productionSSERedisBus.publish_eventsignature.
29-30: LGTM!Helper correctly updated to use
EventMetadatawith the required fields. The rename fromAvroEventMetadataaligns with the PR migration.
54-61: LGTM!Good improvement: using the actual
ExecutionStartedEventtype instead of a custom dummy class provides more realistic test coverage and validates the real event structure works correctly with the routing handlers.backend/app/services/notification_service.py (4)
22-27: LGTM!The imports correctly reflect the migration to the unified
DomainEventmodel. All imported types are used appropriately within the file.
330-330: Verify that blocking on delivery is intentional.This change from fire-and-forget to awaiting
_deliver_notificationmeanscreate_notificationwill block until delivery completes. For webhook and Slack channels, this could add up to 30 seconds of latency (the configured timeout) if external services are slow.If immediate API response is desired, consider restoring async task dispatch or applying this only to the
IN_APPchannel.
631-632: LGTM!The defensive null-check on
event.resource_usagepreventsAttributeErrorwhen the field isNone, with a sensible fallback to0.0.
644-656: LGTM!The unified handler correctly uses
isinstancechecks to narrow the genericDomainEventto specific typed handlers, with appropriate fallback logging for unhandled types and exception handling.backend/tests/unit/services/saga/test_saga_step_and_base.py (3)
6-6: LGTM!Import correctly updated to use the new domain-typed event system.
46-51: LGTM!Test event construction correctly uses
SystemErrorEventwith proper required fields (error_type,message,service_name) and embeddedEventMetadata. This aligns with the unified event model.
77-78: LGTM!Type annotations consistently updated to
SystemErrorEventacross the saga step definition, execute signature, and mock spec. The changes align with the broader migration to domain-typed events.Also applies to: 84-86, 94-94
backend/tests/unit/services/saga/test_saga_orchestrator_unit.py (4)
9-9: LGTM!Imports correctly updated to use
DomainEventandExecutionRequestedEventfrom the domain layer, with the test helpermake_execution_requested_eventproperly separated as a local import.Also applies to: 19-20
48-51: LGTM!The fake producer signature correctly accepts
DomainEvent, matching the updatedUnifiedProducerinterface.
78-86: LGTM!
_StepOKand_Sagacorrectly parameterized withExecutionRequestedEvent. Theexecutemethod signature matches theSagaStepabstract interface.Also applies to: 98-99
122-122: LGTM!Consistent use of
make_execution_requested_eventhelper withexecution_idoverride. This is cleaner than the previous approach and leverages the test helper's sensible defaults.Also applies to: 142-142, 146-146
backend/app/services/result_processor/processor.py (3)
15-26: LGTM!Imports correctly updated to include
DomainEventandEventMetadatafrom the unified domain events module.
178-192: LGTM on conditional metrics; acknowledge the brittle memory limit parsing.The conditional check for
resource_usageis a good defensive change that preventsAttributeErrorwhen resource data is unavailable.However, line 188's parsing of
K8S_POD_MEMORY_LIMITis fragile as noted in the TODO. It assumes the format ends with"Mi", but will fail for:
"256Gi"→int("256G")→ValueError"256M"→int("256")but semantically wrong (megabytes vs mebibytes)"268435456"(raw bytes) →int("268435456")→ incorrect valueConsider extracting a utility function to parse Kubernetes resource quantities properly.
219-219: LGTM!Reasonable fallback to
"unknown"whenerror_typeis not present, ensuring metrics recording doesn't fail.backend/workers/dlq_processor.py (1)
68-71: Age calculation approach is correct.The switch to explicit datetime calculation using
datetime.now(timezone.utc) - message.failed_atis appropriate sinceDLQMessagedoesn't have anage_secondsproperty (that exists inQueuedItemfrom the queue_manager, per the relevant snippets).backend/tests/integration/dlq/test_dlq_manager.py (2)
84-103: Good async-first test pattern using Future instead of polling.The approach of using
asyncio.Futurewithasyncio.wait_foris cleaner than polling-based verification. The cleanup in thefinallyblock properly cancels the consumer task and stops the consumer.
27-28: No changes needed. Beanie is properly initialized through the app's lifespan context, which runs when theappfixture is created. Although the test doesn't explicitly request thedbfixture, theautouse_cleanupfixture inbackend/tests/integration/conftest.pyrequires it, which establishes the dependency chain:_cleanup→db→scope→app_container→app(with lifespan). This ensuresinit_beanieis called at app startup before the test runs.Likely an incorrect or invalid review comment.
backend/app/dlq/manager.py (2)
206-255: Retry sends to both retry and original topics - potential duplicate processing.The
_retry_messagemethod sends the event to both a retry topic (line 222-227) and the original topic (line 230-235). This design choice means:
- The retry topic serves as an audit/monitoring trail
- The original topic gets the actual retry
This is valid if intentional, but consumers on the original topic should be idempotent since they'll receive the same event again.
Confirm this dual-publish behavior is intentional for the retry flow. The original consumers should handle idempotency to prevent duplicate processing.
322-358: DLQ lifecycle event emission methods are well-structured.The three emit methods (
_emit_message_received_event,_emit_message_retried_event,_emit_message_discarded_event) follow a consistent pattern and correctly construct the domain events with proper metadata. The common_produce_dlq_eventmethod handles serialization and publishing with error resilience.
| def _kafka_msg_to_message(self, msg: Any) -> DLQMessage: | ||
| """Parse Kafka ConsumerRecord into DLQMessage.""" | ||
| raw_bytes = msg.value | ||
| raw: str = raw_bytes.decode("utf-8") if isinstance(raw_bytes, (bytes, bytearray)) else str(raw_bytes or "") | ||
| data: dict[str, Any] = json.loads(raw) if raw else {} | ||
|
|
||
| headers_list = msg.headers or [] | ||
| headers: dict[str, str] = {} | ||
| for k, v in headers_list: | ||
| headers[str(k)] = v.decode("utf-8") if isinstance(v, (bytes, bytearray)) else (v or "") | ||
|
|
||
| event = self.schema_registry.deserialize_json(data.get("event", data)) | ||
|
|
||
| return DLQMessage( | ||
| event_id=data.get("event_id", event.event_id), | ||
| event=event, | ||
| event_type=event.event_type, | ||
| original_topic=data.get("original_topic", headers.get("original_topic", "")), | ||
| error=data.get("error", headers.get("error", "Unknown error")), | ||
| retry_count=data.get("retry_count", int(headers.get("retry_count", 0))), | ||
| failed_at=datetime.fromisoformat(data["failed_at"]) | ||
| if data.get("failed_at") | ||
| else datetime.now(timezone.utc), | ||
| status=DLQMessageStatus(data.get("status", DLQMessageStatus.PENDING)), | ||
| producer_id=data.get("producer_id", headers.get("producer_id", "unknown")), | ||
| dlq_offset=msg.offset, | ||
| dlq_partition=msg.partition, | ||
| headers=headers, | ||
| ) | ||
| data = json.loads(msg.value) | ||
| headers = {k: v.decode() for k, v in (msg.headers or [])} | ||
| return DLQMessage(**data, dlq_offset=msg.offset, dlq_partition=msg.partition, headers=headers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JSON parsing lacks error handling for malformed messages.
The _kafka_msg_to_message method directly calls json.loads and DLQMessage(**data, ...) without any exception handling. Malformed JSON or missing required fields will raise exceptions that bubble up and could disrupt message processing.
🐛 Proposed fix with validation
def _kafka_msg_to_message(self, msg: Any) -> DLQMessage:
"""Parse Kafka ConsumerRecord into DLQMessage."""
- data = json.loads(msg.value)
- headers = {k: v.decode() for k, v in (msg.headers or [])}
- return DLQMessage(**data, dlq_offset=msg.offset, dlq_partition=msg.partition, headers=headers)
+ try:
+ data = json.loads(msg.value)
+ headers = {k: v.decode() for k, v in (msg.headers or [])}
+ return DLQMessage(**data, dlq_offset=msg.offset, dlq_partition=msg.partition, headers=headers)
+ except (json.JSONDecodeError, TypeError, KeyError) as e:
+ raise ValueError(f"Failed to parse DLQ message: {e}") from e📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _kafka_msg_to_message(self, msg: Any) -> DLQMessage: | |
| """Parse Kafka ConsumerRecord into DLQMessage.""" | |
| raw_bytes = msg.value | |
| raw: str = raw_bytes.decode("utf-8") if isinstance(raw_bytes, (bytes, bytearray)) else str(raw_bytes or "") | |
| data: dict[str, Any] = json.loads(raw) if raw else {} | |
| headers_list = msg.headers or [] | |
| headers: dict[str, str] = {} | |
| for k, v in headers_list: | |
| headers[str(k)] = v.decode("utf-8") if isinstance(v, (bytes, bytearray)) else (v or "") | |
| event = self.schema_registry.deserialize_json(data.get("event", data)) | |
| return DLQMessage( | |
| event_id=data.get("event_id", event.event_id), | |
| event=event, | |
| event_type=event.event_type, | |
| original_topic=data.get("original_topic", headers.get("original_topic", "")), | |
| error=data.get("error", headers.get("error", "Unknown error")), | |
| retry_count=data.get("retry_count", int(headers.get("retry_count", 0))), | |
| failed_at=datetime.fromisoformat(data["failed_at"]) | |
| if data.get("failed_at") | |
| else datetime.now(timezone.utc), | |
| status=DLQMessageStatus(data.get("status", DLQMessageStatus.PENDING)), | |
| producer_id=data.get("producer_id", headers.get("producer_id", "unknown")), | |
| dlq_offset=msg.offset, | |
| dlq_partition=msg.partition, | |
| headers=headers, | |
| ) | |
| data = json.loads(msg.value) | |
| headers = {k: v.decode() for k, v in (msg.headers or [])} | |
| return DLQMessage(**data, dlq_offset=msg.offset, dlq_partition=msg.partition, headers=headers) | |
| def _kafka_msg_to_message(self, msg: Any) -> DLQMessage: | |
| """Parse Kafka ConsumerRecord into DLQMessage.""" | |
| try: | |
| data = json.loads(msg.value) | |
| headers = {k: v.decode() for k, v in (msg.headers or [])} | |
| return DLQMessage(**data, dlq_offset=msg.offset, dlq_partition=msg.partition, headers=headers) | |
| except (json.JSONDecodeError, TypeError, KeyError) as e: | |
| raise ValueError(f"Failed to parse DLQ message: {e}") from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @backend/app/db/repositories/dlq_repository.py:
- Around line 26-32: Update usages that assume a top-level event_type to
reference the nested event object: in the aggregation pipeline change the group
key from "$event_type" to "$event.event_type" so grouping uses the nested
DomainEvent field, and in query conditions replace any
DLQMessageDocument.event_type references with
DLQMessageDocument.event.event_type (e.g., in the code that builds filters or
queries) so they access the nested event.event_type property.
🧹 Nitpick comments (1)
backend/app/domain/events/typed.py (1)
16-27: Consider consistency:use_enum_values=Truepresent here but missing inBaseEvent.
EventMetadatahasuse_enum_values=Truein itsConfigDict, butBaseEvent(line 33) only hasfrom_attributes=True. If enums should serialize consistently across all event types, consider addinguse_enum_values=TruetoBaseEventas well to ensure uniform serialization behavior.♻️ Optional: Add use_enum_values for consistency
class BaseEvent(AvroBase): """Base fields for all domain events.""" - model_config = ConfigDict(from_attributes=True) + model_config = ConfigDict(from_attributes=True, use_enum_values=True)
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
backend/app/db/repositories/dlq_repository.pybackend/app/domain/events/typed.pybackend/app/services/idempotency/idempotency_manager.py
🧰 Additional context used
🧬 Code graph analysis (3)
backend/app/db/repositories/dlq_repository.py (2)
backend/tests/unit/services/idempotency/test_middleware.py (1)
event(30-34)backend/app/infrastructure/kafka/mappings.py (1)
get_event_class_for_type(91-93)
backend/app/domain/events/typed.py (1)
backend/app/domain/enums/events.py (1)
EventType(4-87)
backend/app/services/idempotency/idempotency_manager.py (1)
backend/app/domain/events/typed.py (1)
BaseEvent(30-40)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: cubic · AI code reviewer
- GitHub Check: Integration Tests
- GitHub Check: E2E Tests
🔇 Additional comments (7)
backend/app/domain/events/typed.py (5)
1-6: LGTM - Clean migration to AvroBase.The import restructuring correctly brings in
AvroBasefrompydantic_avroto enable Avro schema generation across all domain events.
30-40: LGTM - BaseEvent properly extends AvroBase.The
event_type: EventTypefield is correctly added as a required discriminator field, enabling polymorphic deserialization via theDomainEventunion.
517-553: LGTM - Well-structured DLQ event classes.The three new DLQ event types (
DLQMessageReceivedEvent,DLQMessageRetriedEvent,DLQMessageDiscardedEvent) follow the established pattern with appropriate fields and correctLiteraldiscriminators matching theEventTypeenum values fromapp.domain.enums.events.
558-574: Intentional design:ArchivedEventduplicates base fields rather than inheriting.
ArchivedEventextendsAvroBasedirectly and duplicates fields fromBaseEvent. This appears intentional since it's a wrapper for archived event data with additional archive-specific metadata (deleted_at,deleted_by,deletion_reason). The separate hierarchy avoidsArchivedEventbeing part of theDomainEventunion.
643-649: LGTM - DomainEvent union correctly updated.The discriminated union now includes all three new DLQ event types, maintaining the
event_typediscriminator for polymorphic deserialization.backend/app/services/idempotency/idempotency_manager.py (2)
12-12: LGTM - Import aligned with unified event model.The import path change from the infrastructure layer to
app.domain.events.typedaligns with the PR's goal of consolidating to a single domain event system.
43-53: LGTM -mode="json"ensures consistent content hashing.Using
model_dump(mode="json")correctly serializes complex types (datetime, enums, UUIDs) to their JSON string representations before hashing. This prevents hash inconsistencies that could occur from serializing Python objects directly.
| def _doc_to_message(self, doc: DLQMessageDocument) -> DLQMessage: | ||
| event_class = get_event_class_for_type(doc.event_type) | ||
| event_type = doc.event.event_type | ||
| event_class = get_event_class_for_type(event_type) | ||
| if not event_class: | ||
| raise ValueError(f"Unknown event type: {doc.event_type}") | ||
| raise ValueError(f"Unknown event type: {event_type}") | ||
| data = doc.model_dump(exclude={"id", "revision_id"}) | ||
| return DLQMessage(**{**data, "event": event_class(**data["event"])}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, find the DLQMessageDocument class definition
rg -n "class DLQMessageDocument" --type py -A 15Repository: HardMax71/Integr8sCode
Length of output: 1433
🏁 Script executed:
# Check the dlq_repository.py file to see the full context, especially around lines 62 and 110
cat -n backend/app/db/repositories/dlq_repository.py | sed -n '1,120p'Repository: HardMax71/Integr8sCode
Length of output: 5482
🏁 Script executed:
# Search for any other references to event_type or event field in the file
rg -n "event_type\|\.event\." backend/app/db/repositories/dlq_repository.py --type pyRepository: HardMax71/Integr8sCode
Length of output: 48
Update aggregation pipeline and query conditions to use the nested event structure.
The event field is a nested DomainEvent object. Line 62's aggregation pipeline uses "$event_type" (flat path) but should use "$event.event_type" to access the nested field. Similarly, line 110's query uses DLQMessageDocument.event_type, which doesn't exist at the top level—it should be DLQMessageDocument.event.event_type.
- Line 62: Change
"$event_type"to"$event.event_type"in the$groupoperator - Line 110: Change
DLQMessageDocument.event_typetoDLQMessageDocument.event.event_type
🤖 Prompt for AI Agents
In @backend/app/db/repositories/dlq_repository.py around lines 26 - 32, Update
usages that assume a top-level event_type to reference the nested event object:
in the aggregation pipeline change the group key from "$event_type" to
"$event.event_type" so grouping uses the nested DomainEvent field, and in query
conditions replace any DLQMessageDocument.event_type references with
DLQMessageDocument.event.event_type (e.g., in the code that builds filters or
queries) so they access the nested event.event_type property.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/app/db/repositories/dlq_repository.py (2)
61-69: Aggregation pipeline uses incorrect field path for event_type.The pipeline groups by
"$event_type"(top-level), but according to theDLQMessageDocumentschema,event_typeis now nested under theeventfield. This should be"$event.event_type"to match the new document structure.Proposed fix
# Get counts by event type event_type_pipeline: list[Mapping[str, object]] = [ - {"$group": {"_id": "$event_type", "count": {"$sum": 1}}}, + {"$group": {"_id": "$event.event_type", "count": {"$sum": 1}}}, {"$sort": {"count": -1}}, {"$limit": 10}, ]
107-111: Filter uses non-existent top-levelevent_typeattribute.
DLQMessageDocument.event_typereferences a top-level field that no longer exists. According to the document schema,event_typeis nested underevent. This condition will either raise anAttributeErroror silently fail to match documents.Proposed fix
conditions: list[Any] = [ DLQMessageDocument.status == status if status else None, DLQMessageDocument.original_topic == topic if topic else None, - DLQMessageDocument.event_type == event_type if event_type else None, + {"event.event_type": str(event_type)} if event_type else None, ]Note: Beanie's
find()accepts raw query dicts mixed with ODM expressions. Alternatively, verify if Beanie 2.0.1 supports nested attribute access likeDLQMessageDocument.event.event_type.
🧹 Nitpick comments (1)
backend/tests/integration/dlq/test_dlq_discard.py (1)
19-44: Consider extracting the shared helper to reduce duplication.The
_create_dlq_documenthelper is identical to the one intest_dlq_retry.py. Consider extracting it to a shared test utility (e.g., intests/helpers/) to avoid maintaining duplicate code.# In tests/helpers/dlq.py or similar async def create_test_dlq_document( event_id: str | None = None, status: DLQMessageStatus = DLQMessageStatus.PENDING, ) -> DLQMessageDocument: ...
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
backend/app/db/repositories/dlq_repository.pybackend/tests/integration/db/repositories/test_dlq_repository.pybackend/tests/integration/dlq/test_dlq_discard.pybackend/tests/integration/dlq/test_dlq_retry.py
🧰 Additional context used
🧬 Code graph analysis (4)
backend/tests/integration/dlq/test_dlq_retry.py (2)
backend/tests/unit/services/idempotency/test_middleware.py (1)
event(30-34)backend/app/db/docs/dlq.py (1)
DLQMessageDocument(11-43)
backend/app/db/repositories/dlq_repository.py (3)
backend/tests/unit/services/idempotency/test_middleware.py (1)
event(30-34)backend/app/infrastructure/kafka/mappings.py (1)
get_event_class_for_type(91-93)backend/app/db/docs/dlq.py (1)
DLQMessageDocument(11-43)
backend/tests/integration/dlq/test_dlq_discard.py (2)
backend/tests/unit/services/idempotency/test_middleware.py (1)
event(30-34)backend/app/db/docs/dlq.py (1)
DLQMessageDocument(11-43)
backend/tests/integration/db/repositories/test_dlq_repository.py (1)
backend/tests/unit/services/idempotency/test_middleware.py (1)
event(30-34)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: cubic · AI code reviewer
- GitHub Check: E2E Tests
- GitHub Check: Integration Tests
🔇 Additional comments (7)
backend/app/db/repositories/dlq_repository.py (2)
26-32: LGTM!The
_doc_to_messagemethod correctly extractsevent_typefrom the nesteddoc.event.event_typeand reconstructs the event using the appropriate event class. The error message now properly references the localevent_typevariable.
125-127: LGTM!The query pattern
{"event.event_id": event_id}is consistently applied acrossget_message_by_id,mark_message_retried, andmark_message_discarded. This aligns with the new nested event structure and the index defined onevent.event_id.Also applies to: 165-174, 176-186
backend/tests/integration/db/repositories/test_dlq_repository.py (2)
24-69: LGTM!Test documents are correctly constructed with the nested
eventdictionary structure containingevent_id,event_type,metadata, and event-specific fields. This matches the newDLQMessageDocumentschema.
85-86: LGTM!The assertion correctly validates the nested event structure by accessing
msg.event.event_id.backend/tests/integration/dlq/test_dlq_retry.py (2)
27-44: LGTM!The helper correctly constructs a DLQ document by:
- Creating a valid event via
make_execution_requested_event- Dumping to dict and overriding
event_idfor test predictability- Passing the dict to
DLQMessageDocument(event=event_dict, ...)This approach maintains type consistency while allowing controlled test data.
62-65: LGTM!All
find_onequeries consistently use{"event.event_id": event_id}to match the new document structure. The test assertions properly validate status transitions, timestamps, and reason fields.Also applies to: 94-97, 115-117, 158-160, 181-183, 201-204, 222-225
backend/tests/integration/dlq/test_dlq_discard.py (1)
63-67: LGTM!All
find_onequeries consistently use{"event.event_id": event_id}. The tests comprehensively cover discard scenarios including status transitions, timestamps, and custom reasons.Also applies to: 99-102, 119-121, 139-141
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
backend/app/db/repositories/user_settings_repository.py (1)
73-78: Delete orphanedUserSettingsDocumentrecords indelete_user_settings.The
delete_user_settingsmethod deletes fromUserSettingsSnapshotDocumentandEventDocument, but not fromUserSettingsDocument—the primary settings collection wherecreate_snapshotandget_snapshotoperate. This leaves orphanedUserSettingsDocumentrecords when a user is deleted.Add deletion of
UserSettingsDocumentalongside the snapshot and event deletions:Suggested fix
async def delete_user_settings(self, user_id: str) -> None: doc = await UserSettingsDocument.find_one(UserSettingsDocument.user_id == user_id) if doc: await doc.delete() await UserSettingsSnapshotDocument.find(UserSettingsSnapshotDocument.user_id == user_id).delete() await EventDocument.find(EventDocument.aggregate_id == f"user_settings_{user_id}").delete() self.logger.info(f"Deleted all settings data for user {user_id}")backend/app/db/repositories/resource_allocation_repository.py (1)
23-28: The TOCTOU race is real, but the suggested fix is incomplete; useupdate_one()for true atomicity.Two concurrent callers can both read the same doc in an "active" state and both update it to "released"—the check-then-act pattern is inherently racy. Adding a status filter to the query is a good step, but the suggested code still uses
doc.set(), which is a separate operation after theifcheck and does not close the race window.For atomic conditional updates with Beanie 2.0.1, use
find_one(...filters...).update_one()instead:Atomic fix with Beanie 2.0.1 API
from beanie.operators import Set async def release_allocation(self, allocation_id: str) -> bool: now = datetime.now(timezone.utc) result = await ResourceAllocationDocument.find_one( (ResourceAllocationDocument.allocation_id == allocation_id) & (ResourceAllocationDocument.status == "active") ).update_one( Set({ ResourceAllocationDocument.status: "released", ResourceAllocationDocument.released_at: now }) ) return result.modified_count > 0This ensures the status filter and update happen in a single server-side operation, eliminating the race.
backend/app/db/repositories/user_repository.py (1)
29-29: Type annotation mismatch with field expressions in conditions list.At line 29,
conditionsis typed aslist[BaseFindOperator], but at line 41 it appendsUserDocument.role == role, which is a field expression, not aBaseFindOperator. Whilefind()accepts both types at runtime, this type annotation is incomplete. Line 34–37 appendsOr(...)which does conform toBaseFindOperator, creating a heterogeneous list that violates the declared type.Suggested fix
- conditions: list[BaseFindOperator] = [] + conditions: list[BaseFindOperator | Any] = []Alternatively, use
list[Any]for simplicity, or investigate whether Beanie exports a more specific union type that encompasses both field expressions and operators.
🧹 Nitpick comments (4)
backend/app/db/repositories/notification_repository.py (1)
202-213: Consider batching the subscription queries to avoid N+1 pattern.Currently makes one database query per channel. While the number of channels is likely small, fetching all subscriptions in a single query would be more efficient.
♻️ Suggested optimization
async def get_all_subscriptions(self, user_id: str) -> dict[NotificationChannel, DomainNotificationSubscription]: subs: dict[NotificationChannel, DomainNotificationSubscription] = {} - for channel in NotificationChannel: - doc = await NotificationSubscriptionDocument.find_one( - NotificationSubscriptionDocument.user_id == user_id, - NotificationSubscriptionDocument.channel == channel, - ) - if doc: - subs[channel] = DomainNotificationSubscription.model_validate(doc, from_attributes=True) - else: - subs[channel] = DomainNotificationSubscription(user_id=user_id, channel=channel, enabled=True) + docs = await NotificationSubscriptionDocument.find( + NotificationSubscriptionDocument.user_id == user_id, + ).to_list() + existing = {doc.channel: doc for doc in docs} + for channel in NotificationChannel: + if channel in existing: + subs[channel] = DomainNotificationSubscription.model_validate(existing[channel], from_attributes=True) + else: + subs[channel] = DomainNotificationSubscription(user_id=user_id, channel=channel, enabled=True) return subsbackend/app/db/repositories/admin/admin_events_repository.py (1)
71-73: Consider using Beanie operators for consistency.This query still uses dict syntax while the surrounding code now uses Beanie expressions. For full consistency, you could use Beanie's
NEoperator:from beanie.operators import NE related_query = EventDocument.find( EventDocument.metadata.correlation_id == doc.metadata.correlation_id, NE(EventDocument.event_id, event_id) )However, the current dict-based approach works correctly and this is purely a stylistic consideration.
backend/app/db/repositories/resource_allocation_repository.py (1)
9-13: Multi-argfind()syntax is correct in Beanie 2.0.1. Beanie'sfind()method explicitly accepts multiple filter expressions as separate positional arguments, so the code at lines 10–13 is valid and requires no changes.The
release_allocation()method (lines 23–28) follows a read-then-write pattern without an atomic check. If the document is deleted or modified between thefind_one()anddoc.set()calls, the update may fail or succeed silently depending on Beanie's behavior. Consider guarding the release with a status check (e.g., only release if currently "active") or using an atomic update if the business logic requires it, otherwise document that releases are intentionally idempotent.backend/app/db/repositories/admin/admin_user_repository.py (1)
4-5: Type annotation may be inaccurate for Beanie 2.0 expression API.In Beanie 2.0, direct field equality expressions (e.g.,
UserDocument.role == role) return expression objects that may not beBaseFindOperatorinstances. Sincefind()acceptsMapping | boolas well, this works at runtime but the type hint could be more accurate.Consider using a broader type or
Anyfor the conditions list if strict type checking is enabled.Also applies to: 33-33
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
backend/app/db/repositories/admin/admin_events_repository.pybackend/app/db/repositories/admin/admin_settings_repository.pybackend/app/db/repositories/admin/admin_user_repository.pybackend/app/db/repositories/event_repository.pybackend/app/db/repositories/execution_repository.pybackend/app/db/repositories/notification_repository.pybackend/app/db/repositories/replay_repository.pybackend/app/db/repositories/resource_allocation_repository.pybackend/app/db/repositories/saga_repository.pybackend/app/db/repositories/saved_script_repository.pybackend/app/db/repositories/sse_repository.pybackend/app/db/repositories/user_repository.pybackend/app/db/repositories/user_settings_repository.py
🧰 Additional context used
🧬 Code graph analysis (10)
backend/app/db/repositories/event_repository.py (2)
backend/app/db/docs/event.py (1)
EventDocument(13-71)backend/app/services/coordinator/queue_manager.py (1)
execution_id(30-31)
backend/app/db/repositories/admin/admin_user_repository.py (2)
backend/app/domain/user/user_models.py (2)
User(44-57)UserUpdate(60-80)backend/app/db/repositories/user_repository.py (1)
update_user(53-62)
backend/app/db/repositories/saved_script_repository.py (3)
backend/app/api/routes/saved_scripts.py (1)
list_saved_scripts(31-38)backend/app/services/saved_script_service.py (1)
list_saved_scripts(104-118)backend/app/domain/saved_script/models.py (1)
DomainSavedScript(21-28)
backend/app/db/repositories/sse_repository.py (1)
backend/app/services/coordinator/queue_manager.py (1)
execution_id(30-31)
backend/app/db/repositories/replay_repository.py (2)
backend/app/services/event_replay/replay_service.py (1)
get_session(324-325)backend/app/domain/replay/models.py (1)
ReplaySessionState(119-146)
backend/app/db/repositories/admin/admin_events_repository.py (1)
backend/app/db/docs/event.py (1)
EventDocument(13-71)
backend/app/db/repositories/notification_repository.py (1)
backend/app/domain/notification/models.py (1)
DomainNotification(16-45)
backend/app/db/repositories/user_repository.py (1)
backend/app/services/coordinator/queue_manager.py (1)
user_id(34-35)
backend/app/db/repositories/execution_repository.py (1)
backend/app/services/coordinator/queue_manager.py (1)
execution_id(30-31)
backend/app/db/repositories/user_settings_repository.py (2)
backend/app/services/coordinator/queue_manager.py (1)
user_id(34-35)backend/app/domain/user/settings_models.py (1)
DomainUserSettings(34-47)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: cubic · AI code reviewer
- GitHub Check: Integration Tests
- GitHub Check: E2E Tests
🔇 Additional comments (37)
backend/app/db/repositories/sse_repository.py (2)
9-17: LGTM!The migration from dict-style filter to Beanie expression-based query (
ExecutionDocument.execution_id == execution_id) is correct and follows Beanie 2.x idiomatic patterns. This improves type safety and IDE support.
19-23: LGTM!Consistent use of Beanie expression-based query matching the pattern in
get_execution_status. Themodel_validatewithfrom_attributes=Trueis the correct Pydantic v2 approach for converting ORM/document objects to domain models.backend/app/db/repositories/replay_repository.py (3)
18-29: LGTM! Clean migration to Beanie expression syntax.The conversion from dict-based filters to Beanie field-expression syntax (
ReplaySessionDocument.session_id == session_id) is the correct approach for Beanie 2.x. This provides better type safety and IDE support while maintaining identical behavior.
48-54: LGTM!Consistent use of Beanie expression syntax with proper null handling.
71-79: LGTM!The
doc.set()method is the correct Beanie approach for partial atomic updates, and the expression syntax change is consistent with the rest of the file.backend/app/db/repositories/execution_repository.py (1)
29-29: LGTM! Clean migration to Beanie field expressions.The changes consistently replace dict-based queries with Beanie's type-safe field expression syntax across
get_execution,update_execution,write_terminal_result, anddelete_execution. This improves type safety and IDE support while maintaining identical runtime behavior.Also applies to: 38-38, 49-49, 84-84
backend/app/db/repositories/saved_script_repository.py (4)
11-16: LGTM!The migration from
Eq()to direct field equality expressions is the idiomatic Beanie query syntax. Thefind_onecorrectly combines both conditions with AND logic.
24-27: LGTM!Consistent use of field equality expressions for the query filter.
35-43: LGTM!Consistent query pattern for the delete operation.
45-47: LGTM!The
find()with field equality expression followed byto_list()is the standard Beanie pattern for retrieving multiple documents.backend/app/db/repositories/notification_repository.py (7)
32-35: LGTM!Clean migration to Beanie expression-based filters. The compound query ensures user-scoped access control.
44-47: LGTM!Consistent use of Beanie expressions for the lookup query.
53-56: LGTM!Expression syntax is correct and consistent with the repository pattern.
64-65: LGTM!Correct usage of Beanie expressions with
update_many()for bulk status updates.
70-73: LGTM!Properly scoped deletion with expression-based filter.
171-174: LGTM!Expression-based filter with appropriate default fallback when no subscription exists.
183-186: LGTM!Expression-based filter correctly used in the upsert pattern.
backend/app/db/repositories/admin/admin_settings_repository.py (1)
19-19: LGTM! Clean migration to Beanie expression syntax.The refactor from dict-based queries (
{"settings_id": "global"}) to Beanie's typed field comparison (SystemSettingsDocument.settings_id == "global") is correct and consistent across all three methods. This provides better type safety and IDE support while maintaining identical behavior.Also applies to: 43-43, 71-71
backend/app/db/repositories/admin/admin_events_repository.py (3)
65-65: LGTM! Consistent use of Beanie expression syntax for document lookups.The refactor to typed field comparisons (
EventDocument.event_id == event_id) is correct and aligns with the indexed field defined inEventDocument(seeevent.pyline 19:event_id: Indexed(str, unique=True)).Also applies to: 89-89
193-196: LGTM! Clean migration of replay session queries.The conversion to Beanie expression syntax for
ReplaySessionDocumentlookups is correct. The surrounding logic (None checks, model validation) remains unchanged and appropriate.Also applies to: 202-206, 209-211
254-254: LGTM!Correctly migrated to Beanie expression syntax for
ExecutionDocumentlookup within the replay progress calculation.backend/app/db/repositories/event_repository.py (3)
68-72: LGTM!The migration from dict-based query
{"event_id": event_id}to Beanie field expressionEventDocument.event_id == event_idis correct and aligns with Beanie 2.x typed query patterns. This provides better type safety and IDE support.
148-152: LGTM!The
Orcondition correctly uses Beanie field expressions for bothexecution_idandaggregate_idcomparisons. This maintains the existing query semantics while adopting the typed query style.
325-330: LGTM!Consistent migration to Beanie field expression syntax, matching the pattern used in
get_event.backend/app/db/repositories/user_settings_repository.py (2)
16-20: LGTM!Correct migration to Beanie field expression syntax for querying user settings.
22-28: LGTM!The upsert pattern is correctly implemented: find existing document, copy the
idif found, thensave()will update the existing record or insert a new one. The added logging improves observability.backend/app/db/repositories/saga_repository.py (4)
30-36: LGTM!The upsert pattern correctly migrates to Beanie field expressions. Finding the existing document by
saga_id, copying theidif found, then callingsave()ensures proper update-or-insert semantics.
45-47: LGTM!Consistent migration to Beanie field expression syntax.
80-90: LGTM!The migration to Beanie field expression is correct. The update logic properly modifies the document fields and saves.
Note: The condition
if error_message:means an empty string won't update the field. If clearing the error message is needed, consider usingif error_message is not None:instead.
92-94: LGTM!Correct migration to Beanie field expression for the
ExecutionDocumentquery.backend/app/db/repositories/admin/admin_user_repository.py (5)
44-45: LGTM!Direct field equality expression is idiomatic for Beanie 2.0.
53-84: LGTM!Field expression queries are correctly implemented and consistent with the pattern used in
user_repository.py.
86-96: LGTM!Cascade delete queries using field expressions are correctly implemented.
106-114: LGTM!Field expression query is correctly implemented.
98-102: Both nested field query patterns are valid for Beanie 2.0.Line 98's
EventDocument.metadata.user_idcorrectly accesses the nested Pydantic model (EventMetadata), producing the expected MongoDB dot-notation querymetadata.user_id. This is confirmed by the multiple indexes in EventDocument.Settings that referencemetadata.user_id,metadata.correlation_id, andmetadata.service_name.Line 101's
SagaDocument.context_data["user_id"]correctly queries the dict field using bracket notation, which is the standard Beanie pattern for dynamic dict fields. This same pattern is already used elsewhere in the codebase (saga_repository.py:19).backend/app/db/repositories/user_repository.py (2)
4-5: LGTM!Import cleanup correctly removes
Eqsince direct field equality (==) is now used instead. The retainedOrandRegExoperators are still in use.
13-15: LGTM!The migration from dictionary-based queries to Beanie field expressions (
UserDocument.field == value) is idiomatic and provides better type safety and IDE refactoring support.Also applies to: 22-24, 53-56, 64-68



Summary by cubic
Unifies the two event systems into one Avro-backed domain event model, removing the infrastructure/kafka/events layer. Simplifies serialization, routing, DLQ handling, and tests; adds DLQ telemetry events and small SSE shutdown hooks.
Refactors
Migration
Written for commit 84d0b9d. Summary will update on new commits.
Summary by CodeRabbit
New Features
Improvements
Tests
✏️ Tip: You can customize this high-level summary in your review settings.