Skip to content

Commit 35b192f

Browse files
authored
optimize add batch (#661)
1 parent 275ddc8 commit 35b192f

File tree

1 file changed

+101
-39
lines changed

1 file changed

+101
-39
lines changed

src/memos/graph_dbs/polardb.py

Lines changed: 101 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3348,58 +3348,120 @@ def add_nodes_batch(
33483348
with conn.cursor() as cursor:
33493349
# Process each group separately
33503350
for embedding_column, nodes_group in nodes_by_embedding_column.items():
3351-
# Delete existing records first (batch delete)
3352-
for node in nodes_group:
3351+
# Batch delete existing records using IN clause
3352+
ids_to_delete = [node["id"] for node in nodes_group]
3353+
if ids_to_delete:
33533354
delete_query = f"""
33543355
DELETE FROM {self.db_name}_graph."Memory"
3355-
WHERE id = ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring)
3356+
WHERE id IN (
3357+
SELECT ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, unnest(%s::text[])::cstring)
3358+
)
33563359
"""
3357-
cursor.execute(delete_query, (node["id"],))
3360+
cursor.execute(delete_query, (ids_to_delete,))
3361+
3362+
# Batch get graph_ids for all nodes
3363+
get_graph_ids_query = f"""
3364+
SELECT
3365+
id_val,
3366+
ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, id_val::text::cstring) as graph_id
3367+
FROM unnest(%s::text[]) as id_val
3368+
"""
3369+
cursor.execute(get_graph_ids_query, (ids_to_delete,))
3370+
graph_id_map = {row[0]: row[1] for row in cursor.fetchall()}
33583371

3359-
# Insert nodes (batch insert using executemany for better performance)
3372+
# Add graph_id to properties
33603373
for node in nodes_group:
3361-
# Get graph_id for this node
3362-
get_graph_id_query = f"""
3363-
SELECT ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring)
3364-
"""
3365-
cursor.execute(get_graph_id_query, (node["id"],))
3366-
graph_id = cursor.fetchone()[0]
3367-
node["properties"]["graph_id"] = str(graph_id)
3368-
3369-
# Insert node
3370-
if node["embedding_vector"]:
3371-
insert_query = f"""
3372-
INSERT INTO {self.db_name}_graph."Memory"(id, properties, {embedding_column})
3373-
VALUES (
3374-
ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring),
3375-
%s,
3376-
%s
3374+
graph_id = graph_id_map.get(node["id"])
3375+
if graph_id:
3376+
node["properties"]["graph_id"] = str(graph_id)
3377+
3378+
# Batch insert using VALUES with multiple rows
3379+
# Use psycopg2.extras.execute_values for efficient batch insert
3380+
from psycopg2.extras import execute_values
3381+
3382+
if embedding_column and any(node["embedding_vector"] for node in nodes_group):
3383+
# Prepare data tuples for batch insert with embedding
3384+
data_tuples = []
3385+
for node in nodes_group:
3386+
# Each tuple: (id, properties_json, embedding_json)
3387+
data_tuples.append(
3388+
(
3389+
node["id"],
3390+
json.dumps(node["properties"]),
3391+
json.dumps(node["embedding_vector"])
3392+
if node["embedding_vector"]
3393+
else None,
33773394
)
3378-
"""
3379-
logger.info(
3380-
f"[add_nodes_batch] Inserting node insert_query={insert_query}"
33813395
)
3382-
cursor.execute(
3383-
insert_query,
3396+
3397+
# Build the INSERT query template
3398+
insert_query = f"""
3399+
INSERT INTO {self.db_name}_graph."Memory"(id, properties, {embedding_column})
3400+
VALUES %s
3401+
"""
3402+
3403+
# Build the VALUES template for execute_values
3404+
# Each row: (graph_id_function, agtype, vector)
3405+
# Note: properties column is agtype, not jsonb
3406+
template = f"""
3407+
(
3408+
ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring),
3409+
%s::text::agtype,
3410+
%s::vector
3411+
)
3412+
"""
3413+
logger.info(
3414+
f"[add_nodes_batch] embedding_column Inserting insert_query:{insert_query}"
3415+
)
3416+
logger.info(
3417+
f"[add_nodes_batch] embedding_column Inserting data_tuples:{data_tuples}"
3418+
)
3419+
3420+
# Execute batch insert
3421+
execute_values(
3422+
cursor,
3423+
insert_query,
3424+
data_tuples,
3425+
template=template,
3426+
page_size=100, # Insert in batches of 100
3427+
)
3428+
else:
3429+
# Prepare data tuples for batch insert without embedding
3430+
data_tuples = []
3431+
for node in nodes_group:
3432+
# Each tuple: (id, properties_json)
3433+
data_tuples.append(
33843434
(
33853435
node["id"],
33863436
json.dumps(node["properties"]),
3387-
json.dumps(node["embedding_vector"]),
3388-
),
3389-
)
3390-
else:
3391-
insert_query = f"""
3392-
INSERT INTO {self.db_name}_graph."Memory"(id, properties)
3393-
VALUES (
3394-
ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring),
3395-
%s
33963437
)
3397-
"""
3398-
cursor.execute(
3399-
insert_query,
3400-
(node["id"], json.dumps(node["properties"])),
34013438
)
34023439

3440+
# Build the INSERT query template
3441+
insert_query = f"""
3442+
INSERT INTO {self.db_name}_graph."Memory"(id, properties)
3443+
VALUES %s
3444+
"""
3445+
3446+
# Build the VALUES template for execute_values
3447+
# Note: properties column is agtype, not jsonb
3448+
template = f"""
3449+
(
3450+
ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring),
3451+
%s::text::agtype
3452+
)
3453+
"""
3454+
logger.info(f"[add_nodes_batch] Inserting insert_query:{insert_query}")
3455+
logger.info(f"[add_nodes_batch] Inserting data_tuples:{data_tuples}")
3456+
# Execute batch insert
3457+
execute_values(
3458+
cursor,
3459+
insert_query,
3460+
data_tuples,
3461+
template=template,
3462+
page_size=100, # Insert in batches of 100
3463+
)
3464+
34033465
logger.info(
34043466
f"[add_nodes_batch] Inserted {len(nodes_group)} nodes with embedding_column={embedding_column}"
34053467
)

0 commit comments

Comments
 (0)