Skip to content

Commit 30109f6

Browse files
author
glin1993@outlook.com
committed
Add MONITOR_EVENT logs for scheduler lifecycle
1 parent c7090c8 commit 30109f6

File tree

4 files changed

+147
-2
lines changed

4 files changed

+147
-2
lines changed

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import time
55

66
from collections.abc import Callable
7-
from datetime import datetime
7+
from datetime import datetime, timezone
88
from pathlib import Path
99
from typing import TYPE_CHECKING, Union
1010

@@ -49,6 +49,7 @@
4949
from memos.mem_scheduler.utils.filter_utils import (
5050
transform_name_to_key,
5151
)
52+
from memos.mem_scheduler.utils.monitor_event_utils import emit_monitor_event, to_iso
5253
from memos.mem_scheduler.utils.status_tracker import TaskStatusTracker
5354
from memos.mem_scheduler.webservice_modules.rabbitmq_service import RabbitMQSchedulerModule
5455
from memos.mem_scheduler.webservice_modules.redis_service import RedisSchedulerModule
@@ -768,7 +769,33 @@ def _message_consumer(self) -> None:
768769
messages = self.memos_message_queue.get_messages(batch_size=self.consume_batch)
769770

770771
if messages:
772+
now = time.time()
771773
for msg in messages:
774+
enqueue_ts_obj = getattr(msg, "timestamp", None)
775+
enqueue_epoch = None
776+
if isinstance(enqueue_ts_obj, (int, float)):
777+
enqueue_epoch = float(enqueue_ts_obj)
778+
elif hasattr(enqueue_ts_obj, "timestamp"):
779+
dt = enqueue_ts_obj
780+
if dt.tzinfo is None:
781+
dt = dt.replace(tzinfo=timezone.utc)
782+
enqueue_epoch = dt.timestamp()
783+
784+
queue_wait_ms = None
785+
if enqueue_epoch is not None:
786+
queue_wait_ms = max(0.0, now - enqueue_epoch) * 1000
787+
788+
setattr(msg, "dequeue_ts", now)
789+
emit_monitor_event(
790+
"dequeue",
791+
msg,
792+
{
793+
"enqueue_ts": to_iso(enqueue_ts_obj),
794+
"dequeue_ts": datetime.fromtimestamp(now, tz=timezone.utc).isoformat(),
795+
"queue_wait_ms": queue_wait_ms,
796+
},
797+
)
798+
772799
self.metrics.task_dequeued(user_id=msg.user_id, task_type=msg.label)
773800
try:
774801
import contextlib

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from memos.mem_scheduler.task_schedule_modules.redis_queue import SchedulerRedisQueue
2525
from memos.mem_scheduler.task_schedule_modules.task_queue import ScheduleTaskQueue
2626
from memos.mem_scheduler.utils.misc_utils import group_messages_by_user_and_mem_cube
27+
from memos.mem_scheduler.utils.monitor_event_utils import emit_monitor_event, to_iso
2728
from memos.mem_scheduler.utils.status_tracker import TaskStatusTracker
2829

2930

@@ -126,6 +127,7 @@ def _create_task_wrapper(self, handler: Callable, task_item: RunningTaskItem):
126127

127128
def wrapped_handler(messages: list[ScheduleMessageItem]):
128129
start_time = time.time()
130+
start_iso = datetime.fromtimestamp(start_time, tz=timezone.utc).isoformat()
129131
if self.status_tracker:
130132
self.status_tracker.task_started(
131133
task_id=task_item.item_id, user_id=task_item.user_id
@@ -164,17 +166,49 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
164166
wait_sec = max(0.0, now - enq_epoch)
165167
self.metrics.observe_task_wait_duration(wait_sec, m.user_id, m.label)
166168

169+
dequeue_ts = getattr(first_msg, "dequeue_ts", None)
170+
start_delay_ms = None
171+
if isinstance(dequeue_ts, (int, float)):
172+
start_delay_ms = max(0.0, start_time - dequeue_ts) * 1000
173+
174+
emit_monitor_event(
175+
"start",
176+
first_msg,
177+
{
178+
"start_ts": start_iso,
179+
"start_delay_ms": start_delay_ms,
180+
"enqueue_ts": to_iso(enq_ts),
181+
"dequeue_ts": to_iso(
182+
datetime.fromtimestamp(dequeue_ts, tz=timezone.utc)
183+
if isinstance(dequeue_ts, (int, float))
184+
else None
185+
),
186+
},
187+
)
188+
167189
# Execute the original handler
168190
result = handler(messages)
169191

170192
# --- mark done ---
171-
duration = time.time() - start_time
193+
finish_time = time.time()
194+
duration = finish_time - start_time
172195
self.metrics.observe_task_duration(duration, m.user_id, m.label)
173196
if self.status_tracker:
174197
self.status_tracker.task_completed(
175198
task_id=task_item.item_id, user_id=task_item.user_id
176199
)
177200
self.metrics.task_completed(user_id=m.user_id, task_type=m.label)
201+
202+
emit_monitor_event(
203+
"finish",
204+
first_msg,
205+
{
206+
"status": "ok",
207+
"start_ts": start_iso,
208+
"finish_ts": datetime.fromtimestamp(finish_time, tz=timezone.utc).isoformat(),
209+
"exec_duration_ms": duration * 1000,
210+
},
211+
)
178212
# Redis ack is handled in finally to cover failure cases
179213

180214
# Mark task as completed and remove from tracking
@@ -187,11 +221,24 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
187221

188222
except Exception as e:
189223
m = messages[0]
224+
finish_time = time.time()
190225
self.metrics.task_failed(m.user_id, m.label, type(e).__name__)
191226
if self.status_tracker:
192227
self.status_tracker.task_failed(
193228
task_id=task_item.item_id, user_id=task_item.user_id, error_message=str(e)
194229
)
230+
emit_monitor_event(
231+
"finish",
232+
m,
233+
{
234+
"status": "fail",
235+
"start_ts": start_iso,
236+
"finish_ts": datetime.fromtimestamp(finish_time, tz=timezone.utc).isoformat(),
237+
"exec_duration_ms": (finish_time - start_time) * 1000,
238+
"error_type": type(e).__name__,
239+
"error_msg": str(e),
240+
},
241+
)
195242
# Mark task as failed and remove from tracking
196243
with self._task_lock:
197244
if task_item.item_id in self._running_tasks:

src/memos/mem_scheduler/task_schedule_modules/task_queue.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from memos.mem_scheduler.task_schedule_modules.local_queue import SchedulerLocalQueue
1212
from memos.mem_scheduler.task_schedule_modules.redis_queue import SchedulerRedisQueue
1313
from memos.mem_scheduler.utils.db_utils import get_utc_now
14+
from memos.mem_scheduler.utils.monitor_event_utils import emit_monitor_event, to_iso
1415
from memos.mem_scheduler.utils.misc_utils import group_messages_by_user_and_mem_cube
1516

1617

@@ -77,6 +78,8 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
7778
if len(messages) < 1:
7879
logger.error("Submit empty")
7980
elif len(messages) == 1:
81+
enqueue_ts = to_iso(getattr(messages[0], "timestamp", None))
82+
emit_monitor_event("enqueue", messages[0], {"enqueue_ts": enqueue_ts})
8083
self.memos_message_queue.put(messages[0])
8184
else:
8285
user_cube_groups = group_messages_by_user_and_mem_cube(messages)
@@ -99,6 +102,8 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
99102
)
100103
continue
101104

105+
enqueue_ts = to_iso(getattr(message, "timestamp", None))
106+
emit_monitor_event("enqueue", message, {"enqueue_ts": enqueue_ts})
102107
self.memos_message_queue.put(message)
103108
logger.info(
104109
f"Submitted message to local queue: {message.label} - {message.content}"
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import json
2+
import os
3+
import socket
4+
from datetime import datetime, timezone
5+
from typing import Any
6+
7+
from memos.log import get_logger
8+
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
9+
10+
11+
logger = get_logger(__name__)
12+
13+
14+
def _iso_ts_now() -> str:
15+
"""Return current UTC timestamp in ISO format with milliseconds."""
16+
return datetime.now(timezone.utc).isoformat()
17+
18+
19+
def to_iso(ts) -> str | None:
20+
"""Convert datetime to ISO string; return None if not convertible."""
21+
if ts is None:
22+
return None
23+
if isinstance(ts, datetime):
24+
dt = ts
25+
if dt.tzinfo is None:
26+
dt = dt.replace(tzinfo=timezone.utc)
27+
return dt.isoformat()
28+
try:
29+
return datetime.fromtimestamp(float(ts), tz=timezone.utc).isoformat()
30+
except Exception:
31+
return None
32+
33+
34+
def emit_monitor_event(event: str, msg: ScheduleMessageItem, extra: dict[str, Any] | None = None):
35+
"""
36+
Emit a structured MONITOR_EVENT log line for SLS consumption.
37+
38+
This must be fire-and-forget: any exception here should never break the scheduler flow.
39+
"""
40+
try:
41+
payload: dict[str, Any] = {
42+
"event": event,
43+
"ts": _iso_ts_now(),
44+
"label": getattr(msg, "label", None),
45+
"user_id": getattr(msg, "user_id", None),
46+
"mem_cube_id": getattr(msg, "mem_cube_id", None),
47+
"item_id": getattr(msg, "item_id", None),
48+
"task_id": getattr(msg, "task_id", "") or "",
49+
"trace_id": getattr(msg, "trace_id", None),
50+
"stream_key": getattr(msg, "stream_key", None),
51+
"redis_message_id": getattr(msg, "redis_message_id", None),
52+
"monitor_flag": None,
53+
"host": socket.gethostname(),
54+
"env": os.getenv("ENV") or os.getenv("ENVIRONMENT") or "",
55+
}
56+
57+
info = getattr(msg, "info", None)
58+
if isinstance(info, dict):
59+
payload["monitor_flag"] = info.get("monitor_flag")
60+
61+
if extra:
62+
payload.update(extra)
63+
64+
logger.info("MONITOR_EVENT " + json.dumps(payload, ensure_ascii=False))
65+
except Exception:
66+
logger.debug("Failed to emit MONITOR_EVENT", exc_info=True)

0 commit comments

Comments
 (0)