Skip to content

Commit a5af10e

Browse files
bechbdjaidisido
andauthored
Fixed ISSUE-1232 where writing property graph dataframes where columns contained NaN caused Gremlin query errors. Added appropriate checks to ignore these. (#1233)
Co-authored-by: Dave Bechberger <[email protected]> Co-authored-by: jaidisido <[email protected]>
1 parent 14f1b97 commit a5af10e

File tree

2 files changed

+50
-4
lines changed

2 files changed

+50
-4
lines changed

awswrangler/neptune/neptune.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ def _set_properties(g: GraphTraversalSource, use_header_cardinality: bool, row:
290290
if column not in ["~id", "~label", "~to", "~from"]:
291291
# If the column header is specifying the cardinality then use it
292292
if use_header_cardinality:
293-
if column.lower().find("(single)") > 0:
293+
if column.lower().find("(single)") > 0 and pd.notna(value):
294294
g = g.property(Cardinality.single, _get_column_name(column), value)
295295
else:
296296
g = _expand_properties(g, _get_column_name(column), value)
@@ -305,7 +305,7 @@ def _expand_properties(g: GraphTraversalSource, column: str, value: Any) -> Grap
305305
if isinstance(value, list) and len(value) > 0:
306306
for item in value:
307307
g = g.property(Cardinality.set_, column, item)
308-
else:
308+
elif pd.notna(value):
309309
g = g.property(Cardinality.set_, column, value)
310310
return g
311311

tests/test_neptune.py

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import uuid
55
from typing import Any, Dict
66

7+
import numpy as np
78
import pandas as pd
89
import pytest # type: ignore
910
from gremlin_python.process.traversal import Direction, T
@@ -169,6 +170,33 @@ def test_sparql_query(neptune_endpoint, neptune_port) -> Dict[str, Any]:
169170
assert df.shape == (2, 3)
170171

171172

173+
def test_gremlin_write_different_cols(neptune_endpoint, neptune_port) -> Dict[str, Any]:
174+
client = wr.neptune.connect(neptune_endpoint, neptune_port, iam_enabled=False)
175+
id = uuid.uuid4()
176+
wr.neptune.execute_gremlin(client, f"g.addV().property(T.id, '{str(id)}')")
177+
178+
data = [_create_dummy_vertex(), _create_dummy_vertex()]
179+
del data[1]["str"]
180+
data[1]["int"] = np.nan
181+
df = pd.DataFrame(data)
182+
res = wr.neptune.to_property_graph(client, df)
183+
assert res
184+
185+
data = [_create_dummy_edge(), _create_dummy_edge()]
186+
del data[1]["str"]
187+
data[1]["int"] = np.nan
188+
df = pd.DataFrame(data)
189+
res = wr.neptune.to_property_graph(client, df)
190+
assert res
191+
192+
data = [{"~id": id, "age(single)": 50, "name": "foo"}, {"~id": id, "age(single)": 55}, {"~id": id, "name": "foo"}]
193+
df = pd.DataFrame(data)
194+
res = wr.neptune.to_property_graph(client, df)
195+
res = wr.neptune.execute_gremlin(client, f"g.V('{id}').valueMap().with(WithOptions.tokens)")
196+
saved_row = res.iloc[0]
197+
assert saved_row["age"] == 55
198+
199+
172200
def test_gremlin_write_updates(neptune_endpoint, neptune_port) -> Dict[str, Any]:
173201
client = wr.neptune.connect(neptune_endpoint, neptune_port, iam_enabled=False)
174202
id = uuid.uuid4()
@@ -191,12 +219,14 @@ def test_gremlin_write_updates(neptune_endpoint, neptune_port) -> Dict[str, Any]
191219
res = wr.neptune.execute_gremlin(client, f"g.V('{id}').valueMap().with(WithOptions.tokens)")
192220
saved_row = res.iloc[0]
193221
assert saved_row["age"] == 55
194-
assert saved_row["name"] == ["foo", "bar"]
222+
assert "foo" in saved_row["name"]
223+
assert "bar" in saved_row["name"]
195224
res = wr.neptune.to_property_graph(client, df, use_header_cardinality=False)
196225
res = wr.neptune.execute_gremlin(client, f"g.V('{id}').valueMap().with(WithOptions.tokens)")
197226
saved_row = res.iloc[0]
198227
assert saved_row["age(single)"] == 55
199-
assert saved_row["name"] == ["foo", "bar"]
228+
assert "foo" in saved_row["name"]
229+
assert "bar" in saved_row["name"]
200230

201231

202232
def test_gremlin_write_vertices(neptune_endpoint, neptune_port) -> Dict[str, Any]:
@@ -298,6 +328,22 @@ def test_gremlin_write_edges(neptune_endpoint, neptune_port) -> Dict[str, Any]:
298328
assert batch_cnt_df.iloc[0][0] == final_cnt_df.iloc[0][0] + 50
299329

300330

331+
def test_sparql_write_different_cols(neptune_endpoint, neptune_port) -> Dict[str, Any]:
332+
client = wr.neptune.connect(neptune_endpoint, neptune_port, iam_enabled=False)
333+
334+
data = [_create_dummy_triple(), _create_dummy_triple()]
335+
del data[1]["o"]
336+
df = pd.DataFrame(data)
337+
res = wr.neptune.to_rdf_graph(client, df)
338+
assert res
339+
340+
data = [_create_dummy_quad(), _create_dummy_quad()]
341+
del data[1]["o"]
342+
df = pd.DataFrame(data)
343+
res = wr.neptune.to_rdf_graph(client, df)
344+
assert res
345+
346+
301347
def test_sparql_write_triples(neptune_endpoint, neptune_port) -> Dict[str, Any]:
302348
client = wr.neptune.connect(neptune_endpoint, neptune_port, iam_enabled=False)
303349
initial_df = wr.neptune.execute_sparql(client, "SELECT ?p ?o WHERE { <foo> ?p ?o .}")

0 commit comments

Comments
 (0)