Skip to content

Commit 071089d

Browse files
feat(monitor): add event_duration_ms and total_duration_ms to MONITOR_EVENT logs
- Add duration tracking for enqueue, dequeue, start, and finish events - Handle both standard and retry/timeout scenarios - Preserve existing log fields for backward compatibility
1 parent 43faee0 commit 071089d

File tree

3 files changed

+29
-3
lines changed

3 files changed

+29
-3
lines changed

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -716,7 +716,13 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
716716
# emit enqueue events for consistency
717717
for m in immediate_msgs:
718718
emit_monitor_event(
719-
"enqueue", m, {"enqueue_ts": to_iso(getattr(m, "timestamp", None))}
719+
"enqueue",
720+
m,
721+
{
722+
"enqueue_ts": to_iso(getattr(m, "timestamp", None)),
723+
"event_duration_ms": 0,
724+
"total_duration_ms": 0,
725+
},
720726
)
721727

722728
# simulate dequeue for immediately dispatched messages so monitor logs stay complete
@@ -745,6 +751,8 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
745751
"enqueue_ts": to_iso(enqueue_ts_obj),
746752
"dequeue_ts": datetime.fromtimestamp(now, tz=timezone.utc).isoformat(),
747753
"queue_wait_ms": queue_wait_ms,
754+
"event_duration_ms": queue_wait_ms,
755+
"total_duration_ms": queue_wait_ms,
748756
},
749757
)
750758
self.metrics.task_dequeued(user_id=m.user_id, task_type=m.label)
@@ -923,6 +931,8 @@ def _message_consumer(self) -> None:
923931
now, tz=timezone.utc
924932
).isoformat(),
925933
"queue_wait_ms": queue_wait_ms,
934+
"event_duration_ms": queue_wait_ms,
935+
"total_duration_ms": queue_wait_ms,
926936
},
927937
)
928938
self.metrics.task_dequeued(user_id=msg.user_id, task_type=msg.label)

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
185185
if isinstance(dequeue_ts, int | float)
186186
else None
187187
),
188+
"event_duration_ms": start_delay_ms,
189+
"total_duration_ms": wait_sec * 1000,
188190
},
189191
)
190192

@@ -210,6 +212,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
210212
finish_time, tz=timezone.utc
211213
).isoformat(),
212214
"exec_duration_ms": duration * 1000,
215+
"event_duration_ms": duration * 1000,
213216
"total_duration_ms": self._calc_total_duration_ms(
214217
finish_time, getattr(first_msg, "timestamp", None)
215218
),
@@ -244,6 +247,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
244247
finish_time, tz=timezone.utc
245248
).isoformat(),
246249
"exec_duration_ms": (finish_time - start_time) * 1000,
250+
"event_duration_ms": (finish_time - start_time) * 1000,
247251
"error_type": type(e).__name__,
248252
"error_msg": str(e),
249253
"total_duration_ms": self._calc_total_duration_ms(

src/memos/mem_scheduler/task_schedule_modules/task_queue.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,11 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
9494
logger.error("Submit empty")
9595
elif len(messages) == 1:
9696
enqueue_ts = to_iso(getattr(messages[0], "timestamp", None))
97-
emit_monitor_event("enqueue", messages[0], {"enqueue_ts": enqueue_ts})
97+
emit_monitor_event(
98+
"enqueue",
99+
messages[0],
100+
{"enqueue_ts": enqueue_ts, "event_duration_ms": 0, "total_duration_ms": 0},
101+
)
98102
self.memos_message_queue.put(messages[0])
99103
else:
100104
user_cube_groups = group_messages_by_user_and_mem_cube(messages)
@@ -118,7 +122,15 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
118122
continue
119123

120124
enqueue_ts = to_iso(getattr(message, "timestamp", None))
121-
emit_monitor_event("enqueue", message, {"enqueue_ts": enqueue_ts})
125+
emit_monitor_event(
126+
"enqueue",
127+
message,
128+
{
129+
"enqueue_ts": enqueue_ts,
130+
"event_duration_ms": 0,
131+
"total_duration_ms": 0,
132+
},
133+
)
122134
self.memos_message_queue.put(message)
123135
logger.info(
124136
f"Submitted message to local queue: {message.label} - {message.content}"

0 commit comments

Comments
 (0)