Skip to content

Commit 22d6d08

Browse files
feat: Support additional parameters for Neptune bulk load (#2297)
1 parent 3372008 commit 22d6d08

File tree

4 files changed

+122
-12
lines changed

4 files changed

+122
-12
lines changed

awswrangler/neptune/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Utilities Module for Amazon Neptune."""
2+
from awswrangler.neptune._client import BulkLoadParserConfiguration
23
from awswrangler.neptune._gremlin_parser import GremlinParser
34
from awswrangler.neptune._neptune import (
45
bulk_load,
@@ -23,4 +24,5 @@
2324
"bulk_load_from_files",
2425
"GremlinParser",
2526
"flatten_nested_df",
27+
"BulkLoadParserConfiguration",
2628
]

awswrangler/neptune/_client.py

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
"""Amazon NeptuneClient Module."""
33

44
import logging
5-
from typing import Any, Dict, List, Optional, Union, cast
5+
from typing import Any, Dict, List, Optional, TypedDict, Union, cast
66

77
import boto3
88
from botocore.auth import SigV4Auth
99
from botocore.awsrequest import AWSPreparedRequest, AWSRequest
10+
from typing_extensions import Literal, NotRequired
1011

1112
import awswrangler.neptune._gremlin_init as gremlin
1213
from awswrangler import _utils, exceptions
@@ -27,6 +28,28 @@
2728
WS_PROTOCOL = "wss"
2829

2930

31+
class BulkLoadParserConfiguration(TypedDict):
32+
"""Typed dictionary representing the additional parser configuration for the Neptune Bulk Loader."""
33+
34+
namedGraphUri: NotRequired[str]
35+
"""
36+
The default graph for all RDF formats when no graph is specified
37+
(for non-quads formats and NQUAD entries with no graph).
38+
"""
39+
baseUri: NotRequired[str]
40+
"""The base URI for RDF/XML and Turtle formats."""
41+
allowEmptyStrings: NotRequired[bool]
42+
"""
43+
Gremlin users need to be able to pass empty string values("") as node
44+
and edge properties when loading CSV data.
45+
If ``allowEmptyStrings`` is set to ``false`` (the default),
46+
such empty strings are treated as nulls and are not loaded.
47+
48+
If allowEmptyStrings is set to true, the loader treats empty strings
49+
as valid property values and loads them accordingly.
50+
"""
51+
52+
3053
class NeptuneClient:
3154
"""Class representing a Neptune cluster connection."""
3255

@@ -280,7 +303,18 @@ def status(self) -> Any:
280303
res = self._http_session.send(req)
281304
return res.json()
282305

283-
def load(self, s3_path: str, role_arn: str, parallelism: str = "HIGH", format: str = "csv") -> str:
306+
def load(
307+
self,
308+
s3_path: str,
309+
role_arn: str,
310+
parallelism: Literal["LOW", "MEDIUM", "HIGH", "OVERSUBSCRIBE"] = "HIGH",
311+
mode: Literal["RESUME", "NEW", "AUTO"] = "AUTO",
312+
format: str = "csv",
313+
parser_configuration: Optional[BulkLoadParserConfiguration] = None,
314+
update_single_cardinality_properties: Literal["TRUE", "FALSE"] = "FALSE",
315+
queue_request: Literal["TRUE", "FALSE"] = "FALSE",
316+
dependencies: Optional[List[str]] = None,
317+
) -> str:
284318
"""
285319
Start the Neptune Loader command for loading CSV data from external files on S3 into a Neptune DB cluster.
286320
@@ -295,24 +329,52 @@ def load(self, s3_path: str, role_arn: str, parallelism: str = "HIGH", format: s
295329
see `Prerequisites: IAM Role and Amazon S3 Access <https://docs.aws.amazon.com/neptune/latest/userguide/bulk-load-tutorial-IAM.html>`_.
296330
parallelism: str
297331
Specifies the number of threads used by the bulk load process.
332+
mode: str
333+
The load job mode.
334+
335+
In ```RESUME``` mode, the loader looks for a previous load from this source, and if it finds one, resumes that load job.
336+
If no previous load job is found, the loader stops.
337+
338+
In ```NEW``` mode, the creates a new load request regardless of any previous loads.
339+
You can use this mode to reload all the data from a source after dropping previously loaded data from your Neptune cluster, or to load new data available at the same source.
340+
341+
In ```AUTO``` mode, the loader looks for a previous load job from the same source, and if it finds one, resumes that job, just as in ```RESUME``` mode.
298342
format: str
299343
The format of the data. For more information about data formats for the Neptune Loader command,
300344
see `Using the Amazon Neptune Bulk Loader to Ingest Data <https://docs.aws.amazon.com/neptune/latest/userguide/load-api-reference-load.html#:~:text=The%20format%20of%20the%20data.%20For%20more%20information%20about%20data%20formats%20for%20the%20Neptune%20Loader%20command%2C%20see%20Using%20the%20Amazon%20Neptune%20Bulk%20Loader%20to%20Ingest%20Data.>`_.
345+
parser_configuration: dict[str, Any], optional
346+
An optional object with additional parser configuration values.
347+
Each of the child parameters is also optional: ``namedGraphUri``, ``baseUri`` and ``allowEmptyStrings``.
348+
update_single_cardinality_properties: str
349+
An optional parameter that controls how the bulk loader
350+
treats a new value for single-cardinality vertex or edge properties.
351+
queue_request: str
352+
An optional flag parameter that indicates whether the load request can be queued up or not.
353+
354+
If omitted or set to ``"FALSE"``, the load request will fail if another load job is already running.
355+
dependencies: list[str], optional
356+
An optional parameter that can make a queued load request contingent on the successful completion of one or more previous jobs in the queue.
301357
302358
Returns
303359
-------
304360
str
305361
ID of the load job
306362
"""
307-
data = {
363+
data: Dict[str, Any] = {
308364
"source": s3_path,
309365
"format": format,
310366
"iamRoleArn": role_arn,
311-
"mode": "AUTO",
367+
"mode": mode,
312368
"region": self.region,
313369
"failOnError": "TRUE",
314370
"parallelism": parallelism,
371+
"updateSingleCardinalityProperties": update_single_cardinality_properties,
372+
"queueRequest": queue_request,
315373
}
374+
if parser_configuration:
375+
data["parserConfiguration"] = parser_configuration
376+
if dependencies:
377+
data["dependencies"] = dependencies
316378

317379
url = f"https://{self.host}:{self.port}/loader"
318380

awswrangler/neptune/_neptune.py

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@
44
import logging
55
import re
66
import time
7-
from typing import Any, Callable, Dict, Literal, Optional, TypeVar, Union
7+
from typing import Any, Callable, Dict, List, Literal, Optional, TypeVar, Union
88

99
import boto3
1010

1111
import awswrangler.neptune._gremlin_init as gremlin
1212
import awswrangler.pandas as pd
1313
from awswrangler import _utils, exceptions, s3
1414
from awswrangler._config import apply_configs
15-
from awswrangler.neptune._client import NeptuneClient
15+
from awswrangler.neptune._client import BulkLoadParserConfiguration, NeptuneClient
1616

1717
gremlin_python = _utils.import_optional_dependency("gremlin_python")
1818
opencypher = _utils.import_optional_dependency("requests")
@@ -285,6 +285,10 @@ def bulk_load(
285285
iam_role: str,
286286
neptune_load_wait_polling_delay: float = 0.25,
287287
load_parallelism: Literal["LOW", "MEDIUM", "HIGH", "OVERSUBSCRIBE"] = "HIGH",
288+
parser_configuration: Optional[BulkLoadParserConfiguration] = None,
289+
update_single_cardinality_properties: Literal["TRUE", "FALSE"] = "FALSE",
290+
queue_request: Literal["TRUE", "FALSE"] = "FALSE",
291+
dependencies: Optional[List[str]] = None,
288292
keep_files: bool = False,
289293
use_threads: Union[bool, int] = True,
290294
boto3_session: Optional[boto3.Session] = None,
@@ -312,6 +316,18 @@ def bulk_load(
312316
Interval in seconds for how often the function will check if the Neptune bulk load has completed.
313317
load_parallelism: str
314318
Specifies the number of threads used by Neptune's bulk load process.
319+
parser_configuration: dict[str, Any], optional
320+
An optional object with additional parser configuration values.
321+
Each of the child parameters is also optional: ``namedGraphUri``, ``baseUri`` and ``allowEmptyStrings``.
322+
update_single_cardinality_properties: str
323+
An optional parameter that controls how the bulk loader
324+
treats a new value for single-cardinality vertex or edge properties.
325+
queue_request: str
326+
An optional flag parameter that indicates whether the load request can be queued up or not.
327+
328+
If omitted or set to ``"FALSE"``, the load request will fail if another load job is already running.
329+
dependencies: list[str], optional
330+
An optional parameter that can make a queued load request contingent on the successful completion of one or more previous jobs in the queue.
315331
keep_files: bool
316332
Whether to keep stage files or delete them. False by default.
317333
use_threads: bool | int
@@ -352,8 +368,13 @@ def bulk_load(
352368
client=client,
353369
path=path,
354370
iam_role=iam_role,
371+
format="csv",
355372
neptune_load_wait_polling_delay=neptune_load_wait_polling_delay,
356373
load_parallelism=load_parallelism,
374+
parser_configuration=parser_configuration,
375+
update_single_cardinality_properties=update_single_cardinality_properties,
376+
queue_request=queue_request,
377+
dependencies=dependencies,
357378
)
358379
finally:
359380
if keep_files is False:
@@ -372,11 +393,16 @@ def bulk_load_from_files(
372393
client: NeptuneClient,
373394
path: str,
374395
iam_role: str,
396+
format: Literal["csv", "opencypher", "ntriples", "nquads", "rdfxml", "turtle"] = "csv",
375397
neptune_load_wait_polling_delay: float = 0.25,
376398
load_parallelism: Literal["LOW", "MEDIUM", "HIGH", "OVERSUBSCRIBE"] = "HIGH",
399+
parser_configuration: Optional[BulkLoadParserConfiguration] = None,
400+
update_single_cardinality_properties: Literal["TRUE", "FALSE"] = "FALSE",
401+
queue_request: Literal["TRUE", "FALSE"] = "FALSE",
402+
dependencies: Optional[List[str]] = None,
377403
) -> None:
378404
"""
379-
Load CSV files from S3 into Amazon Neptune using the Neptune Bulk Loader.
405+
Load files from S3 into Amazon Neptune using the Neptune Bulk Loader.
380406
381407
For more information about the Bulk Loader see
382408
`here <https://docs.aws.amazon.com/neptune/latest/userguide/bulk-load.html>`_.
@@ -391,10 +417,25 @@ def bulk_load_from_files(
391417
The Amazon Resource Name (ARN) for an IAM role to be assumed by the Neptune DB instance for access to the S3 bucket.
392418
For information about creating a role that has access to Amazon S3 and then associating it with a Neptune cluster,
393419
see `Prerequisites: IAM Role and Amazon S3 Access <https://docs.aws.amazon.com/neptune/latest/userguide/bulk-load-tutorial-IAM.html>`_.
420+
format: str
421+
The format of the data.
394422
neptune_load_wait_polling_delay: float
395423
Interval in seconds for how often the function will check if the Neptune bulk load has completed.
396424
load_parallelism: str
397425
Specifies the number of threads used by Neptune's bulk load process.
426+
parser_configuration: dict[str, Any], optional
427+
An optional object with additional parser configuration values.
428+
Each of the child parameters is also optional: ``namedGraphUri``, ``baseUri`` and ``allowEmptyStrings``.
429+
update_single_cardinality_properties: str
430+
An optional parameter that controls how the bulk loader
431+
treats a new value for single-cardinality vertex or edge properties.
432+
queue_request: str
433+
An optional flag parameter that indicates whether the load request can be queued up or not.
434+
435+
If omitted or set to ``"FALSE"``, the load request will fail if another load job is already running.
436+
dependencies: list[str], optional
437+
An optional parameter that can make a queued load request contingent on the successful completion of one or more previous jobs in the queue.
438+
398439
399440
Examples
400441
--------
@@ -403,15 +444,20 @@ def bulk_load_from_files(
403444
>>> wr.neptune.bulk_load_from_files(
404445
... client=client,
405446
... path="s3://my-bucket/stage-files/",
406-
... iam_role="arn:aws:iam::XXX:role/XXX"
447+
... iam_role="arn:aws:iam::XXX:role/XXX",
448+
... format="csv",
407449
... )
408450
"""
409451
_logger.debug("Starting Neptune Bulk Load from %s", path)
410452
load_id = client.load(
411453
path,
412454
iam_role,
413-
format="csv",
455+
format=format,
414456
parallelism=load_parallelism,
457+
parser_configuration=parser_configuration,
458+
update_single_cardinality_properties=update_single_cardinality_properties,
459+
queue_request=queue_request,
460+
dependencies=dependencies,
415461
)
416462

417463
while True:
@@ -426,7 +472,7 @@ def bulk_load_from_files(
426472

427473
time.sleep(neptune_load_wait_polling_delay)
428474

429-
_logger.debug("Neptune load %s has succeeded in loading data from %s", load_id, path)
475+
_logger.debug("Neptune load %s has succeeded in loading %s data from %s", load_id, format, path)
430476

431477

432478
def connect(host: str, port: int, iam_enabled: bool = False, **kwargs: Any) -> NeptuneClient:

awswrangler/neptune/_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class WriteDFType(Enum):
2424
UPDATE = 3
2525

2626

27-
def write_gremlin_df(client: NeptuneClient, df: pd.DataFrame, mode: WriteDFType, batch_size: int) -> bool:
27+
def write_gremlin_df(client: "NeptuneClient", df: pd.DataFrame, mode: WriteDFType, batch_size: int) -> bool:
2828
"""Write the provided DataFrame using Gremlin.
2929
3030
Parameters
@@ -67,7 +67,7 @@ def write_gremlin_df(client: NeptuneClient, df: pd.DataFrame, mode: WriteDFType,
6767
return _run_gremlin_insert(client, g)
6868

6969

70-
def _run_gremlin_insert(client: NeptuneClient, g: GraphTraversalSource) -> bool:
70+
def _run_gremlin_insert(client: "NeptuneClient", g: GraphTraversalSource) -> bool:
7171
translator = Translator("g")
7272
s = translator.translate(g.bytecode)
7373
s = s.replace("Cardinality.", "") # hack to fix parser error for set cardinality

0 commit comments

Comments
 (0)