Skip to content

Commit 8be2e80

Browse files
authored
Fix/query schedule (#424)
* feat: change MOS_SCHEDULER_THREAD_POOL_MAX_WORKERS to 10000 * feat: add user_name to schedule server router * feat: roll back to old mem-reader-prompt * feat: add moniter in schedule * feat: set default MEMRADER_MAX_TOKENS to 8000 * feat: add metric in schedule status * fix: bug * fix: base scheduler bug
1 parent aa80863 commit 8be2e80

File tree

4 files changed

+310
-104
lines changed

4 files changed

+310
-104
lines changed

src/memos/api/routers/server_router.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,11 @@ def scheduler_status(user_name: str | None = None):
653653
cube = getattr(task, "mem_cube_id", "unknown")
654654
task_count_per_user[cube] = task_count_per_user.get(cube, 0) + 1
655655

656+
try:
657+
metrics_snapshot = mem_scheduler.dispatcher.metrics.snapshot()
658+
except Exception:
659+
metrics_snapshot = {}
660+
656661
return {
657662
"message": "ok",
658663
"data": {
@@ -661,6 +666,7 @@ def scheduler_status(user_name: str | None = None):
661666
"task_count_per_user": task_count_per_user,
662667
"timestamp": time.time(),
663668
"instance_id": INSTANCE_ID,
669+
"metrics": metrics_snapshot,
664670
},
665671
}
666672
except Exception as err:

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 7 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
from memos.memories.activation.kv import KVCacheMemory
5050
from memos.memories.activation.vllmkv import VLLMKVCacheItem, VLLMKVCacheMemory
5151
from memos.memories.textual.tree import TextualMemoryItem, TreeTextMemory
52-
from memos.memos_tools.notification_utils import send_online_bot_notification
5352
from memos.templates.mem_scheduler_prompts import MEMORY_ASSEMBLY_TEMPLATE
5453

5554

@@ -127,21 +126,6 @@ def __init__(self, config: BaseSchedulerConfig):
127126
"consume_interval_seconds", DEFAULT_CONSUME_INTERVAL_SECONDS
128127
)
129128

130-
# queue monitor (optional)
131-
self._queue_monitor_thread: threading.Thread | None = None
132-
self._queue_monitor_running: bool = False
133-
self.queue_monitor_interval_seconds: float = self.config.get(
134-
"queue_monitor_interval_seconds", 60.0
135-
)
136-
self.queue_monitor_warn_utilization: float = self.config.get(
137-
"queue_monitor_warn_utilization", 0.7
138-
)
139-
self.queue_monitor_crit_utilization: float = self.config.get(
140-
"queue_monitor_crit_utilization", 0.9
141-
)
142-
self.enable_queue_monitor: bool = self.config.get("enable_queue_monitor", False)
143-
self._online_bot_callable = None # type: ignore[var-annotated]
144-
145129
# other attributes
146130
self._context_lock = threading.Lock()
147131
self.current_user_id: UserID | str | None = None
@@ -541,6 +525,10 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
541525
logger.error(error_msg)
542526
raise TypeError(error_msg)
543527

528+
if getattr(message, "timestamp", None) is None:
529+
with contextlib.suppress(Exception):
530+
message.timestamp = datetime.utcnow()
531+
544532
if self.disable_handlers and message.label in self.disable_handlers:
545533
logger.info(f"Skipping disabled handler: {message.label} - {message.content}")
546534
continue
@@ -555,6 +543,9 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
555543
logger.info(
556544
f"Submitted message to local queue: {message.label} - {message.content}"
557545
)
546+
with contextlib.suppress(Exception):
547+
if messages:
548+
self.dispatcher.on_messages_enqueued(messages)
558549

559550
def _submit_web_logs(
560551
self, messages: ScheduleLogForWebItem | list[ScheduleLogForWebItem]
@@ -706,13 +697,6 @@ def start(self) -> None:
706697
self._consumer_thread.start()
707698
logger.info("Message consumer thread started")
708699

709-
# optionally start queue monitor if enabled and bot callable present
710-
if self.enable_queue_monitor and self._online_bot_callable is not None:
711-
try:
712-
self.start_queue_monitor(self._online_bot_callable)
713-
except Exception as e:
714-
logger.warning(f"Failed to start queue monitor: {e}")
715-
716700
def stop(self) -> None:
717701
"""Stop all scheduler components gracefully.
718702
@@ -762,9 +746,6 @@ def stop(self) -> None:
762746
self._cleanup_queues()
763747
logger.info("Memory Scheduler stopped completely")
764748

765-
# Stop queue monitor
766-
self.stop_queue_monitor()
767-
768749
@property
769750
def handlers(self) -> dict[str, Callable]:
770751
"""
@@ -997,16 +978,6 @@ def _fmt_eta(seconds: float | None) -> str:
997978

998979
return True
999980

1000-
# ---------------- Queue monitor & notifications ----------------
1001-
def set_notification_bots(self, online_bot=None):
1002-
"""
1003-
Set external notification callables.
1004-
1005-
Args:
1006-
online_bot: a callable matching dinding_report_bot.online_bot signature
1007-
"""
1008-
self._online_bot_callable = online_bot
1009-
1010981
def _gather_queue_stats(self) -> dict:
1011982
"""Collect queue/dispatcher stats for reporting."""
1012983
stats: dict[str, int | float | str] = {}
@@ -1044,71 +1015,3 @@ def _gather_queue_stats(self) -> dict:
10441015
except Exception:
10451016
stats.update({"running": 0, "inflight": 0, "handlers": 0})
10461017
return stats
1047-
1048-
def _queue_monitor_loop(self, online_bot) -> None:
1049-
logger.info(f"Queue monitor started (interval={self.queue_monitor_interval_seconds}s)")
1050-
self._queue_monitor_running = True
1051-
while self._queue_monitor_running:
1052-
time.sleep(self.queue_monitor_interval_seconds)
1053-
try:
1054-
stats = self._gather_queue_stats()
1055-
# decide severity based on utilization if local queue
1056-
title_color = "#00956D"
1057-
subtitle = "Scheduler"
1058-
if not stats.get("use_redis_queue"):
1059-
util = float(stats.get("utilization", 0.0))
1060-
if util >= self.queue_monitor_crit_utilization:
1061-
title_color = "#C62828" # red
1062-
subtitle = "Scheduler (CRITICAL)"
1063-
elif util >= self.queue_monitor_warn_utilization:
1064-
title_color = "#E65100" # orange
1065-
subtitle = "Scheduler (WARNING)"
1066-
1067-
other_data1 = {
1068-
"use_redis_queue": stats.get("use_redis_queue"),
1069-
"handlers": stats.get("handlers"),
1070-
"running": stats.get("running"),
1071-
"inflight": stats.get("inflight"),
1072-
}
1073-
if not stats.get("use_redis_queue"):
1074-
other_data2 = {
1075-
"qsize": stats.get("qsize"),
1076-
"unfinished_tasks": stats.get("unfinished_tasks"),
1077-
"maxsize": stats.get("maxsize"),
1078-
"utilization": f"{float(stats.get('utilization', 0.0)):.2%}",
1079-
}
1080-
else:
1081-
other_data2 = {
1082-
"redis_mode": True,
1083-
}
1084-
1085-
send_online_bot_notification(
1086-
online_bot=online_bot,
1087-
header_name="Scheduler Queue",
1088-
sub_title_name=subtitle,
1089-
title_color=title_color,
1090-
other_data1=other_data1,
1091-
other_data2=other_data2,
1092-
emoji={"Runtime": "🧠", "Queue": "📬"},
1093-
)
1094-
except Exception as e:
1095-
logger.warning(f"Queue monitor iteration failed: {e}")
1096-
logger.info("Queue monitor stopped")
1097-
1098-
def start_queue_monitor(self, online_bot) -> None:
1099-
if self._queue_monitor_thread and self._queue_monitor_thread.is_alive():
1100-
return
1101-
self._online_bot_callable = online_bot
1102-
self._queue_monitor_thread = threading.Thread(
1103-
target=self._queue_monitor_loop,
1104-
args=(online_bot,),
1105-
daemon=True,
1106-
name="QueueMonitorThread",
1107-
)
1108-
self._queue_monitor_thread.start()
1109-
1110-
def stop_queue_monitor(self) -> None:
1111-
self._queue_monitor_running = False
1112-
if self._queue_monitor_thread and self._queue_monitor_thread.is_alive():
1113-
with contextlib.suppress(Exception):
1114-
self._queue_monitor_thread.join(timeout=2.0)

src/memos/mem_scheduler/general_modules/dispatcher.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import concurrent
22
import threading
3+
import time
34

45
from collections import defaultdict
56
from collections.abc import Callable
7+
from datetime import timezone
68
from typing import Any
79

810
from memos.context.context import ContextThreadPoolExecutor
@@ -11,6 +13,7 @@
1113
from memos.mem_scheduler.general_modules.task_threads import ThreadManager
1214
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
1315
from memos.mem_scheduler.schemas.task_schemas import RunningTaskItem
16+
from memos.mem_scheduler.utils.metrics import MetricsRegistry
1417

1518

1619
logger = get_logger(__name__)
@@ -70,6 +73,19 @@ def __init__(self, max_workers=30, enable_parallel_dispatch=True, config=None):
7073
self._completed_tasks = []
7174
self.completed_tasks_max_show_size = 10
7275

76+
self.metrics = MetricsRegistry(
77+
topk_per_label=(self.config or {}).get("metrics_topk_per_label", 50)
78+
)
79+
80+
def on_messages_enqueued(self, msgs: list[ScheduleMessageItem]) -> None:
81+
if not msgs:
82+
return
83+
now = time.time()
84+
for m in msgs:
85+
self.metrics.on_enqueue(
86+
label=m.label, mem_cube_id=m.mem_cube_id, inst_rate=1.0, now=now
87+
)
88+
7389
def _create_task_wrapper(self, handler: Callable, task_item: RunningTaskItem):
7490
"""
7591
Create a wrapper around the handler to track task execution and capture results.
@@ -84,9 +100,37 @@ def _create_task_wrapper(self, handler: Callable, task_item: RunningTaskItem):
84100

85101
def wrapped_handler(messages: list[ScheduleMessageItem]):
86102
try:
103+
# --- mark start: record queuing time(now - enqueue_ts)---
104+
now = time.time()
105+
for m in messages:
106+
enq_ts = getattr(m, "timestamp", None)
107+
108+
# Path 1: epoch seconds (preferred)
109+
if isinstance(enq_ts, int | float):
110+
enq_epoch = float(enq_ts)
111+
112+
# Path 2: datetime -> normalize to UTC epoch
113+
elif hasattr(enq_ts, "timestamp"):
114+
dt = enq_ts
115+
if dt.tzinfo is None:
116+
# treat naive as UTC to neutralize +8h skew
117+
dt = dt.replace(tzinfo=timezone.utc)
118+
enq_epoch = dt.timestamp()
119+
else:
120+
# fallback: treat as "just now"
121+
enq_epoch = now
122+
123+
wait_sec = max(0.0, now - enq_epoch)
124+
self.metrics.on_start(
125+
label=m.label, mem_cube_id=m.mem_cube_id, wait_sec=wait_sec, now=now
126+
)
127+
87128
# Execute the original handler
88129
result = handler(messages)
89130

131+
# --- mark done ---
132+
for m in messages:
133+
self.metrics.on_done(label=m.label, mem_cube_id=m.mem_cube_id, now=time.time())
90134
# Mark task as completed and remove from tracking
91135
with self._task_lock:
92136
if task_item.item_id in self._running_tasks:
@@ -100,6 +144,9 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
100144

101145
except Exception as e:
102146
# Mark task as failed and remove from tracking
147+
for m in messages:
148+
self.metrics.on_done(label=m.label, mem_cube_id=m.mem_cube_id, now=time.time())
149+
# Mark task as failed and remove from tracking
103150
with self._task_lock:
104151
if task_item.item_id in self._running_tasks:
105152
task_item.mark_failed(str(e))

0 commit comments

Comments
 (0)