1+ from time import sleep
2+
13from memos .api .handlers .scheduler_handler import (
24 handle_scheduler_status ,
35 handle_scheduler_wait ,
46)
5- from memos .api .routers .server_router import mem_scheduler
7+ from memos .api .routers .server_router import mem_scheduler , status_tracker
68from memos .mem_scheduler .schemas .message_schemas import ScheduleMessageItem
79
810
@@ -26,26 +28,25 @@ def my_test_handler(messages: list[ScheduleMessageItem]):
2628 for msg in messages :
2729 print (f" my_test_handler - { msg .item_id } : { msg .content } " )
2830 user_status_running = handle_scheduler_status (
29- user_name = USER_MEM_CUBE , mem_scheduler = mem_scheduler , instance_id = "api_w_scheduler"
31+ user_id = msg . user_id , status_tracker = status_tracker
3032 )
31- print (f "[Monitor] Status for { USER_MEM_CUBE } after submit:" , user_status_running )
33+ print ("[Monitor] Status after submit:" , user_status_running )
3234
3335
3436# 2. Register the handler
3537TEST_HANDLER_LABEL = "test_handler"
38+ TEST_USER_ID = "test_user"
3639mem_scheduler .register_handlers ({TEST_HANDLER_LABEL : my_test_handler })
3740
3841# 2.1 Monitor global scheduler status before submitting tasks
39- global_status_before = handle_scheduler_status (
40- user_name = None , mem_scheduler = mem_scheduler , instance_id = "api_w_scheduler"
41- )
42+ global_status_before = handle_scheduler_status (user_id = TEST_USER_ID , status_tracker = status_tracker )
4243print ("[Monitor] Global status before submit:" , global_status_before )
4344
4445# 3. Create messages
4546messages_to_send = [
4647 ScheduleMessageItem (
4748 item_id = f"test_item_{ i } " ,
48- user_id = "test_user" ,
49+ user_id = TEST_USER_ID ,
4950 mem_cube_id = "test_mem_cube" ,
5051 label = TEST_HANDLER_LABEL ,
5152 content = f"This is test message { i } " ,
@@ -56,28 +57,28 @@ def my_test_handler(messages: list[ScheduleMessageItem]):
5657# 5. Submit messages
5758for mes in messages_to_send :
5859 print (f"Submitting message { mes .item_id } to the scheduler..." )
59- mem_scheduler .memos_message_queue .submit_messages ([mes ])
60+ mem_scheduler .submit_messages ([mes ])
61+ sleep (1 )
6062
6163# 5.1 Monitor status for specific mem_cube while running
6264USER_MEM_CUBE = "test_mem_cube"
6365
6466# 6. Wait for messages to be processed (limited to 100 checks)
65- print ("Waiting for messages to be consumed (max 100 checks)..." )
66- mem_scheduler .mem_scheduler_wait ()
67+
68+ user_status_running = handle_scheduler_status (user_id = TEST_USER_ID , status_tracker = status_tracker )
69+ print (f"[Monitor] Status for { USER_MEM_CUBE } after submit:" , user_status_running )
6770
6871# 6.1 Wait until idle for specific mem_cube via handler
6972wait_result = handle_scheduler_wait (
70- user_name = USER_MEM_CUBE ,
73+ user_name = TEST_USER_ID ,
74+ status_tracker = status_tracker ,
7175 timeout_seconds = 120.0 ,
72- poll_interval = 0.2 ,
73- mem_scheduler = mem_scheduler ,
76+ poll_interval = 0.5 ,
7477)
7578print (f"[Monitor] Wait result for { USER_MEM_CUBE } :" , wait_result )
7679
7780# 6.2 Monitor global scheduler status after processing
78- global_status_after = handle_scheduler_status (
79- user_name = None , mem_scheduler = mem_scheduler , instance_id = "api_w_scheduler"
80- )
81+ global_status_after = handle_scheduler_status (user_id = TEST_USER_ID , status_tracker = status_tracker )
8182print ("[Monitor] Global status after processing:" , global_status_after )
8283
8384# 7. Stop the scheduler
0 commit comments