Skip to content

Commit f5da9b8

Browse files
fix: add safe_json_loads
1 parent 713120e commit f5da9b8

File tree

1 file changed

+49
-61
lines changed

1 file changed

+49
-61
lines changed

graphgen/models/storage/graph/kuzu_storage.py

Lines changed: 49 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import json
22
import os
3-
import shutil
43
from dataclasses import dataclass
54
from typing import Any
65

@@ -69,6 +68,16 @@ def _init_schema(self):
6968
def index_done_callback(self):
7069
"""KuzuDB is ACID, changes are immediate, but we can verify generic persistence here."""
7170

71+
@staticmethod
72+
def _safe_json_loads(data_str: str) -> dict:
73+
if not isinstance(data_str, str) or not data_str.strip():
74+
return {}
75+
try:
76+
return json.loads(data_str)
77+
except json.JSONDecodeError as e:
78+
print(f"Error decoding JSON: {e}")
79+
return {}
80+
7281
def has_node(self, node_id: str) -> bool:
7382
result = self._conn.execute(
7483
"MATCH (a:Entity {id: $id}) RETURN count(a)", {"id": node_id}
@@ -115,14 +124,7 @@ def get_node(self, node_id: str) -> Any:
115124
return None
116125

117126
data_str = result.get_next()[0]
118-
119-
if not isinstance(data_str, str) or not data_str.strip():
120-
return {}
121-
try:
122-
return json.loads(data_str)
123-
except json.JSONDecodeError as e:
124-
print(f"Error decoding JSON for node {node_id}: {e}")
125-
return None
127+
return self._safe_json_loads(data_str)
126128

127129
def update_node(self, node_id: str, node_data: dict[str, str]):
128130
current_data = self.get_node(node_id)
@@ -132,7 +134,11 @@ def update_node(self, node_id: str, node_data: dict[str, str]):
132134

133135
# Merge existing data with new data
134136
current_data.update(node_data)
135-
json_data = json.dumps(current_data, ensure_ascii=False)
137+
try:
138+
json_data = json.dumps(current_data, ensure_ascii=False)
139+
except (TypeError, ValueError) as e:
140+
print(f"Error serializing JSON for node {node_id}: {e}")
141+
return
136142

137143
self._conn.execute(
138144
"MATCH (a:Entity {id: $id}) SET a.data = $data",
@@ -145,12 +151,10 @@ def get_all_nodes(self) -> Any:
145151
nodes = []
146152
while result.has_next():
147153
row = result.get_next()
148-
node_id, data_str = row[0], row[1]
149-
try:
150-
data = json.loads(data_str) if data_str and data_str.strip() else {}
151-
except json.JSONDecodeError as e:
152-
print(f"Error decoding JSON for node {node_id}: {e}")
154+
if row is None or len(row) < 2:
153155
continue
156+
node_id, data_str = row[0], row[1]
157+
data = self._safe_json_loads(data_str)
154158
nodes.append((node_id, data))
155159
return nodes
156160

@@ -167,17 +171,7 @@ def get_edge(self, source_node_id: str, target_node_id: str):
167171
return None
168172

169173
data_str = result.get_next()[0]
170-
171-
if not isinstance(data_str, str) or not data_str.strip():
172-
return {}
173-
174-
try:
175-
return json.loads(data_str)
176-
except json.JSONDecodeError as e:
177-
print(
178-
f"Error decoding JSON for edge {source_node_id}->{target_node_id}: {e}"
179-
)
180-
return None
174+
return self._safe_json_loads(data_str)
181175

182176
def update_edge(
183177
self, source_node_id: str, target_node_id: str, edge_data: dict[str, str]
@@ -188,14 +182,20 @@ def update_edge(
188182
return
189183

190184
current_data.update(edge_data)
191-
json_data = json.dumps(current_data, ensure_ascii=False)
185+
try:
186+
json_data = json.dumps(current_data, ensure_ascii=False)
187+
except (TypeError, ValueError) as e:
188+
print(
189+
f"Error serializing JSON for edge {source_node_id}->{target_node_id}: {e}"
190+
)
191+
return
192192

193-
query = """
193+
self._conn.execute(
194+
"""
194195
MATCH (a:Entity {id: $src})-[e:Relation]->(b:Entity {id: $dst})
195196
SET e.data = $data
196-
"""
197-
self._conn.execute(
198-
query, {"src": source_node_id, "dst": target_node_id, "data": json_data}
197+
""",
198+
{"src": source_node_id, "dst": target_node_id, "data": json_data},
199199
)
200200

201201
def get_all_edges(self) -> Any:
@@ -205,12 +205,10 @@ def get_all_edges(self) -> Any:
205205
edges = []
206206
while result.has_next():
207207
row = result.get_next()
208-
src, dst, data_str = row[0], row[1], row[2]
209-
try:
210-
data = json.loads(data_str) if data_str and data_str.strip() else {}
211-
except json.JSONDecodeError as e:
212-
print(f"Error decoding JSON for edge {src}->{dst}: {e}")
208+
if row is None or len(row) < 3:
213209
continue
210+
src, dst, data_str = row[0], row[1], row[2]
211+
data = self._safe_json_loads(data_str)
214212
edges.append((src, dst, data))
215213
return edges
216214

@@ -224,12 +222,10 @@ def get_node_edges(self, source_node_id: str) -> Any:
224222
edges = []
225223
while result.has_next():
226224
row = result.get_next()
227-
src, dst, data_str = row[0], row[1], row[2]
228-
try:
229-
data = json.loads(data_str) if data_str and data_str.strip() else {}
230-
except json.JSONDecodeError as e:
231-
print(f"Error decoding JSON for edge {src}->{dst}: {e}")
225+
if row is None or len(row) < 3:
232226
continue
227+
src, dst, data_str = row[0], row[1], row[2]
228+
data = self._safe_json_loads(data_str)
233229
edges.append((src, dst, data))
234230
return edges
235231

@@ -238,7 +234,11 @@ def upsert_node(self, node_id: str, node_data: dict[str, str]):
238234
Insert or Update node.
239235
Kuzu supports MERGE clause (similar to Neo4j) to handle upserts.
240236
"""
241-
json_data = json.dumps(node_data, ensure_ascii=False)
237+
try:
238+
json_data = json.dumps(node_data, ensure_ascii=False)
239+
except (TypeError, ValueError) as e:
240+
print(f"Error serializing JSON for node {node_id}: {e}")
241+
return
242242
query = """
243243
MERGE (a:Entity {id: $id})
244244
ON MATCH SET a.data = $data
@@ -261,7 +261,13 @@ def upsert_edge(
261261
if not self.has_node(target_node_id):
262262
self.upsert_node(target_node_id, {})
263263

264-
json_data = json.dumps(edge_data, ensure_ascii=False)
264+
try:
265+
json_data = json.dumps(edge_data, ensure_ascii=False)
266+
except (TypeError, ValueError) as e:
267+
print(
268+
f"Error serializing JSON for edge {source_node_id}->{target_node_id}: {e}"
269+
)
270+
return
265271
query = """
266272
MATCH (a:Entity {id: $src}), (b:Entity {id: $dst})
267273
MERGE (a)-[e:Relation]->(b)
@@ -285,21 +291,3 @@ def clear(self):
285291

286292
def reload(self):
287293
"""For databases that need reloading, KuzuDB auto-manages this."""
288-
289-
def drop(self):
290-
if not self.db_path:
291-
return
292-
self._conn = None
293-
self._db = None
294-
295-
if os.path.isdir(self.db_path):
296-
shutil.rmtree(self.db_path)
297-
print(f"Dropped KuzuDB directory at {self.db_path}")
298-
elif os.path.isfile(self.db_path):
299-
os.remove(self.db_path)
300-
print(f"Dropped KuzuDB file at {self.db_path}")
301-
elif os.path.exists(self.db_path):
302-
os.unlink(self.db_path)
303-
print(f"Dropped KuzuDB path at {self.db_path}")
304-
else:
305-
print(f"Database path {self.db_path} does not exist")

0 commit comments

Comments
 (0)