Skip to content

Commit 65c2164

Browse files
bechbdjaidisidokukushking
authored
(feat): Add Amazon Neptune support 🚀 (#1085)
* Added first draft of potential interface for Neptune * Added first draft of potential interface for Neptune * Fixed __init__.py file with the correct functions for Neptune * [skip ci] Updated signatures per initial feedback from draft PR * [skip ci] WIP - Initial version of oc and gremlin endpoint read queries and result parsing * [skip ci] Initial working version of the basic read functionality associated with the three query languages * [skip ci] Fixed tests that were not running correctly due to typo * WIP on writing data * [skip ci] Have a working version of the complete roundtrip for proeprty graphs * [skip ci] Refactored code to simplify the locations into a utils class. Moved the Gremlin parsing code out of client and into its own static class * [skip ci] Added SPARQL write functionality as well as added neptune to database infra scripting * [skip ci] Readded nested asyncio in order for it to work in Jupyter as well as added update option for property graph data * Working version of all MVP features as well as changes to make validation pass locally * Added GremlinParser to the init file so that it can be unit tested. * Added better error handling on query exceptions for all languages * Fixed validation error * Added method to flatten dataframes to Neptune module * Fixed issues related to flattening DF * Fix static checks issues * Fix pydocstyle * Added functionality to properly set the edge id values as well as added the ability to specify the cardinality using the column header * Validate fixes and security group in test_infra * Minor - typing and docs * [skip ci] - Minor - Add Neptune docs entry * Use built-in gremlin driver functionality to enable event loop nesting * Use built-in gremlin driver functionality to enable event loop nesting * Updated the connection handling for Gremlin and added a tutorial notebook for Amazon Neptune * Fixed validation issues * Minor - Remove nest_asyncio and fix tutorial * Upgrading dependencies * Fixed incomplete error message and updated tutorial to remove cluster name and added code to create the df when running cardinality tests. * Minor - validate Co-authored-by: Dave Bechberger <[email protected]> Co-authored-by: jaidisido <[email protected]> Co-authored-by: Anton Kukushkin <[email protected]>
1 parent ec6dc1f commit 65c2164

File tree

16 files changed

+3108
-496
lines changed

16 files changed

+3108
-496
lines changed

awswrangler/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
exceptions,
1919
lakeformation,
2020
mysql,
21+
neptune,
2122
opensearch,
2223
postgresql,
2324
quicksight,
@@ -47,6 +48,7 @@
4748
"redshift",
4849
"lakeformation",
4950
"mysql",
51+
"neptune",
5052
"postgresql",
5153
"secretsmanager",
5254
"sqlserver",

awswrangler/neptune/__init__.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
"""Utilities Module for Amazon Neptune."""
2+
from awswrangler.neptune.gremlin_parser import GremlinParser
3+
from awswrangler.neptune.neptune import (
4+
connect,
5+
execute_gremlin,
6+
execute_opencypher,
7+
execute_sparql,
8+
flatten_nested_df,
9+
to_property_graph,
10+
to_rdf_graph,
11+
)
12+
13+
__all__ = [
14+
"execute_gremlin",
15+
"execute_opencypher",
16+
"execute_sparql",
17+
"to_property_graph",
18+
"to_rdf_graph",
19+
"connect",
20+
"GremlinParser",
21+
"flatten_nested_df",
22+
]

awswrangler/neptune/_utils.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
"""Amazon Neptune Utils Module (PRIVATE)."""
2+
3+
import logging
4+
from enum import Enum
5+
from typing import Any
6+
7+
import pandas as pd
8+
from gremlin_python.process.graph_traversal import GraphTraversalSource, __
9+
from gremlin_python.process.translator import Translator
10+
from gremlin_python.process.traversal import Cardinality, T
11+
from gremlin_python.structure.graph import Graph
12+
13+
from awswrangler import exceptions
14+
from awswrangler.neptune.client import NeptuneClient
15+
16+
_logger: logging.Logger = logging.getLogger(__name__)
17+
18+
19+
class WriteDFType(Enum):
20+
"""Dataframe type enum."""
21+
22+
VERTEX = 1
23+
EDGE = 2
24+
UPDATE = 3
25+
26+
27+
def write_gremlin_df(client: NeptuneClient, df: pd.DataFrame, mode: WriteDFType, batch_size: int) -> bool:
28+
"""Write the provided dataframe using Gremlin.
29+
30+
Parameters
31+
----------
32+
client : NeptuneClient
33+
The Neptune client to write the dataframe
34+
df : pd.DataFrame
35+
The dataframe to write
36+
mode : WriteDFType
37+
The type of dataframe to write
38+
batch_size : int
39+
The size of the batch to write
40+
41+
Returns
42+
-------
43+
bool
44+
True if the write operation succeeded
45+
"""
46+
g = Graph().traversal()
47+
# Loop through items in the DF
48+
for (index, row) in df.iterrows():
49+
# build up a query
50+
if mode == WriteDFType.EDGE:
51+
g = _build_gremlin_edges(g, row.to_dict())
52+
elif mode == WriteDFType.VERTEX:
53+
g = _build_gremlin_vertices(g, row.to_dict())
54+
else:
55+
g = _build_gremlin_update(g, row.to_dict())
56+
# run the query
57+
if index > 0 and index % batch_size == 0:
58+
res = _run_gremlin_insert(client, g)
59+
if res:
60+
g = Graph().traversal()
61+
else:
62+
_logger.debug(res)
63+
raise exceptions.QueryFailed(
64+
"""Failed to insert part or all of the data in the DataFrame, please check the log output."""
65+
)
66+
67+
return _run_gremlin_insert(client, g)
68+
69+
70+
def _run_gremlin_insert(client: NeptuneClient, g: GraphTraversalSource) -> bool:
71+
translator = Translator("g")
72+
s = translator.translate(g.bytecode)
73+
s = s.replace("Cardinality.", "") # hack to fix parser error for set cardinality
74+
_logger.debug(s)
75+
res = client.write_gremlin(s)
76+
return res
77+
78+
79+
def _build_gremlin_update(g: GraphTraversalSource, row: Any) -> GraphTraversalSource:
80+
g = g.V(str(row["~id"]))
81+
g = _build_gremlin_properties(g, row)
82+
83+
return g
84+
85+
86+
def _build_gremlin_vertices(g: GraphTraversalSource, row: Any) -> GraphTraversalSource:
87+
g = g.V(str(row["~id"])).fold().coalesce(__.unfold(), __.addV(row["~label"]).property(T.id, str(row["~id"])))
88+
g = _build_gremlin_properties(g, row)
89+
90+
return g
91+
92+
93+
def _build_gremlin_edges(g: GraphTraversalSource, row: pd.Series) -> GraphTraversalSource:
94+
g = (
95+
g.V(str(row["~from"]))
96+
.fold()
97+
.coalesce(__.unfold(), _build_gremlin_vertices(__, {"~id": row["~from"], "~label": "Vertex"}))
98+
.addE(row["~label"])
99+
.to(
100+
__.V(str(row["~to"]))
101+
.fold()
102+
.coalesce(__.unfold(), _build_gremlin_vertices(__, {"~id": row["~to"], "~label": "Vertex"}))
103+
)
104+
)
105+
g = _build_gremlin_properties(g, row)
106+
107+
return g
108+
109+
110+
def _build_gremlin_properties(g: GraphTraversalSource, row: Any) -> GraphTraversalSource:
111+
for (column, value) in row.items():
112+
if column not in ["~id", "~label", "~to", "~from"]:
113+
if isinstance(value, list) and len(value) > 0:
114+
for item in value:
115+
g = g.property(Cardinality.set_, column, item)
116+
elif not pd.isna(value) and not pd.isnull(value):
117+
g = g.property(column, value)
118+
return g

0 commit comments

Comments
 (0)