Skip to content

Commit 9216e88

Browse files
committed
structural logging+doc added
1 parent f076c14 commit 9216e88

File tree

10 files changed

+162
-62
lines changed

10 files changed

+162
-62
lines changed

backend/app/api/routes/events.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,8 +291,10 @@ async def delete_event(
291291
raise HTTPException(status_code=404, detail="Event not found")
292292

293293
logger.warning(
294-
f"Event {event_id} deleted by admin {admin.email}",
294+
"Event deleted by admin",
295295
extra={
296+
"event_id": event_id,
297+
"admin_email": admin.email,
296298
"event_type": result.event_type,
297299
"aggregate_id": result.aggregate_id,
298300
"correlation_id": result.correlation_id,

backend/app/db/repositories/execution_repository.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ def __init__(self, logger: logging.Logger):
2121

2222
async def create_execution(self, create_data: DomainExecutionCreate) -> DomainExecution:
2323
doc = ExecutionDocument(**asdict(create_data))
24-
self.logger.info(f"Inserting execution {doc.execution_id} into MongoDB")
24+
self.logger.info("Inserting execution into MongoDB", extra={"execution_id": doc.execution_id})
2525
await doc.insert()
26-
self.logger.info(f"Inserted execution {doc.execution_id}")
26+
self.logger.info("Inserted execution", extra={"execution_id": doc.execution_id})
2727
return DomainExecution(
2828
**{
2929
**doc.model_dump(exclude={"id"}),
@@ -34,13 +34,13 @@ async def create_execution(self, create_data: DomainExecutionCreate) -> DomainEx
3434
)
3535

3636
async def get_execution(self, execution_id: str) -> DomainExecution | None:
37-
self.logger.info(f"Searching for execution {execution_id} in MongoDB")
37+
self.logger.info("Searching for execution in MongoDB", extra={"execution_id": execution_id})
3838
doc = await ExecutionDocument.find_one({"execution_id": execution_id})
3939
if not doc:
40-
self.logger.warning(f"Execution {execution_id} not found in MongoDB")
40+
self.logger.warning("Execution not found in MongoDB", extra={"execution_id": execution_id})
4141
return None
4242

43-
self.logger.info(f"Found execution {execution_id} in MongoDB")
43+
self.logger.info("Found execution in MongoDB", extra={"execution_id": execution_id})
4444
return DomainExecution(
4545
**{
4646
**doc.model_dump(exclude={"id"}),
@@ -66,7 +66,7 @@ async def update_execution(self, execution_id: str, update_data: DomainExecution
6666
async def write_terminal_result(self, result: ExecutionResultDomain) -> bool:
6767
doc = await ExecutionDocument.find_one({"execution_id": result.execution_id})
6868
if not doc:
69-
self.logger.warning(f"No execution found for {result.execution_id}")
69+
self.logger.warning("No execution found", extra={"execution_id": result.execution_id})
7070
return False
7171

7272
await doc.set(

backend/app/dlq/manager.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ async def _process_dlq_message(self, message: DLQMessage) -> None:
254254
# Apply filters
255255
for filter_func in self._filters:
256256
if not filter_func(message):
257-
self.logger.info(f"Message {message.event_id} filtered out")
257+
self.logger.info("Message filtered out", extra={"event_id": message.event_id})
258258
return
259259

260260
# Store in MongoDB via Beanie
@@ -371,7 +371,7 @@ async def _retry_message(self, message: DLQMessage) -> None:
371371
# Trigger after_retry callbacks
372372
await self._trigger_callbacks("after_retry", message, success=True)
373373

374-
self.logger.info(f"Successfully retried message {message.event_id}")
374+
self.logger.info("Successfully retried message", extra={"event_id": message.event_id})
375375

376376
async def _discard_message(self, message: DLQMessage, reason: str) -> None:
377377
# Update metrics
@@ -390,7 +390,7 @@ async def _discard_message(self, message: DLQMessage, reason: str) -> None:
390390
# Trigger callbacks
391391
await self._trigger_callbacks("on_discard", message, reason)
392392

393-
self.logger.warning(f"Discarded message {message.event_id} due to {reason}")
393+
self.logger.warning("Discarded message", extra={"event_id": message.event_id, "reason": reason})
394394

395395
async def _monitor_dlq(self) -> None:
396396
while self._running:
@@ -453,12 +453,12 @@ async def _trigger_callbacks(self, event_type: str, *args: Any, **kwargs: Any) -
453453
async def retry_message_manually(self, event_id: str) -> bool:
454454
doc = await DLQMessageDocument.find_one({"event_id": event_id})
455455
if not doc:
456-
self.logger.error(f"Message {event_id} not found in DLQ")
456+
self.logger.error("Message not found in DLQ", extra={"event_id": event_id})
457457
return False
458458

459459
# Guard against invalid states
460460
if doc.status in {DLQMessageStatus.DISCARDED, DLQMessageStatus.RETRIED}:
461-
self.logger.info(f"Skipping manual retry for {event_id}: status={doc.status}")
461+
self.logger.info("Skipping manual retry", extra={"event_id": event_id, "status": str(doc.status)})
462462
return False
463463

464464
message = self._doc_to_message(doc)

backend/app/services/event_replay/replay_service.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ async def create_replay_session(self, config: ReplayConfig) -> str:
4242
state = ReplaySessionState(session_id=str(uuid4()), config=config)
4343
self._sessions[state.session_id] = state
4444

45-
self.logger.info(f"Created replay session {state.session_id} type={config.replay_type} target={config.target}")
45+
self.logger.info(
46+
"Created replay session",
47+
extra={"session_id": state.session_id, "type": config.replay_type, "target": config.target},
48+
)
4649

4750
return state.session_id
4851

@@ -61,7 +64,7 @@ async def start_replay(self, session_id: str) -> None:
6164
session.started_at = datetime.now(timezone.utc)
6265

6366
self._metrics.increment_active_replays()
64-
self.logger.info(f"Started replay session {session_id}")
67+
self.logger.info("Started replay session", extra={"session_id": session_id})
6568

6669
async def _run_replay(self, session: ReplaySessionState) -> None:
6770
start_time = asyncio.get_event_loop().time()
@@ -97,7 +100,10 @@ async def _prepare_session(self, session: ReplaySessionState) -> None:
97100
total_count = await self._repository.count_events(session.config.filter)
98101
session.total_events = min(total_count, session.config.max_events or total_count)
99102

100-
self.logger.info(f"Replay session {session.session_id} will process {session.total_events} events")
103+
self.logger.info(
104+
"Replay session will process events",
105+
extra={"session_id": session.session_id, "total_events": session.total_events},
106+
)
101107

102108
async def _handle_progress_callback(self, session: ReplaySessionState) -> None:
103109
cb = session.config.get_progress_callback()
@@ -119,15 +125,22 @@ async def _complete_session(self, session: ReplaySessionState, start_time: float
119125
await self._update_session_in_db(session)
120126

121127
self.logger.info(
122-
f"Replay session {session.session_id} completed. "
123-
f"Replayed: {session.replayed_events}, "
124-
f"Failed: {session.failed_events}, "
125-
f"Skipped: {session.skipped_events}, "
126-
f"Duration: {duration:.2f}s"
128+
"Replay session completed",
129+
extra={
130+
"session_id": session.session_id,
131+
"replayed_events": session.replayed_events,
132+
"failed_events": session.failed_events,
133+
"skipped_events": session.skipped_events,
134+
"duration_seconds": round(duration, 2),
135+
},
127136
)
128137

129138
async def _handle_session_error(self, session: ReplaySessionState, error: Exception) -> None:
130-
self.logger.error(f"Replay session {session.session_id} failed: {error}", exc_info=True)
139+
self.logger.error(
140+
"Replay session failed",
141+
extra={"session_id": session.session_id, "error": str(error)},
142+
exc_info=True,
143+
)
131144
session.status = ReplayStatus.FAILED
132145
session.completed_at = datetime.now(timezone.utc)
133146
session.errors.append(
@@ -154,7 +167,7 @@ def _update_replay_metrics(self, session: ReplaySessionState, event: BaseEvent,
154167
self._metrics.record_event_replayed(session.config.replay_type, event.event_type, status)
155168

156169
async def _handle_replay_error(self, session: ReplaySessionState, event: BaseEvent, error: Exception) -> None:
157-
self.logger.error(f"Failed to replay event {event.event_id}: {error}")
170+
self.logger.error("Failed to replay event", extra={"event_id": event.event_id, "error": str(error)})
158171
session.failed_events += 1
159172
session.errors.append(
160173
{"timestamp": datetime.now(timezone.utc).isoformat(), "event_id": str(event.event_id), "error": str(error)}
@@ -184,7 +197,7 @@ async def _replay_to_file(self, event: BaseEvent, file_path: str | None) -> bool
184197
return True
185198

186199
async def _fetch_event_batches(self, session: ReplaySessionState) -> AsyncIterator[List[BaseEvent]]:
187-
self.logger.info(f"Fetching events for session {session.session_id}")
200+
self.logger.info("Fetching events for session", extra={"session_id": session.session_id})
188201
events_processed = 0
189202
max_events = session.config.max_events
190203

@@ -250,10 +263,13 @@ async def _replay_event(self, session: ReplaySessionState, event: BaseEvent) ->
250263
elif config.target == ReplayTarget.TEST:
251264
return True
252265
else:
253-
self.logger.error(f"Unknown replay target: {config.target}")
266+
self.logger.error("Unknown replay target", extra={"target": config.target})
254267
return False
255268
except Exception as e:
256-
self.logger.error(f"Failed to replay event (attempt {attempt + 1}/{attempts}): {e}")
269+
self.logger.error(
270+
"Failed to replay event",
271+
extra={"attempt": attempt + 1, "max_attempts": attempts, "error": str(e)},
272+
)
257273
if attempt < attempts - 1:
258274
await asyncio.sleep(min(2**attempt, 10))
259275
continue
@@ -279,7 +295,7 @@ async def pause_replay(self, session_id: str) -> None:
279295

280296
if session.status == ReplayStatus.RUNNING:
281297
session.status = ReplayStatus.PAUSED
282-
self.logger.info(f"Paused replay session {session_id}")
298+
self.logger.info("Paused replay session", extra={"session_id": session_id})
283299

284300
async def resume_replay(self, session_id: str) -> None:
285301
session = self._sessions.get(session_id)
@@ -288,7 +304,7 @@ async def resume_replay(self, session_id: str) -> None:
288304

289305
if session.status == ReplayStatus.PAUSED:
290306
session.status = ReplayStatus.RUNNING
291-
self.logger.info(f"Resumed replay session {session_id}")
307+
self.logger.info("Resumed replay session", extra={"session_id": session_id})
292308

293309
async def cancel_replay(self, session_id: str) -> None:
294310
session = self._sessions.get(session_id)
@@ -301,7 +317,7 @@ async def cancel_replay(self, session_id: str) -> None:
301317
if task and not task.done():
302318
task.cancel()
303319

304-
self.logger.info(f"Cancelled replay session {session_id}")
320+
self.logger.info("Cancelled replay session", extra={"session_id": session_id})
305321

306322
def get_session(self, session_id: str) -> ReplaySessionState | None:
307323
return self._sessions.get(session_id)
@@ -330,7 +346,7 @@ async def cleanup_old_sessions(self, older_than_hours: int = 24) -> int:
330346
del self._sessions[session_id]
331347
removed += 1
332348

333-
self.logger.info(f"Cleaned up {removed} old replay sessions")
349+
self.logger.info("Cleaned up old replay sessions", extra={"removed_count": removed})
334350
return removed
335351

336352
async def _update_session_in_db(self, session: ReplaySessionState) -> None:

backend/app/services/execution_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ async def delete_execution(self, execution_id: str) -> bool:
439439
deleted = await self.execution_repo.delete_execution(execution_id)
440440

441441
if not deleted:
442-
self.logger.warning(f"Execution {execution_id} not found for deletion")
442+
self.logger.warning("Execution not found for deletion", extra={"execution_id": execution_id})
443443
raise ExecutionNotFoundError(execution_id)
444444

445445
self.logger.info("Deleted execution", extra={"execution_id": execution_id})

backend/app/services/saga/saga_service.py

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -48,24 +48,32 @@ async def check_execution_access(self, execution_id: str, user: User) -> bool:
4848
return True
4949

5050
self.logger.debug(
51-
f"Access denied for user {user.user_id} to execution {execution_id}",
52-
extra={"user_role": user.role, "execution_exists": execution is not None},
51+
"Access denied to execution",
52+
extra={
53+
"user_id": user.user_id,
54+
"execution_id": execution_id,
55+
"user_role": user.role,
56+
"execution_exists": execution is not None,
57+
},
5358
)
5459
return False
5560

5661
async def get_saga_with_access_check(self, saga_id: str, user: User) -> Saga:
5762
"""Get saga with access control."""
58-
self.logger.debug(f"Getting saga {saga_id} for user {user.user_id}", extra={"user_role": user.role})
63+
self.logger.debug(
64+
"Getting saga for user", extra={"saga_id": saga_id, "user_id": user.user_id, "user_role": user.role}
65+
)
5966

6067
saga = await self.saga_repo.get_saga(saga_id)
6168
if not saga:
62-
self.logger.warning(f"Saga {saga_id} not found")
69+
self.logger.warning("Saga not found", extra={"saga_id": saga_id})
6370
raise SagaNotFoundError(saga_id)
6471

6572
# Check access permissions
6673
if not await self.check_execution_access(saga.execution_id, user):
6774
self.logger.warning(
68-
f"Access denied for user {user.user_id} to saga {saga_id}", extra={"execution_id": saga.execution_id}
75+
"Access denied to saga",
76+
extra={"user_id": user.user_id, "saga_id": saga_id, "execution_id": saga.execution_id},
6977
)
7078
raise SagaAccessDeniedError(saga_id, user.user_id)
7179

@@ -78,7 +86,8 @@ async def get_execution_sagas(
7886
# Check access to execution
7987
if not await self.check_execution_access(execution_id, user):
8088
self.logger.warning(
81-
f"Access denied for user {user.user_id} to execution {execution_id}", extra={"user_role": user.role}
89+
"Access denied to execution",
90+
extra={"user_id": user.user_id, "execution_id": execution_id, "user_role": user.role},
8291
)
8392
raise SagaAccessDeniedError(execution_id, user.user_id)
8493

@@ -95,22 +104,31 @@ async def list_user_sagas(
95104
user_execution_ids = await self.saga_repo.get_user_execution_ids(user.user_id)
96105
saga_filter.execution_ids = user_execution_ids
97106
self.logger.debug(
98-
f"Filtering sagas for user {user.user_id}",
99-
extra={"execution_count": len(user_execution_ids) if user_execution_ids else 0},
107+
"Filtering sagas for user",
108+
extra={
109+
"user_id": user.user_id,
110+
"execution_count": len(user_execution_ids) if user_execution_ids else 0,
111+
},
100112
)
101113

102114
# Get sagas from repository
103115
result = await self.saga_repo.list_sagas(saga_filter, limit, skip)
104116
self.logger.debug(
105-
f"Listed {len(result.sagas)} sagas for user {user.user_id}",
106-
extra={"total": result.total, "state_filter": str(state) if state else None},
117+
"Listed sagas for user",
118+
extra={
119+
"user_id": user.user_id,
120+
"count": len(result.sagas),
121+
"total": result.total,
122+
"state_filter": str(state) if state else None,
123+
},
107124
)
108125
return result # type: ignore[return-value]
109126

110127
async def cancel_saga(self, saga_id: str, user: User) -> bool:
111128
"""Cancel a saga with permission check."""
112129
self.logger.info(
113-
f"User {user.user_id} requesting cancellation of saga {saga_id}", extra={"user_role": user.role}
130+
"User requesting saga cancellation",
131+
extra={"user_id": user.user_id, "saga_id": saga_id, "user_role": user.role},
114132
)
115133
# Get saga with access check
116134
saga = await self.get_saga_with_access_check(saga_id, user)
@@ -123,10 +141,11 @@ async def cancel_saga(self, saga_id: str, user: User) -> bool:
123141
success = await self.orchestrator.cancel_saga(saga_id)
124142
if success:
125143
self.logger.info(
126-
f"User {user.user_id} cancelled saga {saga_id}", extra={"user_role": user.role, "saga_id": saga_id}
144+
"User cancelled saga",
145+
extra={"user_id": user.user_id, "saga_id": saga_id, "user_role": user.role},
127146
)
128147
else:
129-
self.logger.error(f"Failed to cancel saga {saga_id} for user {user.user_id}", extra={"saga_id": saga_id})
148+
self.logger.error("Failed to cancel saga", extra={"saga_id": saga_id, "user_id": user.user_id})
130149
return success
131150

132151
async def get_saga_statistics(self, user: User, include_all: bool = False) -> dict[str, object]:
@@ -142,22 +161,22 @@ async def get_saga_statistics(self, user: User, include_all: bool = False) -> di
142161

143162
async def get_saga_status_from_orchestrator(self, saga_id: str, user: User) -> Saga | None:
144163
"""Get saga status from orchestrator with fallback to database."""
145-
self.logger.debug(f"Getting live saga status for {saga_id}")
164+
self.logger.debug("Getting live saga status", extra={"saga_id": saga_id})
146165

147166
# Try orchestrator first for live status
148167
saga = await self.orchestrator.get_saga_status(saga_id)
149168
if saga:
150169
# Check access
151170
if not await self.check_execution_access(saga.execution_id, user):
152171
self.logger.warning(
153-
f"Access denied for user {user.user_id} to live saga {saga_id}",
154-
extra={"execution_id": saga.execution_id},
172+
"Access denied to live saga",
173+
extra={"user_id": user.user_id, "saga_id": saga_id, "execution_id": saga.execution_id},
155174
)
156175
raise SagaAccessDeniedError(saga_id, user.user_id)
157176

158-
self.logger.debug(f"Retrieved live status for saga {saga_id}")
177+
self.logger.debug("Retrieved live status for saga", extra={"saga_id": saga_id})
159178
return saga
160179

161180
# Fall back to repository
162-
self.logger.debug(f"No live status found for saga {saga_id}, checking database")
181+
self.logger.debug("No live status found for saga, checking database", extra={"saga_id": saga_id})
163182
return await self.get_saga_with_access_check(saga_id, user)

0 commit comments

Comments
 (0)