diff --git a/examples/api/__init__.py b/examples/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/api/product_api.py b/examples/api/product_api.py new file mode 100644 index 000000000..b98f3b8e5 --- /dev/null +++ b/examples/api/product_api.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +""" +Simulate full MemOS Product API workflow: +1. Register user +2. Add memory +3. Search memory +4. Chat (stream) +""" + +import json + +import requests + + +BASE_URL = "http://0.0.0.0:8001/product" +HEADERS = {"Content-Type": "application/json"} + +index = "24" +USER_ID = f"memos_user_id_{index}" +USER_NAME = f"memos_user_alice_{index}" +MEM_CUBE_ID = f"memos_cube_id_{index}" +SESSION_ID = f"memos_session_id_{index}" +SESSION_ID2 = f"memos_session_id_{index}_s2" + + +def register_user(): + url = f"{BASE_URL}/users/register" + data = { + "user_id": USER_ID, + "user_name": USER_NAME, + "interests": "memory,retrieval,test", + "mem_cube_id": MEM_CUBE_ID, + } + print(f"[*] Registering user {USER_ID} ...") + resp = requests.post(url, headers=HEADERS, data=json.dumps(data), timeout=30) + print(resp.status_code, resp.text) + return resp.json() + + +def add_memory(): + url = f"{BASE_URL}/add" + data = { + "user_id": USER_ID, + "memory_content": "今天我在测试 MemOS 的记忆添加与检索流程。", + "messages": [{"role": "user", "content": "我今天在做系统测试"}], + "doc_path": None, + "mem_cube_id": MEM_CUBE_ID, + "source": "test_script", + "user_profile": False, + "session_id": SESSION_ID, + } + print("[*] Adding memory ...") + resp = requests.post(url, headers=HEADERS, data=json.dumps(data), timeout=30) + print(resp.status_code, resp.text) + return resp.json() + + +def search_memory(query="系统测试"): + url = f"{BASE_URL}/search" + data = { + "user_id": USER_ID, + "query": query, + "mem_cube_id": MEM_CUBE_ID, + "top_k": 5, + "session_id": SESSION_ID, + } + print("[*] Searching memory ...") + resp = requests.post(url, headers=HEADERS, data=json.dumps(data), timeout=30) + print(resp.status_code, resp.text) + return resp.json() + + +def chat_stream(query: str, session_id: str, history: list | None = None): + url = f"{BASE_URL}/chat" + data = { + "user_id": USER_ID, + "query": query, + "mem_cube_id": MEM_CUBE_ID, + "history": history, + "internet_search": False, + "moscube": False, + "session_id": session_id, + } + + print("[*] Starting streaming chat ...") + + with requests.post(url, headers=HEADERS, data=json.dumps(data), stream=True) as resp: + for raw_line in resp.iter_lines(): + if not raw_line: + continue + line = raw_line.decode("utf-8", errors="ignore") + + payload = line.removeprefix("data: ").strip() + if payload == "[DONE]": + print("[done]") + break + + try: + msg = json.loads(payload) + msg_type = msg.get("type") + msg_data = msg.get("data") or msg.get("content") + + if msg_type == "text": + print(msg_data, end="", flush=True) + elif msg_type == "reference": + print(f"\n[参考记忆] {msg_data}") + elif msg_type == "status": + pass + elif msg_type == "suggestion": + print(f"\n[建议] {msg_data}") + elif msg_type == "end": + print("\n[✅ Chat End]") + else: + print(f"\n[{msg_type}] {msg_data}") + except Exception: + try: + print(payload.encode("latin-1").decode("utf-8"), end="") + except Exception: + print(payload) + + +if __name__ == "__main__": + print("===== STEP 1: Register User =====") + register_user() + + print("\n===== STEP 2: Add Memory =====") + add_memory() + + print("\n===== STEP 3: Search Memory =====") + search_memory() + + print("\n===== STEP 4: Stream Chat =====") + chat_stream("我很开心,我今天吃了好吃的拉面", SESSION_ID, history=[]) + chat_stream( + "我刚和你说什么", + SESSION_ID, + history=[ + {"role": "user", "content": "我很开心,我今天吃了好吃的拉面"}, + {"role": "assistant", "content": "🉑"}, + ], + ) + + print("\n===== STEP 4: Stream Chat =====") + chat_stream("我刚和你说什么了呢", SESSION_ID2, history=[]) diff --git a/src/memos/mem_os/product.py b/src/memos/mem_os/product.py index 89e468bd7..9ddb77b52 100644 --- a/src/memos/mem_os/product.py +++ b/src/memos/mem_os/product.py @@ -563,6 +563,34 @@ def _extract_references_from_response(self, response: str) -> tuple[str, list[di logger.error(f"Error extracting references from response: {e}", exc_info=True) return response, [] + def _extract_struct_data_from_history(self, chat_data: list[dict]) -> dict: + """ + get struct message from chat-history + # TODO: @xcy make this more general + """ + system_content = "" + memory_content = "" + chat_history = [] + + for item in chat_data: + role = item.get("role") + content = item.get("content", "") + if role == "system": + parts = content.split("# Memories", 1) + system_content = parts[0].strip() + if len(parts) > 1: + memory_content = "# Memories" + parts[1].strip() + elif role in ("user", "assistant"): + chat_history.append({"role": role, "content": content}) + + if chat_history and chat_history[-1]["role"] == "assistant": + if len(chat_history) >= 2 and chat_history[-2]["role"] == "user": + chat_history = chat_history[:-2] + else: + chat_history = chat_history[:-1] + + return {"system": system_content, "memory": memory_content, "chat_history": chat_history} + def _chunk_response_with_tiktoken( self, response: str, chunk_size: int = 5 ) -> Generator[str, None, None]: @@ -640,23 +668,26 @@ async def _post_chat_processing( clean_response, extracted_references = self._extract_references_from_response( full_response ) + struct_message = self._extract_struct_data_from_history(current_messages) logger.info(f"Extracted {len(extracted_references)} references from response") # Send chat report notifications asynchronously if self.online_bot: + logger.info("Online Bot Open!") try: from memos.memos_tools.notification_utils import ( send_online_bot_notification_async, ) # Prepare notification data - chat_data = { - "query": query, - "user_id": user_id, - "cube_id": cube_id, - "system_prompt": system_prompt, - "full_response": full_response, - } + chat_data = {"query": query, "user_id": user_id, "cube_id": cube_id} + chat_data.update( + { + "memory": struct_message["memory"], + "chat_history": struct_message["chat_history"], + "full_response": full_response, + } + ) system_data = { "references": extracted_references, @@ -720,6 +751,7 @@ def _start_post_chat_processing( """ Asynchronous processing of logs, notifications and memory additions, handle synchronous and asynchronous environments """ + logger.info("Start post_chat_processing...") def run_async_in_thread(): """Running asynchronous tasks in a new thread""" @@ -1046,14 +1078,20 @@ def chat( memories_list = new_memories_list system_prompt = super()._build_system_prompt(memories_list, base_prompt) - history_info = [] - if history: + if history is not None: + # Use the provided history (even if it's empty) history_info = history[-20:] + else: + # Fall back to internal chat_history + if user_id not in self.chat_history_manager: + self._register_chat_history(user_id, session_id) + history_info = self.chat_history_manager[user_id].chat_history[-20:] current_messages = [ {"role": "system", "content": system_prompt}, *history_info, {"role": "user", "content": query}, ] + logger.info("Start to get final answer...") response = self.chat_llm.generate(current_messages) time_end = time.time() self._start_post_chat_processing( @@ -1129,7 +1167,7 @@ def chat_with_references( self._register_chat_history(user_id, session_id) chat_history = self.chat_history_manager[user_id] - if history: + if history is not None: chat_history.chat_history = history[-20:] current_messages = [ {"role": "system", "content": system_prompt}, diff --git a/src/memos/memories/textual/simple_tree.py b/src/memos/memories/textual/simple_tree.py index 313989cd2..05e62e3ee 100644 --- a/src/memos/memories/textual/simple_tree.py +++ b/src/memos/memories/textual/simple_tree.py @@ -1,7 +1,4 @@ -import time - -from datetime import datetime -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING from memos.configs.memory import TreeTextMemoryConfig from memos.embedders.base import BaseEmbedder @@ -9,13 +6,10 @@ from memos.llms.base import BaseLLM from memos.log import get_logger from memos.mem_reader.base import BaseMemReader -from memos.memories.textual.item import TextualMemoryItem, TreeNodeTextualMemoryMetadata from memos.memories.textual.tree import TreeTextMemory from memos.memories.textual.tree_text_memory.organize.manager import MemoryManager from memos.memories.textual.tree_text_memory.retrieve.bm25_util import EnhancedBM25 -from memos.memories.textual.tree_text_memory.retrieve.searcher import Searcher from memos.reranker.base import BaseReranker -from memos.types import MessageList if TYPE_CHECKING: @@ -43,43 +37,22 @@ def __init__( is_reorganize: bool = False, ): """Initialize memory with the given configuration.""" - time_start = time.time() self.config: TreeTextMemoryConfig = config self.mode = self.config.mode logger.info(f"Tree mode is {self.mode}") self.extractor_llm: OpenAILLM | OllamaLLM | AzureLLM = llm - logger.info(f"time init: extractor_llm time is: {time.time() - time_start}") - - time_start_ex = time.time() self.dispatcher_llm: OpenAILLM | OllamaLLM | AzureLLM = llm - logger.info(f"time init: dispatcher_llm time is: {time.time() - time_start_ex}") - - time_start_em = time.time() self.embedder: OllamaEmbedder = embedder - logger.info(f"time init: embedder time is: {time.time() - time_start_em}") - - time_start_gs = time.time() self.graph_store: Neo4jGraphDB = graph_db - logger.info(f"time init: graph_store time is: {time.time() - time_start_gs}") - - time_start_bm = time.time() self.search_strategy = config.search_strategy self.bm25_retriever = ( EnhancedBM25() if self.search_strategy and self.search_strategy.get("bm25", False) else None ) - logger.info(f"time init: bm25_retriever time is: {time.time() - time_start_bm}") - - time_start_rr = time.time() self.reranker = reranker - logger.info(f"time init: reranker time is: {time.time() - time_start_rr}") - - time_start_mm = time.time() self.memory_manager: MemoryManager = memory_manager - logger.info(f"time init: memory_manager time is: {time.time() - time_start_mm}") - time_start_ir = time.time() # Create internet retriever if configured self.internet_retriever = None if config.internet_retriever is not None: @@ -89,223 +62,3 @@ def __init__( ) else: logger.info("No internet retriever configured") - logger.info(f"time init: internet_retriever time is: {time.time() - time_start_ir}") - - def replace_working_memory( - self, memories: list[TextualMemoryItem], user_name: str | None = None - ) -> None: - self.memory_manager.replace_working_memory(memories, user_name=user_name) - - def get_working_memory(self, user_name: str | None = None) -> list[TextualMemoryItem]: - working_memories = self.graph_store.get_all_memory_items( - scope="WorkingMemory", user_name=user_name - ) - items = [TextualMemoryItem.from_dict(record) for record in (working_memories)] - # Sort by updated_at in descending order - sorted_items = sorted( - items, key=lambda x: x.metadata.updated_at or datetime.min, reverse=True - ) - return sorted_items - - def get_current_memory_size(self, user_name: str | None = None) -> dict[str, int]: - """ - Get the current size of each memory type. - This delegates to the MemoryManager. - """ - return self.memory_manager.get_current_memory_size(user_name=user_name) - - def get_searcher( - self, - manual_close_internet: bool = False, - moscube: bool = False, - ): - if (self.internet_retriever is not None) and manual_close_internet: - logger.warning( - "Internet retriever is init by config , but this search set manual_close_internet is True and will close it" - ) - searcher = Searcher( - self.dispatcher_llm, - self.graph_store, - self.embedder, - self.reranker, - internet_retriever=None, - moscube=moscube, - ) - else: - searcher = Searcher( - self.dispatcher_llm, - self.graph_store, - self.embedder, - self.reranker, - internet_retriever=self.internet_retriever, - moscube=moscube, - ) - return searcher - - def search( - self, - query: str, - top_k: int, - info=None, - mode: str = "fast", - memory_type: str = "All", - manual_close_internet: bool = False, - moscube: bool = False, - search_filter: dict | None = None, - user_name: str | None = None, - ) -> list[TextualMemoryItem]: - """Search for memories based on a query. - User query -> TaskGoalParser -> MemoryPathResolver -> - GraphMemoryRetriever -> MemoryReranker -> MemoryReasoner -> Final output - Args: - query (str): The query to search for. - top_k (int): The number of top results to return. - info (dict): Leave a record of memory consumption. - mode (str, optional): The mode of the search. - - 'fast': Uses a faster search process, sacrificing some precision for speed. - - 'fine': Uses a more detailed search process, invoking large models for higher precision, but slower performance. - memory_type (str): Type restriction for search. - ['All', 'WorkingMemory', 'LongTermMemory', 'UserMemory'] - manual_close_internet (bool): If True, the internet retriever will be closed by this search, it high priority than config. - moscube (bool): whether you use moscube to answer questions - search_filter (dict, optional): Optional metadata filters for search results. - - Keys correspond to memory metadata fields (e.g., "user_id", "session_id"). - - Values are exact-match conditions. - Example: {"user_id": "123", "session_id": "abc"} - If None, no additional filtering is applied. - Returns: - list[TextualMemoryItem]: List of matching memories. - """ - if (self.internet_retriever is not None) and manual_close_internet: - searcher = Searcher( - self.dispatcher_llm, - self.graph_store, - self.embedder, - self.reranker, - bm25_retriever=self.bm25_retriever, - internet_retriever=None, - moscube=moscube, - search_strategy=self.search_strategy, - ) - else: - searcher = Searcher( - self.dispatcher_llm, - self.graph_store, - self.embedder, - self.reranker, - bm25_retriever=self.bm25_retriever, - internet_retriever=self.internet_retriever, - moscube=moscube, - search_strategy=self.search_strategy, - ) - return searcher.search( - query, top_k, info, mode, memory_type, search_filter, user_name=user_name - ) - - def get_relevant_subgraph( - self, query: str, top_k: int = 5, depth: int = 2, center_status: str = "activated" - ) -> dict[str, Any]: - """ - Find and merge the local neighborhood sub-graphs of the top-k - nodes most relevant to the query. - Process: - 1. Embed the user query into a vector representation. - 2. Use vector similarity search to find the top-k similar nodes. - 3. For each similar node: - - Ensure its status matches `center_status` (e.g., 'active'). - - Retrieve its local subgraph up to `depth` hops. - - Collect the center node, its neighbors, and connecting edges. - 4. Merge all retrieved subgraphs into a single unified subgraph. - 5. Return the merged subgraph structure. - - Args: - query (str): The user input or concept to find relevant memories for. - top_k (int, optional): How many top similar nodes to retrieve. Default is 5. - depth (int, optional): The neighborhood depth (number of hops). Default is 2. - center_status (str, optional): Status condition the center node must satisfy (e.g., 'active'). - - Returns: - dict[str, Any]: A subgraph dict with: - - 'core_id': ID of the top matching core node, or None if none found. - - 'nodes': List of unique nodes (core + neighbors) in the merged subgraph. - - 'edges': List of unique edges (as dicts with 'from', 'to', 'type') in the merged subgraph. - """ - # Step 1: Embed query - query_embedding = self.embedder.embed([query])[0] - - # Step 2: Get top-1 similar node - similar_nodes = self.graph_store.search_by_embedding(query_embedding, top_k=top_k) - if not similar_nodes: - logger.info("No similar nodes found for query embedding.") - return {"core_id": None, "nodes": [], "edges": []} - - # Step 3: Fetch neighborhood - all_nodes = {} - all_edges = set() - cores = [] - - for node in similar_nodes: - core_id = node["id"] - score = node["score"] - - subgraph = self.graph_store.get_subgraph( - center_id=core_id, depth=depth, center_status=center_status - ) - - if not subgraph["core_node"]: - logger.info(f"Skipping node {core_id} (inactive or not found).") - continue - - core_node = subgraph["core_node"] - neighbors = subgraph["neighbors"] - edges = subgraph["edges"] - - # Collect nodes - all_nodes[core_node["id"]] = core_node - for n in neighbors: - all_nodes[n["id"]] = n - - # Collect edges - for e in edges: - all_edges.add((e["source"], e["target"], e["type"])) - - cores.append( - {"id": core_id, "score": score, "core_node": core_node, "neighbors": neighbors} - ) - - top_core = cores[0] - return { - "core_id": top_core["id"], - "nodes": list(all_nodes.values()), - "edges": [{"source": f, "target": t, "type": ty} for (f, t, ty) in all_edges], - } - - def extract(self, messages: MessageList) -> list[TextualMemoryItem]: - raise NotImplementedError - - def update(self, memory_id: str, new_memory: TextualMemoryItem | dict[str, Any]) -> None: - raise NotImplementedError - - def get(self, memory_id: str) -> TextualMemoryItem: - """Get a memory by its ID.""" - result = self.graph_store.get_node(memory_id) - if result is None: - raise ValueError(f"Memory with ID {memory_id} not found") - metadata_dict = result.get("metadata", {}) - return TextualMemoryItem( - id=result["id"], - memory=result["memory"], - metadata=TreeNodeTextualMemoryMetadata(**metadata_dict), - ) - - def get_by_ids(self, memory_ids: list[str]) -> list[TextualMemoryItem]: - raise NotImplementedError - - def delete_all(self) -> None: - """Delete all memories and their relationships from the graph store.""" - try: - self.graph_store.clear() - logger.info("All memories and edges have been deleted from the graph.") - except Exception as e: - logger.error(f"An error occurred while deleting all memories: {e}") - raise diff --git a/src/memos/memories/textual/tree.py b/src/memos/memories/textual/tree.py index dea3cc1ab..e2e0be69c 100644 --- a/src/memos/memories/textual/tree.py +++ b/src/memos/memories/textual/tree.py @@ -103,11 +103,15 @@ def add( """ return self.memory_manager.add(memories, user_name=user_name, mode=self.mode) - def replace_working_memory(self, memories: list[TextualMemoryItem]) -> None: - self.memory_manager.replace_working_memory(memories) - - def get_working_memory(self) -> list[TextualMemoryItem]: - working_memories = self.graph_store.get_all_memory_items(scope="WorkingMemory") + def replace_working_memory( + self, memories: list[TextualMemoryItem], user_name: str | None = None + ) -> None: + self.memory_manager.replace_working_memory(memories, user_name=user_name) + + def get_working_memory(self, user_name: str | None = None) -> list[TextualMemoryItem]: + working_memories = self.graph_store.get_all_memory_items( + scope="WorkingMemory", user_name=user_name + ) items = [TextualMemoryItem.from_dict(record) for record in (working_memories)] # Sort by updated_at in descending order sorted_items = sorted( @@ -115,12 +119,12 @@ def get_working_memory(self) -> list[TextualMemoryItem]: ) return sorted_items - def get_current_memory_size(self) -> dict[str, int]: + def get_current_memory_size(self, user_name: str | None = None) -> dict[str, int]: """ Get the current size of each memory type. This delegates to the MemoryManager. """ - return self.memory_manager.get_current_memory_size() + return self.memory_manager.get_current_memory_size(user_name=user_name) def get_searcher( self, @@ -160,6 +164,7 @@ def search( manual_close_internet: bool = False, moscube: bool = False, search_filter: dict | None = None, + user_name: str | None = None, ) -> list[TextualMemoryItem]: """Search for memories based on a query. User query -> TaskGoalParser -> MemoryPathResolver -> @@ -208,7 +213,9 @@ def search( moscube=moscube, search_strategy=self.search_strategy, ) - return searcher.search(query, top_k, info, mode, memory_type, search_filter) + return searcher.search( + query, top_k, info, mode, memory_type, search_filter, user_name=user_name + ) def get_relevant_subgraph( self, query: str, top_k: int = 5, depth: int = 2, center_status: str = "activated" @@ -306,7 +313,9 @@ def get(self, memory_id: str) -> TextualMemoryItem: metadata=TreeNodeTextualMemoryMetadata(**metadata_dict), ) - def get_by_ids(self, memory_ids: list[str]) -> list[TextualMemoryItem]: + def get_by_ids( + self, memory_ids: list[str], user_name: str | None = None + ) -> list[TextualMemoryItem]: raise NotImplementedError def get_all(self, user_name: str | None = None) -> dict: diff --git a/src/memos/memos_tools/dinding_report_bot.py b/src/memos/memos_tools/dinding_report_bot.py index 9791cf65a..d8b762855 100644 --- a/src/memos/memos_tools/dinding_report_bot.py +++ b/src/memos/memos_tools/dinding_report_bot.py @@ -7,6 +7,7 @@ import json import os import time +import traceback import urllib.parse from datetime import datetime @@ -14,6 +15,11 @@ from dotenv import load_dotenv +from memos.log import get_logger + + +logger = get_logger(__name__) + load_dotenv() @@ -57,6 +63,20 @@ ROBOT_CODE = os.getenv("DINGDING_ROBOT_CODE") DING_APP_KEY = os.getenv("DINGDING_APP_KEY") DING_APP_SECRET = os.getenv("DINGDING_APP_SECRET") +ENV_NAME = os.getenv("ENV_NAME", "PLAYGROUND_OFFLINE") + +theme_map = { + "ONLINE": { + "color": "#2196F3", + "grad": ("#E3F2FD", "#BBDEFB"), + "emoji": "🩵", + }, + "OFFLINE": { + "color": "#FFC107", + "grad": ("#FFF8E1", "#FFECB3"), + "emoji": "🤍", + }, +} # Get access_token @@ -311,7 +331,7 @@ def error_bot( ) # ---------- Markdown ---------- - colored_title = f"{title}" + colored_title = f"{ENV_NAME}" at_suffix = "" if user_ids: at_suffix = "\n\n" + " ".join([f"@{m}" for m in user_ids]) @@ -367,41 +387,52 @@ def online_bot( other_data2: dict, emoji: dict, ): - heading_color = "#00956D" # Green for subtitle - - # 0) Banner - banner_bytes = make_header(header_name, sub_title_name) - banner_url = upload_bytes_to_oss(banner_bytes, filename="online_report.png") - - # 1) Colored main title - colored_title = f"{header_name}" - - # 3) Markdown - md = "\n\n".join( - filter( - None, - [ - f"![banner]({banner_url})", - f"### 🙄 {colored_title}\n\n", - _kv_lines( - other_data1, - next(iter(emoji.keys())), - next(iter(emoji.values())), - heading_color=heading_color, - ), - _kv_lines( - other_data2, - list(emoji.keys())[1], - list(emoji.values())[1], - heading_color=heading_color, - ), - f"Time: " - f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n", - ], + try: + logger.info("in online bot") + theme = "OFFLINE" if "OFFLINE" in ENV_NAME or "TEST" in ENV_NAME else "ONLINE" + style = theme_map.get(theme, theme_map["OFFLINE"]) + heading_color = style["color"] # Use theme color for subtitle + + # 0) Banner + banner_bytes = make_header( + header_name, + sub_title_name, + colors=style["grad"], + fg=style["color"], + ) + banner_url = upload_bytes_to_oss(banner_bytes, filename=f"{ENV_NAME}_online_report.png") + + # 1) Colored main title + colored_title = f"{ENV_NAME}" + + # 3) Markdown + md = "\n\n".join( + filter( + None, + [ + f"![banner]({banner_url})", + f"### {style['emoji']} {colored_title}\n\n", + _kv_lines( + other_data1, + next(iter(emoji.keys())), + next(iter(emoji.values())), + heading_color=heading_color, + ), + _kv_lines( + other_data2, + list(emoji.keys())[1], + list(emoji.values())[1], + heading_color=heading_color, + ), + f"Time: " + f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n", + ], + ) ) - ) - _send_md(colored_title, md, type="user") + _send_md(colored_title, md, type="user") + except Exception: + logger.error(traceback.format_exc()) if __name__ == "__main__": diff --git a/src/memos/utils.py b/src/memos/utils.py index 08934ed34..9ae27bb81 100644 --- a/src/memos/utils.py +++ b/src/memos/utils.py @@ -6,7 +6,7 @@ logger = get_logger(__name__) -def timed(func=None, *, log=False, log_prefix=""): +def timed(func=None, *, log=True, log_prefix=""): """Decorator to measure and optionally log time of retrieval steps. Can be used as @timed or @timed(log=True)