From 729b440b35494c9a613b260ff38f778be9bf53e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Wed, 5 Nov 2025 20:18:12 +0800 Subject: [PATCH 1/3] feat: simplify simple tree --- src/memos/memories/textual/simple_tree.py | 249 +--------------------- src/memos/memories/textual/tree.py | 27 ++- 2 files changed, 19 insertions(+), 257 deletions(-) diff --git a/src/memos/memories/textual/simple_tree.py b/src/memos/memories/textual/simple_tree.py index 313989cd2..05e62e3ee 100644 --- a/src/memos/memories/textual/simple_tree.py +++ b/src/memos/memories/textual/simple_tree.py @@ -1,7 +1,4 @@ -import time - -from datetime import datetime -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING from memos.configs.memory import TreeTextMemoryConfig from memos.embedders.base import BaseEmbedder @@ -9,13 +6,10 @@ from memos.llms.base import BaseLLM from memos.log import get_logger from memos.mem_reader.base import BaseMemReader -from memos.memories.textual.item import TextualMemoryItem, TreeNodeTextualMemoryMetadata from memos.memories.textual.tree import TreeTextMemory from memos.memories.textual.tree_text_memory.organize.manager import MemoryManager from memos.memories.textual.tree_text_memory.retrieve.bm25_util import EnhancedBM25 -from memos.memories.textual.tree_text_memory.retrieve.searcher import Searcher from memos.reranker.base import BaseReranker -from memos.types import MessageList if TYPE_CHECKING: @@ -43,43 +37,22 @@ def __init__( is_reorganize: bool = False, ): """Initialize memory with the given configuration.""" - time_start = time.time() self.config: TreeTextMemoryConfig = config self.mode = self.config.mode logger.info(f"Tree mode is {self.mode}") self.extractor_llm: OpenAILLM | OllamaLLM | AzureLLM = llm - logger.info(f"time init: extractor_llm time is: {time.time() - time_start}") - - time_start_ex = time.time() self.dispatcher_llm: OpenAILLM | OllamaLLM | AzureLLM = llm - logger.info(f"time init: dispatcher_llm time is: {time.time() - time_start_ex}") - - time_start_em = time.time() self.embedder: OllamaEmbedder = embedder - logger.info(f"time init: embedder time is: {time.time() - time_start_em}") - - time_start_gs = time.time() self.graph_store: Neo4jGraphDB = graph_db - logger.info(f"time init: graph_store time is: {time.time() - time_start_gs}") - - time_start_bm = time.time() self.search_strategy = config.search_strategy self.bm25_retriever = ( EnhancedBM25() if self.search_strategy and self.search_strategy.get("bm25", False) else None ) - logger.info(f"time init: bm25_retriever time is: {time.time() - time_start_bm}") - - time_start_rr = time.time() self.reranker = reranker - logger.info(f"time init: reranker time is: {time.time() - time_start_rr}") - - time_start_mm = time.time() self.memory_manager: MemoryManager = memory_manager - logger.info(f"time init: memory_manager time is: {time.time() - time_start_mm}") - time_start_ir = time.time() # Create internet retriever if configured self.internet_retriever = None if config.internet_retriever is not None: @@ -89,223 +62,3 @@ def __init__( ) else: logger.info("No internet retriever configured") - logger.info(f"time init: internet_retriever time is: {time.time() - time_start_ir}") - - def replace_working_memory( - self, memories: list[TextualMemoryItem], user_name: str | None = None - ) -> None: - self.memory_manager.replace_working_memory(memories, user_name=user_name) - - def get_working_memory(self, user_name: str | None = None) -> list[TextualMemoryItem]: - working_memories = self.graph_store.get_all_memory_items( - scope="WorkingMemory", user_name=user_name - ) - items = [TextualMemoryItem.from_dict(record) for record in (working_memories)] - # Sort by updated_at in descending order - sorted_items = sorted( - items, key=lambda x: x.metadata.updated_at or datetime.min, reverse=True - ) - return sorted_items - - def get_current_memory_size(self, user_name: str | None = None) -> dict[str, int]: - """ - Get the current size of each memory type. - This delegates to the MemoryManager. - """ - return self.memory_manager.get_current_memory_size(user_name=user_name) - - def get_searcher( - self, - manual_close_internet: bool = False, - moscube: bool = False, - ): - if (self.internet_retriever is not None) and manual_close_internet: - logger.warning( - "Internet retriever is init by config , but this search set manual_close_internet is True and will close it" - ) - searcher = Searcher( - self.dispatcher_llm, - self.graph_store, - self.embedder, - self.reranker, - internet_retriever=None, - moscube=moscube, - ) - else: - searcher = Searcher( - self.dispatcher_llm, - self.graph_store, - self.embedder, - self.reranker, - internet_retriever=self.internet_retriever, - moscube=moscube, - ) - return searcher - - def search( - self, - query: str, - top_k: int, - info=None, - mode: str = "fast", - memory_type: str = "All", - manual_close_internet: bool = False, - moscube: bool = False, - search_filter: dict | None = None, - user_name: str | None = None, - ) -> list[TextualMemoryItem]: - """Search for memories based on a query. - User query -> TaskGoalParser -> MemoryPathResolver -> - GraphMemoryRetriever -> MemoryReranker -> MemoryReasoner -> Final output - Args: - query (str): The query to search for. - top_k (int): The number of top results to return. - info (dict): Leave a record of memory consumption. - mode (str, optional): The mode of the search. - - 'fast': Uses a faster search process, sacrificing some precision for speed. - - 'fine': Uses a more detailed search process, invoking large models for higher precision, but slower performance. - memory_type (str): Type restriction for search. - ['All', 'WorkingMemory', 'LongTermMemory', 'UserMemory'] - manual_close_internet (bool): If True, the internet retriever will be closed by this search, it high priority than config. - moscube (bool): whether you use moscube to answer questions - search_filter (dict, optional): Optional metadata filters for search results. - - Keys correspond to memory metadata fields (e.g., "user_id", "session_id"). - - Values are exact-match conditions. - Example: {"user_id": "123", "session_id": "abc"} - If None, no additional filtering is applied. - Returns: - list[TextualMemoryItem]: List of matching memories. - """ - if (self.internet_retriever is not None) and manual_close_internet: - searcher = Searcher( - self.dispatcher_llm, - self.graph_store, - self.embedder, - self.reranker, - bm25_retriever=self.bm25_retriever, - internet_retriever=None, - moscube=moscube, - search_strategy=self.search_strategy, - ) - else: - searcher = Searcher( - self.dispatcher_llm, - self.graph_store, - self.embedder, - self.reranker, - bm25_retriever=self.bm25_retriever, - internet_retriever=self.internet_retriever, - moscube=moscube, - search_strategy=self.search_strategy, - ) - return searcher.search( - query, top_k, info, mode, memory_type, search_filter, user_name=user_name - ) - - def get_relevant_subgraph( - self, query: str, top_k: int = 5, depth: int = 2, center_status: str = "activated" - ) -> dict[str, Any]: - """ - Find and merge the local neighborhood sub-graphs of the top-k - nodes most relevant to the query. - Process: - 1. Embed the user query into a vector representation. - 2. Use vector similarity search to find the top-k similar nodes. - 3. For each similar node: - - Ensure its status matches `center_status` (e.g., 'active'). - - Retrieve its local subgraph up to `depth` hops. - - Collect the center node, its neighbors, and connecting edges. - 4. Merge all retrieved subgraphs into a single unified subgraph. - 5. Return the merged subgraph structure. - - Args: - query (str): The user input or concept to find relevant memories for. - top_k (int, optional): How many top similar nodes to retrieve. Default is 5. - depth (int, optional): The neighborhood depth (number of hops). Default is 2. - center_status (str, optional): Status condition the center node must satisfy (e.g., 'active'). - - Returns: - dict[str, Any]: A subgraph dict with: - - 'core_id': ID of the top matching core node, or None if none found. - - 'nodes': List of unique nodes (core + neighbors) in the merged subgraph. - - 'edges': List of unique edges (as dicts with 'from', 'to', 'type') in the merged subgraph. - """ - # Step 1: Embed query - query_embedding = self.embedder.embed([query])[0] - - # Step 2: Get top-1 similar node - similar_nodes = self.graph_store.search_by_embedding(query_embedding, top_k=top_k) - if not similar_nodes: - logger.info("No similar nodes found for query embedding.") - return {"core_id": None, "nodes": [], "edges": []} - - # Step 3: Fetch neighborhood - all_nodes = {} - all_edges = set() - cores = [] - - for node in similar_nodes: - core_id = node["id"] - score = node["score"] - - subgraph = self.graph_store.get_subgraph( - center_id=core_id, depth=depth, center_status=center_status - ) - - if not subgraph["core_node"]: - logger.info(f"Skipping node {core_id} (inactive or not found).") - continue - - core_node = subgraph["core_node"] - neighbors = subgraph["neighbors"] - edges = subgraph["edges"] - - # Collect nodes - all_nodes[core_node["id"]] = core_node - for n in neighbors: - all_nodes[n["id"]] = n - - # Collect edges - for e in edges: - all_edges.add((e["source"], e["target"], e["type"])) - - cores.append( - {"id": core_id, "score": score, "core_node": core_node, "neighbors": neighbors} - ) - - top_core = cores[0] - return { - "core_id": top_core["id"], - "nodes": list(all_nodes.values()), - "edges": [{"source": f, "target": t, "type": ty} for (f, t, ty) in all_edges], - } - - def extract(self, messages: MessageList) -> list[TextualMemoryItem]: - raise NotImplementedError - - def update(self, memory_id: str, new_memory: TextualMemoryItem | dict[str, Any]) -> None: - raise NotImplementedError - - def get(self, memory_id: str) -> TextualMemoryItem: - """Get a memory by its ID.""" - result = self.graph_store.get_node(memory_id) - if result is None: - raise ValueError(f"Memory with ID {memory_id} not found") - metadata_dict = result.get("metadata", {}) - return TextualMemoryItem( - id=result["id"], - memory=result["memory"], - metadata=TreeNodeTextualMemoryMetadata(**metadata_dict), - ) - - def get_by_ids(self, memory_ids: list[str]) -> list[TextualMemoryItem]: - raise NotImplementedError - - def delete_all(self) -> None: - """Delete all memories and their relationships from the graph store.""" - try: - self.graph_store.clear() - logger.info("All memories and edges have been deleted from the graph.") - except Exception as e: - logger.error(f"An error occurred while deleting all memories: {e}") - raise diff --git a/src/memos/memories/textual/tree.py b/src/memos/memories/textual/tree.py index dea3cc1ab..e2e0be69c 100644 --- a/src/memos/memories/textual/tree.py +++ b/src/memos/memories/textual/tree.py @@ -103,11 +103,15 @@ def add( """ return self.memory_manager.add(memories, user_name=user_name, mode=self.mode) - def replace_working_memory(self, memories: list[TextualMemoryItem]) -> None: - self.memory_manager.replace_working_memory(memories) - - def get_working_memory(self) -> list[TextualMemoryItem]: - working_memories = self.graph_store.get_all_memory_items(scope="WorkingMemory") + def replace_working_memory( + self, memories: list[TextualMemoryItem], user_name: str | None = None + ) -> None: + self.memory_manager.replace_working_memory(memories, user_name=user_name) + + def get_working_memory(self, user_name: str | None = None) -> list[TextualMemoryItem]: + working_memories = self.graph_store.get_all_memory_items( + scope="WorkingMemory", user_name=user_name + ) items = [TextualMemoryItem.from_dict(record) for record in (working_memories)] # Sort by updated_at in descending order sorted_items = sorted( @@ -115,12 +119,12 @@ def get_working_memory(self) -> list[TextualMemoryItem]: ) return sorted_items - def get_current_memory_size(self) -> dict[str, int]: + def get_current_memory_size(self, user_name: str | None = None) -> dict[str, int]: """ Get the current size of each memory type. This delegates to the MemoryManager. """ - return self.memory_manager.get_current_memory_size() + return self.memory_manager.get_current_memory_size(user_name=user_name) def get_searcher( self, @@ -160,6 +164,7 @@ def search( manual_close_internet: bool = False, moscube: bool = False, search_filter: dict | None = None, + user_name: str | None = None, ) -> list[TextualMemoryItem]: """Search for memories based on a query. User query -> TaskGoalParser -> MemoryPathResolver -> @@ -208,7 +213,9 @@ def search( moscube=moscube, search_strategy=self.search_strategy, ) - return searcher.search(query, top_k, info, mode, memory_type, search_filter) + return searcher.search( + query, top_k, info, mode, memory_type, search_filter, user_name=user_name + ) def get_relevant_subgraph( self, query: str, top_k: int = 5, depth: int = 2, center_status: str = "activated" @@ -306,7 +313,9 @@ def get(self, memory_id: str) -> TextualMemoryItem: metadata=TreeNodeTextualMemoryMetadata(**metadata_dict), ) - def get_by_ids(self, memory_ids: list[str]) -> list[TextualMemoryItem]: + def get_by_ids( + self, memory_ids: list[str], user_name: str | None = None + ) -> list[TextualMemoryItem]: raise NotImplementedError def get_all(self, user_name: str | None = None) -> dict: From 0ca7b67becad607e049a98f7e9e3c7204d6f882e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 6 Nov 2025 16:01:27 +0800 Subject: [PATCH 2/3] feat: add product_api examples --- examples/api/__init__.py | 0 examples/api/product_api.py | 131 ++++++++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+) create mode 100644 examples/api/__init__.py create mode 100644 examples/api/product_api.py diff --git a/examples/api/__init__.py b/examples/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/api/product_api.py b/examples/api/product_api.py new file mode 100644 index 000000000..e53bd8ad8 --- /dev/null +++ b/examples/api/product_api.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +""" +Simulate full MemOS Product API workflow: +1. Register user +2. Add memory +3. Search memory +4. Chat (stream) +""" + +import json + +import requests + + +BASE_URL = "http://0.0.0.0:8004/product" +HEADERS = {"Content-Type": "application/json"} + +USER_ID = "memos_user_id" +USER_NAME = "memos_user_alice" +MEM_CUBE_ID = "memos_cube_id_01" +SESSION_ID = "memos_session_id_01" + + +def register_user(): + url = f"{BASE_URL}/users/register" + data = { + "user_id": USER_ID, + "user_name": USER_NAME, + "interests": "memory,retrieval,test", + "mem_cube_id": MEM_CUBE_ID, + } + print(f"[*] Registering user {USER_ID} ...") + resp = requests.post(url, headers=HEADERS, data=json.dumps(data), timeout=30) + print(resp.status_code, resp.text) + return resp.json() + + +def add_memory(): + url = f"{BASE_URL}/add" + data = { + "user_id": USER_ID, + "memory_content": "今天我在测试 MemOS 的记忆添加与检索流程。", + "messages": [{"role": "user", "content": "我今天在做系统测试"}], + "doc_path": None, + "mem_cube_id": MEM_CUBE_ID, + "source": "test_script", + "user_profile": False, + "session_id": SESSION_ID, + } + print("[*] Adding memory ...") + resp = requests.post(url, headers=HEADERS, data=json.dumps(data), timeout=30) + print(resp.status_code, resp.text) + return resp.json() + + +def search_memory(query="系统测试"): + url = f"{BASE_URL}/search" + data = { + "user_id": USER_ID, + "query": query, + "mem_cube_id": MEM_CUBE_ID, + "top_k": 5, + "session_id": SESSION_ID, + } + print("[*] Searching memory ...") + resp = requests.post(url, headers=HEADERS, data=json.dumps(data), timeout=30) + print(resp.status_code, resp.text) + return resp.json() + + +def chat_stream(query="总结一下我今天做的事"): + url = f"{BASE_URL}/chat" + data = { + "user_id": USER_ID, + "query": query, + "mem_cube_id": MEM_CUBE_ID, + "history": [], + "internet_search": False, + "moscube": False, + "session_id": SESSION_ID, + } + + print("[*] Starting streaming chat ...") + + with requests.post(url, headers=HEADERS, data=json.dumps(data), stream=True) as resp: + for raw_line in resp.iter_lines(): + if not raw_line: + continue + line = raw_line.decode("utf-8", errors="ignore") + + payload = line.removeprefix("data: ").strip() + if payload == "[DONE]": + print("[done]") + break + + try: + msg = json.loads(payload) + msg_type = msg.get("type") + msg_data = msg.get("data") or msg.get("content") + + if msg_type == "text": + print(msg_data, end="", flush=True) + elif msg_type == "reference": + print(f"\n[参考记忆] {msg_data}") + elif msg_type == "status": + pass + elif msg_type == "suggestion": + print(f"\n[建议] {msg_data}") + elif msg_type == "end": + print("\n[✅ Chat End]") + else: + print(f"\n[{msg_type}] {msg_data}") + except Exception: + try: + print(payload.encode("latin-1").decode("utf-8"), end="") + except Exception: + print(payload) + + +if __name__ == "__main__": + print("===== STEP 1: Register User =====") + register_user() + + print("\n===== STEP 2: Add Memory =====") + add_memory() + + print("\n===== STEP 3: Search Memory =====") + search_memory() + + print("\n===== STEP 4: Stream Chat =====") + chat_stream() From 4bb21da231a5c1e946a16f857fa9f1fa0e3dbbf8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 6 Nov 2025 21:50:58 +0800 Subject: [PATCH 3/3] feat: modify online bot --- src/memos/memos_tools/dinding_report_bot.py | 99 ++++++++++++++------- 1 file changed, 65 insertions(+), 34 deletions(-) diff --git a/src/memos/memos_tools/dinding_report_bot.py b/src/memos/memos_tools/dinding_report_bot.py index 9791cf65a..7fbd87e55 100644 --- a/src/memos/memos_tools/dinding_report_bot.py +++ b/src/memos/memos_tools/dinding_report_bot.py @@ -7,6 +7,7 @@ import json import os import time +import traceback import urllib.parse from datetime import datetime @@ -14,6 +15,11 @@ from dotenv import load_dotenv +from memos.log import get_logger + + +logger = get_logger(__name__) + load_dotenv() @@ -57,6 +63,20 @@ ROBOT_CODE = os.getenv("DINGDING_ROBOT_CODE") DING_APP_KEY = os.getenv("DINGDING_APP_KEY") DING_APP_SECRET = os.getenv("DINGDING_APP_SECRET") +ENV_NAME = os.getenv("ENV_NAME", "PLAYGROUND_OFFLINE") + +theme_map = { + "ONLINE": { + "color": "#2196F3", # 天空蓝 + "grad": ("#E3F2FD", "#BBDEFB"), + "emoji": "🩵", # 冷静、明亮 + }, + "OFFLINE": { + "color": "#FFC107", # 柔黄色 + "grad": ("#FFF8E1", "#FFECB3"), + "emoji": "🤍", # 轻微提示 + }, +} # Get access_token @@ -311,7 +331,7 @@ def error_bot( ) # ---------- Markdown ---------- - colored_title = f"{title}" + colored_title = f"{ENV_NAME}" at_suffix = "" if user_ids: at_suffix = "\n\n" + " ".join([f"@{m}" for m in user_ids]) @@ -367,41 +387,52 @@ def online_bot( other_data2: dict, emoji: dict, ): - heading_color = "#00956D" # Green for subtitle - - # 0) Banner - banner_bytes = make_header(header_name, sub_title_name) - banner_url = upload_bytes_to_oss(banner_bytes, filename="online_report.png") - - # 1) Colored main title - colored_title = f"{header_name}" - - # 3) Markdown - md = "\n\n".join( - filter( - None, - [ - f"![banner]({banner_url})", - f"### 🙄 {colored_title}\n\n", - _kv_lines( - other_data1, - next(iter(emoji.keys())), - next(iter(emoji.values())), - heading_color=heading_color, - ), - _kv_lines( - other_data2, - list(emoji.keys())[1], - list(emoji.values())[1], - heading_color=heading_color, - ), - f"Time: " - f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n", - ], + try: + logger.info("in online bot") + theme = "OFFLINE" if "OFFLINE" in ENV_NAME or "TEST" in ENV_NAME else "ONLINE" + style = theme_map.get(theme, theme_map["OFFLINE"]) + heading_color = style["color"] # Use theme color for subtitle + + # 0) Banner + banner_bytes = make_header( + header_name, + sub_title_name, + colors=style["grad"], + fg=style["color"], + ) + banner_url = upload_bytes_to_oss(banner_bytes, filename=f"{ENV_NAME}_online_report.png") + + # 1) Colored main title + colored_title = f"{ENV_NAME}" + + # 3) Markdown + md = "\n\n".join( + filter( + None, + [ + f"![banner]({banner_url})", + f"### {style['emoji']} {colored_title}\n\n", + _kv_lines( + other_data1, + next(iter(emoji.keys())), + next(iter(emoji.values())), + heading_color=heading_color, + ), + _kv_lines( + other_data2, + list(emoji.keys())[1], + list(emoji.values())[1], + heading_color=heading_color, + ), + f"Time: " + f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n", + ], + ) ) - ) - _send_md(colored_title, md, type="user") + _send_md(colored_title, md, type="user") + except Exception: + logger.error(traceback.format_exc()) if __name__ == "__main__":