Skip to content

Commit a4ed66d

Browse files
Feat: reorg playground MemCube logs\n\n* feat: normalize show_web_logs title/content (HH:mm:ss, truncate, zh types, desc sort)\n* feat: add addMessage web logs (query/answer)\n* feat: emit merge logs (merged keys + merged result)\n* feat: add mem_update/mem_archive labels and logger methods\n* feat: add non-breaking add/update log split by metadata.key\n* chore: keep schema unchanged; no frontend coupling
1 parent 7541827 commit a4ed66d

File tree

4 files changed

+278
-15
lines changed

4 files changed

+278
-15
lines changed

examples/mem_scheduler/memos_w_scheduler.py

Lines changed: 106 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,18 @@
1111
from memos.log import get_logger
1212
from memos.mem_cube.general import GeneralMemCube
1313
from memos.mem_os.main import MOS
14+
from datetime import datetime
15+
import re
16+
from memos.mem_scheduler.schemas.general_schemas import (
17+
QUERY_LABEL,
18+
ANSWER_LABEL,
19+
ADD_LABEL,
20+
MEM_ORGANIZE_LABEL,
21+
MEM_UPDATE_LABEL,
22+
MEM_ARCHIVE_LABEL,
23+
NOT_APPLICABLE_TYPE,
24+
)
25+
from memos.mem_scheduler.utils.filter_utils import transform_name_to_key
1426
from memos.mem_scheduler.general_scheduler import GeneralScheduler
1527

1628

@@ -70,6 +82,89 @@ def init_task():
7082
return conversations, questions
7183

7284

85+
def _truncate_with_rules(text: str) -> str:
86+
has_cjk = bool(re.search(r"[\u4e00-\u9fff]", text))
87+
limit = 32 if has_cjk else 64
88+
s = text.strip().replace("\n", " ")
89+
if len(s) <= limit:
90+
return s
91+
return s[: limit - 1] + "… 查看详情"
92+
93+
94+
def _format_title(ts: datetime, title_text: str) -> str:
95+
return f"{ts.strftime('%H:%M:%S')} {title_text}"
96+
97+
98+
def _cube_display_from(mem_cube_id: str) -> str:
99+
if "public" in (mem_cube_id or "").lower():
100+
return "公共MemCube"
101+
return "用户MemCube"
102+
103+
_TYPE_ZH = {
104+
"LongTermMemory": "长期",
105+
"UserMemory": "用户",
106+
"WorkingMemory": "工作",
107+
"ActivationMemory": "激活",
108+
"ParameterMemory": "参数",
109+
"TextMemory": "明文",
110+
"UserInput": "消息",
111+
"NotApplicable": "NA",
112+
}
113+
114+
115+
def _format_entry(item) -> tuple[str, str]:
116+
local_ts = item.timestamp.astimezone()
117+
cube_display = _cube_display_from(item.mem_cube_id)
118+
label = item.label
119+
from_t = item.from_memory_type
120+
to_t = item.to_memory_type
121+
content = item.log_content or ""
122+
123+
if label == QUERY_LABEL:
124+
xx = cube_display.replace("MemCube", "")
125+
title = _format_title(local_ts, f"addMessages至{xx} MemCube")
126+
return title, _truncate_with_rules(content)
127+
if label == ANSWER_LABEL:
128+
xx = cube_display.replace("MemCube", "")
129+
title = _format_title(local_ts, f"addMessages至{xx} MemCube")
130+
return title, _truncate_with_rules(content)
131+
132+
if label == ADD_LABEL:
133+
key = transform_name_to_key(content)
134+
title = _format_title(local_ts, f"{cube_display}新增了1条记忆")
135+
return title, _truncate_with_rules(f"{key}{content}")
136+
137+
if label == MEM_UPDATE_LABEL:
138+
key = transform_name_to_key(content)
139+
title = _format_title(local_ts, f"{cube_display}更新了1条记忆")
140+
return title, _truncate_with_rules(f"{key}{content}")
141+
142+
if label == MEM_ARCHIVE_LABEL:
143+
key = transform_name_to_key(content)
144+
title = _format_title(local_ts, f"{cube_display}归档了1条记忆")
145+
return title, _truncate_with_rules(f"{key}{content}")
146+
147+
if label == MEM_ORGANIZE_LABEL:
148+
m = re.search(r"被合并的记忆:(.+)", content)
149+
if m:
150+
keys_str = m.group(1)
151+
n = len([x for x in keys_str.split("|") if x.strip()])
152+
title = _format_title(local_ts, f"{cube_display}合并了{n}条记忆")
153+
return title, _truncate_with_rules(content)
154+
title = _format_title(local_ts, f"{cube_display}合并了1条记忆")
155+
return title, _truncate_with_rules(content)
156+
157+
if from_t != to_t and to_t != NOT_APPLICABLE_TYPE:
158+
key = transform_name_to_key(content)
159+
title = _format_title(local_ts, f"{cube_display}调度了1条记忆")
160+
from_zh = _TYPE_ZH.get(from_t, from_t)
161+
to_zh = _TYPE_ZH.get(to_t, to_t)
162+
return title, _truncate_with_rules(f"[{from_zh}{to_zh}] {key}{content}")
163+
164+
title = _format_title(local_ts, f"{cube_display}事件")
165+
return title, _truncate_with_rules(content)
166+
167+
73168
def show_web_logs(mem_scheduler: GeneralScheduler):
74169
"""Display all web log entries from the scheduler's log queue.
75170
@@ -86,16 +181,20 @@ def show_web_logs(mem_scheduler: GeneralScheduler):
86181
temp_queue = Queue()
87182
log_count = 0
88183

184+
# drain all items for sorting by time desc
185+
items = []
89186
while not mem_scheduler._web_log_message_queue.empty():
90187
log_item: ScheduleLogForWebItem = mem_scheduler._web_log_message_queue.get()
188+
items.append(log_item)
91189
temp_queue.put(log_item)
92-
log_count += 1
93-
94-
# Print log entry details
95-
print(f"\nLog Entry #{log_count}:")
96-
print(f'- "{log_item.label}" log: {log_item}')
97-
98-
print("-" * 50)
190+
# sort by timestamp desc
191+
items_sorted = sorted(items, key=lambda x: x.timestamp, reverse=True)
192+
for idx, log_item in enumerate(items_sorted, 1):
193+
title, content = _format_entry(log_item)
194+
print(f"\nLog Entry #{idx}:")
195+
print(title)
196+
print(content)
197+
log_count = len(items_sorted)
99198

100199
# Restore items back to the original queue
101200
while not temp_queue.empty():

src/memos/mem_scheduler/general_modules/scheduler_logger.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
TEXT_MEMORY_TYPE,
1414
USER_INPUT_TYPE,
1515
WORKING_MEMORY_TYPE,
16+
MEM_UPDATE_LABEL,
17+
MEM_ARCHIVE_LABEL,
1618
)
1719
from memos.mem_scheduler.schemas.message_schemas import (
1820
ScheduleLogForWebItem,
@@ -233,6 +235,48 @@ def log_adding_memory(
233235
f"converted to {memory_type} memory in mem_cube {mem_cube_id}: {memory}"
234236
)
235237

238+
@log_exceptions(logger=logger)
239+
def log_updating_memory(
240+
self,
241+
memory: str,
242+
memory_type: str,
243+
user_id: str,
244+
mem_cube_id: str,
245+
mem_cube: GeneralMemCube,
246+
log_func_callback: Callable[[list[ScheduleLogForWebItem]], None],
247+
):
248+
log_message = self.create_autofilled_log_item(
249+
log_content=memory,
250+
label=MEM_UPDATE_LABEL,
251+
from_memory_type=memory_type,
252+
to_memory_type=memory_type,
253+
user_id=user_id,
254+
mem_cube_id=mem_cube_id,
255+
mem_cube=mem_cube,
256+
)
257+
log_func_callback([log_message])
258+
259+
@log_exceptions(logger=logger)
260+
def log_archiving_memory(
261+
self,
262+
memory: str,
263+
memory_type: str,
264+
user_id: str,
265+
mem_cube_id: str,
266+
mem_cube: GeneralMemCube,
267+
log_func_callback: Callable[[list[ScheduleLogForWebItem]], None],
268+
):
269+
log_message = self.create_autofilled_log_item(
270+
log_content=memory,
271+
label=MEM_ARCHIVE_LABEL,
272+
from_memory_type=memory_type,
273+
to_memory_type=memory_type,
274+
user_id=user_id,
275+
mem_cube_id=mem_cube_id,
276+
mem_cube=mem_cube,
277+
)
278+
log_func_callback([log_message])
279+
236280
@log_exceptions(logger=logger)
237281
def validate_schedule_message(self, message: ScheduleMessageItem, label: str):
238282
"""Validate if the message matches the expected label.

src/memos/mem_scheduler/general_scheduler.py

Lines changed: 126 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
PREF_ADD_LABEL,
1717
QUERY_LABEL,
1818
WORKING_MEMORY_TYPE,
19+
USER_INPUT_TYPE,
20+
NOT_APPLICABLE_TYPE,
21+
LONG_TERM_MEMORY_TYPE,
1922
MemCubeID,
2023
UserID,
2124
)
@@ -159,6 +162,24 @@ def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
159162
messages = grouped_messages[user_id][mem_cube_id]
160163
if len(messages) == 0:
161164
return
165+
try:
166+
web_logs = []
167+
for msg in messages:
168+
web_logs.append(
169+
self.create_autofilled_log_item(
170+
log_content=f"[User] {msg.content}",
171+
label=QUERY_LABEL,
172+
from_memory_type=USER_INPUT_TYPE,
173+
to_memory_type=NOT_APPLICABLE_TYPE,
174+
user_id=msg.user_id,
175+
mem_cube_id=msg.mem_cube_id,
176+
mem_cube=self.current_mem_cube,
177+
)
178+
)
179+
if web_logs:
180+
self._submit_web_logs(web_logs)
181+
except Exception:
182+
pass
162183
self.long_memory_update_process(
163184
user_id=user_id, mem_cube_id=mem_cube_id, messages=messages
164185
)
@@ -181,6 +202,24 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
181202
messages = grouped_messages[user_id][mem_cube_id]
182203
if len(messages) == 0:
183204
return
205+
try:
206+
web_logs = []
207+
for msg in messages:
208+
web_logs.append(
209+
self.create_autofilled_log_item(
210+
log_content=f"[Assistant] {msg.content}",
211+
label=ANSWER_LABEL,
212+
from_memory_type=USER_INPUT_TYPE,
213+
to_memory_type=NOT_APPLICABLE_TYPE,
214+
user_id=msg.user_id,
215+
mem_cube_id=msg.mem_cube_id,
216+
mem_cube=self.current_mem_cube,
217+
)
218+
)
219+
if web_logs:
220+
self._submit_web_logs(web_logs)
221+
except Exception:
222+
pass
184223

185224
def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
186225
logger.info(f"Messages {messages} assigned to {ADD_LABEL} handler.")
@@ -216,18 +255,57 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
216255
continue
217256
mem_type = mem_item.metadata.memory_type
218257
mem_content = mem_item.memory
258+
key = getattr(mem_item.metadata, "key", None)
219259

220260
if mem_type == WORKING_MEMORY_TYPE:
221261
continue
222262

223-
self.log_adding_memory(
224-
memory=mem_content,
225-
memory_type=mem_type,
226-
user_id=msg.user_id,
227-
mem_cube_id=msg.mem_cube_id,
228-
mem_cube=self.current_mem_cube,
229-
log_func_callback=self._submit_web_logs,
230-
)
263+
# 判断新增 vs 更新:同类型内存在同 key 视为更新,否则视为新增
264+
try:
265+
text_mem = mem_cube.text_mem
266+
existing_found = False
267+
if key and hasattr(text_mem, "graph_store"):
268+
try:
269+
candidates = text_mem.graph_store.get_by_metadata(
270+
[
271+
{"field": "memory", "op": "=", "value": key},
272+
{"field": "memory_type", "op": "=", "value": mem_type},
273+
]
274+
)
275+
existing_found = bool(candidates)
276+
except Exception:
277+
existing_found = False
278+
279+
if existing_found:
280+
# 更新记忆日志(同类型内更新)
281+
self.log_updating_memory(
282+
memory=mem_content,
283+
memory_type=mem_type,
284+
user_id=msg.user_id,
285+
mem_cube_id=msg.mem_cube_id,
286+
mem_cube=self.current_mem_cube,
287+
log_func_callback=self._submit_web_logs,
288+
)
289+
else:
290+
# 新增记忆日志
291+
self.log_adding_memory(
292+
memory=mem_content,
293+
memory_type=mem_type,
294+
user_id=msg.user_id,
295+
mem_cube_id=msg.mem_cube_id,
296+
mem_cube=self.current_mem_cube,
297+
log_func_callback=self._submit_web_logs,
298+
)
299+
except Exception:
300+
# 兜底:仍记录为新增
301+
self.log_adding_memory(
302+
memory=mem_content,
303+
memory_type=mem_type,
304+
user_id=msg.user_id,
305+
mem_cube_id=msg.mem_cube_id,
306+
mem_cube=self.current_mem_cube,
307+
log_func_callback=self._submit_web_logs,
308+
)
231309

232310
except Exception as e:
233311
logger.error(f"Error: {e}", exc_info=True)
@@ -432,6 +510,46 @@ def process_message(message: ScheduleMessageItem):
432510
user_name=user_name,
433511
)
434512

513+
try:
514+
mem_items = []
515+
for mid in mem_ids:
516+
try:
517+
mem_items.append(text_mem.get(mid))
518+
except Exception:
519+
pass
520+
if len(mem_items) > 1:
521+
keys = []
522+
for it in mem_items:
523+
key = getattr(getattr(it, "metadata", {}), "key", None) or ""
524+
if key:
525+
keys.append(key)
526+
merged_info = "被合并的记忆:" + " | ".join([str(k) for k in keys])
527+
log_item_a = self.create_autofilled_log_item(
528+
log_content=merged_info,
529+
label=MEM_ORGANIZE_LABEL,
530+
from_memory_type=LONG_TERM_MEMORY_TYPE,
531+
to_memory_type=LONG_TERM_MEMORY_TYPE,
532+
user_id=user_id,
533+
mem_cube_id=mem_cube_id,
534+
mem_cube=mem_cube,
535+
)
536+
combined_key = keys[0] if keys else ""
537+
merged_after = (
538+
"合并后的记忆单元:" + (combined_key if combined_key else "(无键)")
539+
)
540+
log_item_b = self.create_autofilled_log_item(
541+
log_content=merged_after,
542+
label=MEM_ORGANIZE_LABEL,
543+
from_memory_type=LONG_TERM_MEMORY_TYPE,
544+
to_memory_type=LONG_TERM_MEMORY_TYPE,
545+
user_id=user_id,
546+
mem_cube_id=mem_cube_id,
547+
mem_cube=mem_cube,
548+
)
549+
self._submit_web_logs([log_item_a, log_item_b])
550+
except Exception:
551+
pass
552+
435553
logger.info(
436554
f"Successfully processed mem_read for user_id={user_id}, mem_cube_id={mem_cube_id}"
437555
)

src/memos/mem_scheduler/schemas/general_schemas.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ class SearchMode(str, Enum):
2020
ADD_LABEL = "add"
2121
MEM_READ_LABEL = "mem_read"
2222
MEM_ORGANIZE_LABEL = "mem_organize"
23+
MEM_UPDATE_LABEL = "mem_update"
24+
MEM_ARCHIVE_LABEL = "mem_archive"
2325
API_MIX_SEARCH_LABEL = "api_mix_search"
2426
PREF_ADD_LABEL = "pref_add"
2527

0 commit comments

Comments
 (0)