Skip to content

Commit 411fc44

Browse files
bechbdmalachi-constantjaidisido
authored
Fixed ISSUE-1406 (#1407)
* Fixed ISSUE-1406 where edge data frames were having properties set to 'set' cardinality which is not supported. Version prior to 1.1.1.0 were not throwing an error * Fixed formatting issue and updated SPARQL examples in tutorial with more accurate examples * Fixed validation issue with the client needing to ensure something is a string * Fixed issue with Flake8 validation * Minor - Remove quarantine in Neptune tests Co-authored-by: Dave Bechberger <[email protected]> Co-authored-by: Lucas Hanson <[email protected]> Co-authored-by: Abdel Jaidi <[email protected]>
1 parent 5961d14 commit 411fc44

File tree

4 files changed

+55
-28
lines changed

4 files changed

+55
-28
lines changed

awswrangler/neptune/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ def _execute_sparql(self, query: str, headers: Any) -> Any:
249249

250250
s = SPARQLWrapper("")
251251
s.setQuery(query)
252-
query_type = s.queryType.upper()
252+
query_type = str(s.queryType).upper()
253253
if query_type in ["SELECT", "CONSTRUCT", "ASK", "DESCRIBE"]:
254254
data = {"query": query}
255255
else:

awswrangler/neptune/neptune.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from typing import Any, Callable, TypeVar
88

99
import pandas as pd
10+
from gremlin_python.process.anonymous_traversal import traversal
1011
from gremlin_python.process.graph_traversal import GraphTraversalSource, __
1112
from gremlin_python.process.translator import Translator
1213
from gremlin_python.process.traversal import Cardinality, T
@@ -175,7 +176,7 @@ def to_property_graph(
175176
... )
176177
"""
177178
# check if ~id and ~label column exist and if not throw error
178-
g = Graph().traversal()
179+
g = traversal().withGraph(Graph())
179180
is_edge_df = False
180181
is_update_df = True
181182
if "~id" in df.columns:
@@ -307,18 +308,23 @@ def _get_column_name(column: str) -> str:
307308
return column
308309

309310

310-
def _set_properties(g: GraphTraversalSource, use_header_cardinality: bool, row: Any) -> GraphTraversalSource:
311+
def _set_properties(
312+
g: GraphTraversalSource, use_header_cardinality: bool, row: Any, ignore_cardinality: bool = False
313+
) -> GraphTraversalSource:
311314
for (column, value) in row.items():
312315
if column not in ["~id", "~label", "~to", "~from"]:
313-
# If the column header is specifying the cardinality then use it
314-
if use_header_cardinality:
315-
if column.lower().find("(single)") > 0 and pd.notna(value):
316-
g = g.property(Cardinality.single, _get_column_name(column), value)
317-
else:
318-
g = _expand_properties(g, _get_column_name(column), value)
316+
if ignore_cardinality and pd.notna(value):
317+
g = g.property(_get_column_name(column), value)
319318
else:
320-
# If not using header cardinality then use the default of set
321-
g = _expand_properties(g, column, value)
319+
# If the column header is specifying the cardinality then use it
320+
if use_header_cardinality:
321+
if column.lower().find("(single)") > 0 and pd.notna(value):
322+
g = g.property(Cardinality.single, _get_column_name(column), value)
323+
else:
324+
g = _expand_properties(g, _get_column_name(column), value)
325+
else:
326+
# If not using header cardinality then use the default of set
327+
g = _expand_properties(g, column, value)
322328
return g
323329

324330

@@ -361,7 +367,7 @@ def _build_gremlin_insert_edges(
361367
.coalesce(__.unfold(), _build_gremlin_insert_vertices(__, {"~id": row["~to"], "~label": "Vertex"}))
362368
)
363369
)
364-
g = _set_properties(g, use_header_cardinality, row)
370+
g = _set_properties(g, use_header_cardinality, row, ignore_cardinality=True)
365371

366372
return g
367373

@@ -370,6 +376,9 @@ def _run_gremlin_insert(client: NeptuneClient, g: GraphTraversalSource) -> bool:
370376
translator = Translator("g")
371377
s = translator.translate(g.bytecode)
372378
s = s.replace("Cardinality.", "") # hack to fix parser error for set cardinality
379+
s = s.replace(
380+
".values('shape')", ""
381+
) # hack to fix parser error for adding unknown values('shape') steps to translation.
373382
_logger.debug(s)
374383
res = client.write_gremlin(s)
375384
return res

tests/test_neptune.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,19 @@ def test_sparql_query(neptune_endpoint, neptune_port) -> Dict[str, Any]:
170170
assert df.shape == (2, 3)
171171

172172

173-
@pytest.mark.skip(reason="In quarantine due to breaking change in Neptune engine.")
173+
def test_write_vertex_property_nan(neptune_endpoint, neptune_port) -> Dict[str, Any]:
174+
client = wr.neptune.connect(neptune_endpoint, neptune_port, iam_enabled=False)
175+
id = uuid.uuid4()
176+
wr.neptune.execute_gremlin(client, f"g.addV().property(T.id, '{str(id)}')")
177+
178+
data = [_create_dummy_edge(), _create_dummy_edge()]
179+
del data[1]["str"]
180+
data[1]["int"] = np.nan
181+
df = pd.DataFrame(data)
182+
res = wr.neptune.to_property_graph(client, df)
183+
assert res
184+
185+
174186
def test_gremlin_write_different_cols(neptune_endpoint, neptune_port) -> Dict[str, Any]:
175187
client = wr.neptune.connect(neptune_endpoint, neptune_port, iam_enabled=False)
176188
id = uuid.uuid4()
@@ -293,7 +305,6 @@ def test_gremlin_write_vertices(neptune_endpoint, neptune_port) -> Dict[str, Any
293305
assert len(saved_row["str"]) == 2
294306

295307

296-
@pytest.mark.skip(reason="In quarantine due to breaking change in Neptune engine.")
297308
def test_gremlin_write_edges(neptune_endpoint, neptune_port) -> Dict[str, Any]:
298309
client = wr.neptune.connect(neptune_endpoint, neptune_port, iam_enabled=False)
299310

tutorials/033 - Amazon Neptune.ipynb

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,29 +27,29 @@
2727
{
2828
"cell_type": "code",
2929
"execution_count": null,
30-
"outputs": [],
31-
"source": [
32-
"!pip install awswrangler[sparql]"
33-
],
3430
"metadata": {
3531
"collapsed": false,
3632
"pycharm": {
3733
"name": "#%%\n"
3834
}
39-
}
35+
},
36+
"outputs": [],
37+
"source": [
38+
"!pip install awswrangler[sparql]"
39+
]
4040
},
4141
{
4242
"cell_type": "markdown",
43+
"metadata": {
44+
"collapsed": false
45+
},
4346
"source": [
4447
"## Initialize\n",
4548
"\n",
4649
"The first step to using AWS Data Wrangler with Amazon Neptune is to import the library and create a client connection.\n",
4750
"\n",
4851
"<div style=\"background-color:#eeeeee; padding:10px; text-align:left; border-radius:10px; margin-top:10px; margin-bottom:10px; \"><b>Note</b>: Connecting to Amazon Neptune requires that the application you are running has access to the Private VPC where Neptune is located. Without this access you will not be able to connect using AWS Data Wrangler.</div>"
49-
],
50-
"metadata": {
51-
"collapsed": false
52-
}
52+
]
5353
},
5454
{
5555
"cell_type": "code",
@@ -124,7 +124,10 @@
124124
"metadata": {},
125125
"outputs": [],
126126
"source": [
127-
"query = \"SELECT ?s ?o WHERE { ?s ?p ?o .} LIMIT 5\"\n",
127+
"query = \"\"\"\n",
128+
" PREFIX foaf: <https://xmlns.com/foaf/0.1/> \n",
129+
" PREFIX ex: <https://www.example.com/> \n",
130+
" SELECT ?firstName WHERE { ex:JaneDoe foaf:knows ?person . ?person foaf:firstName ?firstName }\"\"\"\n",
128131
"df = wr.neptune.execute_sparql(client, query)\n",
129132
"display(df.head(5))"
130133
]
@@ -316,15 +319,17 @@
316319
"source": [
317320
"def _create_dummy_triple():\n",
318321
" data = dict()\n",
319-
" data[\"s\"] = \"foo\"\n",
322+
" data[\"s\"] = \"http://example.com/resources/foo\"\n",
320323
" data[\"p\"] = uuid.uuid4()\n",
321324
" data[\"o\"] = random.randint(0, 1000)\n",
322325
" return data\n",
323326
"\n",
324327
"data = [_create_dummy_triple(), _create_dummy_triple(), _create_dummy_triple()]\n",
325328
"df = pd.DataFrame(data)\n",
326329
"res = wr.neptune.to_rdf_graph(client, df)\n",
327-
"query = \"SELECT ?o WHERE { <foo> <\" + str(data[0]['p']) + \"> ?o .}\"\n",
330+
"query = \"\"\"\n",
331+
" PREFIX foo: <http://example.com/resources/>\n",
332+
" SELECT ?o WHERE { <foo:foo> <\" + str(data[0]['p']) + \"> ?o .}\"\"\"\n",
328333
"df = wr.neptune.execute_sparql(client, query)\n",
329334
"display(df)"
330335
]
@@ -352,7 +357,9 @@
352357
"data = [_create_dummy_quad(), _create_dummy_quad(), _create_dummy_quad()]\n",
353358
"df = pd.DataFrame(data)\n",
354359
"res = wr.neptune.to_rdf_graph(client, df)\n",
355-
"query = \"SELECT ?o WHERE { <foo> <\" + str(data[0]['p']) + \"> ?o .}\"\n",
360+
"query = \"\"\"\n",
361+
" PREFIX foo: <http://example.com/resources/>\n",
362+
" SELECT ?o WHERE { <foo:foo> <\" + str(data[0]['p']) + \"> ?o .}\"\"\"\n",
356363
"df = wr.neptune.execute_sparql(client, query)\n",
357364
"display(df)"
358365
]
@@ -542,4 +549,4 @@
542549
},
543550
"nbformat": 4,
544551
"nbformat_minor": 5
545-
}
552+
}

0 commit comments

Comments
 (0)