Skip to content

Commit 07a47b7

Browse files
feat(scheduler): Implement comprehensive observability and fix critical bugs
This commit introduces a robust observability system for the scheduler and resolves several critical bugs identified during code review and testing. Key Improvements: - **Task Status Tracking**: Implemented `TaskStatusTracker` using Redis to provide persistent, per-task lifecycle tracking (`waiting`, `in_progress`, `completed`, `failed`). - **Prometheus Metrics**: Added a new metrics system to expose key performance indicators (QPS, latency, queue length, failure/completion rates) for monitoring. - **API Refactoring**: Refactored `/scheduler/status` and `/scheduler/wait` APIs to use the new reliable `TaskStatusTracker`, ensuring accurate state reporting. Bug Fixes: - **Initialization**: Corrected the `SchedulerDispatcher` initialization order to prevent `NoneType` errors in tests and at runtime. - **CPU Usage**: Fixed a busy-wait loop in the metrics monitor thread that caused 100% CPU usage when idle. - **Exception Handling**: Refined API handlers to correctly propagate HTTP error codes (e.g., 404) instead of masking them as 500 errors. - **Dependencies**: Added missing dependencies (`prometheus-client`) and updated test mocks to ensure the test suite can run correctly. - **Legacy Code**: Removed the old, buggy `mem_scheduler_wait` method. All 394 unit tests now pass, and a functional test of the new features has been successfully verified.
1 parent 742df4e commit 07a47b7

File tree

13 files changed

+468
-559
lines changed

13 files changed

+468
-559
lines changed

examples/mem_scheduler/api_w_scheduler.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
handle_scheduler_status,
55
handle_scheduler_wait,
66
)
7-
from memos.api.routers.server_router import mem_scheduler
7+
from memos.api.routers.server_router import mem_scheduler, status_tracker
88
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
9+
from memos.api.product_models import StatusRequest
910

1011

1112
# Debug: Print scheduler configuration
@@ -27,25 +28,26 @@ def my_test_handler(messages: list[ScheduleMessageItem]):
2728
print(f"My test handler received {len(messages)} messages:")
2829
for msg in messages:
2930
print(f" my_test_handler - {msg.item_id}: {msg.content}")
30-
user_status_running = mem_scheduler.get_tasks_status()
31+
user_status_running = handle_scheduler_status(user_id=msg.user_id, status_tracker=status_tracker)
3132
print("[Monitor] Status after submit:", user_status_running)
3233

3334

3435
# 2. Register the handler
3536
TEST_HANDLER_LABEL = "test_handler"
37+
TEST_USER_ID = "test_user"
3638
mem_scheduler.register_handlers({TEST_HANDLER_LABEL: my_test_handler})
3739

3840
# 2.1 Monitor global scheduler status before submitting tasks
3941
global_status_before = handle_scheduler_status(
40-
user_name=None, mem_scheduler=mem_scheduler, instance_id="api_w_scheduler"
42+
user_id=TEST_USER_ID, status_tracker=status_tracker
4143
)
4244
print("[Monitor] Global status before submit:", global_status_before)
4345

4446
# 3. Create messages
4547
messages_to_send = [
4648
ScheduleMessageItem(
4749
item_id=f"test_item_{i}",
48-
user_id="test_user",
50+
user_id=TEST_USER_ID,
4951
mem_cube_id="test_mem_cube",
5052
label=TEST_HANDLER_LABEL,
5153
content=f"This is test message {i}",
@@ -56,29 +58,29 @@ def my_test_handler(messages: list[ScheduleMessageItem]):
5658
# 5. Submit messages
5759
for mes in messages_to_send:
5860
print(f"Submitting message {mes.item_id} to the scheduler...")
59-
mem_scheduler.memos_message_queue.submit_messages([mes])
61+
mem_scheduler.submit_messages([mes])
6062
sleep(1)
6163

6264
# 5.1 Monitor status for specific mem_cube while running
6365
USER_MEM_CUBE = "test_mem_cube"
6466

6567
# 6. Wait for messages to be processed (limited to 100 checks)
6668

67-
user_status_running = mem_scheduler.get_tasks_status()
69+
user_status_running = handle_scheduler_status(user_id=TEST_USER_ID, status_tracker=status_tracker)
6870
print(f"[Monitor] Status for {USER_MEM_CUBE} after submit:", user_status_running)
6971

7072
# 6.1 Wait until idle for specific mem_cube via handler
7173
wait_result = handle_scheduler_wait(
72-
user_name=USER_MEM_CUBE,
74+
user_name=TEST_USER_ID,
75+
status_tracker=status_tracker,
7376
timeout_seconds=120.0,
74-
poll_interval=0.2,
75-
mem_scheduler=mem_scheduler,
77+
poll_interval=0.5,
7678
)
7779
print(f"[Monitor] Wait result for {USER_MEM_CUBE}:", wait_result)
7880

7981
# 6.2 Monitor global scheduler status after processing
8082
global_status_after = handle_scheduler_status(
81-
user_name=None, mem_scheduler=mem_scheduler, instance_id="api_w_scheduler"
83+
user_id=TEST_USER_ID, status_tracker=status_tracker
8284
)
8385
print("[Monitor] Global status after processing:", global_status_after)
8486

poetry.lock

Lines changed: 17 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ dependencies = [
4646
"scikit-learn (>=1.7.0,<2.0.0)", # Machine learning
4747
"fastmcp (>=2.10.5,<3.0.0)",
4848
"python-dateutil (>=2.9.0.post0,<3.0.0)",
49+
"prometheus-client (>=0.23.1,<0.24.0)",
4950
]
5051

5152
[project.urls]

src/memos/api/handlers/component_init.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,18 @@ def init_server() -> dict[str, Any]:
128128
"""
129129
logger.info("Initializing MemOS server components...")
130130

131+
# Initialize Redis client first as it is a core dependency for features like scheduler status tracking
132+
try:
133+
from memos.mem_scheduler.orm_modules.api_redis_model import APIRedisDBManager
134+
redis_client = APIRedisDBManager.load_redis_engine_from_env()
135+
if redis_client:
136+
logger.info("Redis client initialized successfully.")
137+
else:
138+
logger.error("Failed to initialize Redis client. Check REDIS_HOST etc. in environment variables.")
139+
except Exception as e:
140+
logger.error(f"Failed to initialize Redis client: {e}", exc_info=True)
141+
redis_client = None # Ensure redis_client exists even on failure
142+
131143
# Get default cube configuration
132144
default_cube_config = APIConfig.get_default_cube_config()
133145

@@ -287,6 +299,7 @@ def init_server() -> dict[str, Any]:
287299
process_llm=mem_reader.llm,
288300
db_engine=BaseDBManager.create_default_sqlite_engine(),
289301
mem_reader=mem_reader,
302+
redis_client=redis_client,
290303
)
291304
mem_scheduler.init_mem_cube(mem_cube=naive_mem_cube, searcher=searcher)
292305
logger.debug("Scheduler initialized")
@@ -332,4 +345,5 @@ def init_server() -> dict[str, Any]:
332345
"text_mem": text_mem,
333346
"pref_mem": pref_mem,
334347
"online_bot": online_bot,
348+
"redis_client": redis_client,
335349
}

0 commit comments

Comments
 (0)