-
Notifications
You must be signed in to change notification settings - Fork 0
Updated readme + fixed test suite #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- fixed work on linux
- fix of bandit errors (sec scan)
+ update arch file: mermaid instead of plantuml for better rendering
+ update arch file: fixed diagrams with errors
WalkthroughBackend-focused CI updates and a broad backend refactor: DI/service-based route handlers, ASGI middlewares, Redis idempotency, SSE Kafka–Redis bridge, mapper-driven repositories, tracing/metrics additions, notification model change to severity/tags, DLQ rework, and multiple public API surface and domain model adjustments. Changes
Sequence Diagram(s)%%{init: {"themeVariables":{"sequenceMessageFill":"#E8F0FF","actorBorder":"#6E9EEB","signalColor":"#6E9EEB"}}}%%
sequenceDiagram
autonumber
actor Client
participant ASGI as ASGI Middlewares
participant API as FastAPI Router
participant DI as Dishka Container
participant SVC as Service Layer
participant Redis as Redis (idempotency)
participant Mongo as MongoDB
participant Kafka as Kafka
Client->>ASGI: HTTP request
rect rgba(230,240,255,0.35)
ASGI->>ASGI: Correlation / RateLimit / Metrics (ASGI)
end
ASGI->>API: forward
API->>DI: resolve dependencies (current_user/admin_user, services)
DI-->>API: resolved user + services
API->>SVC: call handler(user, ...)
par Idempotency (optional)
SVC->>Redis: get_cached_json / mark_completed_with_json
Redis-->>SVC: cached_json or ack
and Data ops
SVC->>Mongo: read/write via mappers
Mongo-->>SVC: results
end
opt publish event
SVC->>Kafka: publish(payload + metadata)
end
SVC-->>API: response
API-->>Client: HTTP response (+ ASGI headers)
sequenceDiagram
autonumber
participant Grafana
participant API as /alerts/grafana
participant Processor as GrafanaAlertProcessor
participant NS as NotificationService
Grafana->>API: POST webhook
API->>Processor: process_webhook(payload, correlation_id)
Processor->>NS: create_system_notification(subject, body, severity, tags, metadata)
NS->>Mongo: persist notifications
Mongo-->>NS: OK
NS-->>Processor: stats
Processor-->>API: processed_count, errors
API-->>Grafana: AlertResponse
sequenceDiagram
autonumber
participant Kafka as DLQ topic
participant Mgr as DLQManager
participant Map as DLQMapper
participant Mongo as dlq_messages collection
participant Producer as Kafka producer
Kafka-->>Mgr: consume message
Mgr->>Map: from_kafka_message(...)
Map-->>Mgr: DLQMessage
Mgr->>Mongo: store / update status
alt manual retry
Mgr->>Producer: produce(original_topic, headers including trace)
Producer-->>Mgr: ack
Mgr->>Mongo: update via DLQMessageUpdate(status=RETRIED)
else discard
Mgr->>Mongo: update via DLQMessageUpdate(status=DISCARDED)
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested labels
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (2)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Welcome to Codecov 🎉Once you merge this PR into your default branch, you're all set! Codecov will compare coverage reports and display results in all future pull requests. ℹ️ You can also turn on project coverage checks and project coverage reporting on Pull Request comment Thanks for integrating Codecov - We've got you covered ☂️ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 26
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (14)
backend/app/infrastructure/kafka/events/execution.py (1)
93-95: Make resource_usage optional for failure/timeout paths.Failed/timeout executions may not have complete usage; requiring it risks validation errors and dropped events.
Apply this diff:
class ExecutionCompletedEvent(BaseEvent): @@ - resource_usage: ResourceUsageDomain + resource_usage: ResourceUsageDomain | None = None @@ class ExecutionFailedEvent(BaseEvent): @@ - resource_usage: ResourceUsageDomain + resource_usage: ResourceUsageDomain | None = None @@ class ExecutionTimeoutEvent(BaseEvent): @@ - resource_usage: ResourceUsageDomain + resource_usage: ResourceUsageDomain | None = NoneAlso applies to: 105-106, 115-116
backend/app/events/core/dispatcher.py (1)
41-43: Use Enum.value when building/subscribing topics.str(Enum) yields names like 'KafkaTopic.EXECUTION_EVENTS'. Most clients expect the raw topic string (Enum.value).
- topic = str(event_class.topic) + topic = event_class.topic.value @@ - if event_class and hasattr(event_class, 'topic'): - topics.add(str(event_class.topic)) + if event_class and hasattr(event_class, 'topic'): + topics.add(event_class.topic.value)Also applies to: 156-159
backend/app/infrastructure/kafka/topics.py (1)
125-132: Migrate or bridge USER_SETTINGS_EVENTS — active producers foundKafkaTopic.USER_SETTINGS_EVENTS is still referenced; migrate producers to the three granular topics or implement a bridging/alias before removing the topic.
- backend/app/infrastructure/kafka/events/user.py:51 — UserSettingsUpdatedEvent.topic = KafkaTopic.USER_SETTINGS_EVENTS
- backend/app/infrastructure/kafka/topics.py:125-132 — USER_SETTINGS_EVENTS topic config still present
- backend/app/domain/enums/kafka.py:29 — enum value definition
backend/app/events/core/dlq_handler.py (1)
38-61: Off-by-one in DLQ threshold; also pass correct retry_count and tweak message.With max_retries=3, the current check sends to DLQ on the 4th failure. Change to evaluate after increment and pass the updated count.
Apply this diff:
- retry_count = retry_counts.get(event_id, 0) - retry_counts[event_id] = retry_count + 1 - - logger.error( - f"Error processing event {event_id} ({event.event_type}): {error}. " - f"Retry {retry_count + 1}/{max_retries}", - exc_info=True - ) - - # Send to DLQ if we've exceeded max retries - if retry_count >= max_retries: - logger.warning( - f"Event {event_id} exceeded max retries ({max_retries}). " - f"Sending to DLQ." - ) - - await producer.send_to_dlq( - original_event=event, - original_topic=original_topic, - error=error, - retry_count=retry_count - ) + retry_count = retry_counts.get(event_id, 0) + retry_counts[event_id] = retry_count + 1 + current_attempt = retry_counts[event_id] + + logger.error( + f"Error processing event {event_id} ({getattr(event, 'event_type', 'unknown')}): {error}. " + f"Attempt {current_attempt}/{max_retries}", + exc_info=True + ) + + # Send to DLQ if we've reached max retries + if current_attempt >= max_retries: + logger.warning( + f"Event {event_id} reached max retries ({max_retries}). Sending to DLQ." + ) + + await producer.send_to_dlq( + original_event=event, + original_topic=original_topic, + error=error, + retry_count=current_attempt + )backend/app/events/core/consumer.py (1)
120-128: Unhandled deserialization errors can crash the consumer.Exceptions during deserialize_event bubble out of _process_message (no try/except around it) and can terminate the consume loop task. Catch, log, and record metrics; then return.
- event = self._schema_registry.deserialize_event(raw_value, topic) - logger.info(f"Deserialized event: type={event.event_type}, id={event.event_id}") + try: + event = self._schema_registry.deserialize_event(raw_value, topic) + logger.info(f"Deserialized event: type={event.event_type}, id={event.event_id}") + except Exception as e: + logger.error(f"Deserialization error for topic {topic}: {e}", exc_info=True) + self._metrics.processing_errors += 1 + self._event_metrics.record_kafka_consumption_error( + topic=topic, + consumer_group=self._config.group_id, + error_type=type(e).__name__, + ) + returnbackend/app/core/middlewares/request_size_limit.py (1)
12-33: Fail‑open on missing/incorrect Content‑Length; enforce size while streaming.If
Content-Lengthis absent or wrong, oversized bodies pass through. Wrapreceiveto count bytes and short‑circuit with 413, and guard header parsing.- async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: - if scope["type"] != "http": - await self.app(scope, receive, send) - return - - headers = dict(scope["headers"]) - content_length_header = headers.get(b"content-length") - - if content_length_header: - content_length = int(content_length_header) - if content_length > self.max_size_bytes: - response = JSONResponse( - status_code=413, - content={ - "detail": f"Request too large. Maximum size is {self.max_size_bytes / 1024 / 1024}MB" - } - ) - await response(scope, receive, send) - return - - await self.app(scope, receive, send) + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + if scope.get("type") != "http": + await self.app(scope, receive, send) + return + + headers = dict(scope.get("headers") or ()) + content_length_header = headers.get(b"content-length") + + # Fast-path when Content-Length is present and valid + if content_length_header is not None: + try: + content_length = int(content_length_header) + except (ValueError, TypeError): + content_length = None + if content_length is not None and content_length > self.max_size_bytes: + response = JSONResponse( + status_code=413, + content={"detail": f"Request too large. Maximum size is {self.max_size_bytes // (1024 * 1024)}MB"}, + ) + await response(scope, receive, send) + return + + # Enforce limit when Content-Length is missing/incorrect by wrapping receive + total = 0 + + async def limited_receive() -> dict: + nonlocal total + message = await receive() + if message.get("type") == "http.request": + body = message.get("body", b"") or b"" + total += len(body) + if total > self.max_size_bytes: + # Drain remaining body to keep connection sane. + more = message.get("more_body", False) + while more: + nxt = await receive() + more = nxt.get("more_body", False) + response = JSONResponse( + status_code=413, + content={"detail": f"Request too large. Maximum size is {self.max_size_bytes // (1024 * 1024)}MB"}, + ) + await response(scope, receive, send) + # Signal disconnect to downstream app if it continues to read. + return {"type": "http.disconnect"} + return message + + await self.app(scope, limited_receive, send)backend/app/core/correlation.py (1)
90-95: Ensure context is cleared on exceptionsIf the downstream app raises,
clear()won’t run, leaking correlation data. Usetry/finally.- # Process request - await self.app(scope, receive, send_wrapper) - - # Clear context after request - CorrelationContext.clear() + # Process request and always clear context + try: + await self.app(scope, receive, send_wrapper) + finally: + CorrelationContext.clear()backend/app/core/middlewares/cache.py (1)
13-16: Useno-storefor sensitive auth endpoints
private, no-cacheallows caching with revalidation; for tokens preferno-store.- "/api/v1/auth/verify-token": "private, no-cache", # 30 seconds + "/api/v1/auth/verify-token": "private, no-store",backend/app/api/routes/replay.py (3)
70-76: Missing await on list_sessions causes coroutine leakage and 500s.
service.list_sessions(...)is likely async (other service methods are awaited). Addawait.- states = service.list_sessions(status=status, limit=limit) + states = await service.list_sessions(status=status, limit=limit)
79-85: Missing await on get_session.Same issue for
get_session.- state = service.get_session(session_id) + state = await service.get_session(session_id)
69-76: ReplayService list/get must be async (or awaited) — fix signatures or the route.
- ReplayService (backend/app/services/replay_service.py) does not expose async list_sessions/get_session; the DB repo defines async get_session and list_sessions (backend/app/db/repositories/replay_repository.py, ~lines 42–50).
- API route backend/app/api/routes/replay.py (list_replay_sessions, lines 69–76) calls service.list_sessions(...) without await — if the service method is async this yields a coroutine and breaks the mapper/iteration.
- Action: either (A) implement async def list_sessions(...) and async def get_session(...) on ReplayService that await the repository and return the domain types expected by ReplayApiMapper, and call them with await in the routes (e.g. states = await service.list_sessions(...)); or (B) keep sync service methods that return concrete lists/objects (no await) and ensure they do not perform blocking I/O. Verify return types match ReplayApiMapper.session_to_summary inputs.
backend/app/dlq/manager.py (1)
236-254: Producer callback removed without error handling.The producer's
producemethod is called without a callback, which means any production errors will only be caught during the flush operation. This could lead to silent message loss if the flush timeout is exceeded.Consider adding delivery callbacks to track success/failure:
+ def delivery_callback(err, msg): + if err: + logger.error(f"Failed to produce message to {msg.topic()}: {err}") + self.metrics.record_dlq_message_retried( + message.original_topic, + message.event_type, + "failure" + ) + await asyncio.to_thread( self.producer.produce, topic=retry_topic, value=json.dumps(event.to_dict()).encode(), key=message.event_id.encode(), headers=kafka_headers, + callback=delivery_callback, )backend/app/api/routes/events.py (2)
149-169: Guard against None from service to avoid 500 on unauthorized access
get_execution_eventshandles aNoneresult with 403, but this route doesn’t. If the service returnsNonefor non‑admin users wheninclude_all_users=True, iterating will raise.Apply this diff:
events = await event_service.get_events_by_correlation( correlation_id=correlation_id, user_id=current_user.user_id, user_role=current_user.role, include_all_users=include_all_users, limit=limit ) + if events is None: + raise HTTPException(status_code=403, detail="Access denied") - - event_responses = [EventResponse(**mapper.to_dict(event)) for event in events] + event_responses = [EventResponse(**mapper.to_dict(event)) for event in events]
336-351: Fix AttributeError on logging and avoid storing PII (email) as deleter
result.correlation_iddoesn’t exist onArchivedEvent(correlation is in metadata). This will crash before logging.- Prefer
user_idover email fordeleted_byto reduce PII in the archive.Apply this diff:
result = await event_service.delete_event_with_archival( event_id=event_id, - deleted_by=str(admin.email) + deleted_by=str(admin.user_id) ) @@ logger.warning( f"Event {event_id} deleted by admin {admin.email}", extra={ "event_type": result.event_type, "aggregate_id": result.aggregate_id, - "correlation_id": result.correlation_id + "correlation_id": getattr(result.metadata, "correlation_id", None) } )
| # Use default MongoDB credentials for CI | ||
| MONGO_ROOT_USER: root | ||
| MONGO_ROOT_PASSWORD: rootpassword | ||
| MONGODB_HOST: 127.0.0.1 | ||
| MONGODB_PORT: 27017 | ||
| # Explicit URL with default credentials | ||
| MONGODB_URL: mongodb://root:[email protected]:27017/?authSource=admin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Hardcoded DB credentials in CI env (secret leak risk).
MONGO_ROOT_PASSWORD and MONGODB_URL embed credentials in the repo, tripping secret scanners and exposing them in logs. Use GitHub Secrets.
- # Use default MongoDB credentials for CI
- MONGO_ROOT_USER: root
- MONGO_ROOT_PASSWORD: rootpassword
+ # Use GitHub Secrets for credentials (avoid committing secrets)
+ MONGO_ROOT_USER: root
+ MONGO_ROOT_PASSWORD: ${{ secrets.MONGO_ROOT_PASSWORD }}
MONGODB_HOST: 127.0.0.1
MONGODB_PORT: 27017
- # Explicit URL with default credentials
- MONGODB_URL: mongodb://root:[email protected]:27017/?authSource=admin
+ # Explicit URL without hardcoded password
+ MONGODB_URL: mongodb://root:${{ secrets.MONGO_ROOT_PASSWORD }}@127.0.0.1:27017/?authSource=admin🏁 Script executed:
#!/bin/bash
# Find other hardcoded 'mongodb://' credentials to scrub.
rg -nP --glob '!.git' 'mongodb:\/\/[^:\s]+:[^@\s]+@' -C2Length of output: 148
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "Searching repository (excluding .git) for hardcoded MongoDB credentials and related env vars..."
rg -nP -g '!.git/**' 'mongodb:\/\/[^:\s]+:[^@\s]+@' -C2 || true
rg -nP -g '!.git/**' '\bMONGO_ROOT_PASSWORD\b|\bMONGODB_URL\b|\bMONGO_ROOT_USER\b|\bMONGODB_HOST\b|\bMONGODB_PORT\b' -n -C3 || true
rg -nP -g '!.git/**' 'rootpassword' -n -C2 || true
echo
echo "Printing .github/workflows/tests.yml lines 120-150 if file exists:"
if [ -f ".github/workflows/tests.yml" ]; then
sed -n '120,150p' .github/workflows/tests.yml || true
else
echo ".github/workflows/tests.yml not found"
fiLength of output: 1790
Replace hardcoded MongoDB credentials in CI with GitHub Secrets
Confirmed .github/workflows/tests.yml contains plaintext MongoDB credentials (lines 135–141). Move the password into GitHub Secrets and reference it in the workflow. Add the secret MONGO_ROOT_PASSWORD to the repository's GitHub Secrets.
Location: .github/workflows/tests.yml lines 135–141
- # Use default MongoDB credentials for CI
- MONGO_ROOT_USER: root
- MONGO_ROOT_PASSWORD: rootpassword
+ # Use GitHub Secrets for credentials (avoid committing secrets)
+ MONGO_ROOT_USER: root
+ MONGO_ROOT_PASSWORD: ${{ secrets.MONGO_ROOT_PASSWORD }}
MONGODB_HOST: 127.0.0.1
MONGODB_PORT: 27017
- # Explicit URL with default credentials
- MONGODB_URL: mongodb://root:[email protected]:27017/?authSource=admin
+ # Explicit URL without hardcoded password
+ MONGODB_URL: mongodb://root:${{ secrets.MONGO_ROOT_PASSWORD }}@127.0.0.1:27017/?authSource=admin📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Use default MongoDB credentials for CI | |
| MONGO_ROOT_USER: root | |
| MONGO_ROOT_PASSWORD: rootpassword | |
| MONGODB_HOST: 127.0.0.1 | |
| MONGODB_PORT: 27017 | |
| # Explicit URL with default credentials | |
| MONGODB_URL: mongodb://root:rootpassword@127.0.0.1:27017/?authSource=admin | |
| # Use GitHub Secrets for credentials (avoid committing secrets) | |
| MONGO_ROOT_USER: root | |
| MONGO_ROOT_PASSWORD: ${{ secrets.MONGO_ROOT_PASSWORD }} | |
| MONGODB_HOST: 127.0.0.1 | |
| MONGODB_PORT: 27017 | |
| # Explicit URL without hardcoded password | |
| MONGODB_URL: mongodb://root:${{ secrets.MONGO_ROOT_PASSWORD }}@127.0.0.1:27017/?authSource=admin |
🤖 Prompt for AI Agents
In .github/workflows/tests.yml around lines 135–141 the MongoDB password is
hardcoded; add a repository secret named MONGO_ROOT_PASSWORD in GitHub Settings
→ Secrets and replace the plaintext value with a reference to that secret in the
workflow (use the GitHub Actions secrets interpolation for MONGO_ROOT_PASSWORD),
update any derived values such as MONGODB_URL to construct the connection string
using the secret instead of embedding the literal password, and ensure no other
credentials remain in plaintext (keep MONGO_ROOT_USER and host/port as needed or
move them to secrets if required).
| SCHEMA_REGISTRY_URL=http://localhost:8081 | ||
|
|
||
| # Security | ||
| SECURE_COOKIES=true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Secure cookies in tests can break auth flows over HTTP.
Most test clients use http://; Secure cookies won’t be sent.
Apply this diff:
-SECURE_COOKIES=true
+SECURE_COOKIES=false📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| SECURE_COOKIES=true | |
| SECURE_COOKIES=false |
🤖 Prompt for AI Agents
In backend/.env.test around line 27, the test environment currently sets
SECURE_COOKIES=true which prevents cookies from being sent over HTTP during
tests; change this to SECURE_COOKIES=false (or remove the variable) in the
.env.test file so test clients using http:// receive cookies, or alternately
adjust test-specific cookie config to disable the secure flag when
NODE_ENV=test.
| # Start SSE Kafka→Redis bridge to ensure consumers are running before any events are published | ||
| _ = await container.get(SSEKafkaRedisBridge) | ||
| logger.info("SSE Kafka→Redis bridge started with consumer pool") | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Confirm DI actually starts the SSE Kafka→Redis bridge and awaits readiness.
Simply resolving the service must trigger consumer startup; otherwise producers may publish before consumers are ready. Ensure the provider starts background tasks and uses a consumer group/offset policy that won’t miss early events.
🏁 Script executed:
#!/bin/bash
# Inspect bridge implementation and provider lifecycle hooks
rg -nP -C3 'class\s+SSEKafkaRedisBridge\b' backend || true
rg -nP -C2 '(async\s+def\s+start\b|def\s+__aenter__\b|on_startup|start_consumers)' backend || trueLength of output: 9084
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== head: repo root listing ==="
ls -la | sed -n '1,200p' || true
echo
echo "=== backend/app/services/sse/kafka_redis_bridge.py (full) ==="
nl -ba backend/app/services/sse/kafka_redis_bridge.py | sed -n '1,240p' || true
echo
echo "=== backend/app/core/dishka_lifespan.py (full) ==="
nl -ba backend/app/core/dishka_lifespan.py | sed -n '1,240p' || true
echo
echo "=== occurrences of 'container.get' and 'container =' ==="
rg -n --hidden "container\.get\(|\bcontainer\s*=" -S || true
echo
echo "=== likely DI/container modules (search) ==="
rg -n --hidden "dependency_injector|punq|Injector|Container\(|class\s+Container\b|from\s+.*container" -S || true
echo
echo "=== search for SSEKafkaRedisBridge usage / instantiation ==="
rg -n --hidden "SSEKafkaRedisBridge" -S || trueLength of output: 8435
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== kafka_redis_bridge.py ==="
sed -n '1,240p' backend/app/services/sse/kafka_redis_bridge.py || true
echo
echo "=== providers.py (SSE provider area) ==="
sed -n '1,420p' backend/app/core/providers.py || true
echo
echo "=== dishka_lifespan.py ==="
sed -n '1,240p' backend/app/core/dishka_lifespan.py || true
echo
echo "=== container.py ==="
sed -n '1,240p' backend/app/core/container.py || true
echo
echo "=== sse_shutdown_manager.py ==="
sed -n '1,240p' backend/app/services/sse/sse_shutdown_manager.py || true
echo
echo "=== grep interesting keywords in SSE files ==="
rg -n --hidden "asynccontextmanager|contextlib.asynccontextmanager|yield|start_consum|consumer_group|consumer_pool|create_sse_kafka_redis_bridge|set_router|start\(" backend/app/services/sse backend/app/core/providers.py backend/app/core/dishka_lifespan.py || trueLength of output: 37526
DI starts SSE bridge — consumer offset policy will drop pre-start events
- Verified: calling await container.get(SSEKafkaRedisBridge) runs providers.get_sse_kafka_redis_bridge which does await router.start(); router.start() creates consumers and awaits consumer.start(topics). (backend/app/core/providers.py; backend/app/services/sse/kafka_redis_bridge.py)
- Issue: ConsumerConfig in kafka_redis_bridge uses auto_offset_reset="latest" (and enable_auto_commit=True) — a fresh consumer group with no committed offsets will start at latest and skip messages produced before consumers join.
- Action required: either change auto_offset_reset to "earliest" or implement an explicit offset/seek strategy or ensure consumers/offsets are established/committed before producers publish (choose based on desired semantics and replay behavior). (backend/app/services/sse/kafka_redis_bridge.py)
🤖 Prompt for AI Agents
In backend/app/core/dishka_lifespan.py around lines 83 to 86, the code awaits
container.get(SSEKafkaRedisBridge) which starts consumers that are configured
with auto_offset_reset="latest" (and enable_auto_commit=True) causing fresh
consumer groups to skip pre-start messages; update the consumer startup so
consumers consume from the desired offset by either changing ConsumerConfig
auto_offset_reset to "earliest" in
backend/app/services/sse/kafka_redis_bridge.py, or implement an explicit
offset/seek strategy at router.start() (e.g., subscribe, call assignment
callback and seek_to_beginning/seek(partition, offset) or perform an initial
committed-offset check and commit a baseline) and ensure commits are persisted
before producers may publish; pick the option consistent with intended replay
semantics and apply the change in kafka_redis_bridge.py.
| # Try to get service if not initialized | ||
| if self.rate_limit_service is None: | ||
| asgi_app = scope.get("app") | ||
| if asgi_app: | ||
| container = asgi_app.state.dishka_container | ||
| async with container() as container_scope: | ||
| self.rate_limit_service = await container_scope.get(RateLimitService) | ||
|
|
||
| if self.rate_limit_service is None: | ||
| await self.app(scope, receive, send) | ||
| return | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DI container lookup needs guards; fail open if absent.
Accessing state.dishka_container unconditionally can raise AttributeError; add getattr checks and try/except.
- if self.rate_limit_service is None:
- asgi_app = scope.get("app")
- if asgi_app:
- container = asgi_app.state.dishka_container
- async with container() as container_scope:
- self.rate_limit_service = await container_scope.get(RateLimitService)
-
- if self.rate_limit_service is None:
- await self.app(scope, receive, send)
- return
+ if self.rate_limit_service is None:
+ asgi_app = scope.get("app")
+ try:
+ container = getattr(getattr(asgi_app, "state", object()), "dishka_container", None)
+ if container is not None:
+ async with container() as container_scope:
+ self.rate_limit_service = await container_scope.get(RateLimitService)
+ except Exception:
+ # Fail open if DI resolution fails
+ await self.app(scope, receive, send)
+ return
+
+ if self.rate_limit_service is None:
+ await self.app(scope, receive, send)
+ return📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Try to get service if not initialized | |
| if self.rate_limit_service is None: | |
| asgi_app = scope.get("app") | |
| if asgi_app: | |
| container = asgi_app.state.dishka_container | |
| async with container() as container_scope: | |
| self.rate_limit_service = await container_scope.get(RateLimitService) | |
| if self.rate_limit_service is None: | |
| await self.app(scope, receive, send) | |
| return | |
| # Try to get service if not initialized | |
| if self.rate_limit_service is None: | |
| asgi_app = scope.get("app") | |
| try: | |
| container = getattr(getattr(asgi_app, "state", object()), "dishka_container", None) | |
| if container is not None: | |
| async with container() as container_scope: | |
| self.rate_limit_service = await container_scope.get(RateLimitService) | |
| except Exception: | |
| # Fail open if DI resolution fails | |
| await self.app(scope, receive, send) | |
| return | |
| if self.rate_limit_service is None: | |
| await self.app(scope, receive, send) | |
| return |
|
|
||
| if not status.allowed: | ||
| return self._rate_limit_exceeded_response(status) | ||
|
|
||
| # Process request and add headers | ||
| response = await call_next(request) | ||
| self._add_rate_limit_headers(response, status) | ||
|
|
||
| return response | ||
|
|
||
| response = self._rate_limit_exceeded_response(status) | ||
| await response(scope, receive, send) | ||
| return | ||
|
|
||
| # Add rate limit headers to response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fail-open on rate-limit backend errors.
Avoid 5xx when Redis/backend is flaky.
- status = await self._check_rate_limit(identifier, path, username)
+ try:
+ status = await self._check_rate_limit(identifier, path, username)
+ except Exception:
+ await self.app(scope, receive, send)
+ return📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| status = await self._check_rate_limit(identifier, path, username) | |
| if not status.allowed: | |
| return self._rate_limit_exceeded_response(status) | |
| # Process request and add headers | |
| response = await call_next(request) | |
| self._add_rate_limit_headers(response, status) | |
| return response | |
| response = self._rate_limit_exceeded_response(status) | |
| await response(scope, receive, send) | |
| return | |
| try: | |
| status = await self._check_rate_limit(identifier, path, username) | |
| except Exception: | |
| await self.app(scope, receive, send) | |
| return | |
| if not status.allowed: | |
| response = self._rate_limit_exceeded_response(status) | |
| await response(scope, receive, send) | |
| return |
🤖 Prompt for AI Agents
In backend/app/core/middlewares/rate_limit.py around lines 79 to 85, modify the
rate-limit handling to fail-open when the backend (Redis) is flaky: wrap the
await self._check_rate_limit(...) call in a try/except that catches backend
errors (exceptions) and treats the request as allowed, logging a warning and
incrementing a metric for rate-limit-backend-failure; likewise, if
_check_rate_limit returns a status object indicating a backend error, treat it
as allowed instead of returning a 5xx, and only send the rate-limit-exceeded
response when status.allowed is explicitly False.
| docs = await cursor.to_list(None) | ||
| events: List[DomainSettingsEvent] = [] | ||
| for d in docs: | ||
| et = d.get("event_type") | ||
| try: | ||
| et_parsed: EventType = EventType(et) | ||
| except Exception: | ||
| # Fallback to generic settings-updated when type is unknown | ||
| et_parsed = EventType.USER_SETTINGS_UPDATED | ||
| events.append(DomainSettingsEvent( | ||
| event_type=et_parsed, | ||
| timestamp=d.get("timestamp"), | ||
| payload=d.get("payload", {}), | ||
| correlation_id=d.get("correlation_id") | ||
| )) | ||
| return events | ||
| return [self.mapper.event_from_mongo_document(d) for d in docs] | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Motor cursor: avoid to_list(None); use async iteration
to_list expects an int; passing None can raise or be driver‑dependent. Async iterate instead.
- docs = await cursor.to_list(None)
+ docs = [d async for d in cursor]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| docs = await cursor.to_list(None) | |
| events: List[DomainSettingsEvent] = [] | |
| for d in docs: | |
| et = d.get("event_type") | |
| try: | |
| et_parsed: EventType = EventType(et) | |
| except Exception: | |
| # Fallback to generic settings-updated when type is unknown | |
| et_parsed = EventType.USER_SETTINGS_UPDATED | |
| events.append(DomainSettingsEvent( | |
| event_type=et_parsed, | |
| timestamp=d.get("timestamp"), | |
| payload=d.get("payload", {}), | |
| correlation_id=d.get("correlation_id") | |
| )) | |
| return events | |
| return [self.mapper.event_from_mongo_document(d) for d in docs] | |
| docs = [d async for d in cursor] | |
| return [self.mapper.event_from_mongo_document(d) for d in docs] |
🤖 Prompt for AI Agents
In backend/app/db/repositories/user_settings_repository.py around lines 82 to
84, replace the use of cursor.to_list(None) (which passes None to an API that
expects an int) with async iteration over the Motor cursor: create an empty
list, async for each document in the cursor append the mapped result
(self.mapper.event_from_mongo_document(d)) to the list, then return that list;
this avoids relying on to_list(None) and is driver-safe and memory-conscious.
backend/app/dlq/manager.py
Outdated
| schema_registry = SchemaRegistryManager() | ||
| dlq_message = DLQMessage.from_kafka_message(msg, schema_registry) | ||
| dlq_message = DLQMapper.from_kafka_message(msg, schema_registry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider caching the SchemaRegistryManager instance.
Creating a new SchemaRegistryManager instance for every message processed will be inefficient. The manager likely initializes connections and caches that should be reused.
Move the SchemaRegistryManager initialization to the constructor:
def __init__(
self,
database: AsyncIOMotorDatabase,
consumer: Consumer,
producer: Producer,
dlq_topic: KafkaTopic = KafkaTopic.DEAD_LETTER_QUEUE,
retry_topic_suffix: str = "-retry",
default_retry_policy: RetryPolicy | None = None,
):
self.metrics = get_dlq_metrics()
self.dlq_topic = dlq_topic
self.retry_topic_suffix = retry_topic_suffix
+ self.schema_registry = SchemaRegistryManager()Then use self.schema_registry at line 124:
- schema_registry = SchemaRegistryManager()
- dlq_message = DLQMapper.from_kafka_message(msg, schema_registry)
+ dlq_message = DLQMapper.from_kafka_message(msg, self.schema_registry)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| schema_registry = SchemaRegistryManager() | |
| dlq_message = DLQMessage.from_kafka_message(msg, schema_registry) | |
| dlq_message = DLQMapper.from_kafka_message(msg, schema_registry) | |
| # backend/app/dlq/manager.py | |
| def __init__( | |
| self, | |
| database: AsyncIOMotorDatabase, | |
| consumer: Consumer, | |
| producer: Producer, | |
| dlq_topic: KafkaTopic = KafkaTopic.DEAD_LETTER_QUEUE, | |
| retry_topic_suffix: str = "-retry", | |
| default_retry_policy: RetryPolicy | None = None, | |
| ): | |
| self.metrics = get_dlq_metrics() | |
| self.dlq_topic = dlq_topic | |
| self.retry_topic_suffix = retry_topic_suffix | |
| self.schema_registry = SchemaRegistryManager() |
| schema_registry = SchemaRegistryManager() | |
| dlq_message = DLQMessage.from_kafka_message(msg, schema_registry) | |
| dlq_message = DLQMapper.from_kafka_message(msg, schema_registry) | |
| # Replace the per-message instantiation with the cached instance | |
| - schema_registry = SchemaRegistryManager() | |
| dlq_message = DLQMapper.from_kafka_message(msg, self.schema_registry) |
🤖 Prompt for AI Agents
In backend/app/dlq/manager.py around lines 123 to 124, a new
SchemaRegistryManager is created for each processed message which is
inefficient; initialize and assign SchemaRegistryManager to self.schema_registry
in the class constructor (e.g., __init__) so a single instance is reused, then
replace the local creation at line 124 with use of self.schema_registry (ensure
the constructor runs before message processing and adjust any tests or callers
accordingly).
| # Unified replay lifecycle across admin + services | ||
| # "scheduled" retained for admin flows (alias of initial state semantics) | ||
| SCHEDULED = "scheduled" | ||
| CREATED = "created" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
New status SCHEDULED added — ensure parity with CREATED semantics.
Audit transitions/guards so both CREATED and SCHEDULED are treated as “initial.” Persisted values, filters, and UI chips should include the new status.
🏁 Script executed:
#!/bin/bash
# Find places that special-case CREATED or enumerate statuses
rg -nP -C2 --type=py 'ReplayStatus\.(CREATED|SCHEDULED)|\bstatus\b.*in\s*\[|["'\'']created["'\'']|["'\'']scheduled["'\'']' -g '!**/test/**'Length of output: 148
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "1) Print backend/app/domain/enums/replay.py if present"
if [ -f backend/app/domain/enums/replay.py ]; then
sed -n '1,240p' backend/app/domain/enums/replay.py || true
else
echo "File backend/app/domain/enums/replay.py not found; listing any replay.py files..."
rg -n --hidden --no-ignore-vcs --glob '!**/test/**' 'replay.py' || true
fi
echo
echo "2) Search for ReplayStatus.CREATED / ReplayStatus.SCHEDULED and literal 'created'/'scheduled' occurrences (excluding tests)"
rg -n -C2 -uu --hidden --no-ignore-vcs --glob '!**/test/**' \
-e 'ReplayStatus\.(CREATED|SCHEDULED)' \
-e '\bstatus\b.*in\s*\[' \
-e '"created"' -e "'created'" \
-e '"scheduled"' -e "'scheduled'" || true
echo
echo "3) Search for all usages of ReplayStatus"
rg -n -C2 -uu --hidden --no-ignore-vcs --glob '!**/test/**' 'ReplayStatus' || true
echo "Done."Length of output: 1188
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "1) Show backend/app/domain/enums/replay.py (if present)"
if [ -f backend/app/domain/enums/replay.py ]; then
sed -n '1,240p' backend/app/domain/enums/replay.py || true
else
echo "backend/app/domain/enums/replay.py not found"
fi
echo
echo "2) Try ripgrep across repository for ReplayStatus / literal 'created' / 'scheduled' (no extra globs)"
rg -n -uu --hidden -S 'ReplayStatus|\bcreated\b|\bscheduled\b|status\s*==|status\s*in\s*\[' || true
echo
echo "3) Fallback: Python recursive search (excludes common vendor/test dirs) for the same patterns"
python3 - <<'PY'
import os, re
pattern = re.compile(r'\b(created|scheduled)\b|ReplayStatus|status\s*==|status\s*in\s*\[', flags=re.I)
exclude_dirs = {'test','tests','.git','venv','env','.venv','node_modules','build','dist','__pycache__'}
for root, dirs, files in os.walk('.'):
dirs[:] = [d for d in dirs if d not in exclude_dirs and not d.startswith('.')]
for f in files:
if any(f.endswith(ext) for ext in ('.py','.sql','.json','.yml','.yaml','.md','.txt','.html','.js','.ts','.cfg','.ini','.pyi','.tmpl','.jinja')):
path = os.path.join(root, f)
try:
with open(path, 'r', encoding='utf-8', errors='ignore') as fh:
for i, line in enumerate(fh, 1):
if pattern.search(line):
print(f"{path}:{i}: {line.rstrip()}")
except Exception:
pass
PY
echo
echo "4) List any migration or fixture files that may contain persisted status literals"
rg -n -uu --hidden -S 'created|scheduled' --glob '!**/test/**' db migrations fixtures || true
echo "Done."Length of output: 71775
Treat ReplayStatus.SCHEDULED as an alias for CREATED (include it in all initial-state checks and filters).
Audit + update guards, filters, defaults, mappers and tests so "scheduled" and "created" behave equivalently:
- backend/app/services/event_replay/replay_service.py: line 55 — change
if session.status != ReplayStatus.CREATEDto accept both CREATED and SCHEDULED. - backend/app/services/event_replay/replay_service.py: ~lines 376–382 — filtering
sessions = [s for s in sessions if s.status == status]should normalize/compare so "scheduled" matches initial-status queries. - backend/app/infrastructure/mappers/replay_mapper.py: lines 54 and 222–223 — ensure parsing/serialization treats "scheduled" and "created" equivalently (string ⇄ enum).
- Model/schema defaults that assume CREATED: backend/app/schemas_pydantic/replay_models.py:82 and backend/app/domain/replay/models.py:89 — either keep defaults but handle SCHEDULED as initial elsewhere, or make defaults consistent across layers.
- Admin paths already using SCHEDULED (backend/app/services/admin/admin_events_service.py:148; backend/app/db/repositories/admin/admin_events_repository.py:275–286) — verify these remain consistent with service logic.
- Update tests asserting only CREATED (backend/tests/**, especially integration/unit replay tests) to accept both where appropriate.
🤖 Prompt for AI Agents
In backend/app/domain/enums/replay.py around lines 13-16, treat
ReplayStatus.SCHEDULED as an alias of CREATED by ensuring all initial-state
logic accepts both values: update
backend/app/services/event_replay/replay_service.py (line ~55) to change the
guard from `status != CREATED` to allow status in {CREATED, SCHEDULED}; change
the session filtering (~lines 376–382) to normalize/compare status so queries
for the initial state accept either enum/string; in
backend/app/infrastructure/mappers/replay_mapper.py (lines ~54 and 222–223) make
parsing/serialization map the string "scheduled" to the CREATED enum (or make
both map to the same enum value) and serialize the enum consistently; review
defaults in backend/app/schemas_pydantic/replay_models.py:82 and
backend/app/domain/replay/models.py:89 and either keep CREATED as the stored
default but treat SCHEDULED as equivalent on input/filters or make defaults
consistent across layers; confirm admin code paths
(backend/app/services/admin/admin_events_service.py:148 and
backend/app/db/repositories/admin/admin_events_repository.py:275–286) remain
compatible; update tests under backend/tests/** to accept either CREATED or
SCHEDULED for initial-state assertions and adjust any filters/mocks to normalize
statuses before comparisons.
| # Use record_subject_name_strategy to ensure subject is based on record name, not topic | ||
| self._serializers[subject] = AvroSerializer( | ||
| self.client, | ||
| schema_str, | ||
| conf={'subject.name.strategy': record_subject_name_strategy} | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Align subject naming (serializer vs manual registration) to avoid duplicate subjects and misapplied compatibility.
record_subject_name_strategy derives subjects from the record’s full name ({namespace}.{name}-value), but the code still registers/sets compatibility under {ClassName}-value. This will create two subjects and leave compatibility on the unused one.
Apply this diff to unify on fully qualified record subjects and avoid accidental re-registration:
@@
- # Use event class name in subject to avoid conflicts on shared topics
- subject = f"{event_class.__name__}-value"
+ # Use fully-qualified record subject (namespace + name) to match serializer strategy
+ subject = f"{self.namespace}.{event_class.__name__}-value"
return self.register_schema(subject, event_class)
@@
- subject = f"{event.__class__.__name__}-value"
+ subject = f"{self.namespace}.{event.__class__.__name__}-value"
if subject not in self._serializers:
schema_str = json.dumps(event.__class__.avro_schema(namespace=self.namespace))
# Use record_subject_name_strategy to ensure subject is based on record name, not topic
self._serializers[subject] = AvroSerializer(
self.client,
schema_str,
- conf={'subject.name.strategy': record_subject_name_strategy}
+ conf={
+ 'subject.name.strategy': record_subject_name_strategy,
+ # Prevent a second, unintended subject/version from being registered at runtime.
+ 'auto.register.schemas': False
+ }
)
@@
- # Use event class name in subject to avoid conflicts on shared topics
- subject = f"{event_class.__name__}-value"
+ # Use fully-qualified record subject (namespace + name) to match serializer strategy
+ subject = f"{self.namespace}.{event_class.__name__}-value"
self.set_compatibility(subject, "FORWARD")
self.register_schema(subject, event_class)Note: This remains safe because serialize_event() calls _get_schema_id() first, ensuring the subject is registered before serialization with auto.register.schemas=False.
Also applies to: 83-90, 216-221, 117-119
🤖 Prompt for AI Agents
In backend/app/events/schema/schema_registry.py around lines 121-126 (and also
update occurrences at 83-90, 116-121, and 216-221), the code registers/sets
compatibility using the short subject "{ClassName}-value" while the
AvroSerializer is configured with record_subject_name_strategy which uses the
fully qualified record name "{namespace}.{name}-value"; change the manual
registration and compatibility calls to compute and use the fully qualified
subject (the same record_subject_name_strategy result) instead of the class-only
subject so both serializer and registration reference the same subject; ensure
any helper that computes subject uses the record's namespace and name (or reuse
the serializer/strategy output) and update the three indicated code ranges to
prevent duplicate subjects and misapplied compatibility.
| @staticmethod | ||
| def search_filter_to_query(f: UserSearchFilter) -> Dict[str, Any]: | ||
| query: Dict[str, Any] = {} | ||
| if f.search_text: | ||
| query["$or"] = [ | ||
| {UserFields.USERNAME.value: {"$regex": f.search_text, "$options": "i"}}, | ||
| {UserFields.EMAIL.value: {"$regex": f.search_text, "$options": "i"}}, | ||
| ] | ||
| if f.role: | ||
| query[UserFields.ROLE] = f.role | ||
| return query | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: filter uses mixed field keys and wrong role type; regex not escaped.
- Keys use
.valuefor USERNAME/EMAIL but not for ROLE; elsewhere you useUserFields.*(no.value). f.rolelikely aUserRole; DB storesrole.value(string).- User-provided regex should be escaped to avoid pathological patterns.
Fix below.
@staticmethod
def search_filter_to_query(f: UserSearchFilter) -> Dict[str, Any]:
query: Dict[str, Any] = {}
if f.search_text:
- query["$or"] = [
- {UserFields.USERNAME.value: {"$regex": f.search_text, "$options": "i"}},
- {UserFields.EMAIL.value: {"$regex": f.search_text, "$options": "i"}},
- ]
+ pattern = re.escape(f.search_text)
+ query["$or"] = [
+ {UserFields.USERNAME: {"$regex": pattern, "$options": "i"}},
+ {UserFields.EMAIL: {"$regex": pattern, "$options": "i"}},
+ ]
if f.role:
- query[UserFields.ROLE] = f.role
+ query[UserFields.ROLE] = f.role.value
return query📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @staticmethod | |
| def search_filter_to_query(f: UserSearchFilter) -> Dict[str, Any]: | |
| query: Dict[str, Any] = {} | |
| if f.search_text: | |
| query["$or"] = [ | |
| {UserFields.USERNAME.value: {"$regex": f.search_text, "$options": "i"}}, | |
| {UserFields.EMAIL.value: {"$regex": f.search_text, "$options": "i"}}, | |
| ] | |
| if f.role: | |
| query[UserFields.ROLE] = f.role | |
| return query | |
| @staticmethod | |
| def search_filter_to_query(f: UserSearchFilter) -> Dict[str, Any]: | |
| query: Dict[str, Any] = {} | |
| if f.search_text: | |
| pattern = re.escape(f.search_text) | |
| query["$or"] = [ | |
| {UserFields.USERNAME: {"$regex": pattern, "$options": "i"}}, | |
| {UserFields.EMAIL: {"$regex": pattern, "$options": "i"}}, | |
| ] | |
| if f.role: | |
| query[UserFields.ROLE] = f.role.value | |
| return query |
🤖 Prompt for AI Agents
In backend/app/infrastructure/mappers/admin_mapper.py around lines 118 to 129,
the search filter builds query keys inconsistently and uses raw user input in
regex; change key access to use the enum members consistently (e.g.,
UserFields.USERNAME and UserFields.EMAIL — whichever form is used project-wide),
convert the role to its stored string (use f.role.value) when setting the role
filter, and escape f.search_text before inserting into the regex (e.g., use
re.escape(f.search_text)); add an import for re if not present and keep the
case-insensitive option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 12
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (8)
backend/app/services/rate_limit_service.py (1)
271-289: Fix token-bucket division-by-zero and invalid window handling.limit can be 0 after multiplier, and window_seconds could be misconfigured (<=0). This path will divide by zero and crash.
Apply this diff to guard early and fail-closed predictably:
async def _check_token_bucket( self, user_id: str, endpoint: str, limit: int, window_seconds: int, burst_multiplier: float, rule: RateLimitRule ) -> RateLimitStatus: - key = f"{self.prefix}tb:{user_id}:{endpoint}" - max_tokens = int(limit * burst_multiplier) - refill_rate = limit / window_seconds + now = time.time() + # Guard invalid config + if window_seconds <= 0: + window_seconds = 1 + if limit <= 0: + retry_after = window_seconds + return RateLimitStatus( + allowed=False, + limit=0, + remaining=0, + reset_at=datetime.fromtimestamp(now + retry_after, timezone.utc), + retry_after=retry_after, + matched_rule=rule.endpoint_pattern, + algorithm=RateLimitAlgorithm.TOKEN_BUCKET, + ) + + key = f"{self.prefix}tb:{user_id}:{endpoint}" + max_tokens = int(limit * burst_multiplier) + refill_rate = limit / window_seconds @@ - now = time.time() + # now already captured abovebackend/app/services/event_replay/replay_service.py (1)
92-97: Treat task cancellation as CANCELLED, not FAILED.CancelledError is currently handled as a generic error and marks the session failed.
- except Exception as e: + except asyncio.CancelledError: + session.status = ReplayStatus.CANCELLED + session.completed_at = datetime.now(timezone.utc) + await self._update_session_in_db(session) + except Exception as e: await self._handle_session_error(session, e)backend/app/services/event_service.py (1)
49-51: Potential AttributeError when service_name is None.Calling .startswith on None will crash. Guard the attribute.
- if not include_system_events: - events = [e for e in events if not (e.metadata and e.metadata.service_name.startswith("system-"))] + if not include_system_events: + events = [ + e for e in events + if not (e.metadata and str(getattr(e.metadata, "service_name", "") or "").startswith("system-")) + ]backend/app/dlq/manager.py (1)
325-333: Enum used in MongoDB query; likely no matches.
DLQMessageStatus.SCHEDULED(Enum) is compared to stored string. Also raw field names are used instead ofDLQFields.Apply:
- cursor = self.dlq_collection.find({ - "status": DLQMessageStatus.SCHEDULED, - "next_retry_at": {"$lte": now} - }).limit(100) + cursor = self.dlq_collection.find({ + str(DLQFields.STATUS): DLQMessageStatus.SCHEDULED.value, + str(DLQFields.NEXT_RETRY_AT): {"$lte": now} + }).limit(100)backend/app/db/repositories/event_repository.py (1)
419-436: Bug: user_role typed as str breaks access control checks
user_roleis annotated asstrbut compared toUserRole.ADMIN, which will never match and can leak or block data incorrectly.Apply this diff:
async def query_events_advanced( self, user_id: str, - user_role: str, + user_role: UserRole, filters: EventFilter ) -> EventListResult | None: @@ - if filters.user_id: - if filters.user_id != user_id and user_role != UserRole.ADMIN: + if filters.user_id: + if filters.user_id != user_id and user_role != UserRole.ADMIN: return None # Signal unauthorized query[EventFields.METADATA_USER_ID] = filters.user_id - elif user_role != UserRole.ADMIN: + elif user_role != UserRole.ADMIN: query[EventFields.METADATA_USER_ID] = user_idbackend/app/api/routes/admin/events.py (1)
174-186: Return 404 when event doesn’t exist (don’t use 500)
delete_eventreturnsFalsefor not-found; mapping that to 500 is misleading.Apply:
- deleted = await service.delete_event(event_id=event_id, deleted_by=admin.email) - if not deleted: - raise HTTPException(status_code=500, detail="Failed to delete event") + deleted = await service.delete_event(event_id=event_id, deleted_by=admin.email) + if not deleted: + raise HTTPException(status_code=404, detail="Event not found")backend/app/services/notification_service.py (2)
220-241: Consumer subscribes only to COMPLETED topicYou register handlers for FAILED and TIMEOUT but don’t subscribe to their topics; those notifications will never trigger.
Apply:
- execution_results_topic = get_topic_for_event(EventType.EXECUTION_COMPLETED) - - # Log topics for debugging - logger.info(f"Notification service will subscribe to topics: {execution_results_topic}") + t_completed = get_topic_for_event(EventType.EXECUTION_COMPLETED) + t_failed = get_topic_for_event(EventType.EXECUTION_FAILED) + t_timeout = get_topic_for_event(EventType.EXECUTION_TIMEOUT) + topics = list({t_completed, t_failed, t_timeout}) + logger.info(f"Notification service will subscribe to topics: {topics}") @@ - await self._consumer.start([execution_results_topic]) + await self._consumer.start(topics)
449-456: Don’t log full webhook URLs (secrets leakage)Webhook URLs often embed secrets. Avoid logging them.
Apply:
logger.debug( f"Sending webhook notification to {webhook_url}", extra={ "notification_id": str(notification.notification_id), "payload_size": len(str(payload)), - "webhook_url": webhook_url + # Intentionally omit webhook URL to avoid leaking secrets } )
♻️ Duplicate comments (5)
backend/app/core/middlewares/rate_limit.py (2)
61-72: DI container lookup needs guards; fail open if absent or errors.Unprotected access to state.dishka_container can raise; add getattr/try/except and proceed without RL on failure.
- if self.rate_limit_service is None: - asgi_app = scope.get("app") - if asgi_app: - container = asgi_app.state.dishka_container - async with container() as container_scope: - self.rate_limit_service = await container_scope.get(RateLimitService) - - if self.rate_limit_service is None: - await self.app(scope, receive, send) - return + if self.rate_limit_service is None: + asgi_app = scope.get("app") + try: + container = getattr(getattr(asgi_app, "state", object()), "dishka_container", None) + if container is not None: + async with container() as container_scope: + self.rate_limit_service = await container_scope.get(RateLimitService) + except Exception: + # Fail open if DI resolution fails + await self.app(scope, receive, send) + return + + if self.rate_limit_service is None: + await self.app(scope, receive, send) + return
77-83: Fail open on backend errors.wrap check in try/except; treat exceptions as allowed to avoid 5xx on Redis flaps.
- status = await self._check_rate_limit(user_id, path) + try: + status = await self._check_rate_limit(user_id, path) + except Exception: + await self.app(scope, receive, send) + returnbackend/app/db/repositories/replay_repository.py (1)
77-83: cutoff_time must be datetime, not str.This repeats a previously flagged issue; comparing str to a datetime field is incorrect. Use timezone‑aware datetime.
- async def delete_old_sessions(self, cutoff_time: str) -> int: + async def delete_old_sessions(self, cutoff_time: "datetime") -> int: @@ - result = await self.replay_collection.delete_many({ - "created_at": {"$lt": cutoff_time}, + result = await self.replay_collection.delete_many({ + "created_at": {"$lt": cutoff_time}, "status": {"$in": ["completed", "failed", "cancelled"]} })Ensure callers pass UTC datetimes.
backend/app/dlq/manager.py (1)
139-143: Cache SchemaRegistryManager instead of per‑message creation.Re-initializing on every message is wasteful. Initialize once in
__init__and reuse.Apply:
class DLQManager: def __init__( self, database: AsyncIOMotorDatabase, consumer: Consumer, producer: Producer, dlq_topic: KafkaTopic = KafkaTopic.DEAD_LETTER_QUEUE, retry_topic_suffix: str = "-retry", default_retry_policy: RetryPolicy | None = None, ): @@ - self.dlq_collection: AsyncIOMotorCollection[Any] = database.get_collection(CollectionNames.DLQ_MESSAGES) + self.dlq_collection: AsyncIOMotorCollection[Any] = database.get_collection(CollectionNames.DLQ_MESSAGES) + self.schema_registry = SchemaRegistryManager() @@ async def _parse_message(self, msg: Message) -> DLQMessage: """Parse Kafka message into DLQMessage.""" - schema_registry = SchemaRegistryManager() - return DLQMapper.from_kafka_message(msg, schema_registry) + return DLQMapper.from_kafka_message(msg, self.schema_registry)backend/app/db/repositories/dlq_repository.py (1)
193-205: Enum serialized into Mongo; must use.value.
DLQMessageStatus.RETRIEDwon’t serialize correctly via PyMongo.Apply:
result = await self.dlq_collection.update_one( {DLQFields.EVENT_ID: event_id}, { "$set": { - DLQFields.STATUS: DLQMessageStatus.RETRIED, + DLQFields.STATUS: DLQMessageStatus.RETRIED.value, DLQFields.RETRIED_AT: now, DLQFields.LAST_UPDATED: now } } )
🧹 Nitpick comments (53)
backend/app/services/saga/saga_service.py (3)
114-127: Pagination guardrails and large-filter scaling
- Add simple bounds for
limit/skipto prevent accidental heavy scans.- For large users, fetching all
execution_idsthen filtering may be costly; consider a repo method that filters byuser_idserver-side or ensure supporting indexes exist.Apply:
async def list_user_sagas( @@ ) -> SagaListResult: """List sagas accessible by user.""" - saga_filter = SagaFilter(state=state) + # Enforce sane pagination bounds + limit = max(1, min(int(limit), 1000)) + skip = max(0, int(skip)) + saga_filter = SagaFilter(state=state) @@ - result = await self.saga_repo.list_sagas(saga_filter, limit, skip) + result = await self.saga_repo.list_sagas(saga_filter, limit, skip)If you stick with execution-id filtering, ensure compound indexes exist:
- executions: index on user_id
- sagas: index on execution_id (and optionally state)
173-181: Double-check include_all semantics (admin scope)Current condition
user.role != ADMIN or not include_allmeans admins see only their own stats unless they explicitly setinclude_all=True. If the intent was “admins see all by default,” useandinstead.Option if you meant “admins see all unless explicitly scoped”:
- if user.role != UserRole.ADMIN or not include_all: + if user.role != UserRole.ADMIN and not include_all: user_execution_ids = await self.saga_repo.get_user_execution_ids(user.user_id) saga_filter = SagaFilter(execution_ids=user_execution_ids)
182-208: Return type can’t be None; tighten annotation and docstringThis method either returns a
Sagaor raises (fallback callsget_saga_with_access_check, which raises on not found/access denied). Adjust the annotation.- ) -> Saga | None: - """Get saga status from orchestrator with fallback to database.""" + ) -> Saga: + """Get saga status from orchestrator with fallback to database. + Raises SagaAccessDeniedError or SagaNotFoundError on failure."""backend/app/services/rate_limit_service.py (3)
131-137: Remove duplicate _prepare_config calls.Config is prepared in _get_config() and again in check_rate_limit(). Drop the second call to reduce overhead.
if config is None: with self._timer(self.metrics.redis_duration, {"operation": "get_config"}): config = await self._get_config() - ctx.config = config - # Prepare config (compile/sort) - self._prepare_config(config) + ctx.config = configAlso applies to: 394-396
490-512: Use decoded key_str for Redis ops in usage stats.You decode to key_str but still call Redis with key (possibly bytes). Use key_str consistently.
- if algo == "sw": - count = await cast(Awaitable[int], self.redis.zcard(key)) + if algo == "sw": + count = await cast(Awaitable[int], self.redis.zcard(key_str)) stats[endpoint] = {"count": count, "algorithm": "sliding_window"} elif algo == "tb": - bucket_data = await self.redis.get(key) + bucket_data = await self.redis.get(key_str)
222-231: Consider normalizing endpoint in Redis keys to cap cardinality.Keys use raw endpoint (with IDs), which can explode keyspace and index set size. Using normalized_endpoint for key suffixes would reduce memory and scan costs at the expense of per-resource isolation. If acceptable, switch to normalized endpoints for sw/tb keys and indexing.
Also applies to: 280-289
backend/app/core/middlewares/rate_limit.py (3)
95-100: Use getattr for state.user access.Avoid poking into dict; use getattr for safety.
- def _extract_user_id(self, request: Request) -> str: - user: User | None = request.state.__dict__.get("user") + def _extract_user_id(self, request: Request) -> str: + user: User | None = getattr(request.state, "user", None) if user: return str(user.user_id)
57-59: Broaden excluded path match to cover subpaths.Exact matches miss e.g. “/docs/index.html”. Use a prefix check.
- if not self.enabled or path in self.EXCLUDED_PATHS: + if not self.enabled or any(path == p or path.startswith(f"{p}/") for p in self.EXCLUDED_PATHS): await self.app(scope, receive, send) return
85-93: Skip rate-limit headers on fail-open/no-limit statuses.When limit is 0 (fallback), emitting X-RateLimit-* is misleading.
async def send_wrapper(message: Message) -> None: if message["type"] == "http.response.start": - headers = MutableHeaders(scope=message) - headers["X-RateLimit-Limit"] = str(status.limit) - headers["X-RateLimit-Remaining"] = str(status.remaining) - headers["X-RateLimit-Reset"] = str(int(status.reset_at.timestamp())) + if status.limit > 0: + headers = MutableHeaders(scope=message) + headers["X-RateLimit-Limit"] = str(status.limit) + headers["X-RateLimit-Remaining"] = str(status.remaining) + headers["X-RateLimit-Reset"] = str(int(status.reset_at.timestamp())) await send(message)backend/app/core/providers.py (1)
443-461: Manage NotificationService lifecycle; ensure clean shutdown.initialize() starts background tasks but provider returns a long-lived instance with no teardown. Convert to an AsyncIterator provider and stop/shutdown on app shutdown to avoid leaks.
If NotificationService exposes shutdown/close, switch to:
- @provide - def get_notification_service( + @provide + async def get_notification_service( self, notification_repository: NotificationRepository, kafka_event_service: KafkaEventService, event_bus_manager: EventBusManager, schema_registry: SchemaRegistryManager, sse_redis_bus: SSERedisBus, settings: Settings, - ) -> NotificationService: + ) -> AsyncIterator[NotificationService]: service = NotificationService( notification_repository=notification_repository, event_service=kafka_event_service, event_bus_manager=event_bus_manager, schema_registry_manager=schema_registry, sse_bus=sse_redis_bus, settings=settings, ) - service.initialize() - return service + service.initialize() + try: + yield service + finally: + # Prefer explicit shutdown if available + shutdown = getattr(service, "shutdown", None) + if callable(shutdown): + result = shutdown() + if asyncio.iscoroutine(result): + await resultbackend/app/services/event_replay/replay_service.py (4)
71-81: OTel attribute types: use strings for enums.Ensure attributes are OTLP-compatible.
- with trace_span( + with trace_span( name="event_replay.session", kind=SpanKind.INTERNAL, attributes={ "replay.session_id": str(session.session_id), - "replay.type": session.config.replay_type, - "replay.target": session.config.target, + "replay.type": str(session.config.replay_type), + "replay.target": str(session.config.target), } ):
260-268: OTel attribute types: use strings for enums.- with trace_span( + with trace_span( name="event_replay.process_batch", kind=SpanKind.INTERNAL, attributes={ "replay.session_id": str(session.session_id), "replay.batch.count": len(batch), - "replay.target": session.config.target, + "replay.target": str(session.config.target), }, ):
273-286: Throttle DB updates inside the batch loop.Updating after every event is heavy. Flush periodically and at completion.
- self._update_replay_metrics(session, event, success) - session.last_event_at = event.timestamp - await self._update_session_in_db(session) + self._update_replay_metrics(session, event, success) + session.last_event_at = event.timestamp + # Flush progress every 50 events (tune as needed) + if (session.replayed_events + session.failed_events + session.skipped_events) % 50 == 0: + await self._update_session_in_db(session)
338-340: Specify file encoding.Be explicit to avoid locale-dependent behavior.
- with open(file_path, 'a') as f: + with open(file_path, 'a', encoding='utf-8') as f:backend/app/api/routes/grafana_alerts.py (2)
17-21: Fallback when no correlation_id is present.If middleware didn’t set one, generate and set it to keep traces linked.
- correlation_id = CorrelationContext.get_correlation_id() + correlation_id = CorrelationContext.get_correlation_id() + if not correlation_id: + correlation_id = CorrelationContext.generate_correlation_id() + CorrelationContext.set_correlation_id(correlation_id)
33-39: Avoid hardcoded URL; derive from routing.Hardcoding “/api/v1/alerts/grafana” can drift. Build it from the route name.
-@router.get("/grafana/test") -async def test_grafana_alert_endpoint() -> dict[str, str]: - return { +from fastapi import Request + +@router.get("/grafana/test") +async def test_grafana_alert_endpoint(request: Request) -> dict[str, str]: + return { "status": "ok", "message": "Grafana webhook endpoint is ready", - "webhook_url": "/api/v1/alerts/grafana", + "webhook_url": request.app.url_path_for("receive_grafana_alerts"), }backend/app/services/event_service.py (2)
88-91: Centralize user scoping with _build_user_filter.Avoid duplicating access logic; merge builder output when caller didn’t specify user_id.
- query = EventFilterMapper.to_mongo_query(filters) - if not filters.user_id and user_role != UserRole.ADMIN: - query["metadata.user_id"] = user_id + query = EventFilterMapper.to_mongo_query(filters) + if not filters.user_id: + user_filter = self._build_user_filter(user_id, user_role) + query.update(user_filter)
1-2: Typing consistency (List vs list).Prefer built‑ins (list, dict) across signatures/returns for PEP‑585 consistency.
Also applies to: 35-36, 54-63, 75-83, 112-123, 170-177, 193-204
backend/app/db/repositories/replay_repository.py (3)
118-123: Reduce memory with cursor batch_size.Let Motor handle batching rather than accumulating large in‑mem lists.
- cursor = self.events_collection.find(query).sort("timestamp", 1).skip(skip) + cursor = ( + self.events_collection.find(query) + .sort("timestamp", 1) + .skip(skip) + .batch_size(batch_size) + )
1-3: Typing/style consistency.Mixed use of typing.List and builtin list. Prefer built‑ins for new code.
Also applies to: 54-55, 85-88
20-33: Index builds: ensure idempotency ordering and awaitability are fine.Looks good; consider wrapping in try/except to log per‑index failures without aborting the whole routine if desired.
backend/app/api/routes/admin/settings.py (3)
14-19: Avoid duplicate admin dependency execution.You both set a router‑level dependency and inject admin per endpoint. Keep only the parameter‑level Depends to also access the user object.
router = APIRouter( prefix="/admin/settings", tags=["admin", "settings"], route_class=DishkaRoute, - dependencies=[Depends(admin_user)] )
45-52: Overbroad exception mapped to 400; prefer 500 for unexpected errors.The second generic except masks unexpected failures as 400. Narrow or convert to 500.
- except Exception: - raise HTTPException(status_code=400, detail="Invalid settings format") + except Exception: + raise HTTPException(status_code=500, detail="Failed to parse settings")
61-64: Mapper reuse and Pydantic v2 helpers.You can reuse a single SettingsMapper and use model_validate for clarity.
- settings_mapper = SettingsMapper() - return SystemSettings(**settings_mapper.system_settings_to_pydantic_dict(updated_domain_settings)) + mapper = SettingsMapper() + return SystemSettings.model_validate(mapper.system_settings_to_pydantic_dict(updated_domain_settings)) @@ - settings_mapper = SettingsMapper() - return SystemSettings(**settings_mapper.system_settings_to_pydantic_dict(reset_domain_settings)) + mapper = SettingsMapper() + return SystemSettings.model_validate(mapper.system_settings_to_pydantic_dict(reset_domain_settings)) @@ - settings_mapper = SettingsMapper() - return SystemSettings(**settings_mapper.system_settings_to_pydantic_dict(domain_settings)) + mapper = SettingsMapper() + return SystemSettings.model_validate(mapper.system_settings_to_pydantic_dict(domain_settings))Also applies to: 76-77, 28-31
backend/app/dlq/manager.py (1)
205-217: Race: scheduling then immediate retry can double‑process.You set status=SCHEDULED then immediately retry; the monitor may also pick it up before status flips to RETRIED.
Apply:
- # Calculate next retry time - next_retry = retry_policy.get_next_retry_time(message) - - # Update message status - await self._update_message_status( - message.event_id, - DLQMessageUpdate(status=DLQMessageStatus.SCHEDULED, next_retry_at=next_retry), - ) - - # If immediate retry, process now - if retry_policy.strategy == RetryStrategy.IMMEDIATE: - await self._retry_message(message) + # Immediate retries: process now to avoid monitor race + if retry_policy.strategy == RetryStrategy.IMMEDIATE: + await self._retry_message(message) + return + + # Calculate next retry time and schedule + next_retry = retry_policy.get_next_retry_time(message) + await self._update_message_status( + message.event_id, + DLQMessageUpdate(status=DLQMessageStatus.SCHEDULED, next_retry_at=next_retry), + )backend/app/services/admin/admin_events_service.py (2)
178-195: Log the actual exported row count (after limit).You slice rows by
limitbut loglen(rows), which can mislead.Apply:
- rows = await self._repo.export_events_csv(filter) + rows = await self._repo.export_events_csv(filter) output = StringIO() @@ - for row in rows[:limit]: + limited_rows = rows[:limit] + for row in limited_rows: writer.writerow(row_mapper.to_dict(row)) @@ - logger.info("Exported events CSV", extra={ - "row_count": len(rows), + logger.info("Exported events CSV", extra={ + "row_count": len(limited_rows), "filename": filename, })
142-151: Persist target metadata on scheduled replay.Include
target_serviceanddry_run=Falsein the session update for observability/consistency.Apply:
session_update = ReplaySessionUpdate( total_events=session_data.total_events, correlation_id=replay_correlation_id, status=ReplayStatus.SCHEDULED, + target_service=target_service, + dry_run=False, )backend/app/services/grafana_alert_processor.py (1)
149-157: Optional: parallelize per‑alert processing for large batches.If webhooks carry many alerts,
awaiting sequentially can be slow; considergatherwith a bounded semaphore.backend/app/api/routes/dlq.py (3)
177-179: Avoid calling a private method from the route.
dlq_manager._discard_message(...)is private by convention. Provide a public method (e.g.,discard_message(message: DLQMessage, reason: str)) and use it here.Example:
- await dlq_manager._discard_message(message_data, f"manual: {reason}") + await dlq_manager.discard_message(message_data, f"manual: {reason}")
68-87: Be robust to domain Enum/string for status.If
msg.statusis an Enum, constructing the schema Enum with it can fail. Guard for both.Apply:
- status=DLQMessageStatus(msg.status), + status=DLQMessageStatus(msg.status if isinstance(msg.status, str) else msg.status.value),
107-126: Same robustness for detail endpoint status.Apply:
- status=DLQMessageStatus(message.status), + status=DLQMessageStatus(message.status if isinstance(message.status, str) else message.status.value),backend/app/db/repositories/event_repository.py (3)
33-44: Normalize naive datetimes to UTC in time filterIf a naive
datetimesneaks in, Mongo time comparisons can be inconsistent. Normalize to UTC here.Apply this diff:
def _build_time_filter( self, - start_time: datetime | None, - end_time: datetime | None + start_time: datetime | None, + end_time: datetime | None ) -> dict[str, object]: """Build time range filter, eliminating if-else branching.""" + # Normalize naive datetimes to UTC to avoid inconsistent comparisons + if start_time and start_time.tzinfo is None: + start_time = start_time.replace(tzinfo=timezone.utc) + if end_time and end_time.tzinfo is None: + end_time = end_time.replace(tzinfo=timezone.utc) return { key: value for key, value in { "$gte": start_time, "$lte": end_time }.items() if value is not None }
85-103: Batch insert return value may desync from actual insertsReturning
[event.event_id for event in events]ignores partial insert failures (e.g., duplicate keys withordered=False). Preferresult.inserted_idswhen authoritative, or surface partial-failure info.Example:
- logger.info(f"Stored {len(result.inserted_ids)} events in batch") - return [event.event_id for event in events] + logger.info(f"Stored {len(result.inserted_ids)} events in batch") + return [str(_id) for _id in result.inserted_ids]If
_iddiffers fromevent_id, consider returning a mapping ofevent_id -> ok/failed.
337-345: Change stream yields raw docsYou’re yielding raw Mongo documents while most readers use mapped domain models. If intentional, document this API; otherwise, map via
self.mapper.backend/app/api/routes/user_settings.py (3)
62-66: Avoid calling “private” mapper APIs
UserSettingsApiMapper._to_domain_notificationsis a pseudo-private method. Expose and use a public method to prevent accidental changes.
75-79: Same: avoid private mapper for editor mappingUse a public helper instead of
_to_domain_editor.
102-111: Consider response_model for custom setting endpointAdd
response_model=UserSettings(you already return it) for consistent OpenAPI docs.Apply:
-@router.put("/custom/{key}") +@router.put("/custom/{key}", response_model=UserSettings) async def update_custom_setting(backend/app/api/routes/admin/events.py (5)
72-75: Constrain hours >= 1Prevent negative/zero hours; set
ge=1to match typical stats semantics.Apply:
- hours: int = Query(default=24, le=168), + hours: int = Query(default=24, ge=1, le=168),
203-215: Validate start_time <= end_time for CSV exportAdd a quick guard; otherwise the repository gets an inverted window.
Apply:
try: - export_filter = EventFilterMapper.from_admin_pydantic( + if start_time and end_time and start_time > end_time: + raise HTTPException(status_code=400, detail="start_time must be <= end_time") + export_filter = EventFilterMapper.from_admin_pydantic(
235-251: Validate start_time <= end_time for JSON exportSame guard here.
Apply:
try: - export_filter = EventFilterMapper.from_admin_pydantic( + if start_time and end_time and start_time > end_time: + raise HTTPException(status_code=400, detail="start_time must be <= end_time") + export_filter = EventFilterMapper.from_admin_pydantic(
120-133: Don’t branch on exception message textParsing
ValueErrormessages is brittle. Prefer specific exception types from the service (e.g.,NoEventsError,TooManyEventsError) and map them to 404/400.
35-40: Duplicate admin guardRouter-level
dependencies=[Depends(admin_user)]plus per-endpoint admin parameters is redundant on browse/stats/detail/export. Keep the param only where you needadmin’s identity (e.g., delete).backend/app/api/routes/admin/users.py (1)
120-138: Role mapping may double-wrap enums
UserUpdate.roleis likely alreadyUserRole; wrapping withUserRole(update_dict["role"])is unnecessary and can misfire if values differ in case.Apply:
- domain_update = DomainUserUpdate( + domain_update = DomainUserUpdate( username=update_dict.get("username"), email=update_dict.get("email"), - role=UserRole(update_dict["role"]) if "role" in update_dict else None, + role=update_dict.get("role"), is_active=update_dict.get("is_active"), password=update_dict.get("password"), )backend/app/services/notification_service.py (2)
159-171: Kafka consumer never started
initialize()doesn’t call_subscribe_to_events(). If another orchestrator won’t call it, execution-based notifications won’t fire.Option: add a settings flag to optionally start the consumer in
initialize().
828-841: SSE payload: ensure JSON-safe typesIf
notification_idis UUID, ensure string serialization before publishing.Apply:
- "notification_id": notification.notification_id, + "notification_id": str(notification.notification_id),backend/app/api/routes/execution.py (4)
110-115: Avoid hard-coded service identity in idempotency pseudo-event.Use settings for service name/version for consistency with other endpoints.
- pseudo_event = BaseEvent( + settings = get_settings() + pseudo_event = BaseEvent( event_id=str(uuid4()), - event_type=EventType.EXECUTION_REQUESTED, + event_type=EventType.EXECUTION_REQUESTED, # keep domain enum if BaseEvent expects enum timestamp=datetime.now(timezone.utc), metadata=EventMetadata( user_id=current_user.user_id, correlation_id=str(uuid4()), - service_name="api", - service_version="1.0.0" + service_name=settings.SERVICE_NAME, + service_version=settings.SERVICE_VERSION, ) )
91-99: Don’t hard-code route in tracing attributes.Use the actual request path to avoid drift.
- "http.route": "/api/v1/execute", + "http.route": request.url.path,
48-50: Name shadowing: parameter vs dependency function.
current_user: Annotated[..., Depends(current_user)]is valid but can be confusing. Consider renaming the parameter touserfor clarity.
336-342: has_more computation can be more precise.Use the actual returned count:
has_more = (skip + len(execution_results)) < total_count.- has_more=(skip + limit) < total_count + has_more=(skip + len(execution_results)) < total_countbackend/app/api/routes/events.py (4)
80-92: Pass SortOrder value to the service.Service layer expects a string; passing the enum may break
.lower()or comparisons.- sort_order=sort_order + sort_order=sort_order.value
275-286: Robust metadata merge for Pydantic models or dicts.
model_copy(update=event_request.metadata)may ignore extras or fail ifmetadatais a model. Safely merge withmodel_dump(exclude_unset=True)when applicable.+from pydantic import BaseModel @@ - if event_request.metadata: - base_meta = base_meta.model_copy(update=event_request.metadata) + if event_request.metadata: + update_payload = ( + event_request.metadata.model_dump(exclude_unset=True) + if isinstance(event_request.metadata, BaseModel) + else event_request.metadata + ) + base_meta = base_meta.model_copy(update=update_payload)
366-374:target_serviceis unused.If intentional, annotate with a comment/TODO; otherwise, wire it into the replay publish (e.g., header or payload) or drop the param.
Would you like a follow-up patch to include
target_servicein the replayed event headers/payload?
38-48: Route overlap naming: “execution events” exist in two routers.There’s also
/executions/{execution_id}/eventsunder the execution router. Consider consolidating or clearly documenting the difference to avoid API confusion.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (25)
backend/app/api/dependencies.py(1 hunks)backend/app/api/routes/admin/events.py(6 hunks)backend/app/api/routes/admin/settings.py(1 hunks)backend/app/api/routes/admin/users.py(2 hunks)backend/app/api/routes/dlq.py(6 hunks)backend/app/api/routes/events.py(14 hunks)backend/app/api/routes/execution.py(13 hunks)backend/app/api/routes/grafana_alerts.py(1 hunks)backend/app/api/routes/replay.py(3 hunks)backend/app/api/routes/user_settings.py(4 hunks)backend/app/core/middlewares/rate_limit.py(3 hunks)backend/app/core/providers.py(17 hunks)backend/app/db/repositories/admin/admin_events_repository.py(6 hunks)backend/app/db/repositories/dlq_repository.py(4 hunks)backend/app/db/repositories/event_repository.py(19 hunks)backend/app/db/repositories/replay_repository.py(4 hunks)backend/app/dlq/manager.py(9 hunks)backend/app/domain/admin/replay_updates.py(1 hunks)backend/app/services/admin/admin_events_service.py(1 hunks)backend/app/services/event_replay/replay_service.py(3 hunks)backend/app/services/event_service.py(4 hunks)backend/app/services/grafana_alert_processor.py(1 hunks)backend/app/services/notification_service.py(22 hunks)backend/app/services/rate_limit_service.py(9 hunks)backend/app/services/saga/saga_service.py(6 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- backend/app/db/repositories/admin/admin_events_repository.py
- backend/app/api/routes/replay.py
🧰 Additional context used
🧬 Code graph analysis (22)
backend/app/services/admin/admin_events_service.py (2)
backend/app/db/repositories/admin/admin_events_repository.py (10)
browse_events(62-90)get_event_detail(92-119)get_event_stats(126-205)build_replay_query(400-402)prepare_replay_session(404-433)update_replay_session(251-262)get_replay_status_with_progress(264-379)export_events_csv(207-225)delete_event(121-124)archive_event(227-236)backend/app/domain/admin/replay_updates.py (2)
ReplaySessionUpdate(10-56)to_dict(25-52)
backend/app/services/grafana_alert_processor.py (1)
backend/app/services/notification_service.py (1)
create_system_notification(320-368)
backend/app/services/saga/saga_service.py (2)
backend/app/services/saga/saga_orchestrator.py (1)
SagaOrchestrator(31-512)backend/app/db/repositories/saga_repository.py (1)
get_user_execution_ids(108-114)
backend/app/db/repositories/replay_repository.py (2)
backend/app/domain/admin/replay_updates.py (3)
ReplaySessionUpdate(10-56)has_updates(54-56)to_dict(25-52)backend/app/db/repositories/admin/admin_events_repository.py (1)
update_replay_session(251-262)
backend/app/api/dependencies.py (1)
backend/app/core/security.py (1)
get_current_user(48-70)
backend/app/dlq/manager.py (3)
backend/app/core/providers.py (2)
get_dlq_metrics(283-284)get_settings(80-81)backend/app/events/schema/schema_registry.py (1)
SchemaRegistryManager(53-223)backend/tests/unit/dlq/test_dlq_models.py (1)
headers(78-79)
backend/app/api/routes/grafana_alerts.py (2)
backend/app/core/correlation.py (2)
CorrelationContext(11-39)get_correlation_id(23-24)backend/app/services/grafana_alert_processor.py (2)
GrafanaAlertProcessor(11-168)process_webhook(122-168)
backend/app/services/rate_limit_service.py (1)
backend/tests/unit/services/test_rate_limit_service.py (5)
zremrangebyscore(173-173)zadd(174-174)zcard(175-175)expire(176-176)execute(177-177)
backend/app/api/routes/dlq.py (2)
backend/app/api/dependencies.py (1)
current_user(10-15)backend/app/db/repositories/dlq_repository.py (4)
DLQRepository(26-274)get_dlq_stats(31-118)retry_messages_batch(221-274)get_message_by_id(153-158)
backend/app/db/repositories/event_repository.py (2)
backend/app/api/routes/events.py (4)
get_event(251-265)get_events_by_correlation(150-177)get_event_statistics(217-247)list_event_types(319-327)backend/app/services/event_service.py (4)
get_event(140-152)get_events_by_correlation(112-123)get_event_statistics(125-138)list_event_types(170-176)
backend/app/services/event_replay/replay_service.py (2)
backend/app/db/repositories/replay_repository.py (2)
ReplayRepository(13-128)update_replay_session(89-103)backend/app/domain/admin/replay_updates.py (1)
ReplaySessionUpdate(10-56)
backend/app/api/routes/user_settings.py (1)
backend/app/api/dependencies.py (1)
current_user(10-15)
backend/app/core/middlewares/rate_limit.py (1)
backend/app/services/rate_limit_service.py (2)
RateLimitService(26-512)check_rate_limit(108-212)
backend/app/db/repositories/dlq_repository.py (1)
backend/app/dlq/manager.py (1)
DLQManager(29-394)
backend/app/services/notification_service.py (3)
backend/app/core/providers.py (1)
get_notification_metrics(287-288)backend/app/db/repositories/notification_repository.py (8)
NotificationRepository(18-293)get_users_by_roles(247-261)get_active_users(263-293)cleanup_old_notifications(200-203)count_notifications(141-147)try_claim_pending(157-172)get_subscription(206-214)update_notification(55-60)backend/app/events/core/consumer.py (1)
UnifiedConsumer(22-259)
backend/app/api/routes/events.py (4)
backend/app/api/dependencies.py (2)
admin_user(19-24)current_user(10-15)backend/app/services/event_service.py (3)
EventService(19-203)get_execution_events(29-52)delete_event_with_archival(178-188)backend/app/services/kafka_event_service.py (2)
KafkaEventService(23-220)publish_event(35-146)backend/app/core/providers.py (1)
get_settings(80-81)
backend/app/services/event_service.py (1)
backend/app/db/repositories/event_repository.py (8)
get_events_by_correlation(139-147)get_event_statistics(200-259)get_event_statistics_filtered(261-319)get_event(104-106)aggregate_events(461-475)list_event_types(477-488)delete_event_with_archival(516-557)get_events_by_aggregate(125-137)
backend/app/api/routes/admin/settings.py (1)
backend/app/api/dependencies.py (1)
admin_user(19-24)
backend/app/api/routes/execution.py (4)
backend/app/api/dependencies.py (2)
admin_user(19-24)current_user(10-15)backend/app/services/idempotency/idempotency_manager.py (2)
get_cached_json(268-273)mark_completed_with_json(253-266)backend/app/services/kafka_event_service.py (1)
publish_event(35-146)backend/app/core/providers.py (1)
get_settings(80-81)
backend/app/core/providers.py (8)
backend/app/events/core/producer.py (1)
UnifiedProducer(23-288)backend/app/events/schema/schema_registry.py (2)
SchemaRegistryManager(53-223)create_schema_registry_manager(226-227)backend/app/services/grafana_alert_processor.py (1)
GrafanaAlertProcessor(11-168)backend/app/services/idempotency/idempotency_manager.py (3)
IdempotencyConfig(27-34)IdempotencyManager(69-310)create_idempotency_manager(313-318)backend/app/services/notification_service.py (2)
NotificationService(110-972)initialize(159-170)backend/app/services/saga/saga_orchestrator.py (2)
SagaOrchestrator(31-512)create_saga_orchestrator(515-540)backend/app/core/database_context.py (2)
AsyncDatabaseConnection(80-170)disconnect(130-135)backend/app/dlq/manager.py (4)
start(67-80)stop(82-102)DLQManager(29-394)create_dlq_manager(396-431)
backend/app/api/routes/admin/events.py (4)
backend/app/api/dependencies.py (1)
admin_user(19-24)backend/app/services/admin/admin_events_service.py (10)
AdminEventsService(54-250)browse_events(59-70)get_event_stats(75-76)get_event_detail(72-73)prepare_or_schedule_replay(78-165)start_replay_session(167-168)get_replay_status(170-172)delete_event(235-250)export_events_csv_content(178-195)export_events_json_content(197-233)backend/app/db/repositories/admin/admin_events_repository.py (4)
browse_events(62-90)get_event_stats(126-205)get_event_detail(92-119)delete_event(121-124)backend/app/api/routes/events.py (1)
delete_event(331-357)
backend/app/api/routes/admin/users.py (3)
backend/app/api/dependencies.py (1)
admin_user(19-24)backend/app/db/repositories/admin/admin_user_repository.py (1)
AdminUserRepository(19-156)backend/app/services/rate_limit_service.py (1)
get_user_rate_limit_summaries(474-480)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Backend Tests
- GitHub Check: Backend Tests
🔇 Additional comments (14)
backend/app/services/rate_limit_service.py (1)
241-269: Verify X-RateLimit-Reset semantics for sliding window.reset_at is set to now + window_seconds even when under limit. Many clients expect reset_at to reflect when the oldest hit expires. Confirm intended header semantics.
backend/app/core/middlewares/rate_limit.py (1)
101-114: Confirm RateLimitStatus defaults.Fallback status omits algorithm; verify RateLimitStatus.algorithm has a safe default to avoid runtime errors in downstream usage.
backend/app/domain/admin/replay_updates.py (1)
9-57: LGTM: well-scoped typed update model.Dataclass + to_dict/has_updates is clean and practical.
backend/app/api/dependencies.py (2)
9-16: LGTM on DI-based auth dependencies.Clean handoff to AuthService; minimal surface and consistent return type.
Also applies to: 18-24
1-7: No lingering imports/usages found.
Search for get_current_user_optional, require_auth_guard, and require_admin_guard returned no matches across the repository; dependencies.py appropriately uses auth_service.get_current_user/get_admin.backend/app/dlq/manager.py (1)
72-73: Verify topic type for subscribe.Ensure
KafkaTopicis astr(orStrEnum) when passed toconsumer.subscribe([self.dlq_topic]); otherwise usestr(self.dlq_topic).Would you confirm
KafkaTopicis a string enum across the codebase?backend/app/db/repositories/dlq_repository.py (1)
31-118: Confirm DLQStatistics.timestamp availability.API layer reads
stats.timestamp; ensureDLQStatisticssets this field (e.g., defaulting todatetime.now(timezone.utc)), or populate it here.If missing, I can add it in repository or the domain model—your preference?
backend/app/api/routes/dlq.py (1)
35-48: Verifystats.timestamppresence.The repository’s return may not populate this field. Ensure the domain model provides it or set it before response mapping.
backend/app/api/routes/user_settings.py (1)
26-33: DI switch to current_user looks goodClean move from Request/AuthService to DI; endpoint remains simple and typed.
backend/app/api/routes/admin/users.py (2)
150-160: Good guard: prevent self-deletionSolid UX/safety check.
53-71: No Pydantic validation risk — UserResponse already includes rate-limit fieldsUserResponse in backend/app/schemas_pydantic/user.py defines optional bypass_rate_limit, global_multiplier, and has_custom_limits, so injecting those keys will not trigger a Pydantic extras error.
backend/app/services/notification_service.py (1)
606-625: Good: typed handlers + safe metadata shapingTyped event handlers with truncated payloads are a sensible balance.
backend/app/api/routes/events.py (2)
392-414: Stable replay correlation id when none present.Avoid
replay_None. Fall back to a UUID.[suggest_minor_issue]
+from uuid import uuid4 @@ - replay_correlation_id = f"replay_{CorrelationContext.get_correlation_id()}" + base_cid = CorrelationContext.get_correlation_id() or str(uuid4()) + replay_correlation_id = f"replay_{base_cid}"
250-266: Access control path looks good.Fetching an event with owner/admin enforcement via
EventService.get_eventaligns with the new auth model.
| if idempotency_result.is_duplicate: | ||
| cached_json = await idempotency_manager.get_cached_json( | ||
| event=pseudo_event, | ||
| key_strategy="custom", | ||
| custom_key=f"http:{current_user.user_id}:{idempotency_key}", | ||
| ) | ||
| return ExecutionResponse.model_validate_json(cached_json) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Prevent double-processing: handle “processing/in-flight” idempotency state.
After check_and_reserve, only duplicates are handled. If the key is currently processing, this path will run a second execution and may overwrite the cached result. Return 409 (or 202) for in‑flight requests.
Apply:
if idempotency_result.is_duplicate:
cached_json = await idempotency_manager.get_cached_json(
event=pseudo_event,
key_strategy="custom",
custom_key=f"http:{current_user.user_id}:{idempotency_key}",
)
return ExecutionResponse.model_validate_json(cached_json)
+ # If a request with the same idempotency key is already processing, avoid double execution
+ elif getattr(idempotency_result, "is_processing", False):
+ raise HTTPException(
+ status_code=409,
+ detail="Request with this Idempotency-Key is already processing"
+ )🏁 Script executed:
#!/bin/bash
# Verify IdempotencyResult flags and naming
rg -nP 'class\s+IdempotencyResult\b' -A20
rg -nP '\bidempotency_result\.\w+' -g '!**/tests/**' -C2Length of output: 1978
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== backend/app/services/idempotency/idempotency_manager.py (first 300 lines) ==="
sed -n '1,300p' backend/app/services/idempotency/idempotency_manager.py || true
echo
echo "=== Search for IdempotencyStatus, is_processing, check_and_reserve, get_cached_json, idempotency_result usages ==="
rg -nP --hidden -uu 'class\s+IdempotencyStatus\b' -S || true
rg -nP --hidden -uu '\bIdempotencyStatus\b' -S || true
rg -nP --hidden -uu '\bis_processing\b' -S || true
rg -nP --hidden -uu '\bcheck_and_reserve\b' -S || true
rg -nP --hidden -uu '\bget_cached_json\b' -S || true
rg -nP --hidden -uu '\bidempotency_result\.\w+' -g '!**/tests/**' -C3 -S || trueLength of output: 31136
Prevent double-processing — reject in‑flight idempotency keys (409)
Do not treat every duplicate as a cached result. If idempotency_result.status == IdempotencyStatus.PROCESSING (in‑flight), raise HTTPException(status_code=409) before calling get_cached_json; only fetch cached_json when the key is completed and has_cached_result is True. Import IdempotencyStatus into the route.
File: backend/app/api/routes/execution.py lines 125-132
🤖 Prompt for AI Agents
In backend/app/api/routes/execution.py around lines 125-132, the current logic
treats every duplicate idempotency key as a cached result; modify it to import
IdempotencyStatus and check idempotency_result.status: if it equals
IdempotencyStatus.PROCESSING raise an HTTPException(status_code=409) to reject
in-flight requests; only call idempotency_manager.get_cached_json and return
ExecutionResponse when the status indicates completion and
idempotency_result.has_cached_result is True; ensure IdempotencyStatus (and
HTTPException if not already imported) are added to the file imports.
| settings = get_settings() | ||
| payload = { | ||
| "execution_id": execution.execution_id, | ||
| "status": str(ExecutionStatus.CANCELLED), | ||
| "reason": cancel_request.reason or "User requested cancellation", | ||
| "previous_status": str(execution.status), | ||
| } | ||
| meta = EventMetadata( | ||
| service_name=settings.SERVICE_NAME, | ||
| service_version=settings.SERVICE_VERSION, | ||
| user_id=current_user.user_id, | ||
| metadata={ | ||
| "reason": cancel_request.reason or "User requested cancellation", | ||
| "previous_status": execution.status, | ||
| } | ||
| ) | ||
| event_id = await event_service.publish_event( | ||
| event_type=EventType.EXECUTION_CANCELLED, | ||
| payload=payload, | ||
| aggregate_id=execution.execution_id, | ||
| metadata=meta, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emit canonical enum values in events and pass event_type as string.
Use .value instead of str(enum) for payload fields, and pass a string to Kafka publish to avoid type mismatches.
- payload = {
- "execution_id": execution.execution_id,
- "status": str(ExecutionStatus.CANCELLED),
- "reason": cancel_request.reason or "User requested cancellation",
- "previous_status": str(execution.status),
- }
+ payload = {
+ "execution_id": execution.execution_id,
+ "status": ExecutionStatus.CANCELLED.value,
+ "reason": cancel_request.reason or "User requested cancellation",
+ "previous_status": execution.status.value,
+ }
@@
- event_id = await event_service.publish_event(
- event_type=EventType.EXECUTION_CANCELLED,
+ event_id = await event_service.publish_event(
+ event_type=EventType.EXECUTION_CANCELLED.value,
payload=payload,
aggregate_id=execution.execution_id,
metadata=meta,
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| settings = get_settings() | |
| payload = { | |
| "execution_id": execution.execution_id, | |
| "status": str(ExecutionStatus.CANCELLED), | |
| "reason": cancel_request.reason or "User requested cancellation", | |
| "previous_status": str(execution.status), | |
| } | |
| meta = EventMetadata( | |
| service_name=settings.SERVICE_NAME, | |
| service_version=settings.SERVICE_VERSION, | |
| user_id=current_user.user_id, | |
| metadata={ | |
| "reason": cancel_request.reason or "User requested cancellation", | |
| "previous_status": execution.status, | |
| } | |
| ) | |
| event_id = await event_service.publish_event( | |
| event_type=EventType.EXECUTION_CANCELLED, | |
| payload=payload, | |
| aggregate_id=execution.execution_id, | |
| metadata=meta, | |
| ) | |
| settings = get_settings() | |
| payload = { | |
| "execution_id": execution.execution_id, | |
| "status": ExecutionStatus.CANCELLED.value, | |
| "reason": cancel_request.reason or "User requested cancellation", | |
| "previous_status": execution.status.value, | |
| } | |
| meta = EventMetadata( | |
| service_name=settings.SERVICE_NAME, | |
| service_version=settings.SERVICE_VERSION, | |
| user_id=current_user.user_id, | |
| ) | |
| event_id = await event_service.publish_event( | |
| event_type=EventType.EXECUTION_CANCELLED.value, | |
| payload=payload, | |
| aggregate_id=execution.execution_id, | |
| metadata=meta, | |
| ) |
🤖 Prompt for AI Agents
In backend/app/api/routes/execution.py around lines 214 to 231, the event
payload and publish call use str(enum) and an Enum for event_type which can
cause type mismatches; change status and previous_status to use
ExecutionStatus.CANCELLED.value and execution.status.value (or .value on
whatever enum instance) so the payload carries canonical string values, and pass
event_type as EventType.EXECUTION_CANCELLED.value (a string) to
event_service.publish_event.
| @@ -0,0 +1,39 @@ | |||
| from dishka import FromDishka | |||
| from dishka.integrations.fastapi import DishkaRoute | |||
| from fastapi import APIRouter | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Protect the webhook with a shared secret/token.
Unauthenticated public webhook invites abuse. Validate an HMAC or at least a shared token sent via header (e.g., X-Grafana-Token). Gate requests when the secret is set.
Apply this minimal env‑based guard:
-from fastapi import APIRouter
+import os
+from fastapi import APIRouter, Header, HTTPException, status
@@
-async def receive_grafana_alerts(
+async def receive_grafana_alerts(
webhook_payload: GrafanaWebhook,
- processor: FromDishka[GrafanaAlertProcessor],
+ processor: FromDishka[GrafanaAlertProcessor],
+ x_grafana_token: str | None = Header(default=None, alias="X-Grafana-Token"),
) -> AlertResponse:
- correlation_id = CorrelationContext.get_correlation_id()
+ # Optional shared-secret check; if env is set, enforce it.
+ secret = os.getenv("GRAFANA_WEBHOOK_TOKEN")
+ if secret and x_grafana_token != secret:
+ raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid webhook token")
+
+ correlation_id = CorrelationContext.get_correlation_id()Also applies to: 12-21
🤖 Prompt for AI Agents
In backend/app/api/routes/grafana_alerts.py around lines 3 and 12-21, the
webhook endpoint is currently unauthenticated; add an environment-configured
shared secret check: read the secret from config/env (e.g.,
GRAFANA_WEBHOOK_SECRET), and if the secret is non-empty require an
X-Grafana-Token header on incoming requests; compare the provided token to the
configured secret using a constant-time comparison (e.g., hmac.compare_digest)
and return a 401/403 response when missing or mismatched; keep behavior
permissive when the env secret is empty (no gate), and log authentication
failures for monitoring.
| # Replay sessions indexes | ||
| await self.replay_collection.create_index([("session_id", ASCENDING)], unique=True) | ||
| await self.replay_collection.create_index([("status", ASCENDING)]) | ||
| await self.replay_collection.create_index([("created_at", DESCENDING)]) | ||
| await self.replay_collection.create_index([("user_id", ASCENDING)]) | ||
|
|
||
| # Events collection indexes for replay queries | ||
| await self.events_collection.create_index([("execution_id", 1), ("timestamp", 1)]) | ||
| await self.events_collection.create_index([("event_type", 1), ("timestamp", 1)]) | ||
| await self.events_collection.create_index([("metadata.user_id", 1), ("timestamp", 1)]) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Index/query mismatch for user_id filter.
You index replay_collection.user_id but query on config.filter.user_id; the index won’t help. Either change the query to use user_id or add a matching index.
await self.replay_collection.create_index([("user_id", ASCENDING)])
+ await self.replay_collection.create_index([("config.filter.user_id", ASCENDING)])
@@
- if user_id:
- query["config.filter.user_id"] = user_id
+ if user_id:
+ # Prefer the indexed field; if domain exposes root-level user_id, use it.
+ query["user_id"] = user_idIf root-level user_id doesn’t exist in documents, keep the nested query and the added index.
Also applies to: 55-63
🤖 Prompt for AI Agents
In backend/app/db/repositories/replay_repository.py around lines 21-31 (and
similarly at lines 55-63), the code creates an index on the root field "user_id"
but the application queries use the nested path "config.filter.user_id", so the
index will not be used; fix this by either changing the query to filter on the
root "user_id" field (if that field actually exists on documents) or by creating
an index on the nested path "config.filter.user_id" to match the query; apply
the same correction to the other index block at lines 55-63 so indexes and query
paths align.
| self.consumer.close() | ||
| self.producer.flush(10) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t block the event loop during shutdown.
consumer.close() and producer.flush(10) are synchronous and can block the loop inside an async def.
Apply:
- self.consumer.close()
- self.producer.flush(10)
+ await asyncio.to_thread(self.consumer.close)
+ await asyncio.to_thread(self.producer.flush, 10)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| self.consumer.close() | |
| self.producer.flush(10) | |
| await asyncio.to_thread(self.consumer.close) | |
| await asyncio.to_thread(self.producer.flush, 10) |
🤖 Prompt for AI Agents
In backend/app/dlq/manager.py around lines 99-101, the synchronous calls
self.consumer.close() and self.producer.flush(10) can block the asyncio event
loop inside an async def; make these calls non-blocking by running them off the
loop (e.g., wrap them with asyncio.to_thread or loop.run_in_executor) or switch
to the async client methods if available, await the wrapped tasks, and
handle/propagate any exceptions so shutdown does not block the event loop.
| # Get counts | ||
| additional_filters: dict[str, object] | None = {"status": status} if status else None | ||
| total, unread_count = await asyncio.gather( | ||
| self.repository.count_notifications(user_id, additional_filters), | ||
| self.repository.count_notifications(user_id, {"status": status}), | ||
| self.get_unread_count(user_id) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: total count uses {"status": None} when status not provided
Passing {"status": None} skews totals to 0. Only include status when not None.
Apply:
- total, unread_count = await asyncio.gather(
- self.repository.count_notifications(user_id, {"status": status}),
+ additional = {"status": status} if status is not None else None
+ total, unread_count = await asyncio.gather(
+ self.repository.count_notifications(user_id, additional),
self.get_unread_count(user_id)
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Get counts | |
| additional_filters: dict[str, object] | None = {"status": status} if status else None | |
| total, unread_count = await asyncio.gather( | |
| self.repository.count_notifications(user_id, additional_filters), | |
| self.repository.count_notifications(user_id, {"status": status}), | |
| self.get_unread_count(user_id) | |
| ) | |
| # Get counts | |
| additional = {"status": status} if status is not None else None | |
| total, unread_count = await asyncio.gather( | |
| self.repository.count_notifications(user_id, additional), | |
| self.get_unread_count(user_id) | |
| ) |
🤖 Prompt for AI Agents
In backend/app/services/notification_service.py around lines 732 to 736, the
current call always passes {"status": status} which becomes {"status": None}
when status is not provided and causes totals to be 0; instead build the filter
dict only including the "status" key when status is not None (e.g., create
filters = {} and if status is not None set filters["status"] = status) and then
pass that filters variable into self.repository.count_notifications in the
asyncio.gather call so the repository receives no status filter when status is
omitted.
…mports in saga service
|

Summary by CodeRabbit
New Features
Enhancements
Chores
Documentation