Skip to content

Commit 2991c10

Browse files
committed
add_nodes_batch for neo4j.py
1 parent 8403414 commit 2991c10

File tree

1 file changed

+104
-0
lines changed

1 file changed

+104
-0
lines changed

src/memos/graph_dbs/neo4j.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,110 @@ def add_node(
236236
metadata=metadata,
237237
)
238238

239+
def add_nodes_batch(
240+
self,
241+
nodes: list[dict[str, Any]],
242+
user_name: str | None = None,
243+
) -> None:
244+
"""
245+
Batch add multiple memory nodes to the graph.
246+
247+
Args:
248+
nodes: List of node dictionaries, each containing:
249+
- id: str - Node ID
250+
- memory: str - Memory content
251+
- metadata: dict[str, Any] - Node metadata
252+
user_name: Optional user name (will use config default if not provided)
253+
"""
254+
if not nodes:
255+
logger.warning("[add_nodes_batch] Empty nodes list, skipping")
256+
return
257+
258+
logger.info(f"[add_nodes_batch] Adding {len(nodes)} nodes")
259+
260+
# user_name comes from parameter; fallback to config if missing
261+
effective_user_name = user_name if user_name else self.config.user_name
262+
263+
# Prepare all nodes
264+
prepared_nodes = []
265+
for node_data in nodes:
266+
try:
267+
id = node_data["id"]
268+
memory = node_data["memory"]
269+
metadata = node_data.get("metadata", {})
270+
271+
logger.debug(f"[add_nodes_batch] Processing node id: {id}")
272+
273+
# Set user_name in metadata if needed
274+
if not self.config.use_multi_db and (self.config.user_name or effective_user_name):
275+
metadata["user_name"] = effective_user_name
276+
277+
# Safely process metadata
278+
metadata = _prepare_node_metadata(metadata)
279+
280+
# Flatten info fields to top level (for Neo4j flat structure)
281+
metadata = _flatten_info_fields(metadata)
282+
283+
# Merge node and set metadata
284+
created_at = metadata.pop("created_at")
285+
updated_at = metadata.pop("updated_at")
286+
287+
# Serialization for sources
288+
if metadata.get("sources"):
289+
for idx in range(len(metadata["sources"])):
290+
metadata["sources"][idx] = json.dumps(metadata["sources"][idx])
291+
292+
prepared_nodes.append(
293+
{
294+
"id": id,
295+
"memory": memory,
296+
"created_at": created_at,
297+
"updated_at": updated_at,
298+
"metadata": metadata,
299+
}
300+
)
301+
except Exception as e:
302+
logger.error(
303+
f"[add_nodes_batch] Failed to prepare node {node_data.get('id', 'unknown')}: {e}",
304+
exc_info=True,
305+
)
306+
# Continue with other nodes
307+
continue
308+
309+
if not prepared_nodes:
310+
logger.warning("[add_nodes_batch] No valid nodes to insert after preparation")
311+
return
312+
313+
# Batch insert using Neo4j UNWIND for better performance
314+
query = """
315+
UNWIND $nodes AS node
316+
MERGE (n:Memory {id: node.id})
317+
SET n.memory = node.memory,
318+
n.created_at = datetime(node.created_at),
319+
n.updated_at = datetime(node.updated_at),
320+
n += node.metadata
321+
"""
322+
323+
# Prepare nodes data for UNWIND
324+
nodes_data = [
325+
{
326+
"id": node["id"],
327+
"memory": node["memory"],
328+
"created_at": node["created_at"],
329+
"updated_at": node["updated_at"],
330+
"metadata": node["metadata"],
331+
}
332+
for node in prepared_nodes
333+
]
334+
335+
try:
336+
with self.driver.session(database=self.db_name) as session:
337+
session.run(query, nodes=nodes_data)
338+
logger.info(f"[add_nodes_batch] Successfully inserted {len(prepared_nodes)} nodes")
339+
except Exception as e:
340+
logger.error(f"[add_nodes_batch] Failed to add nodes: {e}", exc_info=True)
341+
raise
342+
239343
def update_node(self, id: str, fields: dict[str, Any], user_name: str | None = None) -> None:
240344
"""
241345
Update node fields in Neo4j, auto-converting `created_at` and `updated_at` to datetime type if present.

0 commit comments

Comments
 (0)