Skip to content

Commit 65bad8f

Browse files
authored
Fix/trace id align (#637)
* feat: Propagate trace_id to scheduled messages and improve context robustness - Propagate from request context to to align logs across asynchronous operations. - Update context getter functions (, , , ) to return default empty/production values instead of for improved robustness. * Add scheduler total duration metric and keep context defaults * Fix: Ruff UP038 in dispatcher.py --------- Co-authored-by: [email protected] <>
1 parent 8be6f34 commit 65bad8f

File tree

2 files changed

+37
-0
lines changed

2 files changed

+37
-0
lines changed

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
ContextThread,
1717
RequestContext,
1818
get_current_context,
19+
get_current_trace_id,
1920
set_request_context,
2021
)
2122
from memos.llms.base import BaseLLM
@@ -664,10 +665,16 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
664665
if not messages:
665666
return
666667

668+
current_trace_id = get_current_trace_id()
669+
667670
immediate_msgs: list[ScheduleMessageItem] = []
668671
queued_msgs: list[ScheduleMessageItem] = []
669672

670673
for msg in messages:
674+
# propagate request trace_id when available so monitor logs align with request logs
675+
if current_trace_id:
676+
msg.trace_id = current_trace_id
677+
671678
# basic metrics and status tracking
672679
with suppress(Exception):
673680
self.metrics.task_enqueued(user_id=msg.user_id, task_type=msg.label)

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,9 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
210210
finish_time, tz=timezone.utc
211211
).isoformat(),
212212
"exec_duration_ms": duration * 1000,
213+
"total_duration_ms": self._calc_total_duration_ms(
214+
finish_time, getattr(first_msg, "timestamp", None)
215+
),
213216
},
214217
)
215218
# Redis ack is handled in finally to cover failure cases
@@ -243,6 +246,9 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
243246
"exec_duration_ms": (finish_time - start_time) * 1000,
244247
"error_type": type(e).__name__,
245248
"error_msg": str(e),
249+
"total_duration_ms": self._calc_total_duration_ms(
250+
finish_time, getattr(m, "timestamp", None)
251+
),
246252
},
247253
)
248254
# Mark task as failed and remove from tracking
@@ -423,6 +429,30 @@ def _handle_future_result(self, future):
423429
except Exception as e:
424430
logger.error(f"Handler execution failed: {e!s}", exc_info=True)
425431

432+
@staticmethod
433+
def _calc_total_duration_ms(finish_epoch: float, enqueue_ts) -> float | None:
434+
"""
435+
Calculate total duration from enqueue timestamp to finish time in milliseconds.
436+
"""
437+
try:
438+
enq_epoch = None
439+
440+
if isinstance(enqueue_ts, int | float):
441+
enq_epoch = float(enqueue_ts)
442+
elif hasattr(enqueue_ts, "timestamp"):
443+
dt = enqueue_ts
444+
if dt.tzinfo is None:
445+
dt = dt.replace(tzinfo=timezone.utc)
446+
enq_epoch = dt.timestamp()
447+
448+
if enq_epoch is None:
449+
return None
450+
451+
total_ms = max(0.0, finish_epoch - enq_epoch) * 1000
452+
return total_ms
453+
except Exception:
454+
return None
455+
426456
def execute_task(
427457
self,
428458
user_id: str,

0 commit comments

Comments
 (0)