Skip to content

Commit 1923128

Browse files
glin93tangg555
andauthored
Feat/monitor event new filed (#653)
* feat(monitor): add event_duration_ms and total_duration_ms to MONITOR_EVENT logs - Add duration tracking for enqueue, dequeue, start, and finish events - Handle both standard and retry/timeout scenarios - Preserve existing log fields for backward compatibility * fix(monitor): improve duration calculation accuracy and robustness - Use start_time for start event duration calculation to ensure consistency - Add timestamp backfill for single message submission in task queue - Ensure robust handling of missing timestamps --------- Co-authored-by: [email protected] <> Co-authored-by: Travis Tang <[email protected]>
1 parent 6f32006 commit 1923128

File tree

3 files changed

+31
-3
lines changed

3 files changed

+31
-3
lines changed

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -716,7 +716,13 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
716716
# emit enqueue events for consistency
717717
for m in immediate_msgs:
718718
emit_monitor_event(
719-
"enqueue", m, {"enqueue_ts": to_iso(getattr(m, "timestamp", None))}
719+
"enqueue",
720+
m,
721+
{
722+
"enqueue_ts": to_iso(getattr(m, "timestamp", None)),
723+
"event_duration_ms": 0,
724+
"total_duration_ms": 0,
725+
},
720726
)
721727

722728
# simulate dequeue for immediately dispatched messages so monitor logs stay complete
@@ -745,6 +751,8 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
745751
"enqueue_ts": to_iso(enqueue_ts_obj),
746752
"dequeue_ts": datetime.fromtimestamp(now, tz=timezone.utc).isoformat(),
747753
"queue_wait_ms": queue_wait_ms,
754+
"event_duration_ms": queue_wait_ms,
755+
"total_duration_ms": queue_wait_ms,
748756
},
749757
)
750758
self.metrics.task_dequeued(user_id=m.user_id, task_type=m.label)
@@ -923,6 +931,8 @@ def _message_consumer(self) -> None:
923931
now, tz=timezone.utc
924932
).isoformat(),
925933
"queue_wait_ms": queue_wait_ms,
934+
"event_duration_ms": queue_wait_ms,
935+
"total_duration_ms": queue_wait_ms,
926936
},
927937
)
928938
self.metrics.task_dequeued(user_id=msg.user_id, task_type=msg.label)

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
185185
if isinstance(dequeue_ts, int | float)
186186
else None
187187
),
188+
"event_duration_ms": start_delay_ms,
189+
"total_duration_ms": self._calc_total_duration_ms(start_time, enq_ts),
188190
},
189191
)
190192

@@ -210,6 +212,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
210212
finish_time, tz=timezone.utc
211213
).isoformat(),
212214
"exec_duration_ms": duration * 1000,
215+
"event_duration_ms": duration * 1000,
213216
"total_duration_ms": self._calc_total_duration_ms(
214217
finish_time, getattr(first_msg, "timestamp", None)
215218
),
@@ -244,6 +247,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
244247
finish_time, tz=timezone.utc
245248
).isoformat(),
246249
"exec_duration_ms": (finish_time - start_time) * 1000,
250+
"event_duration_ms": (finish_time - start_time) * 1000,
247251
"error_type": type(e).__name__,
248252
"error_msg": str(e),
249253
"total_duration_ms": self._calc_total_duration_ms(

src/memos/mem_scheduler/task_schedule_modules/task_queue.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,14 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
9393
if len(messages) < 1:
9494
logger.error("Submit empty")
9595
elif len(messages) == 1:
96+
if getattr(messages[0], "timestamp", None) is None:
97+
messages[0].timestamp = get_utc_now()
9698
enqueue_ts = to_iso(getattr(messages[0], "timestamp", None))
97-
emit_monitor_event("enqueue", messages[0], {"enqueue_ts": enqueue_ts})
99+
emit_monitor_event(
100+
"enqueue",
101+
messages[0],
102+
{"enqueue_ts": enqueue_ts, "event_duration_ms": 0, "total_duration_ms": 0},
103+
)
98104
self.memos_message_queue.put(messages[0])
99105
else:
100106
user_cube_groups = group_messages_by_user_and_mem_cube(messages)
@@ -118,7 +124,15 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
118124
continue
119125

120126
enqueue_ts = to_iso(getattr(message, "timestamp", None))
121-
emit_monitor_event("enqueue", message, {"enqueue_ts": enqueue_ts})
127+
emit_monitor_event(
128+
"enqueue",
129+
message,
130+
{
131+
"enqueue_ts": enqueue_ts,
132+
"event_duration_ms": 0,
133+
"total_duration_ms": 0,
134+
},
135+
)
122136
self.memos_message_queue.put(message)
123137
logger.info(
124138
f"Submitted message to local queue: {message.label} - {message.content}"

0 commit comments

Comments
 (0)