Skip to content

Commit aee13ba

Browse files
committed
feat: 1. add redis queue for scheduler 2. finish the code related to mix search and fine search
1 parent 5332d12 commit aee13ba

39 files changed

+2992
-1353
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from memos.api.routers.server_router import mem_scheduler
2+
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
3+
4+
5+
# Debug: Print scheduler configuration
6+
print("=== Scheduler Configuration Debug ===")
7+
print(f"Scheduler type: {type(mem_scheduler).__name__}")
8+
print(f"Config: {mem_scheduler.config}")
9+
print(f"use_redis_queue: {mem_scheduler.use_redis_queue}")
10+
print(f"Queue type: {type(mem_scheduler.memos_message_queue).__name__}")
11+
print(f"Queue maxsize: {getattr(mem_scheduler.memos_message_queue, 'maxsize', 'N/A')}")
12+
13+
# Check if Redis queue is connected
14+
if hasattr(mem_scheduler.memos_message_queue, "_is_connected"):
15+
print(f"Redis connected: {mem_scheduler.memos_message_queue._is_connected}")
16+
if hasattr(mem_scheduler.memos_message_queue, "_redis_conn"):
17+
print(f"Redis connection: {mem_scheduler.memos_message_queue._redis_conn}")
18+
print("=====================================\n")
19+
20+
queue = mem_scheduler.memos_message_queue
21+
queue.clear()
22+
23+
24+
# 1. Define a handler function
25+
def my_test_handler(messages: list[ScheduleMessageItem]):
26+
print(f"My test handler received {len(messages)} messages:")
27+
for msg in messages:
28+
print(f" my_test_handler - {msg.item_id}: {msg.content}")
29+
print(
30+
f"{queue._redis_conn.xinfo_groups(queue.stream_name)} qsize: {queue.qsize()} messages:{messages}"
31+
)
32+
33+
34+
# 2. Register the handler
35+
TEST_HANDLER_LABEL = "test_handler"
36+
mem_scheduler.register_handlers({TEST_HANDLER_LABEL: my_test_handler})
37+
38+
# 3. Create messages
39+
messages_to_send = [
40+
ScheduleMessageItem(
41+
item_id=f"test_item_{i}",
42+
user_id="test_user",
43+
mem_cube_id="test_mem_cube",
44+
label=TEST_HANDLER_LABEL,
45+
content=f"This is test message {i}",
46+
)
47+
for i in range(5)
48+
]
49+
50+
# 5. Submit messages
51+
for mes in messages_to_send:
52+
print(f"Submitting message {mes.item_id} to the scheduler...")
53+
mem_scheduler.submit_messages([mes])
54+
55+
# 6. Wait for messages to be processed (limited to 100 checks)
56+
print("Waiting for messages to be consumed (max 100 checks)...")
57+
mem_scheduler.mem_scheduler_wait()
58+
59+
60+
# 7. Stop the scheduler
61+
print("Stopping the scheduler...")
62+
mem_scheduler.stop()

examples/mem_scheduler/memos_w_optimized_scheduler.py

Lines changed: 0 additions & 85 deletions
This file was deleted.

examples/mem_scheduler/memos_w_optimized_scheduler_for_test.py

Lines changed: 0 additions & 87 deletions
This file was deleted.

examples/mem_scheduler/memos_w_scheduler.py

Lines changed: 37 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,48 @@ def init_task():
7070
return conversations, questions
7171

7272

73+
def show_web_logs(mem_scheduler: GeneralScheduler):
74+
"""Display all web log entries from the scheduler's log queue.
75+
76+
Args:
77+
mem_scheduler: The scheduler instance containing web logs to display
78+
"""
79+
if mem_scheduler._web_log_message_queue.empty():
80+
print("Web log queue is currently empty.")
81+
return
82+
83+
print("\n" + "=" * 50 + " WEB LOGS " + "=" * 50)
84+
85+
# Create a temporary queue to preserve the original queue contents
86+
temp_queue = Queue()
87+
log_count = 0
88+
89+
while not mem_scheduler._web_log_message_queue.empty():
90+
log_item: ScheduleLogForWebItem = mem_scheduler._web_log_message_queue.get()
91+
temp_queue.put(log_item)
92+
log_count += 1
93+
94+
# Print log entry details
95+
print(f"\nLog Entry #{log_count}:")
96+
print(f'- "{log_item.label}" log: {log_item}')
97+
98+
print("-" * 50)
99+
100+
# Restore items back to the original queue
101+
while not temp_queue.empty():
102+
mem_scheduler._web_log_message_queue.put(temp_queue.get())
103+
104+
print(f"\nTotal {log_count} web log entries displayed.")
105+
print("=" * 110 + "\n")
106+
107+
73108
def run_with_scheduler_init():
74109
print("==== run_with_automatic_scheduler_init ====")
75110
conversations, questions = init_task()
76111

77112
# set configs
78113
mos_config = MOSConfig.from_yaml_file(
79-
f"{BASE_DIR}/examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml"
114+
f"{BASE_DIR}/examples/data/config/mem_scheduler/memos_config_w_optimized_scheduler.yaml"
80115
)
81116

82117
mem_cube_config = GeneralMemCubeConfig.from_yaml_file(
@@ -118,6 +153,7 @@ def run_with_scheduler_init():
118153
)
119154

120155
mos.add(conversations, user_id=user_id, mem_cube_id=mem_cube_id)
156+
mos.mem_scheduler.current_mem_cube = mem_cube
121157

122158
for item in questions:
123159
print("===== Chat Start =====")
@@ -131,40 +167,5 @@ def run_with_scheduler_init():
131167
mos.mem_scheduler.stop()
132168

133169

134-
def show_web_logs(mem_scheduler: GeneralScheduler):
135-
"""Display all web log entries from the scheduler's log queue.
136-
137-
Args:
138-
mem_scheduler: The scheduler instance containing web logs to display
139-
"""
140-
if mem_scheduler._web_log_message_queue.empty():
141-
print("Web log queue is currently empty.")
142-
return
143-
144-
print("\n" + "=" * 50 + " WEB LOGS " + "=" * 50)
145-
146-
# Create a temporary queue to preserve the original queue contents
147-
temp_queue = Queue()
148-
log_count = 0
149-
150-
while not mem_scheduler._web_log_message_queue.empty():
151-
log_item: ScheduleLogForWebItem = mem_scheduler._web_log_message_queue.get()
152-
temp_queue.put(log_item)
153-
log_count += 1
154-
155-
# Print log entry details
156-
print(f"\nLog Entry #{log_count}:")
157-
print(f'- "{log_item.label}" log: {log_item}')
158-
159-
print("-" * 50)
160-
161-
# Restore items back to the original queue
162-
while not temp_queue.empty():
163-
mem_scheduler._web_log_message_queue.put(temp_queue.get())
164-
165-
print(f"\nTotal {log_count} web log entries displayed.")
166-
print("=" * 110 + "\n")
167-
168-
169170
if __name__ == "__main__":
170171
run_with_scheduler_init()

0 commit comments

Comments
 (0)