Skip to content

Commit 31d2468

Browse files
committed
new feat & fix bugs: mem_scheduler update the logging feature and examples; some bugs due to refactoring have been fixed
1 parent 91924d7 commit 31d2468

File tree

16 files changed

+300
-297
lines changed

16 files changed

+300
-297
lines changed

examples/mem_scheduler/memos_w_scheduler.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@
33

44
from datetime import datetime
55
from pathlib import Path
6+
from queue import Queue
7+
from typing import TYPE_CHECKING
68

79
from memos.configs.mem_cube import GeneralMemCubeConfig
810
from memos.configs.mem_os import MOSConfig
911
from memos.configs.mem_scheduler import AuthConfig, SchedulerConfigFactory
1012
from memos.log import get_logger
1113
from memos.mem_cube.general import GeneralMemCube
1214
from memos.mem_os.main import MOS
15+
from memos.mem_scheduler.general_scheduler import GeneralScheduler
1316
from memos.mem_scheduler.scheduler_factory import SchedulerFactory
1417
from memos.mem_scheduler.schemas.general_schemas import (
1518
ANSWER_LABEL,
@@ -19,6 +22,12 @@
1922
from memos.mem_scheduler.utils.misc_utils import parse_yaml
2023

2124

25+
if TYPE_CHECKING:
26+
from memos.mem_scheduler.schemas import (
27+
ScheduleLogForWebItem,
28+
)
29+
30+
2231
FILE_PATH = Path(__file__).absolute()
2332
BASE_DIR = FILE_PATH.parent.parent.parent
2433
sys.path.insert(0, str(BASE_DIR)) # Enable execution from any working directory
@@ -109,6 +118,8 @@ def run_with_automatic_scheduler_init():
109118
response = mos.chat(query, user_id=user_id)
110119
print(f"Query:\n {query}\n\nAnswer:\n {response}")
111120

121+
show_web_logs(mem_scheduler=mos.mem_scheduler)
122+
112123
mos.mem_scheduler.stop()
113124

114125

@@ -184,9 +195,46 @@ def run_with_manual_scheduler_init():
184195
mos.mem_scheduler.submit_messages(messages=message_item)
185196
print(f"Query:\n {query}\n\nAnswer:\n {response}")
186197

198+
show_web_logs(mem_scheduler=mos.mem_scheduler)
199+
187200
mos.mem_scheduler.stop()
188201

189202

203+
def show_web_logs(mem_scheduler: GeneralScheduler):
204+
"""Display all web log entries from the scheduler's log queue.
205+
206+
Args:
207+
mem_scheduler: The scheduler instance containing web logs to display
208+
"""
209+
if mem_scheduler._web_log_message_queue.empty():
210+
print("Web log queue is currently empty.")
211+
return
212+
213+
print("\n" + "=" * 50 + " WEB LOGS " + "=" * 50)
214+
215+
# Create a temporary queue to preserve the original queue contents
216+
temp_queue = Queue()
217+
log_count = 0
218+
219+
while not mem_scheduler._web_log_message_queue.empty():
220+
log_item: ScheduleLogForWebItem = mem_scheduler._web_log_message_queue.get()
221+
temp_queue.put(log_item)
222+
log_count += 1
223+
224+
# Print log entry details
225+
print(f"\nLog Entry #{log_count}:")
226+
print(f'- "{log_item.label}" log: {log_item}')
227+
228+
print("-" * 50)
229+
230+
# Restore items back to the original queue
231+
while not temp_queue.empty():
232+
mem_scheduler._web_log_message_queue.put(temp_queue.get())
233+
234+
print(f"\nTotal {log_count} web log entries displayed.")
235+
print("=" * 110 + "\n")
236+
237+
190238
if __name__ == "__main__":
191239
run_with_automatic_scheduler_init()
192240

examples/mem_scheduler/memos_w_scheduler_for_test.py

Lines changed: 7 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,17 @@
1+
import json
12
import shutil
23
import sys
34

45
from pathlib import Path
5-
from queue import Queue
6-
from typing import TYPE_CHECKING
76

87
from memos.configs.mem_cube import GeneralMemCubeConfig
98
from memos.configs.mem_os import MOSConfig
109
from memos.configs.mem_scheduler import AuthConfig
1110
from memos.log import get_logger
1211
from memos.mem_cube.general import GeneralMemCube
13-
from memos.mem_scheduler.general_scheduler import GeneralScheduler
1412
from memos.mem_scheduler.mos_for_test_scheduler import MOSForTestScheduler
1513

1614

17-
if TYPE_CHECKING:
18-
from memos.mem_scheduler.schemas import (
19-
ScheduleLogForWebItem,
20-
)
21-
22-
2315
FILE_PATH = Path(__file__).absolute()
2416
BASE_DIR = FILE_PATH.parent.parent.parent
2517
sys.path.insert(0, str(BASE_DIR)) # Enable execution from any working directory
@@ -90,41 +82,6 @@ def init_task():
9082
return conversations, questions
9183

9284

93-
def show_web_logs(mem_scheduler: GeneralScheduler):
94-
"""Display all web log entries from the scheduler's log queue.
95-
96-
Args:
97-
mem_scheduler: The scheduler instance containing web logs to display
98-
"""
99-
if mem_scheduler._web_log_message_queue.empty():
100-
print("Web log queue is currently empty.")
101-
return
102-
103-
print("\n" + "=" * 50 + " WEB LOGS " + "=" * 50)
104-
105-
# Create a temporary queue to preserve the original queue contents
106-
temp_queue = Queue()
107-
log_count = 0
108-
109-
while not mem_scheduler._web_log_message_queue.empty():
110-
log_item: ScheduleLogForWebItem = mem_scheduler._web_log_message_queue.get()
111-
temp_queue.put(log_item)
112-
log_count += 1
113-
114-
# Print log entry details
115-
print(f"\nLog Entry #{log_count}:")
116-
print(f'- "{log_item.label}" log: {log_item}')
117-
118-
print("-" * 50)
119-
120-
# Restore items back to the original queue
121-
while not temp_queue.empty():
122-
mem_scheduler._web_log_message_queue.put(temp_queue.get())
123-
124-
print(f"\nTotal {log_count} web log entries displayed.")
125-
print("=" * 110 + "\n")
126-
127-
12885
if __name__ == "__main__":
12986
# set up data
13087
conversations, questions = init_task()
@@ -168,13 +125,18 @@ def show_web_logs(mem_scheduler: GeneralScheduler):
168125

169126
mos.add(conversations, user_id=user_id, mem_cube_id=mem_cube_id)
170127

128+
# Add interfering conversations
129+
file_path = Path(f"{BASE_DIR}/examples/data/mem_scheduler/scene_data.json")
130+
scene_data = json.load(file_path.open("r", encoding="utf-8"))
131+
mos.add(scene_data[0], user_id=user_id, mem_cube_id=mem_cube_id)
132+
mos.add(scene_data[1], user_id=user_id, mem_cube_id=mem_cube_id)
133+
171134
for item in questions:
172135
print("===== Chat Start =====")
173136
query = item["question"]
174137
print(f"Query:\n {query}\n")
175138
response = mos.chat(query=query, user_id=user_id)
176139
print(f"Answer:\n {response}")
177140
print("===== Chat End =====")
178-
show_web_logs(mos.mem_scheduler)
179141

180142
mos.mem_scheduler.stop()

examples/mem_scheduler/try_schedule_modules.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import shutil
22
import sys
33

4-
from datetime import datetime
54
from pathlib import Path
65
from queue import Queue
76
from typing import TYPE_CHECKING
@@ -17,9 +16,7 @@
1716
from memos.mem_scheduler.mos_for_test_scheduler import MOSForTestScheduler
1817
from memos.mem_scheduler.schemas.general_schemas import (
1918
NOT_APPLICABLE_TYPE,
20-
QUERY_LABEL,
2119
)
22-
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
2320

2421

2522
if TYPE_CHECKING:
@@ -189,23 +186,6 @@ def show_web_logs(mem_scheduler: GeneralScheduler):
189186
)
190187
print(f"\nnew_candidates: {[one.memory for one in new_candidates]}")
191188

192-
# test query_consume
193-
message_item = ScheduleMessageItem(
194-
user_id=user_id,
195-
mem_cube_id=mem_cube_id,
196-
mem_cube=mem_cube,
197-
label=QUERY_LABEL,
198-
content=query,
199-
timestamp=datetime.now(),
200-
)
201-
print(
202-
f"\nworking memories before: {[one.memory for one in mos.mem_scheduler.mem_cubes[mem_cube_id].text_mem.get_working_memory()]}"
203-
)
204-
mos.mem_scheduler._query_message_consumer(messages=[message_item])
205-
print(
206-
f"\nworking memories after: {[one.memory for one in mos.mem_scheduler.mem_cubes[mem_cube_id].text_mem.get_working_memory()]}"
207-
)
208-
209189
# test activation memory update
210190
mos.mem_scheduler.update_activation_memory_periodically(
211191
interval_seconds=0,

src/memos/configs/mem_scheduler.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,20 @@ class RabbitMQConfig(
106106

107107

108108
class GraphDBAuthConfig(BaseConfig):
109-
uri: str = Field(default="localhost", description="URI for graph database access")
109+
uri: str = Field(
110+
default="bolt://localhost:7687",
111+
description="URI for graph database access (e.g., bolt://host:port)",
112+
)
113+
user: str = Field(default="neo4j", description="Username for graph database authentication")
114+
password: str = Field(
115+
default="",
116+
description="Password for graph database authentication",
117+
min_length=8, # 建议密码最小长度
118+
)
119+
db_name: str = Field(default="neo4j", description="Database name to connect to")
120+
auto_create: bool = Field(
121+
default=True, description="Whether to automatically create the database if it doesn't exist"
122+
)
110123

111124

112125
class OpenAIConfig(BaseConfig):

src/memos/mem_os/core.py

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,7 @@ def add(
586586
user_id (str, optional): The identifier of the user to add the memories to.
587587
If None, the default user is used.
588588
"""
589+
# user input messages
589590
assert (messages is not None) or (memory_content is not None) or (doc_path is not None), (
590591
"messages_or_doc_path or memory_content or doc_path must be provided."
591592
)
@@ -625,23 +626,26 @@ def add(
625626
type="chat",
626627
info={"user_id": target_user_id, "session_id": str(uuid.uuid4())},
627628
)
629+
630+
mem_ids = []
628631
for mem in memories:
629-
self.mem_cubes[mem_cube_id].text_mem.add(mem)
632+
mem_id_list: list[str] = self.mem_cubes[mem_cube_id].text_mem.add(mem)
633+
mem_ids.extend(mem_id_list)
630634

631635
# submit messages for scheduler
632-
mem_cube = self.mem_cubes[mem_cube_id]
633636
if self.enable_mem_scheduler and self.mem_scheduler is not None:
634-
text_messages = [message["content"] for message in messages]
637+
mem_cube = self.mem_cubes[mem_cube_id]
635638
message_item = ScheduleMessageItem(
636639
user_id=target_user_id,
637640
mem_cube_id=mem_cube_id,
638641
mem_cube=mem_cube,
639642
label=ADD_LABEL,
640-
content=json.dumps(text_messages),
643+
content=json.dumps(mem_ids),
641644
timestamp=datetime.now(),
642645
)
643646
self.mem_scheduler.submit_messages(messages=[message_item])
644647

648+
# user profile
645649
if (
646650
(memory_content is not None)
647651
and self.config.enable_textual_memory
@@ -669,21 +673,56 @@ def add(
669673
type="chat",
670674
info={"user_id": target_user_id, "session_id": str(uuid.uuid4())},
671675
)
676+
677+
mem_ids = []
672678
for mem in memories:
673-
self.mem_cubes[mem_cube_id].text_mem.add(mem)
679+
mem_id_list: list[str] = self.mem_cubes[mem_cube_id].text_mem.add(mem)
680+
mem_ids.extend(mem_id_list)
681+
682+
# submit messages for scheduler
683+
if self.enable_mem_scheduler and self.mem_scheduler is not None:
684+
mem_cube = self.mem_cubes[mem_cube_id]
685+
message_item = ScheduleMessageItem(
686+
user_id=target_user_id,
687+
mem_cube_id=mem_cube_id,
688+
mem_cube=mem_cube,
689+
label=ADD_LABEL,
690+
content=json.dumps(mem_ids),
691+
timestamp=datetime.now(),
692+
)
693+
self.mem_scheduler.submit_messages(messages=[message_item])
694+
695+
# user doc input
674696
if (
675697
(doc_path is not None)
676698
and self.config.enable_textual_memory
677699
and self.mem_cubes[mem_cube_id].text_mem
678700
):
679701
documents = self._get_all_documents(doc_path)
680-
doc_memory = self.mem_reader.get_memory(
702+
doc_memories = self.mem_reader.get_memory(
681703
documents,
682704
type="doc",
683705
info={"user_id": target_user_id, "session_id": str(uuid.uuid4())},
684706
)
685-
for mem in doc_memory:
686-
self.mem_cubes[mem_cube_id].text_mem.add(mem)
707+
708+
mem_ids = []
709+
for mem in doc_memories:
710+
mem_id_list: list[str] = self.mem_cubes[mem_cube_id].text_mem.add(mem)
711+
mem_ids.extend(mem_id_list)
712+
713+
# submit messages for scheduler
714+
if self.enable_mem_scheduler and self.mem_scheduler is not None:
715+
mem_cube = self.mem_cubes[mem_cube_id]
716+
message_item = ScheduleMessageItem(
717+
user_id=target_user_id,
718+
mem_cube_id=mem_cube_id,
719+
mem_cube=mem_cube,
720+
label=ADD_LABEL,
721+
content=json.dumps(mem_ids),
722+
timestamp=datetime.now(),
723+
)
724+
self.mem_scheduler.submit_messages(messages=[message_item])
725+
687726
logger.info(f"Add memory to {mem_cube_id} successfully")
688727

689728
def get(

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ def update_activation_memory_periodically(
351351
new_activation_memories = []
352352

353353
if self.monitor.timed_trigger(
354-
last_time=self.monitor._last_activation_mem_update_time,
354+
last_time=self.monitor.last_activation_mem_update_time,
355355
interval_seconds=interval_seconds,
356356
):
357357
logger.info(f"Updating activation memory for user {user_id} and mem_cube {mem_cube_id}")
@@ -387,15 +387,15 @@ def update_activation_memory_periodically(
387387
mem_cube=mem_cube,
388388
)
389389

390-
self.monitor._last_activation_mem_update_time = datetime.now()
390+
self.monitor.last_activation_mem_update_time = datetime.now()
391391

392392
logger.debug(
393-
f"Activation memory update completed at {self.monitor._last_activation_mem_update_time}"
393+
f"Activation memory update completed at {self.monitor.last_activation_mem_update_time}"
394394
)
395395
else:
396396
logger.info(
397397
f"Skipping update - {interval_seconds} second interval not yet reached. "
398-
f"Last update time is {self.monitor._last_activation_mem_update_time} and now is"
398+
f"Last update time is {self.monitor.last_activation_mem_update_time} and now is"
399399
f"{datetime.now()}"
400400
)
401401

@@ -421,10 +421,11 @@ def _submit_web_logs(
421421

422422
for message in messages:
423423
self._web_log_message_queue.put(message)
424-
logger.info(f"Submitted Scheduling log for web: {message.log_content}")
424+
message_info = message.debug_info()
425+
logger.debug(f"Submitted Scheduling log for web: {message_info}")
425426

426427
if self.is_rabbitmq_connected():
427-
logger.info("Submitted Scheduling log to rabbitmq")
428+
logger.info(f"Submitted Scheduling log to rabbitmq: {message_info}")
428429
self.rabbitmq_publish_message(message=message.to_dict())
429430
logger.debug(f"{len(messages)} submitted. {self._web_log_message_queue.qsize()} in queue.")
430431

0 commit comments

Comments
 (0)