Skip to content

Commit 943b4cb

Browse files
committed
Merge remote-tracking branch 'refs/remotes/origin/dev' into dev_zdy_1208_optimize
2 parents cb5b99b + 1f3606f commit 943b4cb

File tree

11 files changed

+103
-115
lines changed

11 files changed

+103
-115
lines changed

examples/mem_scheduler/task_stop_rerun.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from pathlib import Path
22
from time import sleep
33

4-
# Note: we skip API handler status/wait utilities in this demo
54
from memos.api.routers.server_router import mem_scheduler
65
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
76

examples/mem_scheduler/try_schedule_modules.py

Lines changed: 69 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import shutil
21
import sys
32

43
from pathlib import Path
@@ -7,16 +6,15 @@
76

87
from tqdm import tqdm
98

10-
from memos.configs.mem_cube import GeneralMemCubeConfig
11-
from memos.configs.mem_os import MOSConfig
12-
from memos.configs.mem_scheduler import AuthConfig
13-
from memos.log import get_logger
14-
from memos.mem_cube.general import GeneralMemCube
15-
from memos.mem_scheduler.analyzer.mos_for_test_scheduler import MOSForTestScheduler
16-
from memos.mem_scheduler.general_scheduler import GeneralScheduler
17-
from memos.mem_scheduler.schemas.task_schemas import (
18-
NOT_APPLICABLE_TYPE,
9+
from memos.api.routers.server_router import (
10+
mem_scheduler,
1911
)
12+
from memos.log import get_logger
13+
from memos.mem_scheduler.analyzer.api_analyzer import DirectSearchMemoriesAnalyzer
14+
from memos.mem_scheduler.base_scheduler import BaseScheduler
15+
from memos.mem_scheduler.optimized_scheduler import OptimizedScheduler
16+
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
17+
from memos.mem_scheduler.schemas.task_schemas import MEM_UPDATE_TASK_LABEL
2018

2119

2220
if TYPE_CHECKING:
@@ -95,7 +93,7 @@ def init_task():
9593
return conversations, questions
9694

9795

98-
def show_web_logs(mem_scheduler: GeneralScheduler):
96+
def show_web_logs(mem_scheduler: BaseScheduler):
9997
"""Display all web log entries from the scheduler's log queue.
10098
10199
Args:
@@ -130,78 +128,77 @@ def show_web_logs(mem_scheduler: GeneralScheduler):
130128
print("=" * 110 + "\n")
131129

132130

133-
if __name__ == "__main__":
134-
# set up data
135-
conversations, questions = init_task()
136-
137-
# set configs
138-
mos_config = MOSConfig.from_yaml_file(
139-
f"{BASE_DIR}/examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml"
140-
)
141-
142-
mem_cube_config = GeneralMemCubeConfig.from_yaml_file(
143-
f"{BASE_DIR}/examples/data/config/mem_scheduler/mem_cube_config_neo4j.yaml"
144-
)
131+
class ScheduleModulesRunner(DirectSearchMemoriesAnalyzer):
132+
def __init__(self):
133+
super().__init__()
145134

146-
# default local graphdb uri
147-
if AuthConfig.default_config_exists():
148-
auth_config = AuthConfig.from_local_config()
135+
def start_conversation(self, user_id="test_user", mem_cube_id="test_cube", session_id=None):
136+
self.current_user_id = user_id
137+
self.current_mem_cube_id = mem_cube_id
138+
self.current_session_id = (
139+
session_id or f"session_{hash(user_id + mem_cube_id)}_{len(self.conversation_history)}"
140+
)
141+
self.conversation_history = []
142+
143+
logger.info(f"Started conversation session: {self.current_session_id}")
144+
print(f"🚀 Started new conversation session: {self.current_session_id}")
145+
print(f" User ID: {self.current_user_id}")
146+
print(f" Mem Cube ID: {self.current_mem_cube_id}")
147+
148+
def add_msgs(self, messages: list[dict]):
149+
# Create add request
150+
add_req = self.create_test_add_request(
151+
user_id=self.current_user_id,
152+
mem_cube_id=self.current_mem_cube_id,
153+
messages=messages,
154+
session_id=self.current_session_id,
155+
)
149156

150-
mos_config.mem_reader.config.llm.config.api_key = auth_config.openai.api_key
151-
mos_config.mem_reader.config.llm.config.api_base = auth_config.openai.base_url
157+
# Add to memory
158+
result = self.add_memories(add_req)
159+
print(f" ✅ Added to memory successfully: \n{messages}")
152160

153-
mem_cube_config.text_mem.config.graph_db.config.uri = auth_config.graph_db.uri
154-
mem_cube_config.text_mem.config.graph_db.config.user = auth_config.graph_db.user
155-
mem_cube_config.text_mem.config.graph_db.config.password = auth_config.graph_db.password
156-
mem_cube_config.text_mem.config.graph_db.config.db_name = auth_config.graph_db.db_name
157-
mem_cube_config.text_mem.config.graph_db.config.auto_create = (
158-
auth_config.graph_db.auto_create
159-
)
161+
return result
160162

161-
# Initialization
162-
mos = MOSForTestScheduler(mos_config)
163163

164-
user_id = "user_1"
165-
mos.create_user(user_id)
164+
if __name__ == "__main__":
165+
# set up data
166+
conversations, questions = init_task()
166167

167-
mem_cube_id = "mem_cube_5"
168-
mem_cube_name_or_path = f"{BASE_DIR}/outputs/mem_scheduler/{user_id}/{mem_cube_id}"
168+
trying_modules = ScheduleModulesRunner()
169169

170-
if Path(mem_cube_name_or_path).exists():
171-
shutil.rmtree(mem_cube_name_or_path)
172-
print(f"{mem_cube_name_or_path} is not empty, and has been removed.")
170+
trying_modules.start_conversation(
171+
user_id="try_scheduler_modules",
172+
mem_cube_id="try_scheduler_modules",
173+
)
173174

174-
mem_cube = GeneralMemCube(mem_cube_config)
175-
mem_cube.dump(mem_cube_name_or_path)
176-
mos.register_mem_cube(
177-
mem_cube_name_or_path=mem_cube_name_or_path, mem_cube_id=mem_cube_id, user_id=user_id
175+
trying_modules.add_msgs(
176+
messages=conversations,
178177
)
179-
mos.mem_scheduler.current_mem_cube = mem_cube
180178

181-
mos.add(conversations, user_id=user_id, mem_cube_id=mem_cube_id)
179+
mem_scheduler: OptimizedScheduler = mem_scheduler
180+
# Force retrieval to trigger every turn for the example to be deterministic
181+
try:
182+
mem_scheduler.monitor.query_trigger_interval = 0.0
183+
except Exception:
184+
logger.exception("Failed to set query_trigger_interval; continuing with defaults.")
182185

183-
for item in tqdm(questions, desc="processing queries"):
186+
for item_idx, item in enumerate(tqdm(questions, desc="processing queries")):
184187
query = item["question"]
185-
186-
# test process_session_turn
187-
working_memory, new_candidates = mos.mem_scheduler.process_session_turn(
188-
queries=[query],
189-
user_id=user_id,
190-
mem_cube_id=mem_cube_id,
191-
mem_cube=mem_cube,
192-
top_k=10,
188+
messages_to_send = [
189+
ScheduleMessageItem(
190+
item_id=f"test_item_{item_idx}",
191+
user_id=trying_modules.current_user_id,
192+
mem_cube_id=trying_modules.current_mem_cube_id,
193+
label=MEM_UPDATE_TASK_LABEL,
194+
content=query,
195+
)
196+
]
197+
198+
# Run one session turn manually to get search candidates
199+
mem_scheduler._memory_update_consumer(
200+
messages=messages_to_send,
193201
)
194-
print(f"\nnew_candidates: {[one.memory for one in new_candidates]}")
195-
196-
# test activation memory update
197-
mos.mem_scheduler.update_activation_memory_periodically(
198-
interval_seconds=0,
199-
label=NOT_APPLICABLE_TYPE,
200-
user_id=user_id,
201-
mem_cube_id=mem_cube_id,
202-
mem_cube=mem_cube,
203-
)
204-
205-
show_web_logs(mos.mem_scheduler)
206202

207-
mos.mem_scheduler.stop()
203+
# Show accumulated web logs
204+
show_web_logs(mem_scheduler)

src/memos/api/handlers/chat_handler.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -541,16 +541,7 @@ def generate_chat_response() -> Generator[str, None, None]:
541541
)
542542

543543
# Step 3: Generate streaming response from LLM
544-
if (
545-
chat_req.model_name_or_path
546-
and chat_req.model_name_or_path not in self.chat_llms
547-
):
548-
raise HTTPException(
549-
status_code=400,
550-
detail=f"Model {chat_req.model_name_or_path} not suport, choose from {list(self.chat_llms.keys())}",
551-
)
552-
553-
model = chat_req.model_name_or_path or next(iter(self.chat_llms.keys()))
544+
model = next(iter(self.chat_llms.keys()))
554545
response_stream = self.chat_llms[model].generate_stream(
555546
current_messages, model_name_or_path=model
556547
)

src/memos/mem_os/core.py

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ def chat(self, query: str, user_id: str | None = None, base_prompt: str | None =
287287
content=query,
288288
timestamp=datetime.utcnow(),
289289
)
290-
self.mem_scheduler.memos_message_queue.submit_messages(messages=[message_item])
290+
self.mem_scheduler.submit_messages(messages=[message_item])
291291

292292
memories = mem_cube.text_mem.search(
293293
query,
@@ -347,7 +347,7 @@ def chat(self, query: str, user_id: str | None = None, base_prompt: str | None =
347347
content=response,
348348
timestamp=datetime.utcnow(),
349349
)
350-
self.mem_scheduler.memos_message_queue.submit_messages(messages=[message_item])
350+
self.mem_scheduler.submit_messages(messages=[message_item])
351351

352352
return response
353353

@@ -776,9 +776,7 @@ def process_textual_memory():
776776
timestamp=datetime.utcnow(),
777777
task_id=task_id,
778778
)
779-
self.mem_scheduler.memos_message_queue.submit_messages(
780-
messages=[message_item]
781-
)
779+
self.mem_scheduler.submit_messages(messages=[message_item])
782780
else:
783781
message_item = ScheduleMessageItem(
784782
user_id=target_user_id,
@@ -791,9 +789,7 @@ def process_textual_memory():
791789
logger.info(
792790
f"[DIAGNOSTIC] core.add: Submitting message to scheduler: {message_item.model_dump_json(indent=2)}"
793791
)
794-
self.mem_scheduler.memos_message_queue.submit_messages(
795-
messages=[message_item]
796-
)
792+
self.mem_scheduler.submit_messages(messages=[message_item])
797793

798794
def process_preference_memory():
799795
if (
@@ -828,7 +824,7 @@ def process_preference_memory():
828824
content=json.dumps(messages_list),
829825
timestamp=datetime.utcnow(),
830826
)
831-
self.mem_scheduler.memos_message_queue.submit_messages(messages=[message_item])
827+
self.mem_scheduler.submit_messages(messages=[message_item])
832828

833829
# Execute both memory processing functions in parallel
834830
with ContextThreadPoolExecutor(max_workers=2) as executor:
@@ -882,9 +878,7 @@ def process_preference_memory():
882878
content=json.dumps(mem_ids),
883879
timestamp=datetime.utcnow(),
884880
)
885-
self.mem_scheduler.memos_message_queue.submit_messages(
886-
messages=[message_item]
887-
)
881+
self.mem_scheduler.submit_messages(messages=[message_item])
888882
else:
889883
message_item = ScheduleMessageItem(
890884
user_id=target_user_id,
@@ -893,9 +887,7 @@ def process_preference_memory():
893887
content=json.dumps(mem_ids),
894888
timestamp=datetime.utcnow(),
895889
)
896-
self.mem_scheduler.memos_message_queue.submit_messages(
897-
messages=[message_item]
898-
)
890+
self.mem_scheduler.submit_messages(messages=[message_item])
899891

900892
# user doc input
901893
if (
@@ -924,7 +916,7 @@ def process_preference_memory():
924916
content=json.dumps(mem_ids),
925917
timestamp=datetime.utcnow(),
926918
)
927-
self.mem_scheduler.memos_message_queue.submit_messages(messages=[message_item])
919+
self.mem_scheduler.submit_messages(messages=[message_item])
928920

929921
logger.info(f"Add memory to {mem_cube_id} successfully")
930922

src/memos/mem_os/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ def _chat_with_cot_enhancement(
220220
content=enhanced_response,
221221
timestamp=datetime.now().isoformat(),
222222
)
223-
self.mem_scheduler.memos_message_queue.submit_messages(messages=[message_item])
223+
self.mem_scheduler.submit_messages(messages=[message_item])
224224

225225
return enhanced_response
226226

src/memos/mem_os/product.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,7 @@ def _send_message_to_scheduler(
641641
content=query,
642642
timestamp=datetime.utcnow(),
643643
)
644-
self.mem_scheduler.memos_message_queue.submit_messages(messages=[message_item])
644+
self.mem_scheduler.submit_messages(messages=[message_item])
645645

646646
async def _post_chat_processing(
647647
self,

src/memos/mem_scheduler/analyzer/mos_for_test_scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,7 @@ def chat(self, query: str, user_id: str | None = None) -> str:
523523
content=response,
524524
timestamp=datetime.now(),
525525
)
526-
self.mem_scheduler.memos_message_queue.submit_messages(messages=[message_item])
526+
self.mem_scheduler.submit_messages(messages=[message_item])
527527

528528
return response
529529

src/memos/mem_scheduler/general_modules/scheduler_logger.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,10 @@ def log_working_memory_replacement(
158158
new_text_memories = [m.memory for m in new_memory]
159159
original_set = set(original_text_memories)
160160
new_set = set(new_text_memories)
161-
added_texts = list(new_set - original_set)
161+
added_texts = []
162+
for new_mem in new_set:
163+
if new_mem not in original_set:
164+
added_texts.append(new_mem)
162165
memcube_content = []
163166
meta = []
164167
by_text = {m.memory: m for m in new_memory}

src/memos/mem_scheduler/optimized_scheduler.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -338,19 +338,25 @@ def replace_working_memory(
338338
for one in new_working_memory_monitors:
339339
one.sorting_score = 0
340340

341-
logger.info(
342-
f"[optimized replace_working_memory] update {len(new_working_memory_monitors)} working_memory_monitors"
343-
)
344341
self.monitor.update_working_memory_monitors(
345342
new_working_memory_monitors=new_working_memory_monitors,
346343
user_id=user_id,
347344
mem_cube_id=mem_cube_id,
348345
mem_cube=mem_cube,
349346
)
350-
351-
# Use the filtered and reranked memories directly
352-
text_mem_base.replace_working_memory(memories=memories_with_new_order)
353-
347+
logger.info(
348+
f"[optimized replace_working_memory] update {len(new_working_memory_monitors)} working_memory_monitors"
349+
)
350+
try:
351+
# Use the filtered and reranked memories directly
352+
text_mem_base.replace_working_memory(
353+
memories=memories_with_new_order, user_name=mem_cube_id
354+
)
355+
except Exception:
356+
logger.error(
357+
"[optimized replace_working_memory] text_mem_base.replace_working_memory failed!",
358+
stack_info=True,
359+
)
354360
# Update monitor after replacing working memory
355361
mem_monitors: list[MemoryMonitorItem] = self.monitor.working_memory_monitors[user_id][
356362
mem_cube_id

src/memos/mem_scheduler/task_schedule_modules/orchestrator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ def __init__(self):
4747
# Per-task minimum idle time (ms) before claiming pending messages
4848
# Default fallback handled in `get_task_idle_min`.
4949
self.tasks_min_idle_ms = {
50-
# Preferential add tasks: allow claiming pending sooner (1 minute)
51-
PREF_ADD_TASK_LABEL: 60_000,
50+
# Preferential add tasks: allow claiming pending sooner (10 minute)
51+
PREF_ADD_TASK_LABEL: 600_000,
5252
}
5353

5454
def get_stream_priorities(self) -> None | dict:

0 commit comments

Comments
 (0)