Skip to content

Commit 2e6c0aa

Browse files
fridayLCaralHsi
andauthored
Feat: set add memory batch (#678)
* feat: update memos headers * feat: headers add * feat: update search agent * feat: upadte mem story * feat: update mem scehduler * feat: update deepsearch mem code * feat: update deepsearch agent * feat: update test code * fix: remove dup config * feat: dock search pipeline * fix: code test * feat: add test scripts * feat: add test * feat: update need_raw process * fix: add initter * fix: change agent search func name * feat: update logs and defined * feat: update full text mem search * feat: cp plugin to dev * feat: add one recall for fulltext retrieval * fix: set default for fulltext search * feat: add langchain chunk * feat: fix playground for query * feat: update file content memory extract * feat: update code * feat: update import * code: reformat suffix * feat: update file_id * remove langchain-text-splitters==1.0.0 * feat: add reqiuement * feat: make test * feat: fix markdown * feat: fix simple chunker * feat: add file sources * feat: add concat doc source * add: file_info * remove:macos-13 * feat: fix ffideids * fix: fix filed ids data * feat: add set batch insert memory --------- Co-authored-by: CaralHsi <[email protected]>
1 parent 321f843 commit 2e6c0aa

File tree

1 file changed

+123
-12
lines changed
  • src/memos/memories/textual/tree_text_memory/organize

1 file changed

+123
-12
lines changed

src/memos/memories/textual/tree_text_memory/organize/manager.py

Lines changed: 123 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,44 @@ def __init__(
8585
self._merged_threshold = merged_threshold
8686

8787
def add(
88-
self, memories: list[TextualMemoryItem], user_name: str | None = None, mode: str = "sync"
88+
self,
89+
memories: list[TextualMemoryItem],
90+
user_name: str | None = None,
91+
mode: str = "sync",
92+
use_batch: bool = True,
8993
) -> list[str]:
9094
"""
91-
Add new memories in parallel to different memory types.
95+
Add new memories to different memory types.
96+
97+
Args:
98+
memories: List of memory items to add.
99+
user_name: Optional user name for the memories.
100+
mode: "sync" to cleanup and refresh after adding, "async" to skip.
101+
use_batch: If True, use batch database operations (more efficient for large batches).
102+
If False, use parallel single-node operations (original behavior).
103+
104+
Returns:
105+
List of added memory IDs.
92106
"""
93107
added_ids: list[str] = []
108+
if use_batch:
109+
added_ids = self._add_memories_batch(memories, user_name)
110+
else:
111+
added_ids = self._add_memories_parallel(memories, user_name)
112+
113+
if mode == "sync":
114+
self._cleanup_working_memory(user_name)
115+
self._refresh_memory_size(user_name=user_name)
116+
117+
return added_ids
94118

119+
def _add_memories_parallel(
120+
self, memories: list[TextualMemoryItem], user_name: str | None = None
121+
) -> list[str]:
122+
"""
123+
Add memories using parallel single-node operations (original behavior).
124+
"""
125+
added_ids: list[str] = []
95126
with ContextThreadPoolExecutor(max_workers=10) as executor:
96127
futures = {executor.submit(self._process_memory, m, user_name): m for m in memories}
97128
for future in as_completed(futures, timeout=500):
@@ -100,21 +131,101 @@ def add(
100131
added_ids.extend(ids)
101132
except Exception as e:
102133
logger.exception("Memory processing error: ", exc_info=e)
134+
return added_ids
103135

104-
if mode == "sync":
105-
for mem_type in ["WorkingMemory"]:
106-
try:
107-
self.graph_store.remove_oldest_memory(
108-
memory_type="WorkingMemory",
109-
keep_latest=self.memory_size[mem_type],
110-
user_name=user_name,
136+
def _add_memories_batch(
137+
self, memories: list[TextualMemoryItem], user_name: str | None = None
138+
) -> list[str]:
139+
"""
140+
Add memories using batch database operations (more efficient for large batches).
141+
"""
142+
if not memories:
143+
return []
144+
145+
added_ids: list[str] = []
146+
working_nodes: list[dict] = []
147+
graph_nodes: list[dict] = []
148+
graph_node_ids: list[str] = []
149+
150+
for memory in memories:
151+
working_id = str(uuid.uuid4())
152+
153+
# Prepare WorkingMemory node (skip for ToolSchemaMemory and ToolTrajectoryMemory)
154+
if memory.metadata.memory_type not in ("ToolSchemaMemory", "ToolTrajectoryMemory"):
155+
working_metadata = memory.metadata.model_copy(
156+
update={"memory_type": "WorkingMemory"}
157+
).model_dump(exclude_none=True)
158+
working_metadata["updated_at"] = datetime.now().isoformat()
159+
working_nodes.append(
160+
{
161+
"id": working_id,
162+
"memory": memory.memory,
163+
"metadata": working_metadata,
164+
}
165+
)
166+
167+
# Prepare graph memory node (LongTermMemory/UserMemory/ToolSchemaMemory/ToolTrajectoryMemory)
168+
if memory.metadata.memory_type in (
169+
"LongTermMemory",
170+
"UserMemory",
171+
"ToolSchemaMemory",
172+
"ToolTrajectoryMemory",
173+
):
174+
graph_node_id = str(uuid.uuid4())
175+
metadata_dict = memory.metadata.model_dump(exclude_none=True)
176+
metadata_dict["updated_at"] = datetime.now().isoformat()
177+
178+
# Add working_binding for fast mode
179+
tags = metadata_dict.get("tags") or []
180+
if "mode:fast" in tags:
181+
prev_bg = metadata_dict.get("background", "") or ""
182+
binding_line = f"[working_binding:{working_id}] direct built from raw inputs"
183+
metadata_dict["background"] = (
184+
f"{prev_bg} || {binding_line}" if prev_bg else binding_line
111185
)
112-
except Exception:
113-
logger.warning(f"Remove {mem_type} error: {traceback.format_exc()}")
114186

115-
self._refresh_memory_size(user_name=user_name)
187+
graph_nodes.append(
188+
{
189+
"id": graph_node_id,
190+
"memory": memory.memory,
191+
"metadata": metadata_dict,
192+
}
193+
)
194+
graph_node_ids.append(graph_node_id)
195+
added_ids.append(graph_node_id)
196+
197+
# Batch insert nodes
198+
if working_nodes:
199+
try:
200+
self.graph_store.add_nodes_batch(working_nodes, user_name=user_name)
201+
except Exception as e:
202+
logger.exception("Batch add WorkingMemory nodes error: ", exc_info=e)
203+
204+
if graph_nodes:
205+
try:
206+
self.graph_store.add_nodes_batch(graph_nodes, user_name=user_name)
207+
except Exception as e:
208+
logger.exception("Batch add graph memory nodes error: ", exc_info=e)
209+
210+
# Notify reorganizer (only if enabled)
211+
if graph_node_ids and self.is_reorganize:
212+
self.reorganizer.add_message(QueueMessage(op="add", after_node=graph_node_ids))
213+
116214
return added_ids
117215

216+
def _cleanup_working_memory(self, user_name: str | None = None) -> None:
217+
"""
218+
Remove oldest WorkingMemory nodes to keep within size limit.
219+
"""
220+
try:
221+
self.graph_store.remove_oldest_memory(
222+
memory_type="WorkingMemory",
223+
keep_latest=self.memory_size["WorkingMemory"],
224+
user_name=user_name,
225+
)
226+
except Exception:
227+
logger.warning(f"Remove WorkingMemory error: {traceback.format_exc()}")
228+
118229
def replace_working_memory(
119230
self, memories: list[TextualMemoryItem], user_name: str | None = None
120231
) -> None:

0 commit comments

Comments
 (0)