Skip to content

Commit aa342b8

Browse files
author
yuan.wang
committed
merge dev
2 parents b664d55 + 150202b commit aa342b8

File tree

10 files changed

+1801
-128
lines changed

10 files changed

+1801
-128
lines changed

src/memos/graph_dbs/neo4j.py

Lines changed: 321 additions & 19 deletions
Large diffs are not rendered by default.

src/memos/graph_dbs/neo4j_community.py

Lines changed: 494 additions & 12 deletions
Large diffs are not rendered by default.

src/memos/graph_dbs/polardb.py

Lines changed: 756 additions & 16 deletions
Large diffs are not rendered by default.

src/memos/mem_scheduler/general_scheduler.py

Lines changed: 132 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,8 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
367367
if kb_log_content:
368368
event = self.create_event_log(
369369
label="knowledgeBaseUpdate",
370-
log_content=f"Knowledge Base Memory Update: {len(kb_log_content)} changes.",
370+
from_memory_type=USER_INPUT_TYPE,
371+
to_memory_type=LONG_TERM_MEMORY_TYPE,
371372
user_id=msg.user_id,
372373
mem_cube_id=msg.mem_cube_id,
373374
mem_cube=self.current_mem_cube,
@@ -376,6 +377,9 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
376377
memory_len=len(kb_log_content),
377378
memcube_name=self._map_memcube_name(msg.mem_cube_id),
378379
)
380+
event.log_content = (
381+
f"Knowledge Base Memory Update: {len(kb_log_content)} changes."
382+
)
379383
event.task_id = msg.task_id
380384
self._submit_web_logs([event])
381385
else:
@@ -504,6 +508,8 @@ def process_message(message: ScheduleMessageItem):
504508
text_mem=text_mem,
505509
user_name=user_name,
506510
custom_tags=info.get("custom_tags", None),
511+
task_id=message.task_id,
512+
info=info,
507513
)
508514

509515
logger.info(
@@ -529,6 +535,8 @@ def _process_memories_with_reader(
529535
text_mem: TreeTextMemory,
530536
user_name: str,
531537
custom_tags: list[str] | None = None,
538+
task_id: str | None = None,
539+
info: dict | None = None,
532540
) -> None:
533541
"""
534542
Process memories using mem_reader for enhanced memory processing.
@@ -540,6 +548,7 @@ def _process_memories_with_reader(
540548
text_mem: Text memory instance
541549
custom_tags: Optional list of custom tags for memory processing
542550
"""
551+
kb_log_content: list[dict] = []
543552
try:
544553
# Get the mem_reader from the parent MOSCore
545554
if not hasattr(self, "mem_reader") or self.mem_reader is None:
@@ -602,6 +611,90 @@ def _process_memories_with_reader(
602611
logger.info(
603612
f"Added {len(enhanced_mem_ids)} enhanced memories: {enhanced_mem_ids}"
604613
)
614+
615+
# LOGGING BLOCK START
616+
# This block is replicated from _add_message_consumer to ensure consistent logging
617+
is_cloud_env = (
618+
os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
619+
)
620+
if is_cloud_env:
621+
# New: Knowledge Base Logging (Cloud Service)
622+
kb_log_content = []
623+
for item in flattened_memories:
624+
kb_log_content.append(
625+
{
626+
"log_source": "KNOWLEDGE_BASE_LOG",
627+
"trigger_source": info.get("trigger_source", "Messages")
628+
if info
629+
else "Messages",
630+
"operation": "ADD",
631+
"memory_id": item.id,
632+
"content": item.memory,
633+
"original_content": None,
634+
"source_doc_id": getattr(item.metadata, "source_doc_id", None),
635+
}
636+
)
637+
if kb_log_content:
638+
event = self.create_event_log(
639+
label="knowledgeBaseUpdate",
640+
from_memory_type=USER_INPUT_TYPE,
641+
to_memory_type=LONG_TERM_MEMORY_TYPE,
642+
user_id=user_id,
643+
mem_cube_id=mem_cube_id,
644+
mem_cube=self.current_mem_cube,
645+
memcube_log_content=kb_log_content,
646+
metadata=None,
647+
memory_len=len(kb_log_content),
648+
memcube_name=self._map_memcube_name(mem_cube_id),
649+
)
650+
event.log_content = (
651+
f"Knowledge Base Memory Update: {len(kb_log_content)} changes."
652+
)
653+
event.task_id = task_id
654+
self._submit_web_logs([event])
655+
else:
656+
# Existing: Playground/Default Logging
657+
add_content_legacy: list[dict] = []
658+
add_meta_legacy: list[dict] = []
659+
for item_id, item in zip(
660+
enhanced_mem_ids, flattened_memories, strict=False
661+
):
662+
key = getattr(item.metadata, "key", None) or transform_name_to_key(
663+
name=item.memory
664+
)
665+
add_content_legacy.append(
666+
{"content": f"{key}: {item.memory}", "ref_id": item_id}
667+
)
668+
add_meta_legacy.append(
669+
{
670+
"ref_id": item_id,
671+
"id": item_id,
672+
"key": item.metadata.key,
673+
"memory": item.memory,
674+
"memory_type": item.metadata.memory_type,
675+
"status": item.metadata.status,
676+
"confidence": item.metadata.confidence,
677+
"tags": item.metadata.tags,
678+
"updated_at": getattr(item.metadata, "updated_at", None)
679+
or getattr(item.metadata, "update_at", None),
680+
}
681+
)
682+
if add_content_legacy:
683+
event = self.create_event_log(
684+
label="addMemory",
685+
from_memory_type=USER_INPUT_TYPE,
686+
to_memory_type=LONG_TERM_MEMORY_TYPE,
687+
user_id=user_id,
688+
mem_cube_id=mem_cube_id,
689+
mem_cube=self.current_mem_cube,
690+
memcube_log_content=add_content_legacy,
691+
metadata=add_meta_legacy,
692+
memory_len=len(add_content_legacy),
693+
memcube_name=self._map_memcube_name(mem_cube_id),
694+
)
695+
event.task_id = task_id
696+
self._submit_web_logs([event])
697+
# LOGGING BLOCK END
605698
else:
606699
logger.info("No enhanced memories generated by mem_reader")
607700
else:
@@ -630,10 +723,47 @@ def _process_memories_with_reader(
630723
logger.info("Remove and Refresh Memories")
631724
logger.debug(f"Finished add {user_id} memory: {mem_ids}")
632725

633-
except Exception:
726+
except Exception as exc:
634727
logger.error(
635728
f"Error in _process_memories_with_reader: {traceback.format_exc()}", exc_info=True
636729
)
730+
with contextlib.suppress(Exception):
731+
is_cloud_env = (
732+
os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
733+
)
734+
if is_cloud_env:
735+
if not kb_log_content:
736+
trigger_source = (
737+
info.get("trigger_source", "Messages") if info else "Messages"
738+
)
739+
kb_log_content = [
740+
{
741+
"log_source": "KNOWLEDGE_BASE_LOG",
742+
"trigger_source": trigger_source,
743+
"operation": "ADD",
744+
"memory_id": mem_id,
745+
"content": None,
746+
"original_content": None,
747+
"source_doc_id": None,
748+
}
749+
for mem_id in mem_ids
750+
]
751+
event = self.create_event_log(
752+
label="knowledgeBaseUpdate",
753+
from_memory_type=USER_INPUT_TYPE,
754+
to_memory_type=LONG_TERM_MEMORY_TYPE,
755+
user_id=user_id,
756+
mem_cube_id=mem_cube_id,
757+
mem_cube=self.current_mem_cube,
758+
memcube_log_content=kb_log_content,
759+
metadata=None,
760+
memory_len=len(kb_log_content),
761+
memcube_name=self._map_memcube_name(mem_cube_id),
762+
)
763+
event.log_content = f"Knowledge Base Memory Update failed: {exc!s}"
764+
event.task_id = task_id
765+
event.status = "failed"
766+
self._submit_web_logs([event])
637767

638768
def _mem_reorganize_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
639769
logger.info(f"Messages {messages} assigned to {MEM_ORGANIZE_LABEL} handler.")

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import concurrent
2-
import os
32
import threading
43
import time
54

@@ -15,7 +14,7 @@
1514
from memos.mem_scheduler.schemas.general_schemas import (
1615
DEFAULT_STOP_WAIT,
1716
)
18-
from memos.mem_scheduler.schemas.message_schemas import ScheduleLogForWebItem, ScheduleMessageItem
17+
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
1918
from memos.mem_scheduler.schemas.task_schemas import RunningTaskItem
2019
from memos.mem_scheduler.utils.misc_utils import group_messages_by_user_and_mem_cube
2120
from memos.mem_scheduler.utils.status_tracker import TaskStatusTracker
@@ -159,20 +158,6 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
159158
)
160159
self.metrics.task_completed(user_id=m.user_id, task_type=m.label)
161160

162-
is_cloud_env = (
163-
os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
164-
)
165-
if self.submit_web_logs and is_cloud_env:
166-
status_log = ScheduleLogForWebItem(
167-
user_id=task_item.user_id,
168-
mem_cube_id=task_item.mem_cube_id,
169-
item_id=task_item.item_id,
170-
label=m.label,
171-
log_content=f"Task {task_item.item_id} completed successfully for user {task_item.user_id}.",
172-
status="completed",
173-
)
174-
self.submit_web_logs([status_log])
175-
176161
# acknowledge redis messages
177162
if self.use_redis_queue and self.memos_message_queue is not None:
178163
for msg in messages:
@@ -211,20 +196,6 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
211196
self._completed_tasks.pop(0)
212197
logger.error(f"Task failed: {task_item.get_execution_info()}, Error: {e}")
213198

214-
is_cloud_env = (
215-
os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
216-
)
217-
if self.submit_web_logs and is_cloud_env:
218-
status_log = ScheduleLogForWebItem(
219-
user_id=task_item.user_id,
220-
mem_cube_id=task_item.mem_cube_id,
221-
item_id=task_item.item_id,
222-
label=m.label,
223-
log_content=f"Task {task_item.item_id} failed for user {task_item.user_id} with error: {e!s}.",
224-
status="failed",
225-
exception=str(e),
226-
)
227-
self.submit_web_logs([status_log])
228199
raise
229200

230201
return wrapped_handler

src/memos/memories/textual/item.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ class PreferenceTextualMemoryMetadata(TextualMemoryMetadata):
203203
preference: str | None = Field(default=None, description="Preference.")
204204
created_at: str | None = Field(default=None, description="Timestamp of the dialog.")
205205
mem_cube_id: str | None = Field(default=None, description="ID of the MemCube.")
206+
score: float | None = Field(default=None, description="Score of the retrieval result.")
206207

207208

208209
class TextualMemoryItem(BaseModel):

src/memos/memories/textual/prefer_text_memory/extractor.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ def extract_implicit_preference(self, qa_pair: MessageList | str) -> dict[str, A
9090
response = self.llm_provider.generate([{"role": "user", "content": prompt}])
9191
response = response.strip().replace("```json", "").replace("```", "").strip()
9292
result = json.loads(response)
93-
result["preference"] = result.pop("implicit_preference")
93+
for d in result:
94+
d["preference"] = d.pop("implicit_preference")
9495
return result
9596
except Exception as e:
9697
logger.error(f"Error extracting implicit preferences: {e}, return None")
@@ -137,20 +138,24 @@ def _process_single_chunk_implicit(
137138
if not implicit_pref:
138139
return None
139140

140-
vector_info = {
141-
"embedding": self.embedder.embed([implicit_pref["context_summary"]])[0],
142-
}
141+
memories = []
142+
for pref in implicit_pref:
143+
vector_info = {
144+
"embedding": self.embedder.embed([pref["context_summary"]])[0],
145+
}
143146

144-
extract_info = {**basic_info, **implicit_pref, **vector_info, **info}
147+
extract_info = {**basic_info, **pref, **vector_info, **info}
145148

146-
metadata = PreferenceTextualMemoryMetadata(
147-
type=msg_type, preference_type="implicit_preference", **extract_info
148-
)
149-
memory = TextualMemoryItem(
150-
id=extract_info["dialog_id"], memory=implicit_pref["context_summary"], metadata=metadata
151-
)
149+
metadata = PreferenceTextualMemoryMetadata(
150+
type=msg_type, preference_type="implicit_preference", **extract_info
151+
)
152+
memory = TextualMemoryItem(
153+
id=str(uuid.uuid4()), memory=pref["context_summary"], metadata=metadata
154+
)
152155

153-
return memory
156+
memories.append(memory)
157+
158+
return memories
154159

155160
def extract(
156161
self,

src/memos/memories/textual/prefer_text_memory/retrievers.py

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import os
2+
13
from abc import ABC, abstractmethod
24
from typing import Any
35

@@ -34,9 +36,12 @@ def _naive_reranker(
3436
self, query: str, prefs_mem: list[TextualMemoryItem], top_k: int, **kwargs: Any
3537
) -> list[TextualMemoryItem]:
3638
if self.reranker:
37-
prefs_mem = self.reranker.rerank(query, prefs_mem, top_k)
38-
return [item for item, _ in prefs_mem]
39-
return prefs_mem
39+
prefs_mem_reranked = []
40+
prefs_mem_tuple = self.reranker.rerank(query, prefs_mem, top_k)
41+
for item, score in prefs_mem_tuple:
42+
item.metadata.score = score
43+
prefs_mem_reranked.append(item)
44+
return prefs_mem_reranked
4045

4146
def _original_text_reranker(
4247
self,
@@ -52,11 +57,22 @@ def _original_text_reranker(
5257
prefs_mem_for_reranker = deepcopy(prefs_mem)
5358
for pref_mem, pref in zip(prefs_mem_for_reranker, prefs, strict=False):
5459
pref_mem.memory = pref_mem.memory + "\n" + pref.original_text
55-
prefs_mem_for_reranker = self.reranker.rerank(query, prefs_mem_for_reranker, top_k)
56-
prefs_mem_for_reranker = [item for item, _ in prefs_mem_for_reranker]
60+
reranked_results = self.reranker.rerank(query, prefs_mem_for_reranker, top_k)
61+
prefs_mem_for_reranker = [item for item, _ in reranked_results]
5762
prefs_ids = [item.id for item in prefs_mem_for_reranker]
5863
prefs_dict = {item.id: item for item in prefs_mem}
59-
return [prefs_dict[item_id] for item_id in prefs_ids if item_id in prefs_dict]
64+
65+
# Create mapping from id to score from reranked results
66+
reranked_scores = {item.id: score for item, score in reranked_results}
67+
68+
# Assign scores to the original items
69+
result_items = []
70+
for item_id in prefs_ids:
71+
if item_id in prefs_dict:
72+
original_item = prefs_dict[item_id]
73+
original_item.metadata.score = reranked_scores.get(item_id)
74+
result_items.append(original_item)
75+
return result_items
6076
return prefs_mem
6177

6278
def retrieve(
@@ -119,24 +135,34 @@ def retrieve(
119135
if pref.payload.get("preference", None)
120136
]
121137

122-
# store explicit id and score, use it after reranker
123-
explicit_id_scores = {item.id: item.score for item in explicit_prefs}
124-
125138
reranker_map = {
126139
"naive": self._naive_reranker,
127140
"original_text": self._original_text_reranker,
128141
}
129142
reranker_func = reranker_map["naive"]
130-
explicit_prefs_mem = reranker_func(
131-
query=query, prefs_mem=explicit_prefs_mem, prefs=explicit_prefs, top_k=top_k
143+
prefs_mem_explicit = reranker_func(
144+
query=query,
145+
prefs_mem=explicit_prefs_mem,
146+
prefs=explicit_prefs,
147+
top_k=top_k,
132148
)
133-
implicit_prefs_mem = reranker_func(
134-
query=query, prefs_mem=implicit_prefs_mem, prefs=implicit_prefs, top_k=top_k
149+
prefs_mem_implicit = reranker_func(
150+
query=query,
151+
prefs_mem=implicit_prefs_mem,
152+
prefs=implicit_prefs,
153+
top_k=top_k,
135154
)
136155

137156
# filter explicit mem by score bigger than threshold
138-
explicit_prefs_mem = [
139-
item for item in explicit_prefs_mem if explicit_id_scores.get(item.id, 0) >= 0.0
157+
prefs_mem_explicit = [
158+
item
159+
for item in prefs_mem_explicit
160+
if item.metadata.score >= float(os.getenv("PREFERENCE_SEARCH_THRESHOLD", 0.0))
161+
]
162+
prefs_mem_implicit = [
163+
item
164+
for item in prefs_mem_implicit
165+
if item.metadata.score >= float(os.getenv("PREFERENCE_SEARCH_THRESHOLD", 0.0))
140166
]
141167

142-
return explicit_prefs_mem + implicit_prefs_mem
168+
return prefs_mem_explicit + prefs_mem_implicit

0 commit comments

Comments
 (0)