Skip to content

Commit c133eff

Browse files
authored
refactor: improve architecture and configurations for memory scheduler (#142)
* fix bugs: modify mos_for_test_scheduler.py and fix bugs of scheduler dispatch * feat & fix bugs & refactor: mem scheudler add more functions of monitors and web logs, and refactor the code structure * new feat & fix bugs: mem_scheduler update the logging feature and examples; some bugs due to refactoring have been fixed * fix ruff problem * fix UTC problem
1 parent e2d34d8 commit c133eff

37 files changed

+1763
-1169
lines changed
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
backend: general_scheduler
22
config:
33
top_k: 10
4-
top_n: 5
4+
top_n: 10
55
act_mem_update_interval: 30
6-
context_window_size: 5
6+
context_window_size: 10
77
thread_pool_max_workers: 5
8-
consume_interval_seconds: 3
8+
consume_interval_seconds: 1
99
enable_parallel_dispatch: true

examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ mem_scheduler:
3333
backend: "general_scheduler"
3434
config:
3535
top_k: 10
36-
top_n: 5
36+
top_n: 10
3737
act_mem_update_interval: 30
38-
context_window_size: 5
38+
context_window_size: 10
3939
thread_pool_max_workers: 10
40-
consume_interval_seconds: 3
40+
consume_interval_seconds: 1
4141
enable_parallel_dispatch: true
4242
max_turns_window: 20
4343
top_k: 5

examples/data/config/mem_scheduler/memos_config_w_scheduler_and_openai.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@ mem_reader:
3434
mem_scheduler:
3535
backend: "general_scheduler"
3636
config:
37-
top_k: 2
38-
top_n: 5
37+
top_k: 10
38+
top_n: 10
3939
act_mem_update_interval: 30
40-
context_window_size: 5
40+
context_window_size: 10
4141
thread_pool_max_workers: 10
42-
consume_interval_seconds: 3
42+
consume_interval_seconds: 1
4343
enable_parallel_dispatch: true
4444
max_turns_window: 20
4545
top_k: 5

examples/mem_os/chat_w_scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from memos.configs.mem_os import MOSConfig
88
from memos.mem_cube.general import GeneralMemCube
99
from memos.mem_os.main import MOS
10-
from memos.mem_scheduler.utils import parse_yaml
10+
from memos.mem_scheduler.utils.misc_utils import parse_yaml
1111

1212

1313
# init MOS

examples/mem_scheduler/schedule_w_memos.py renamed to examples/mem_scheduler/memos_w_scheduler.py

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,29 @@
33

44
from datetime import datetime
55
from pathlib import Path
6+
from queue import Queue
7+
from typing import TYPE_CHECKING
68

79
from memos.configs.mem_cube import GeneralMemCubeConfig
810
from memos.configs.mem_os import MOSConfig
911
from memos.configs.mem_scheduler import AuthConfig, SchedulerConfigFactory
1012
from memos.log import get_logger
1113
from memos.mem_cube.general import GeneralMemCube
1214
from memos.mem_os.main import MOS
13-
from memos.mem_scheduler.modules.schemas import (
15+
from memos.mem_scheduler.general_scheduler import GeneralScheduler
16+
from memos.mem_scheduler.scheduler_factory import SchedulerFactory
17+
from memos.mem_scheduler.schemas.general_schemas import (
1418
ANSWER_LABEL,
1519
QUERY_LABEL,
16-
ScheduleMessageItem,
1720
)
18-
from memos.mem_scheduler.scheduler_factory import SchedulerFactory
19-
from memos.mem_scheduler.utils import parse_yaml
21+
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
22+
from memos.mem_scheduler.utils.misc_utils import parse_yaml
23+
24+
25+
if TYPE_CHECKING:
26+
from memos.mem_scheduler.schemas import (
27+
ScheduleLogForWebItem,
28+
)
2029

2130

2231
FILE_PATH = Path(__file__).absolute()
@@ -109,6 +118,8 @@ def run_with_automatic_scheduler_init():
109118
response = mos.chat(query, user_id=user_id)
110119
print(f"Query:\n {query}\n\nAnswer:\n {response}")
111120

121+
show_web_logs(mem_scheduler=mos.mem_scheduler)
122+
112123
mos.mem_scheduler.stop()
113124

114125

@@ -184,9 +195,46 @@ def run_with_manual_scheduler_init():
184195
mos.mem_scheduler.submit_messages(messages=message_item)
185196
print(f"Query:\n {query}\n\nAnswer:\n {response}")
186197

198+
show_web_logs(mem_scheduler=mos.mem_scheduler)
199+
187200
mos.mem_scheduler.stop()
188201

189202

203+
def show_web_logs(mem_scheduler: GeneralScheduler):
204+
"""Display all web log entries from the scheduler's log queue.
205+
206+
Args:
207+
mem_scheduler: The scheduler instance containing web logs to display
208+
"""
209+
if mem_scheduler._web_log_message_queue.empty():
210+
print("Web log queue is currently empty.")
211+
return
212+
213+
print("\n" + "=" * 50 + " WEB LOGS " + "=" * 50)
214+
215+
# Create a temporary queue to preserve the original queue contents
216+
temp_queue = Queue()
217+
log_count = 0
218+
219+
while not mem_scheduler._web_log_message_queue.empty():
220+
log_item: ScheduleLogForWebItem = mem_scheduler._web_log_message_queue.get()
221+
temp_queue.put(log_item)
222+
log_count += 1
223+
224+
# Print log entry details
225+
print(f"\nLog Entry #{log_count}:")
226+
print(f'- "{log_item.label}" log: {log_item}')
227+
228+
print("-" * 50)
229+
230+
# Restore items back to the original queue
231+
while not temp_queue.empty():
232+
mem_scheduler._web_log_message_queue.put(temp_queue.get())
233+
234+
print(f"\nTotal {log_count} web log entries displayed.")
235+
print("=" * 110 + "\n")
236+
237+
190238
if __name__ == "__main__":
191239
run_with_automatic_scheduler_init()
192240

examples/mem_scheduler/schedule_chat_and_web.py renamed to examples/mem_scheduler/memos_w_scheduler_for_test.py

Lines changed: 11 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,17 @@
1+
import json
12
import shutil
23
import sys
34

45
from pathlib import Path
5-
from queue import Queue
6-
from typing import TYPE_CHECKING
76

87
from memos.configs.mem_cube import GeneralMemCubeConfig
98
from memos.configs.mem_os import MOSConfig
109
from memos.configs.mem_scheduler import AuthConfig
1110
from memos.log import get_logger
1211
from memos.mem_cube.general import GeneralMemCube
13-
from memos.mem_scheduler.general_scheduler import GeneralScheduler
1412
from memos.mem_scheduler.mos_for_test_scheduler import MOSForTestScheduler
1513

1614

17-
if TYPE_CHECKING:
18-
from memos.mem_scheduler.modules.schemas import (
19-
ScheduleLogForWebItem,
20-
)
21-
22-
2315
FILE_PATH = Path(__file__).absolute()
2416
BASE_DIR = FILE_PATH.parent.parent.parent
2517
sys.path.insert(0, str(BASE_DIR)) # Enable execution from any working directory
@@ -90,41 +82,6 @@ def init_task():
9082
return conversations, questions
9183

9284

93-
def show_web_logs(mem_scheduler: GeneralScheduler):
94-
"""Display all web log entries from the scheduler's log queue.
95-
96-
Args:
97-
mem_scheduler: The scheduler instance containing web logs to display
98-
"""
99-
if mem_scheduler._web_log_message_queue.empty():
100-
print("Web log queue is currently empty.")
101-
return
102-
103-
print("\n" + "=" * 50 + " WEB LOGS " + "=" * 50)
104-
105-
# Create a temporary queue to preserve the original queue contents
106-
temp_queue = Queue()
107-
log_count = 0
108-
109-
while not mem_scheduler._web_log_message_queue.empty():
110-
log_item: ScheduleLogForWebItem = mem_scheduler._web_log_message_queue.get()
111-
temp_queue.put(log_item)
112-
log_count += 1
113-
114-
# Print log entry details
115-
print(f"\nLog Entry #{log_count}:")
116-
print(f'- "{log_item.label}" log: {log_item}')
117-
118-
print("-" * 50)
119-
120-
# Restore items back to the original queue
121-
while not temp_queue.empty():
122-
mem_scheduler._web_log_message_queue.put(temp_queue.get())
123-
124-
print(f"\nTotal {log_count} web log entries displayed.")
125-
print("=" * 110 + "\n")
126-
127-
12885
if __name__ == "__main__":
12986
# set up data
13087
conversations, questions = init_task()
@@ -168,12 +125,18 @@ def show_web_logs(mem_scheduler: GeneralScheduler):
168125

169126
mos.add(conversations, user_id=user_id, mem_cube_id=mem_cube_id)
170127

128+
# Add interfering conversations
129+
file_path = Path(f"{BASE_DIR}/examples/data/mem_scheduler/scene_data.json")
130+
scene_data = json.load(file_path.open("r", encoding="utf-8"))
131+
mos.add(scene_data[0], user_id=user_id, mem_cube_id=mem_cube_id)
132+
mos.add(scene_data[1], user_id=user_id, mem_cube_id=mem_cube_id)
133+
171134
for item in questions:
135+
print("===== Chat Start =====")
172136
query = item["question"]
173-
137+
print(f"Query:\n {query}\n")
174138
response = mos.chat(query=query, user_id=user_id)
175-
print(f"Query:\n {query}\n\nAnswer:\n {response}")
176-
177-
show_web_logs(mos.mem_scheduler)
139+
print(f"Answer:\n {response}")
140+
print("===== Chat End =====")
178141

179142
mos.mem_scheduler.stop()

examples/mem_scheduler/rabbitmq_example.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
def publish_message(rabbitmq_module, message):
99
"""Function to publish a message."""
1010
rabbitmq_module.rabbitmq_publish_message(message)
11-
print(f"Published message: {message}")
11+
print(f"Published message: {message}\n")
1212

1313

1414
def main():
@@ -24,8 +24,7 @@ def main():
2424
rabbitmq_module.initialize_rabbitmq(config=AuthConfig.from_local_yaml().rabbitmq)
2525

2626
try:
27-
# Start consumer
28-
rabbitmq_module.rabbitmq_start_consuming()
27+
rabbitmq_module.wait_for_connection_ready()
2928

3029
# === Publish some test messages ===
3130
# List to hold thread references
@@ -38,6 +37,9 @@ def main():
3837
thread.start()
3938
threads.append(thread)
4039

40+
# Start consumer
41+
rabbitmq_module.rabbitmq_start_consuming()
42+
4143
# Join threads to ensure all messages are published before proceeding
4244
for thread in threads:
4345
thread.join()
@@ -47,7 +49,7 @@ def main():
4749

4850
finally:
4951
# Give some time for cleanup
50-
time.sleep(5)
52+
time.sleep(3)
5153

5254
# Close connections
5355
rabbitmq_module.rabbitmq_close()

examples/mem_scheduler/redis_example.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88

99
from memos.configs.mem_scheduler import SchedulerConfigFactory
1010
from memos.mem_cube.general import GeneralMemCube
11-
from memos.mem_scheduler.modules.schemas import QUERY_LABEL, ScheduleMessageItem
1211
from memos.mem_scheduler.scheduler_factory import SchedulerFactory
12+
from memos.mem_scheduler.schemas.general_schemas import QUERY_LABEL
13+
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
1314

1415

1516
if TYPE_CHECKING:

examples/mem_scheduler/try_schedule_modules.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313
from memos.log import get_logger
1414
from memos.mem_cube.general import GeneralMemCube
1515
from memos.mem_scheduler.general_scheduler import GeneralScheduler
16-
from memos.mem_scheduler.modules.schemas import NOT_APPLICABLE_TYPE
1716
from memos.mem_scheduler.mos_for_test_scheduler import MOSForTestScheduler
17+
from memos.mem_scheduler.schemas.general_schemas import (
18+
NOT_APPLICABLE_TYPE,
19+
)
1820

1921

2022
if TYPE_CHECKING:
21-
from memos.mem_scheduler.modules.schemas import (
23+
from memos.mem_scheduler.schemas import (
2224
ScheduleLogForWebItem,
2325
)
2426

@@ -175,14 +177,14 @@ def show_web_logs(mem_scheduler: GeneralScheduler):
175177
query = item["question"]
176178

177179
# test process_session_turn
178-
mos.mem_scheduler.process_session_turn(
180+
working_memory, new_candidates = mos.mem_scheduler.process_session_turn(
179181
queries=[query],
180182
user_id=user_id,
181183
mem_cube_id=mem_cube_id,
182184
mem_cube=mem_cube,
183185
top_k=10,
184-
query_history=None,
185186
)
187+
print(f"\nnew_candidates: {[one.memory for one in new_candidates]}")
186188

187189
# test activation memory update
188190
mos.mem_scheduler.update_activation_memory_periodically(

src/memos/configs/mem_scheduler.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66
from pydantic import ConfigDict, Field, field_validator, model_validator
77

88
from memos.configs.base import BaseConfig
9-
from memos.mem_scheduler.modules.schemas import (
9+
from memos.mem_scheduler.modules.misc import DictConversionMixin
10+
from memos.mem_scheduler.schemas.general_schemas import (
1011
BASE_DIR,
1112
DEFAULT_ACT_MEM_DUMP_PATH,
1213
DEFAULT_CONSUME_INTERVAL_SECONDS,
1314
DEFAULT_THREAD__POOL_MAX_WORKERS,
14-
DictConversionMixin,
1515
)
1616

1717

@@ -21,6 +21,7 @@ class BaseSchedulerConfig(BaseConfig):
2121
top_k: int = Field(
2222
default=10, description="Number of top candidates to consider in initial retrieval"
2323
)
24+
# TODO: The 'top_n' field is deprecated and will be removed in future versions.
2425
top_n: int = Field(default=5, description="Number of final results to return after processing")
2526
enable_parallel_dispatch: bool = Field(
2627
default=True, description="Whether to enable parallel message processing using thread pool"
@@ -48,7 +49,7 @@ class GeneralSchedulerConfig(BaseSchedulerConfig):
4849
default=300, description="Interval in seconds for updating activation memory"
4950
)
5051
context_window_size: int | None = Field(
51-
default=5, description="Size of the context window for conversation history"
52+
default=10, description="Size of the context window for conversation history"
5253
)
5354
act_mem_dump_path: str | None = Field(
5455
default=DEFAULT_ACT_MEM_DUMP_PATH, # Replace with DEFAULT_ACT_MEM_DUMP_PATH
@@ -105,7 +106,20 @@ class RabbitMQConfig(
105106

106107

107108
class GraphDBAuthConfig(BaseConfig):
108-
uri: str = Field(default="localhost", description="URI for graph database access")
109+
uri: str = Field(
110+
default="bolt://localhost:7687",
111+
description="URI for graph database access (e.g., bolt://host:port)",
112+
)
113+
user: str = Field(default="neo4j", description="Username for graph database authentication")
114+
password: str = Field(
115+
default="",
116+
description="Password for graph database authentication",
117+
min_length=8, # 建议密码最小长度
118+
)
119+
db_name: str = Field(default="neo4j", description="Database name to connect to")
120+
auto_create: bool = Field(
121+
default=True, description="Whether to automatically create the database if it doesn't exist"
122+
)
109123

110124

111125
class OpenAIConfig(BaseConfig):

0 commit comments

Comments
 (0)