Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
a7eb168
feat: define mem-read schedular message&consumer; add async mem-reade…
CaralHsi Sep 28, 2025
8a24ec7
feat: add fast/fine mode in mem-reader;
CaralHsi Sep 28, 2025
81915a3
feat: add mem-reader in scheduler
CaralHsi Sep 29, 2025
e875ca0
fix: conflict
CaralHsi Oct 13, 2025
b5086e7
feat: change async remove
CaralHsi Oct 13, 2025
b43a9ff
Merge branch 'dev' of github.com:MemTensor/MemOS into feat/Async-add
CaralHsi Oct 14, 2025
0b2649d
feat: modify async-add in core.py
CaralHsi Oct 14, 2025
9dd632f
feat: add 'remove and refresh memory in schedular'
CaralHsi Oct 14, 2025
8c97058
feat: add naive fast mode in mem-reader
CaralHsi Oct 14, 2025
3e08a82
feat: finish fast mode in mem-reader
CaralHsi Oct 15, 2025
37fcff8
feat: add token-based window splitting and concurrency improvements
CaralHsi Oct 16, 2025
5f7e8e0
feat: add split chunker into mode in simple struct mem reader
CaralHsi Oct 16, 2025
2355527
feat: update async-mode add
CaralHsi Oct 16, 2025
2ee4c4c
chore: update gitignore
CaralHsi Oct 16, 2025
593faa5
feat: improve database note write performance
CaralHsi Oct 17, 2025
8d2263a
feat: fix mem-read scheduler
CaralHsi Oct 20, 2025
e250ab8
fix: nebula group-by bug
CaralHsi Oct 20, 2025
14e986e
fix: bug in adding mem scheduler
CaralHsi Oct 20, 2025
1609703
Merge branch 'dev' into feat/Async-add
CaralHsi Oct 21, 2025
31adec0
fix: nebula index; mem-reader chat-time;
CaralHsi Oct 21, 2025
8628a37
fix: conflict
CaralHsi Oct 21, 2025
18f3cc8
format: searcher
CaralHsi Oct 21, 2025
4e0133e
fix: some bug in shceduler and mem-reader
CaralHsi Oct 21, 2025
6653bea
feat: add mem-organize in scheduler
CaralHsi Oct 21, 2025
af5c940
feat: add tree.mode to config; modify scheduler config
CaralHsi Oct 21, 2025
0c6e68b
Merge branch 'dev' into feat/Async-add
CaralHsi Oct 21, 2025
28a20e9
fix: test bug
CaralHsi Oct 21, 2025
b29aa02
Merge branch 'feat/Async-add' of github.com:CaralHsi/MemOSRealPublic …
CaralHsi Oct 21, 2025
d00a553
feat: add organize handler and submit reorganize scheduler
CaralHsi Oct 22, 2025
1f735c5
feat: move all async organization modules in scheduler; add user_name…
CaralHsi Oct 22, 2025
e0f089e
feat: modify tree_textual_memory example
CaralHsi Oct 22, 2025
2ac8201
feat: modify reorganizer and add passing user_name in relation_reason…
CaralHsi Oct 22, 2025
11d4f00
feat: delete reorganize task switch button in core
CaralHsi Oct 28, 2025
87bc80e
feat: fix get candidate nodes; add get neighbors; [TODO]: neo4j and p…
CaralHsi Oct 28, 2025
afa5bcf
feat: update reorganize
CaralHsi Oct 28, 2025
b09552f
fix: mem-reader
CaralHsi Oct 28, 2025
3ad9c8d
feat: update reorganize scheduler
CaralHsi Oct 28, 2025
73ca83b
fix: conflict
CaralHsi Oct 28, 2025
bd47a49
fix: core.py
CaralHsi Oct 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@
)
embedder = EmbedderFactory.from_config(embedder_config)

user_name = "lucy4"

# === Step 2: Initialize Neo4j GraphStore ===
graph_config = GraphDBConfigFactory(
backend="neo4j",
config={
"uri": "bolt://localhost:7687",
"user": "neo4j",
"password": "12345678",
"db_name": "lucy4",
"db_name": user_name,
"auto_create": True,
},
)
Expand Down Expand Up @@ -178,6 +180,7 @@

results = relation_detector.process_node(
node=node,
user_name=user_name,
exclude_ids=[node.id], # Exclude self when searching for neighbors
top_k=5,
)
Expand Down
13 changes: 8 additions & 5 deletions examples/core_memories/tree_textual_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@
added_ids = my_tree_textual_memory.add(m_list)
for i, id in enumerate(added_ids):
print(f"{i}'th added result is:" + my_tree_textual_memory.get(id).memory)
my_tree_textual_memory.memory_manager.wait_reorganizer()
# wait the synchronous thread
# TODO: USE SCHEDULE MODULE TO WAIT

time.sleep(60)

Expand Down Expand Up @@ -233,7 +234,8 @@

for m_list in doc_memory:
added_ids = my_tree_textual_memory.add(m_list)
my_tree_textual_memory.memory_manager.wait_reorganizer()
# wait the synchronous thread
# TODO: USE SCHEDULE MODULE TO WAIT

results = my_tree_textual_memory.search(
"Tell me about what memos consist of?",
Expand All @@ -245,9 +247,10 @@
print(f"{i}'th similar result is: " + str(r["memory"]))
print(f"Successfully search {len(results)} memories")

# close the synchronous thread in memory manager
my_tree_textual_memory.memory_manager.close()
# close the synchronous thread
# TODO: USE SCHEDULE MODULE TO CLOSE

# my_tree_textual_memory.dump
# Note that you cannot drop this tree when`use_multi_db` ==
# false. my_tree_textual_memory.drop() """
my_tree_textual_memory.dump("tmp/my_tree_textual_memory")
my_tree_textual_memory.drop()
2 changes: 1 addition & 1 deletion src/memos/graph_dbs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def edge_exists(self, source_id: str, target_id: str, type: str) -> bool:

# Graph Query & Reasoning
@abstractmethod
def get_node(self, id: str, include_embedding: bool = False) -> dict[str, Any] | None:
def get_node(self, id: str, include_embedding: bool = False, **kwargs) -> dict[str, Any] | None:
"""
Retrieve the metadata and content of a node.
Args:
Expand Down
129 changes: 108 additions & 21 deletions src/memos/graph_dbs/nebular.py
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ def get_grouped_counts(
group_by_fields.append(alias)
# Full GQL query construction
gql = f"""
MATCH (n /*+ INDEX(idx_memory_user_name) */)
MATCH (n@Memory /*+ INDEX(idx_memory_user_name) */)
{where_clause}
RETURN {", ".join(return_fields)}, COUNT(n) AS count
"""
Expand Down Expand Up @@ -1381,31 +1381,55 @@ def get_structure_optimization_candidates(
where_clause += f' AND n.user_name = "{user_name}"'

return_fields = self._build_return_fields(include_embedding)
return_fields += f", n.{self.dim_field} AS {self.dim_field}"

query = f"""
gql = f"""
MATCH (n@Memory /*+ INDEX(idx_memory_user_name) */)
WHERE {where_clause}
OPTIONAL MATCH (n)-[@PARENT]->(c@Memory)
OPTIONAL MATCH (p@Memory)-[@PARENT]->(n)
WHERE c IS NULL AND p IS NULL
RETURN {return_fields}
OPTIONAL MATCH (n)-[@PARENT]->(c@Memory {{user_name: "{user_name}"}})
OPTIONAL MATCH (p@Memory {{user_name: "{user_name}"}})-[@PARENT]->(n)
RETURN {return_fields},
c.id AS child_id,
p.id AS parent_id
"""

candidates = []
node_ids = set()
per_node_seen_has_child_or_parent: dict[str, bool] = {}
per_node_payload: dict[str, dict] = {}

try:
results = self.execute_query(query)
for row in results:
props = {k: v.value for k, v in row.items()}
node = self._parse_node(props)
node_id = node["id"]
if node_id not in node_ids:
candidates.append(node)
node_ids.add(node_id)
results = self.execute_query(gql)
except Exception as e:
logger.error(f"Failed : {e}, traceback: {traceback.format_exc()}")
return candidates
logger.error(
f"[get_structure_optimization_candidates] Query failed: {e}, "
f"traceback: {traceback.format_exc()}"
)
return []

for row in results:
props = {k: v.value for k, v in row.items() if k not in ("child_id", "parent_id")}
node = self._parse_node(props)
nid = node["id"]

if nid not in per_node_payload:
per_node_payload[nid] = node
per_node_seen_has_child_or_parent[nid] = False

child_val = row.get("child_id")
parent_val = row.get("parent_id")

child_unwrapped = self._parse_value(child_val) if (child_val is not None) else None
parent_unwrapped = self._parse_value(parent_val) if (parent_val is not None) else None

if child_unwrapped:
per_node_seen_has_child_or_parent[nid] = True
if parent_unwrapped:
per_node_seen_has_child_or_parent[nid] = True

isolated_nodes: list[dict] = []
for nid, node_obj in per_node_payload.items():
if not per_node_seen_has_child_or_parent[nid]:
isolated_nodes.append(node_obj)

return isolated_nodes

@timed
def drop_database(self) -> None:
Expand Down Expand Up @@ -1450,7 +1474,7 @@ def get_context_chain(self, id: str, type: str = "FOLLOWS") -> list[str]:

@timed
def get_neighbors(
self, id: str, type: str, direction: Literal["in", "out", "both"] = "out"
self, id: str, type: str, direction: Literal["in", "out", "both"] = "both"
) -> list[str]:
"""
Get connected node IDs in a specific direction and relationship type.
Expand All @@ -1461,7 +1485,70 @@ def get_neighbors(
Returns:
List of neighboring node IDs.
"""
raise NotImplementedError
if direction not in ("in", "out", "both"):
raise ValueError(f"Unsupported direction: {direction}")

user_name = self.config.user_name
id_val = self._format_value(id) # e.g. '"5225-uuid..."'
user_val = self._format_value(user_name) # e.g. '"lme_user_1"'
edge_type = type # assume caller passes valid edge tag

def _run_out_query() -> list[str]:
# out: (this)-[edge_type]->(dst)
gql = f"""
MATCH (src@Memory {{id: {id_val}, user_name: {user_val}}})
-[r@{edge_type}]->
(dst@Memory {{user_name: {user_val}}})
RETURN DISTINCT dst.id AS neighbor
""".strip()
try:
result = self.execute_query(gql)
except Exception as e:
logger.error(f"[get_neighbors][out] Query failed: {e}, gql={gql}")
return []

out_ids = []
try:
for row in result:
out_ids.append(row["neighbor"].value)
except Exception as e:
logger.error(f"[get_neighbors][out] Parse failed: {e}")
return out_ids

def _run_in_query() -> list[str]:
# in: (src)-[edge_type]->(this)
gql = f"""
MATCH (src@Memory {{user_name: {user_val}}})
-[r@{edge_type}]->
(dst@Memory {{id: {id_val}, user_name: {user_val}}})
RETURN DISTINCT src.id AS neighbor
""".strip()
try:
result = self.execute_query(gql)
except Exception as e:
logger.error(f"[get_neighbors][in] Query failed: {e}, gql={gql}")
return []

in_ids = []
try:
for row in result:
in_ids.append(row["neighbor"].value)
except Exception as e:
logger.error(f"[get_neighbors][in] Parse failed: {e}")
return in_ids

if direction == "out":
return list(set(_run_out_query()))
elif direction == "in":
return list(set(_run_in_query()))
else: # direction == "both"
out_ids = _run_out_query()
in_ids = _run_in_query()
merged = set(out_ids)
merged.update(in_ids)
if id in merged:
merged.remove(id)
return list(merged)

@timed
def get_path(self, source_id: str, target_id: str, max_depth: int = 3) -> list[str]:
Expand Down
54 changes: 24 additions & 30 deletions src/memos/mem_os/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from memos.mem_scheduler.schemas.general_schemas import (
ADD_LABEL,
ANSWER_LABEL,
MEM_ORGANIZE_LABEL,
MEM_READ_LABEL,
PREF_ADD_LABEL,
QUERY_LABEL,
Expand Down Expand Up @@ -166,25 +167,6 @@ def mem_scheduler_off(self) -> bool:
logger.error(f"Failed to stop scheduler: {e!s}")
return False

def mem_reorganizer_on(self) -> bool:
pass

def mem_reorganizer_off(self) -> bool:
"""temporally implement"""
for mem_cube in self.mem_cubes.values():
logger.info(f"try to close reorganizer for {mem_cube.text_mem.config.cube_id}")
if mem_cube.text_mem and mem_cube.text_mem.is_reorganize:
logger.info(f"close reorganizer for {mem_cube.text_mem.config.cube_id}")
mem_cube.text_mem.memory_manager.close()
mem_cube.text_mem.memory_manager.wait_reorganizer()

def mem_reorganizer_wait(self) -> bool:
for mem_cube in self.mem_cubes.values():
logger.info(f"try to close reorganizer for {mem_cube.text_mem.config.cube_id}")
if mem_cube.text_mem and mem_cube.text_mem.is_reorganize:
logger.info(f"close reorganizer for {mem_cube.text_mem.config.cube_id}")
mem_cube.text_mem.memory_manager.wait_reorganizer()

def _register_chat_history(
self, user_id: str | None = None, session_id: str | None = None
) -> None:
Expand Down Expand Up @@ -727,9 +709,12 @@ def add(
f"time add: get mem_cube_id time user_id: {target_user_id} time is: {time.time() - time_start}"
)

time_start_0 = time.time()
if mem_cube_id not in self.mem_cubes:
raise ValueError(f"MemCube '{mem_cube_id}' is not loaded. Please register.")

logger.info(
f"time add: get mem_cube_id check in mem_cubes time user_id: {target_user_id} time is: {time.time() - time_start_0}"
)
sync_mode = self.mem_cubes[mem_cube_id].text_mem.mode
if sync_mode == "async":
assert self.mem_scheduler is not None, (
Expand Down Expand Up @@ -779,16 +764,25 @@ def process_textual_memory():
timestamp=datetime.utcnow(),
)
self.mem_scheduler.submit_messages(messages=[message_item])

message_item = ScheduleMessageItem(
user_id=target_user_id,
mem_cube_id=mem_cube_id,
mem_cube=mem_cube,
label=ADD_LABEL,
content=json.dumps(mem_ids),
timestamp=datetime.utcnow(),
)
self.mem_scheduler.submit_messages(messages=[message_item])
elif sync_mode == "sync":
message_item = ScheduleMessageItem(
user_id=user_id,
mem_cube_id=mem_cube_id,
mem_cube=mem_cube,
label=MEM_ORGANIZE_LABEL,
content=json.dumps(mem_ids),
timestamp=datetime.utcnow(),
)
self.mem_scheduler.submit_messages(messages=[message_item])
message_item = ScheduleMessageItem(
user_id=target_user_id,
mem_cube_id=mem_cube_id,
mem_cube=mem_cube,
label=ADD_LABEL,
content=json.dumps(mem_ids),
timestamp=datetime.utcnow(),
)
self.mem_scheduler.submit_messages(messages=[message_item])

def process_preference_memory():
if (
Expand Down
3 changes: 2 additions & 1 deletion src/memos/mem_os/product.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import random
import time
import traceback

from collections.abc import Generator
from datetime import datetime
Expand Down Expand Up @@ -215,7 +216,7 @@ def _restore_user_instances(
logger.error(f"Failed to restore user configuration for {user_id}: {e}")

except Exception as e:
logger.error(f"Error during user instance restoration: {e}")
logger.error(f"Error during user instance restoration: {e}: {traceback.print_exc()}")

def _initialize_cube_from_default_config(
self, cube_id: str, user_id: str, default_config: GeneralMemCubeConfig
Expand Down
3 changes: 2 additions & 1 deletion src/memos/mem_reader/simple_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
_ENC = tiktoken.get_encoding("cl100k_base")

def _count_tokens_text(s: str) -> int:
return len(_ENC.encode(s or ""))
# allow special tokens like <|endoftext|> instead of raising ValueError
return len(_ENC.encode(s or "", disallowed_special=()))
except Exception:
# Heuristic fallback: zh chars ~1 token, others ~1 token per ~4 chars
def _count_tokens_text(s: str) -> int:
Expand Down
3 changes: 3 additions & 0 deletions src/memos/mem_scheduler/base_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ def __init__(self, config: BaseSchedulerConfig):
self.max_internal_message_queue_size = self.config.get(
"max_internal_message_queue_size", DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE
)
self.memos_message_queue: Queue[ScheduleMessageItem] = Queue(
maxsize=self.max_internal_message_queue_size
)

# Initialize message queue based on configuration
if self.use_redis_queue:
Expand Down
Loading