diff --git a/.gitignore b/.gitignore index 184181828..792869e14 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,8 @@ tmp/ **/tmp_data/ # evaluation data +*.csv +*.jsonl evaluation/*tmp/ evaluation/results evaluation/.env @@ -13,6 +15,7 @@ evaluation/.env evaluation/configs/* **tree_textual_memory_locomo** .env +evaluation/scripts/personamem # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/evaluation/data/personamem/.gitkeep b/evaluation/data/personamem/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/evaluation/scripts/locomo/locomo_eval.py b/evaluation/scripts/locomo/locomo_eval.py index c6adbd61c..25d2a847e 100644 --- a/evaluation/scripts/locomo/locomo_eval.py +++ b/evaluation/scripts/locomo/locomo_eval.py @@ -32,7 +32,6 @@ except Exception as e: print(f"Warning: Failed to download NLTK resources: {e}") - try: sentence_model_name = "Qwen/Qwen3-Embedding-0.6B" sentence_model = SentenceTransformer(sentence_model_name) @@ -363,8 +362,7 @@ async def limited_task(task): parser.add_argument( "--lib", type=str, - choices=["zep", "memos", "mem0", "mem0_graph", "langmem", "openai"], - help="Specify the memory framework (zep or memos or mem0 or mem0_graph)", + choices=["zep", "memos", "mem0", "mem0_graph", "openai", "memos-api", "memobase"], ) parser.add_argument( "--version", diff --git a/evaluation/scripts/locomo/locomo_ingestion.py b/evaluation/scripts/locomo/locomo_ingestion.py index f3837002d..ae5e57c87 100644 --- a/evaluation/scripts/locomo/locomo_ingestion.py +++ b/evaluation/scripts/locomo/locomo_ingestion.py @@ -1,7 +1,26 @@ +import os +import sys +import uuid + + +sys.path.insert( + 0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) +) +sys.path.insert( + 0, + os.path.join( + os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + ), + "evaluation", + "scripts", + ), +) + import argparse import concurrent.futures import json -import os +import threading import time from datetime import datetime, timezone @@ -10,7 +29,9 @@ from dotenv import load_dotenv from mem0 import MemoryClient +from memobase import ChatBlob from tqdm import tqdm +from utils.client import memobase_client, memos_client from zep_cloud.client import Zep from memos.configs.mem_cube import GeneralMemCubeConfig @@ -93,7 +114,34 @@ def get_client(frame: str, user_id: str | None = None, version: str = "default") return mos -def ingest_session(client, session, frame, metadata, revised_client=None): +def string_to_uuid(s: str, salt="memobase_client") -> str: + return str(uuid.uuid5(uuid.NAMESPACE_DNS, s + salt)) + + +def memobase_add_memory(user, message, retries=3): + for attempt in range(retries): + try: + _ = user.insert(ChatBlob(messages=message), sync=True) + return + except Exception as e: + if attempt < retries - 1: + time.sleep(1) + continue + else: + raise e + + +def memobase_add_memories_for_speaker(client, speaker, messages): + real_uid = string_to_uuid(speaker) + u = client.get_or_create_user(real_uid) + for i in range(0, len(messages), 2): + batch_messages = messages[i : i + 2] + memobase_add_memory(u, batch_messages) + print(f"[{i + 1}/{len(messages)}] Added messages for {speaker} successfully.") + u.flush(sync=True) + + +def ingest_session(client, session, frame, version, metadata, revised_client=None): session_date = metadata["session_date"] date_format = "%I:%M %p on %d %B, %Y UTC" date_string = datetime.strptime(session_date, date_format).replace(tzinfo=timezone.utc) @@ -125,7 +173,7 @@ def ingest_session(client, session, frame, metadata, revised_client=None): group_id=conv_id, ) - elif frame == "memos": + elif frame == "memos" or frame == "memos-api": messages = [] messages_reverse = [] @@ -149,16 +197,22 @@ def ingest_session(client, session, frame, metadata, revised_client=None): speaker_a_user_id = conv_id + "_speaker_a" speaker_b_user_id = conv_id + "_speaker_b" + if frame == "memos-api": + client.add(messages=messages, user_id=f"{speaker_a_user_id.replace('_', '')}{version}") - client.add( - messages=messages, - user_id=speaker_a_user_id, - ) + revised_client.add( + messages=messages_reverse, user_id=f"{speaker_b_user_id.replace('_', '')}{version}" + ) + elif frame == "memos": + client.add( + messages=messages, + user_id=speaker_a_user_id, + ) - revised_client.add( - messages=messages_reverse, - user_id=speaker_b_user_id, - ) + revised_client.add( + messages=messages_reverse, + user_id=speaker_b_user_id, + ) print(f"Added messages for {speaker_a_user_id} and {speaker_b_user_id} successfully.") elif frame == "mem0" or frame == "mem0_graph": @@ -217,6 +271,77 @@ def ingest_session(client, session, frame, metadata, revised_client=None): version="v2", enable_graph=True, ) + elif frame == "memobase": + print(f"Processing abc for {metadata['session_key']}") + messages = [] + messages_reverse = [] + + for chat in tqdm(session, desc=f"{metadata['session_key']}"): + data = chat.get("speaker") + ": " + chat.get("text") + + if chat.get("speaker") == metadata["speaker_a"]: + messages.append( + { + "role": "user", + "content": chat.get("text"), + "alias": metadata["speaker_a"], + "created_at": iso_date, + } + ) + messages_reverse.append( + { + "role": "assistant", + "content": chat.get("text"), + "alias": metadata["speaker_b"], + "created_at": iso_date, + } + ) + elif chat.get("speaker") == metadata["speaker_b"]: + messages.append( + { + "role": "assistant", + "content": chat.get("text"), + "alias": metadata["speaker_b"], + "created_at": iso_date, + } + ) + messages_reverse.append( + { + "role": "user", + "content": chat.get("text"), + "alias": metadata["speaker_a"], + "created_at": iso_date, + } + ) + else: + raise ValueError( + f"Unknown speaker {chat.get('speaker')} in session {metadata['session_key']}" + ) + + print({"context": data, "conv_id": conv_id, "created_at": iso_date}) + + thread_a = threading.Thread( + target=memobase_add_memories_for_speaker, + args=( + client, + metadata["speaker_a_user_id"], + messages, + ), + ) + + thread_b = threading.Thread( + target=memobase_add_memories_for_speaker, + args=( + client, + metadata["speaker_b_user_id"], + messages_reverse, + ), + ) + + thread_a.start() + thread_b.start() + thread_a.join() + thread_b.join() end_time = time.time() elapsed_time = round(end_time - start_time, 2) @@ -246,7 +371,19 @@ def process_user(conv_idx, frame, locomo_df, version, num_workers=1): speaker_b_user_id = conv_id + "_speaker_b" client = get_client("memos", speaker_a_user_id, version) revised_client = get_client("memos", speaker_b_user_id, version) - + elif frame == "memos-api": + conv_id = "locomo_exp_user_" + str(conv_idx) + speaker_a_user_id = conv_id + "_speaker_a" + speaker_b_user_id = conv_id + "_speaker_b" + client = memos_client(mode="api") + revised_client = memos_client(mode="api") + elif frame == "memobase": + client = memobase_client() + conv_id = "locomo_exp_user_" + str(conv_idx) + speaker_a_user_id = conv_id + "_speaker_a" + speaker_b_user_id = conv_id + "_speaker_b" + client.delete_user(string_to_uuid(speaker_a_user_id)) + client.delete_user(string_to_uuid(speaker_b_user_id)) sessions_to_process = [] for session_idx in range(max_session_count): session_key = f"session_{session_idx}" @@ -272,7 +409,7 @@ def process_user(conv_idx, frame, locomo_df, version, num_workers=1): with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: futures = { executor.submit( - ingest_session, client, session, frame, metadata, revised_client + ingest_session, client, session, frame, version, metadata, revised_client ): metadata["session_key"] for session, metadata in sessions_to_process } @@ -340,8 +477,7 @@ def main(frame, version="default", num_workers=4): parser.add_argument( "--lib", type=str, - choices=["zep", "memos", "mem0", "mem0_graph"], - help="Specify the memory framework (zep or memos or mem0 or mem0_graph)", + choices=["zep", "memos", "mem0", "mem0_graph", "memos-api", "memobase"], ) parser.add_argument( "--version", diff --git a/evaluation/scripts/locomo/locomo_metric.py b/evaluation/scripts/locomo/locomo_metric.py index 9335ec5ba..8ee18faaf 100644 --- a/evaluation/scripts/locomo/locomo_metric.py +++ b/evaluation/scripts/locomo/locomo_metric.py @@ -9,8 +9,7 @@ parser.add_argument( "--lib", type=str, - choices=["zep", "memos", "mem0", "mem0_graph", "langmem", "openai"], - help="Specify the memory framework (zep or memos or mem0 or mem0_graph)", + choices=["zep", "memos", "mem0", "mem0_graph", "openai", "memos-api", "memobase"], ) parser.add_argument( "--version", diff --git a/evaluation/scripts/locomo/locomo_responses.py b/evaluation/scripts/locomo/locomo_responses.py index 5d0374c2b..056b17163 100644 --- a/evaluation/scripts/locomo/locomo_responses.py +++ b/evaluation/scripts/locomo/locomo_responses.py @@ -124,8 +124,7 @@ async def main(frame, version="default"): parser.add_argument( "--lib", type=str, - choices=["zep", "memos", "mem0", "mem0_graph", "openai"], - help="Specify the memory framework (zep or memos or mem0 or mem0_graph)", + choices=["zep", "memos", "mem0", "mem0_graph", "openai", "memos-api", "memobase"], ) parser.add_argument( "--version", diff --git a/evaluation/scripts/locomo/locomo_search.py b/evaluation/scripts/locomo/locomo_search.py index 26e7dd467..e72f4594b 100644 --- a/evaluation/scripts/locomo/locomo_search.py +++ b/evaluation/scripts/locomo/locomo_search.py @@ -1,6 +1,24 @@ +import os +import sys +import uuid + + +sys.path.insert( + 0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) +) +sys.path.insert( + 0, + os.path.join( + os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + ), + "evaluation", + "scripts", + ), +) + import argparse import json -import os from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed @@ -11,7 +29,8 @@ from dotenv import load_dotenv from mem0 import MemoryClient from tqdm import tqdm -from utils import filter_memory_data +from utils.client import memobase_client, memos_client +from utils.memos_filters import filter_memory_data from zep_cloud.client import Zep from memos.configs.mem_os import MOSConfig @@ -101,6 +120,15 @@ def get_client(frame: str, user_id: str | None = None, version: str = "default", {speaker_2_memories} """ +TEMPLATE_MEMOBASE = """Memories for user {speaker_1_user_id}: + + {speaker_1_memories} + + Memories for user {speaker_2_user_id}: + + {speaker_2_memories} +""" + def mem0_search(client, query, speaker_a_user_id, speaker_b_user_id, top_k=20): start = time() @@ -191,6 +219,38 @@ def memos_search(client, query, conv_id, speaker_a, speaker_b, reversed_client=N return context, duration_ms +def memos_api_search( + client, query, conv_id, speaker_a, speaker_b, top_k, version, reversed_client=None +): + start = time() + speaker_a_user_id = conv_id + "_speaker_a" + search_a_results = client.search( + query=query, user_id=f"{speaker_a_user_id.replace('_', '')}{version}", top_k=top_k + ) + speaker_a_context = "" + for item in search_a_results: + speaker_a_context += f"{item}\n" + + speaker_b_user_id = conv_id + "_speaker_b" + search_b_results = reversed_client.search( + query=query, user_id=f"{speaker_b_user_id.replace('_', '')}{version}", top_k=top_k + ) + speaker_b_context = "" + for item in search_b_results: + speaker_b_context += f"{item}\n" + + context = TEMPLATE_MEMOS.format( + speaker_1=speaker_a, + speaker_1_memories=speaker_a_context, + speaker_2=speaker_b, + speaker_2_memories=speaker_b_context, + ) + + print(query, context) + duration_ms = (time() - start) * 1000 + return context, duration_ms + + def mem0_graph_search(client, query, speaker_a_user_id, speaker_b_user_id, top_k=20): start = time() search_speaker_a_results = client.search( @@ -297,7 +357,58 @@ def zep_search(client, query, group_id, top_k=20): return context, duration_ms -def search_query(client, query, metadata, frame, reversed_client=None, top_k=20): +def memobase_search( + client, query, speaker_a, speaker_b, speaker_a_user_id, speaker_b_user_id, top_k=20 +): + start = time() + speaker_a_memories = memobase_search_memory( + client, speaker_a_user_id, query, max_memory_context_size=top_k * 100 + ) + speaker_b_memories = memobase_search_memory( + client, speaker_b_user_id, query, max_memory_context_size=top_k * 100 + ) + context = TEMPLATE_MEMOBASE.format( + speaker_1_user_id=speaker_a, + speaker_1_memories=speaker_a_memories, + indent=4, + speaker_2_user_id=speaker_b, + speaker_2_memories=speaker_b_memories, + ) + print(query, context) + duration_ms = (time() - start) * 1000 + return (context, duration_ms) + + +def string_to_uuid(s: str, salt="memobase_client") -> str: + return str(uuid.uuid5(uuid.NAMESPACE_DNS, s + salt)) + + +def memobase_search_memory( + client, user_id, query, max_memory_context_size, max_retries=3, retry_delay=1 +): + retries = 0 + real_uid = string_to_uuid(user_id) + u = client.get_user(real_uid, no_get=True) + + while retries < max_retries: + try: + memories = u.context( + max_token_size=max_memory_context_size, + chats=[{"role": "user", "content": query}], + event_similarity_threshold=0.2, + fill_window_with_events=True, + ) + return memories + except Exception as e: + print(f"Error during memory search: {e}") + print("Retrying...") + retries += 1 + if retries >= max_retries: + raise e + time.sleep(retry_delay) + + +def search_query(client, query, metadata, frame, version, reversed_client=None, top_k=20): conv_id = metadata.get("conv_id") speaker_a = metadata.get("speaker_a") speaker_b = metadata.get("speaker_b") @@ -316,7 +427,15 @@ def search_query(client, query, metadata, frame, reversed_client=None, top_k=20) ) elif frame == "memos": context, duration_ms = memos_search( - client, query, conv_id, speaker_a, speaker_b, reversed_client + client, query, conv_id, speaker_a, speaker_b, version, reversed_client + ) + elif frame == "memos-api": + context, duration_ms = memos_api_search( + client, query, conv_id, speaker_a, speaker_b, top_k, version, reversed_client + ) + elif frame == "memobase": + context, duration_ms = memobase_search( + client, query, speaker_a, speaker_b, speaker_a_user_id, speaker_b_user_id, top_k ) return context, duration_ms @@ -364,6 +483,15 @@ def process_user(group_idx, locomo_df, frame, version, top_k=20, num_workers=1): speaker_b_user_id = conv_id + "_speaker_b" client = get_client(frame, speaker_a_user_id, version, top_k=top_k) reversed_client = get_client(frame, speaker_b_user_id, version, top_k=top_k) + elif frame == "memos-api": + speaker_a_user_id = conv_id + "_speaker_a" + speaker_b_user_id = conv_id + "_speaker_b" + client = memos_client(mode="api") + reversed_client = memos_client(mode="api") + client.user_register(user_id=f"{speaker_a_user_id.replace('_', '')}{version}") + reversed_client.user_register(user_id=f"{speaker_b_user_id.replace('_', '')}{version}") + elif frame == "memobase": + client = memobase_client() else: client = get_client(frame, conv_id, version) @@ -372,7 +500,7 @@ def process_qa(qa): if qa.get("category") == 5: return None context, duration_ms = search_query( - client, query, metadata, frame, reversed_client=reversed_client, top_k=top_k + client, query, metadata, frame, version, reversed_client=reversed_client, top_k=top_k ) if not context: @@ -439,8 +567,7 @@ def main(frame, version="default", num_workers=1, top_k=20): parser.add_argument( "--lib", type=str, - choices=["zep", "memos", "mem0", "mem0_graph", "langmem"], - help="Specify the memory framework (zep or memos or mem0 or mem0_graph)", + choices=["zep", "memos", "mem0", "mem0_graph", "memos-api", "memobase"], ) parser.add_argument( "--version", diff --git a/evaluation/scripts/longmemeval/lme_eval.py b/evaluation/scripts/longmemeval/lme_eval.py index 2d54a5acf..384f595be 100644 --- a/evaluation/scripts/longmemeval/lme_eval.py +++ b/evaluation/scripts/longmemeval/lme_eval.py @@ -346,7 +346,7 @@ async def main(frame, version, nlp_options, num_runs=3, num_workers=5): parser.add_argument( "--lib", type=str, - choices=["mem0-local", "mem0-api"], + choices=["mem0-local", "mem0-api", "memos-local", "zep", "memos-api", "zep", "memobase"], ) parser.add_argument( "--version", type=str, default="v1", help="Version of the evaluation framework." diff --git a/evaluation/scripts/longmemeval/lme_ingestion.py b/evaluation/scripts/longmemeval/lme_ingestion.py index aef8e076d..f2df0bd30 100644 --- a/evaluation/scripts/longmemeval/lme_ingestion.py +++ b/evaluation/scripts/longmemeval/lme_ingestion.py @@ -10,7 +10,8 @@ import pandas as pd from tqdm import tqdm -from utils.client import mem0_client, memos_client, zep_client +from utils.client import mem0_client, memobase_client, memos_client, zep_client +from utils.memobase_utils import memobase_add_memory, string_to_uuid from zep_cloud.types import Message @@ -19,7 +20,7 @@ def ingest_session(session, date, user_id, session_id, frame, client): if frame == "zep": for idx, msg in enumerate(session): print( - f"\033[90m[{frame}]\033[0m 💬 Session \033[1;94m{session_id}\033[0m: [\033[93m{idx + 1}/{len(session)}\033[0m] Ingesting message: \033[1m{msg['role']}\033[0m - \033[96m{msg['content'][:50]}...\033[0m at \033[92m{date.isoformat()}\033[0m" + f"\033[90m[{frame}]\033[0m 📝 User \033[1;94m{user_id}\033[0m 💬 Session \033[1;94m{session_id}\033[0m: [\033[93m{idx + 1}/{len(session)}\033[0m] Ingesting message: \033[1m{msg['role']}\033[0m - \033[96m{msg['content'][:50]}...\033[0m at \033[92m{date.isoformat()}\033[0m" ) client.memory.add( session_id=session_id, @@ -53,32 +54,49 @@ def ingest_session(session, date, user_id, session_id, frame, client): print( f"\033[90m[{frame}]\033[0m ✅ Session \033[1;94m{session_id}\033[0m: Ingested \033[93m{len(messages)}\033[0m messages at \033[92m{date.isoformat()}\033[0m" ) - elif frame == "memos-local": + elif frame == "memobase": for idx, msg in enumerate(session): messages.append( { "role": msg["role"], "content": msg["content"][:8000], - "chat_time": date.isoformat(), + "created_at": date.isoformat(), } ) print( - f"\033[90m[{frame}]\033[0m 📝 Session \033[1;94m{session_id}\033[0m: [\033[93m{idx + 1}/{len(session)}\033[0m] Reading message: \033[1m{msg['role']}\033[0m - \033[96m{msg['content'][:50]}...\033[0m at \033[92m{date.isoformat()}\033[0m" + f"\033[90m[{frame}]\033[0m 📝 User \033[1;94m{user_id}\033[0m 💬 Session \033[1;94m{session_id}\033[0m: [\033[93m{idx + 1}/{len(session)}\033[0m] Ingesting message: \033[1m{msg['role']}\033[0m - \033[96m{msg['content'][:50]}...\033[0m at \033[92m{date.isoformat()}\033[0m" + ) + + real_uid = string_to_uuid(user_id) + user = client.get_user(real_uid) + memobase_add_memory(user, messages) + user.flash(sync=True) + print( + f"\033[90m[{frame}]\033[0m ✅ Session \033[1;94m{session_id}\033[0m: Ingested \033[93m{len(messages)}\033[0m messages at \033[92m{date.isoformat()}\033[0m" + ) + elif frame == "memos-local" or frame == "memos-api": + for _idx, msg in enumerate(session): + messages.append( + { + "role": msg["role"], + "content": msg["content"][:8000], + "chat_time": date.isoformat(), + } ) client.add(messages=messages, user_id=user_id) print( f"\033[90m[{frame}]\033[0m ✅ Session \033[1;94m{session_id}\033[0m: Ingested \033[93m{len(messages)}\033[0m messages at \033[92m{date.isoformat()}\033[0m" ) + client.mem_reorganizer_wait() -def ingest_conv(lme_df, version, conv_idx, frame, num_workers=2): +def ingest_conv(lme_df, version, conv_idx, frame): conversation = lme_df.iloc[conv_idx] sessions = conversation["haystack_sessions"] dates = conversation["haystack_dates"] user_id = "lme_exper_user_" + str(conv_idx) - session_id = "lme_exper_session_" + str(conv_idx) print("\n" + "=" * 80) print(f"🔄 \033[1;36mINGESTING CONVERSATION {conv_idx}\033[0m".center(80)) @@ -89,19 +107,10 @@ def ingest_conv(lme_df, version, conv_idx, frame, num_workers=2): print("🔌 \033[1mUsing \033[94mZep client\033[0m \033[1mfor ingestion...\033[0m") # Delete existing user and session if they exist client.user.delete(user_id) - client.memory.delete(session_id) - print( - f"🗑️ Deleted existing user \033[93m{user_id}\033[0m and session \033[93m{session_id}\033[0m from Zep memory..." - ) - # Add user and session to Zep memory + print(f"🗑️ Deleted existing user \033[93m{user_id}\033[0m from Zep memory...") + # Add user to Zep memory client.user.add(user_id=user_id) - client.memory.add_session( - user_id=user_id, - session_id=session_id, - ) - print( - f"➕ Added user \033[93m{user_id}\033[0m and session \033[93m{session_id}\033[0m to Zep memory..." - ) + print(f"➕ Added user \033[93m{user_id}\033[0m to Zep memory...") elif frame == "mem0-local": client = mem0_client(mode="local") print("🔌 \033[1mUsing \033[94mMem0 Local client\033[0m \033[1mfor ingestion...\033[0m") @@ -117,41 +126,49 @@ def ingest_conv(lme_df, version, conv_idx, frame, num_workers=2): elif frame == "memos-local": client = memos_client( mode="local", - db_name=f"lme_{frame}-{version}-{user_id.replace('_', '')}", + db_name=f"lme_{frame}-{version}", user_id=user_id, top_k=20, mem_cube_path=f"results/lme/{frame}-{version}/storages/{user_id}", - mem_cube_config_path="configs/mem_cube_config.json", + mem_cube_config_path="configs/mu_mem_cube_config.json", mem_os_config_path="configs/mos_memos_config.json", addorsearch="add", ) print("🔌 \033[1mUsing \033[94mMemos Local client\033[0m \033[1mfor ingestion...\033[0m") - - with ThreadPoolExecutor(max_workers=num_workers) as executor: - futures = [] - - for idx, session in enumerate(sessions): - date = dates[idx] + " UTC" - date_format = "%Y/%m/%d (%a) %H:%M UTC" - date_string = datetime.strptime(date, date_format).replace(tzinfo=timezone.utc) - - future = executor.submit( - ingest_session, session, date_string, user_id, session_id, frame, client + elif frame == "memos-api": + client = memos_client(mode="api") + elif frame == "memobase": + client = memobase_client() + print("🔌 \033[1mUsing \033[94mMemobase client\033[0m \033[1mfor ingestion...\033[0m") + client.delete_user(string_to_uuid(user_id)) + print(f"🗑️ Deleted existing user \033[93m{user_id}\033[0m from Memobase memory...") + + for idx, session in enumerate(sessions): + session_id = user_id + "_lme_exper_session_" + str(idx) + if frame == "zep": + client.memory.add_session( + user_id=user_id, + session_id=session_id, + ) + print( + f"➕ Added session \033[93m{session_id}\033[0m for user \033[93m{user_id}\033[0m to Zep memory..." ) - futures.append(future) - if len(session) == 0: - print(f"\033[93m⚠️ Skipping empty session {idx} in conversation {conv_idx}\033[0m") - continue + if len(session) == 0: + print(f"\033[93m⚠️ Skipping empty session {idx} in conversation {conv_idx}\033[0m") + continue - for future in tqdm( - as_completed(futures), total=len(futures), desc=f"📊 Ingesting user {conv_idx}" - ): - try: - future.result() - except Exception as e: - print(f"\033[91m❌ Error ingesting session: {e}\033[0m") + date = dates[idx] + " UTC" + date_format = "%Y/%m/%d (%a) %H:%M UTC" + date_string = datetime.strptime(date, date_format).replace(tzinfo=timezone.utc) + + try: + ingest_session(session, date_string, user_id, session_id, frame, client) + except Exception as e: + print(f"\033[91m❌ Error ingesting session: {e}\033[0m") + if frame == "memos-local": + client.mem_reorganizer_off() print("=" * 80) @@ -170,8 +187,21 @@ def main(frame, version, num_workers=2): print("-" * 80) start_time = datetime.now() - for session_idx in range(num_multi_sessions): - ingest_conv(lme_df, version, session_idx, frame, num_workers=num_workers) + + with ThreadPoolExecutor(max_workers=num_workers) as executor: + futures = [] + for session_idx in range(num_multi_sessions): + future = executor.submit(ingest_conv, lme_df, version, session_idx, frame) + futures.append(future) + + for future in tqdm( + as_completed(futures), total=len(futures), desc="📊 Processing conversations" + ): + try: + future.result() + except Exception as e: + print(f"\033[91m❌ Error processing conversation: {e}\033[0m") + end_time = datetime.now() elapsed_time = end_time - start_time elapsed_time_str = str(elapsed_time).split(".")[0] @@ -193,7 +223,7 @@ def main(frame, version, num_workers=2): parser.add_argument( "--lib", type=str, - choices=["mem0-local", "mem0-api", "memos-local"], + choices=["mem0-local", "mem0-api", "memos-local", "memos-api", "zep", "memobase"], ) parser.add_argument( "--version", type=str, default="v1", help="Version of the evaluation framework." diff --git a/evaluation/scripts/longmemeval/lme_metric.py b/evaluation/scripts/longmemeval/lme_metric.py index be285123f..69f7748e0 100644 --- a/evaluation/scripts/longmemeval/lme_metric.py +++ b/evaluation/scripts/longmemeval/lme_metric.py @@ -258,7 +258,7 @@ def calculate_scores(data, grade_path, output_path): parser.add_argument( "--lib", type=str, - choices=["mem0-local", "mem0-api"], + choices=["mem0-local", "mem0-api", "memos-local", "memos-api", "zep", "memobase"], ) parser.add_argument( "--version", type=str, default="v1", help="Version of the evaluation framework." diff --git a/evaluation/scripts/longmemeval/lme_responses.py b/evaluation/scripts/longmemeval/lme_responses.py index 9d5f8c1ab..e1e341826 100644 --- a/evaluation/scripts/longmemeval/lme_responses.py +++ b/evaluation/scripts/longmemeval/lme_responses.py @@ -145,7 +145,7 @@ def main(frame, version, num_workers=4): parser.add_argument( "--lib", type=str, - choices=["mem0-local", "mem0-api"], + choices=["mem0-local", "mem0-api", "memos-local", "memos-api", "zep", "memobase"], ) parser.add_argument( "--version", type=str, default="v1", help="Version of the evaluation framework." diff --git a/evaluation/scripts/longmemeval/lme_search.py b/evaluation/scripts/longmemeval/lme_search.py index 0643c07ff..898ab7e27 100644 --- a/evaluation/scripts/longmemeval/lme_search.py +++ b/evaluation/scripts/longmemeval/lme_search.py @@ -13,11 +13,13 @@ import pandas as pd from tqdm import tqdm -from utils.client import mem0_client, memos_client, zep_client +from utils.client import mem0_client, memobase_client, memos_client, zep_client +from utils.memobase_utils import memobase_search_memory from utils.memos_filters import filter_memory_data from utils.prompts import ( MEM0_CONTEXT_TEMPLATE, MEM0_GRAPH_CONTEXT_TEMPLATE, + MEMOBASE_CONTEXT_TEMPLATE, MEMOS_CONTEXT_TEMPLATE, ZEP_CONTEXT_TEMPLATE, ) @@ -111,21 +113,37 @@ def mem0_search(client, user_id, query, top_k=20, enable_graph=False, frame="mem return context, duration_ms -def memos_search(client, user_id, query, frame="memos-local"): +def memos_search(client, user_id, query, top_k, frame="memos-local"): start = time() + if frame == "memos-local": + results = client.search( + query=query, + user_id=user_id, + ) - results = client.search( - query=query, - user_id=user_id, - ) + results = filter_memory_data(results)["text_mem"][0]["memories"] + search_memories = "\n".join([f" - {item['memory']}" for item in results]) - search_memories = filter_memory_data(results)["text_mem"][0]["memories"] + elif frame == "memos-api": + results = client.search(query=query, user_id=user_id, top_k=top_k) + search_memories = "\n".join([f" - {item}" for item in results]) context = MEMOS_CONTEXT_TEMPLATE.format(user_id=user_id, memories=search_memories) duration_ms = (time() - start) * 1000 return context, duration_ms +def memobase_search(client, user_id, query, top_k=20): + start = time() + memories = memobase_search_memory(client, user_id, query, max_memory_context_size=top_k * 100) + context = MEMOBASE_CONTEXT_TEMPLATE.format( + user_id=user_id, + memories=memories, + ) + duration_ms = (time() - start) * 1000 + return context, duration_ms + + def process_user(lme_df, conv_idx, frame, version, top_k=20): row = lme_df.iloc[conv_idx] question = row["question"] @@ -175,17 +193,27 @@ def process_user(lme_df, conv_idx, frame, version, top_k=20): elif frame == "memos-local": client = memos_client( mode="local", - db_name=f"lme_{frame}-{version}-{user_id.replace('_', '')}", + db_name=f"lme_{frame}-{version}", user_id=user_id, - top_k=20, + top_k=top_k, mem_cube_path=f"results/lme/{frame}-{version}/storages/{user_id}", - mem_cube_config_path="configs/mem_cube_config.json", + mem_cube_config_path="configs/mu_mem_cube_config.json", mem_os_config_path="configs/mos_memos_config.json", addorsearch="search", ) print("🔌 \033[1mUsing \033[94mMemos Local client\033[0m \033[1mfor search...\033[0m") context, duration_ms = memos_search(client, user_id, question, frame=frame) + elif frame == "memobase": + client = memobase_client() + print("🔌 \033[1mUsing \033[94mMemobase client\033[0m \033[1mfor search...\033[0m") + context, duration_ms = memobase_search_memory(client, user_id, question, top_k=top_k) + elif frame == "memos-api": + client = memos_client( + mode="api", + ) + print("🔌 \033[1mUsing \033[94mMemos API client\033[0m \033[1mfor search...\033[0m") + context, duration_ms = memos_search(client, user_id, question, top_k=top_k, frame=frame) search_results[user_id].append( { "question": question, @@ -282,7 +310,11 @@ def main(frame, version, top_k=20, num_workers=2): if __name__ == "__main__": parser = argparse.ArgumentParser(description="LongMemeval Search Script") - parser.add_argument("--lib", type=str, choices=["mem0-local", "mem0-api", "memos-local"]) + parser.add_argument( + "--lib", + type=str, + choices=["mem0-local", "mem0-api", "memos-local", "memos-api", "zep", "memobase"], + ) parser.add_argument( "--version", type=str, default="v1", help="Version of the evaluation framework." ) diff --git a/evaluation/scripts/run_lme_eval.sh b/evaluation/scripts/run_lme_eval.sh index 96e430fd6..fc9031ea0 100755 --- a/evaluation/scripts/run_lme_eval.sh +++ b/evaluation/scripts/run_lme_eval.sh @@ -2,8 +2,8 @@ # Common parameters for all scripts LIB="memos-local" -VERSION="071503" -WORKERS=10 +VERSION="072202" +WORKERS=50 TOPK=20 echo "Running lme_ingestion.py..." @@ -20,4 +20,25 @@ if [ $? -ne 0 ]; then exit 1 fi +echo "Running lme_responses.py..." +CUDA_VISIBLE_DEVICES=0 python scripts/longmemeval/lme_responses.py --lib $LIB --version $VERSION --workers $WORKERS +if [ $? -ne 0 ]; then + echo "Error running lme_responses.py" + exit 1 +fi + +echo "Running lme_eval.py..." +CUDA_VISIBLE_DEVICES=0 python scripts/longmemeval/lme_eval.py --lib $LIB --version $VERSION --workers $WORKERS +if [ $? -ne 0 ]; then + echo "Error running lme_eval.py" + exit 1 +fi + +echo "Running lme_metric.py..." +CUDA_VISIBLE_DEVICES=0 python scripts/longmemeval/lme_metric.py --lib $LIB --version $VERSION +if [ $? -ne 0 ]; then + echo "Error running lme_metric.py" + exit 1 +fi + echo "All scripts completed successfully!" diff --git a/evaluation/scripts/run_locomo_eval.sh b/evaluation/scripts/run_locomo_eval.sh index df1a865f2..89a09729c 100755 --- a/evaluation/scripts/run_locomo_eval.sh +++ b/evaluation/scripts/run_locomo_eval.sh @@ -1,17 +1,17 @@ #!/bin/bash # Common parameters for all scripts -LIB="memos" -VERSION="063001" +LIB="memos-api" +VERSION="072001" WORKERS=10 TOPK=20 -echo "Running locomo_ingestion.py..." -CUDA_VISIBLE_DEVICES=0 python scripts/locomo/locomo_ingestion.py --lib $LIB --version $VERSION --workers $WORKERS -if [ $? -ne 0 ]; then - echo "Error running locomo_ingestion.py" - exit 1 -fi +# echo "Running locomo_ingestion.py..." +# CUDA_VISIBLE_DEVICES=0 python scripts/locomo/locomo_ingestion.py --lib $LIB --version $VERSION --workers $WORKERS +# if [ $? -ne 0 ]; then +# echo "Error running locomo_ingestion.py" +# exit 1 +# fi echo "Running locomo_search.py..." CUDA_VISIBLE_DEVICES=0 python scripts/locomo/locomo_search.py --lib $LIB --version $VERSION --top_k $TOPK --workers $WORKERS diff --git a/evaluation/scripts/run_pm_eval.sh b/evaluation/scripts/run_pm_eval.sh new file mode 100755 index 000000000..a3321cdf9 --- /dev/null +++ b/evaluation/scripts/run_pm_eval.sh @@ -0,0 +1,37 @@ +#!/bin/bash + +# Common parameters for all scripts +LIB="memos-local" +VERSION="072201" +WORKERS=10 +TOPK=20 + +echo "Running pm_ingestion.py..." +CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_ingestion.py --lib $LIB --version $VERSION --workers $WORKERS +if [ $? -ne 0 ]; then + echo "Error running pm_ingestion.py" + exit 1 +fi + +echo "Running pm_search.py..." +CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_search.py --lib $LIB --version $VERSION --top_k $TOPK --workers $WORKERS +if [ $? -ne 0 ]; then + echo "Error running pm_search.py" + exit 1 +fi + +echo "Running pm_responses.py..." +CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_responses.py --lib $LIB --version $VERSION --workers $WORKERS +if [ $? -ne 0 ]; then + echo "Error running pm_responses.py" + exit 1 +fi + +echo "Running pm_metric.py..." +CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_metric.py --lib $LIB --version $VERSION +if [ $? -ne 0 ]; then + echo "Error running pm_metric.py" + exit 1 +fi + +echo "All scripts completed successfully!" diff --git a/evaluation/scripts/utils/client.py b/evaluation/scripts/utils/client.py index ddb144f6f..33aea7497 100644 --- a/evaluation/scripts/utils/client.py +++ b/evaluation/scripts/utils/client.py @@ -4,16 +4,20 @@ from dotenv import load_dotenv from mem0 import MemoryClient +from memobase import MemoBaseClient from zep_cloud.client import Zep from zep_cloud.types import Message sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from memobase import ChatBlob + from memos.configs.mem_cube import GeneralMemCubeConfig from memos.configs.mem_os import MOSConfig from memos.mem_cube.general import GeneralMemCube -from memos.mem_os.main import MOS +from memos.mem_os.product import MOSProduct from utils.mem0_local import Mem0Client +from utils.memos_api import MemOSAPI from utils.memos_filters import filter_memory_data @@ -57,7 +61,7 @@ def memos_client( mos_config_data = json.load(f) mos_config_data["top_k"] = top_k mos_config = MOSConfig(**mos_config_data) - memos = MOS(mos_config) + memos = MOSProduct(mos_config) memos.create_user(user_id=user_id) if addorsearch == "add": @@ -68,24 +72,35 @@ def memos_client( mem_cube_config_data["text_mem"]["config"]["graph_db"]["config"]["db_name"] = ( f"{db_name.replace('_', '')}" ) + mem_cube_config_data["text_mem"]["config"]["graph_db"]["config"]["user_name"] = user_id + mem_cube_config_data["text_mem"]["config"]["reorganize"] = True mem_cube_config = GeneralMemCubeConfig.model_validate(mem_cube_config_data) mem_cube = GeneralMemCube(mem_cube_config) if not os.path.exists(mem_cube_path): mem_cube.dump(mem_cube_path) - memos.register_mem_cube( - mem_cube_name_or_path=mem_cube_path, - mem_cube_id=user_id, + memos.user_register( user_id=user_id, + user_name=user_id, + interests=f"I'm {user_id}", + default_mem_cube=mem_cube, ) elif mode == "api": - pass + memos = MemOSAPI(base_url=os.getenv("MEMOS_BASE_URL")) return memos +def memobase_client(): + client = MemoBaseClient( + project_url=os.getenv("MEMOBASE_PROJECT_URL"), + api_key=os.getenv("MEMOBASE_API_KEY"), + ) + return client + + if __name__ == "__main__": # Example usage of the Zep client zep = zep_client() @@ -196,3 +211,25 @@ def memos_client( search_result_b = memos_b.search(query="football", user_id="alice") filtered_search_result_b = filter_memory_data(search_result_b)["text_mem"][0]["memories"] print("Search results in Memos B:", filtered_search_result_b) + + # Example usage of MemoBase client + client = memobase_client() + print("MemoBase client initialized successfully.") + + # Example of adding a user and retrieving user information + user_id = client.add_user() + user = client.get_user(user_id) + + # Example of adding a chat blob to the user + print(f"Adding chat blob for user {user_id}...") + b = ChatBlob( + messages=[ + {"role": "user", "content": "Hi, I'm here again"}, + {"role": "assistant", "content": "Hi, Gus! How can I help you?"}, + ] + ) + bid = user.insert(b) + + # Example of retrieving the context of the user + context = user.context() + print(context) diff --git a/evaluation/scripts/utils/memobase_utils.py b/evaluation/scripts/utils/memobase_utils.py new file mode 100644 index 000000000..dcf06ea31 --- /dev/null +++ b/evaluation/scripts/utils/memobase_utils.py @@ -0,0 +1,46 @@ +import time +import uuid + +from memobase import ChatBlob + + +def string_to_uuid(s: str, salt="memobase_client") -> str: + return str(uuid.uuid5(uuid.NAMESPACE_DNS, s + salt)) + + +def memobase_add_memory(user, message, retries=3): + for attempt in range(retries): + try: + _ = user.insert(ChatBlob(messages=message), sync=True) + return + except Exception as e: + if attempt < retries - 1: + time.sleep(1) + continue + else: + raise e + + +def memobase_search_memory( + client, user_id, query, max_memory_context_size, max_retries=3, retry_delay=1 +): + retries = 0 + real_uid = string_to_uuid(user_id) + u = client.get_user(real_uid, no_get=True) + + while retries < max_retries: + try: + memories = u.context( + max_token_size=max_memory_context_size, + chats=[{"role": "user", "content": query}], + event_similarity_threshold=0.2, + fill_window_with_events=True, + ) + return memories + except Exception as e: + print(f"Error during memory search: {e}") + print("Retrying...") + retries += 1 + if retries >= max_retries: + raise e + time.sleep(retry_delay) diff --git a/evaluation/scripts/utils/memos_api.py b/evaluation/scripts/utils/memos_api.py new file mode 100644 index 000000000..7b7f2a061 --- /dev/null +++ b/evaluation/scripts/utils/memos_api.py @@ -0,0 +1,63 @@ +import json + +import requests + + +class MemOSAPI: + def __init__(self, base_url: str = "http://localhost:8000"): + self.base_url = base_url + self.headers = {"Content-Type": "application/json"} + + def user_register(self, user_id: str): + """Register a user.""" + url = f"{self.base_url}/users/register" + payload = json.dumps({"user_id": user_id}) + response = requests.request("POST", url, data=payload, headers=self.headers) + return response.text + + def add(self, messages: list[dict], user_id: str | None = None): + """Create memories.""" + register_res = json.loads(self.user_register(user_id)) + cube_id = register_res["data"]["mem_cube_id"] + url = f"{self.base_url}/add" + payload = json.dumps({"messages": messages, "user_id": user_id, "mem_cube_id": cube_id}) + + response = requests.request("POST", url, data=payload, headers=self.headers) + return response.text + + def search(self, query: str, user_id: str | None = None, top_k: int = 10): + """Search memories.""" + url = f"{self.base_url}/search" + payload = json.dumps( + { + "query": query, + "user_id": user_id, + } + ) + + response = requests.request("POST", url, data=payload, headers=self.headers) + if response.status_code != 200: + response.raise_for_status() + else: + result = json.loads(response.text)["data"]["text_mem"][0]["memories"] + text_memories = [item["memory"] for item in result][:top_k] + return text_memories + + +if __name__ == "__main__": + client = MemOSAPI(base_url="http://localhost:8000") + # Example usage + try: + messages = [ + { + "role": "user", + "content": "I went to the store and bought a red apple.", + "chat_time": "2023-10-01T12:00:00Z", + } + ] + add_response = client.add(messages, user_id="user789") + print("Add memory response:", add_response) + search_response = client.search("red apple", user_id="user789", top_k=1) + print("Search memory response:", search_response) + except requests.RequestException as e: + print("An error occurred:", e) diff --git a/evaluation/scripts/utils/prompts.py b/evaluation/scripts/utils/prompts.py index 6515619ec..dd83acdc2 100644 --- a/evaluation/scripts/utils/prompts.py +++ b/evaluation/scripts/utils/prompts.py @@ -28,6 +28,37 @@ Answer: """ +PM_ANSWER_PROMPT = """ + You are a helpful assistant tasked with selecting the best answer to a user question, based solely on summarized conversation memories. + + # CONTEXT: + The following are summarized facts and preferences extracted from prior user conversations. Use only these memories to answer the question. + + {context} + + # INSTRUCTIONS: + 1. Carefully read and reason over the memory summary. + 2. Evaluate each of the four answer choices (a) through (d). + 3. Choose the single best-supported answer based on the information in memory. + 4. Output ONLY the final choice in the format (a), (b), (c), or (d), placed directly after the token . + + # IMPORTANT RULES: + - Your final answer **must appear after** the token . + - Your final answer **must use parentheses**, like (a) or (b). + - Do NOT list multiple choices. Choose only one. + - Do NOT include extra text after . Just output the answer. + + # QUESTION: + {question} + + # OPTIONS: + {options} + + Final Answer: + +""" + + ZEP_CONTEXT_TEMPLATE = """ FACTS and ENTITIES represent relevant context to the current conversation. @@ -53,6 +84,12 @@ {memories} """ +MEMOBASE_CONTEXT_TEMPLATE = """ + Memories for user {user_id}: + + {memories} +""" + MEM0_GRAPH_CONTEXT_TEMPLATE = """ Memories for user {user_id}: diff --git a/examples/core_memories/general_textual_memory.py b/examples/core_memories/general_textual_memory.py index 2ecbc7826..f71e2ef2e 100644 --- a/examples/core_memories/general_textual_memory.py +++ b/examples/core_memories/general_textual_memory.py @@ -1,6 +1,7 @@ from memos.configs.memory import MemoryConfigFactory from memos.memories.factory import MemoryFactory + config = MemoryConfigFactory( backend="general_text", config={ diff --git a/src/memos/mem_os/core.py b/src/memos/mem_os/core.py index 14b85d5ef..015d3774c 100644 --- a/src/memos/mem_os/core.py +++ b/src/memos/mem_os/core.py @@ -168,6 +168,14 @@ def mem_reorganizer_off(self) -> bool: if mem_cube.text_mem and mem_cube.text_mem.is_reorganize: logger.info(f"close reorganizer for {mem_cube.text_mem.config.cube_id}") mem_cube.text_mem.memory_manager.close() + mem_cube.text_mem.memory_manager.wait_reorganizer() + + def mem_reorganizer_wait(self) -> bool: + for mem_cube in self.mem_cubes.values(): + logger.info(f"try to close reorganizer for {mem_cube.text_mem.config.cube_id}") + if mem_cube.text_mem and mem_cube.text_mem.is_reorganize: + logger.info(f"close reorganizer for {mem_cube.text_mem.config.cube_id}") + mem_cube.text_mem.memory_manager.wait_reorganizer() def _register_chat_history(self, user_id: str | None = None) -> None: """Initialize chat history with user ID.""" diff --git a/src/memos/memories/textual/general.py b/src/memos/memories/textual/general.py index 2754fc1ae..4a1d90cb0 100644 --- a/src/memos/memories/textual/general.py +++ b/src/memos/memories/textual/general.py @@ -17,6 +17,7 @@ from memos.vec_dbs.factory import QdrantVecDB, VecDBFactory from memos.vec_dbs.item import VecDBItem + logger = get_logger(__name__) @@ -36,11 +37,7 @@ def __init__(self, config: GeneralTextMemoryConfig): stop=stop_after_attempt(3), retry=retry_if_exception_type(json.JSONDecodeError), before_sleep=lambda retry_state: logger.warning( - "Extracting memory failed due to JSON decode error: {error}, Attempt retry: {attempt_number} / {max_attempt_number}".format( - error=retry_state.outcome.exception(), - attempt_number=retry_state.attempt_number, - max_attempt_number=3, - ) + f"Extracting memory failed due to JSON decode error: {retry_state.outcome.exception()}, Attempt retry: {retry_state.attempt_number} / {3}" ), ) def extract(self, messages: MessageList) -> list[TextualMemoryItem]: diff --git a/src/memos/memories/textual/tree_text_memory/organize/conflict.py b/src/memos/memories/textual/tree_text_memory/organize/conflict.py index ccb86e321..2ea16ed2f 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/conflict.py +++ b/src/memos/memories/textual/tree_text_memory/organize/conflict.py @@ -2,7 +2,9 @@ import re from datetime import datetime + from dateutil import parser + from memos.embedders.base import BaseEmbedder from memos.graph_dbs.neo4j import Neo4jGraphDB from memos.llms.base import BaseLLM diff --git a/tests/memories/textual/test_general.py b/tests/memories/textual/test_general.py index 8f5bf7966..94dcd5cd3 100644 --- a/tests/memories/textual/test_general.py +++ b/tests/memories/textual/test_general.py @@ -1,10 +1,8 @@ # TODO: Overcomplex. Use pytest fixtures instead of setUp/tearDown. -import json -import os import unittest import uuid -from unittest.mock import MagicMock, mock_open, patch +from unittest.mock import MagicMock, patch from memos.configs.embedder import EmbedderConfigFactory from memos.configs.llm import LLMConfigFactory