Skip to content

Commit d3650ff

Browse files
authored
feat: Opensearch parallel bulk (#2310)
* feat: Opensearch parallel bulk * Minor refactor * Raise an error if using args not compatible with parallel bulk * [skip-ci] Docstrings
1 parent 7fd7302 commit d3650ff

File tree

2 files changed

+94
-20
lines changed

2 files changed

+94
-20
lines changed

awswrangler/opensearch/_write.py

Lines changed: 73 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ def index_json(
260260
doc_type: Optional[str] = None,
261261
boto3_session: Optional[boto3.Session] = boto3.Session(),
262262
json_path: Optional[str] = None,
263+
use_threads: Union[bool, int] = False,
263264
**kwargs: Any,
264265
) -> Any:
265266
"""Index all documents from JSON file to OpenSearch index.
@@ -284,6 +285,10 @@ def index_json(
284285
boto3_session : boto3.Session(), optional
285286
Boto3 Session to be used to access s3 if s3 path is provided.
286287
The default boto3 Session will be used if boto3_session receive None.
288+
use_threads : bool, int
289+
True to enable concurrent requests, False to disable multiple threads.
290+
If enabled os.cpu_count() will be used as the max number of threads.
291+
If integer is provided, specified number is used.
287292
**kwargs :
288293
KEYWORD arguments forwarded to :func:`~awswrangler.opensearch.index_documents`
289294
which is used to execute the operation
@@ -324,7 +329,9 @@ def index_json(
324329
documents = list(_file_line_generator(path, is_json=True))
325330
if json_path:
326331
documents = _get_documents_w_json_path(documents, json_path)
327-
return index_documents(client=client, documents=documents, index=index, doc_type=doc_type, **kwargs)
332+
return index_documents(
333+
client=client, documents=documents, index=index, doc_type=doc_type, use_threads=use_threads, **kwargs
334+
)
328335

329336

330337
@_utils.check_optional_dependency(opensearchpy, "opensearchpy")
@@ -334,6 +341,7 @@ def index_csv(
334341
index: str,
335342
doc_type: Optional[str] = None,
336343
pandas_kwargs: Optional[Dict[str, Any]] = None,
344+
use_threads: Union[bool, int] = False,
337345
**kwargs: Any,
338346
) -> Any:
339347
"""Index all documents from a CSV file to OpenSearch index.
@@ -353,6 +361,10 @@ def index_csv(
353361
e.g. pandas_kwargs={'sep': '|', 'na_values': ['null', 'none']}
354362
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
355363
Note: these params values are enforced: `skip_blank_lines=True`
364+
use_threads : bool, int
365+
True to enable concurrent requests, False to disable multiple threads.
366+
If enabled os.cpu_count() will be used as the max number of threads.
367+
If integer is provided, specified number is used.
356368
**kwargs :
357369
KEYWORD arguments forwarded to :func:`~awswrangler.opensearch.index_documents`
358370
which is used to execute the operation
@@ -396,12 +408,17 @@ def index_csv(
396408
}
397409
pandas_kwargs.update(enforced_pandas_params)
398410
df = pd.read_csv(path, **pandas_kwargs)
399-
return index_df(client, df=df, index=index, doc_type=doc_type, **kwargs)
411+
return index_df(client, df=df, index=index, doc_type=doc_type, use_threads=use_threads, **kwargs)
400412

401413

402414
@_utils.check_optional_dependency(opensearchpy, "opensearchpy")
403415
def index_df(
404-
client: "opensearchpy.OpenSearch", df: pd.DataFrame, index: str, doc_type: Optional[str] = None, **kwargs: Any
416+
client: "opensearchpy.OpenSearch",
417+
df: pd.DataFrame,
418+
index: str,
419+
doc_type: Optional[str] = None,
420+
use_threads: Union[bool, int] = False,
421+
**kwargs: Any,
405422
) -> Any:
406423
"""Index all documents from a DataFrame to OpenSearch index.
407424
@@ -415,6 +432,10 @@ def index_df(
415432
Name of the index.
416433
doc_type : str, optional
417434
Name of the document type (for Elasticsearch versions 5.x and earlier).
435+
use_threads : bool, int
436+
True to enable concurrent requests, False to disable multiple threads.
437+
If enabled os.cpu_count() will be used as the max number of threads.
438+
If integer is provided, specified number is used.
418439
**kwargs :
419440
KEYWORD arguments forwarded to :func:`~awswrangler.opensearch.index_documents`
420441
which is used to execute the operation
@@ -438,7 +459,14 @@ def index_df(
438459
... index='sample-index1'
439460
... )
440461
"""
441-
return index_documents(client=client, documents=_df_doc_generator(df), index=index, doc_type=doc_type, **kwargs)
462+
return index_documents(
463+
client=client,
464+
documents=_df_doc_generator(df),
465+
index=index,
466+
doc_type=doc_type,
467+
use_threads=use_threads,
468+
**kwargs,
469+
)
442470

443471

444472
@_utils.check_optional_dependency(opensearchpy, "opensearchpy")
@@ -453,13 +481,19 @@ def index_documents(
453481
bulk_size: int = 1000,
454482
chunk_size: Optional[int] = 500,
455483
max_chunk_bytes: Optional[int] = 100 * 1024 * 1024,
456-
max_retries: Optional[int] = 5,
457-
initial_backoff: Optional[int] = 2,
458-
max_backoff: Optional[int] = 600,
484+
max_retries: Optional[int] = None,
485+
initial_backoff: Optional[int] = None,
486+
max_backoff: Optional[int] = None,
487+
use_threads: Union[bool, int] = False,
459488
**kwargs: Any,
460489
) -> Dict[str, Any]:
461490
"""Index all documents to OpenSearch index.
462491
492+
Note
493+
----
494+
`max_retries`, `initial_backoff`, and `max_backoff` are not supported with parallel bulk
495+
(when `use_threads`is set to True).
496+
463497
Note
464498
----
465499
Some of the args are referenced from opensearch-py client library (bulk helpers)
@@ -501,6 +535,10 @@ def index_documents(
501535
Any subsequent retries will be powers of ``initial_backoff*2**retry_number`` (default: 2)
502536
max_backoff: int, optional
503537
maximum number of seconds a retry will wait (default: 600)
538+
use_threads : bool, int
539+
True to enable concurrent requests, False to disable multiple threads.
540+
If enabled os.cpu_count() will be used as the max number of threads.
541+
If integer is provided, specified number is used.
504542
**kwargs :
505543
KEYWORD arguments forwarded to bulk operation
506544
elasticsearch >= 7.10.2 / opensearch: \
@@ -528,6 +566,11 @@ def index_documents(
528566
if "refresh" in kwargs and _is_serverless(client):
529567
raise exceptions.NotSupported("Refresh policy not supported in OpenSearch Serverless.")
530568

569+
if use_threads and any([max_retries, initial_backoff, max_backoff]):
570+
raise exceptions.InvalidArgumentCombination(
571+
f"`max_retries`, `initial_backoff`, and `max_backoff` are not supported when `use_threads` is set to True"
572+
)
573+
531574
if not isinstance(documents, list):
532575
documents = list(documents)
533576
total_documents = len(documents)
@@ -556,20 +599,30 @@ def index_documents(
556599
refresh_interval = _get_refresh_interval(client, index)
557600
_disable_refresh_interval(client, index)
558601
_logger.debug("running bulk index of %s documents", len(bulk_chunk_documents))
559-
_success, _errors = opensearchpy.helpers.bulk(
560-
client=client,
561-
actions=bulk_chunk_documents,
562-
ignore_status=ignore_status,
563-
chunk_size=chunk_size,
564-
max_chunk_bytes=max_chunk_bytes,
565-
max_retries=max_retries,
566-
initial_backoff=initial_backoff,
567-
max_backoff=max_backoff,
568-
request_timeout=30,
602+
bulk_kwargs = {
603+
"ignore_status": ignore_status,
604+
"chunk_size": chunk_size,
605+
"max_chunk_bytes": max_chunk_bytes,
606+
"request_timeout": 30,
569607
**kwargs,
570-
)
571-
success += _success
572-
errors += _errors
608+
}
609+
_logger.debug("running bulk with kwargs: %s", bulk_kwargs)
610+
if use_threads:
611+
# Parallel bulk does not support max_retries, initial_backoff & max_backoff
612+
for _success, _errors in opensearchpy.helpers.parallel_bulk(
613+
client, bulk_chunk_documents, **bulk_kwargs
614+
):
615+
success += _success
616+
errors += _errors
617+
else:
618+
# Defaults
619+
bulk_kwargs["max_retries"] = 5 if not max_retries else max_retries
620+
bulk_kwargs["initial_backoff"] = 2 if not initial_backoff else initial_backoff
621+
bulk_kwargs["max_backoff"] = 600 if not max_backoff else max_backoff
622+
623+
_success, _errors = opensearchpy.helpers.bulk(client, bulk_chunk_documents, **bulk_kwargs)
624+
success += _success
625+
errors += _errors
573626
_logger.debug("indexed %s documents (%s/%s)", _success, success, total_documents)
574627
if progressbar:
575628
progress_bar.update(success, force=True)

tests/unit/test_opensearch.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,27 @@ def test_index_documents(client):
312312
wr.opensearch.delete_index(client, index)
313313

314314

315+
@pytest.mark.parametrize("use_threads", [False, True, 2])
316+
def test_index_documents_parallel(client, use_threads):
317+
index = f"test_index_documents_{_get_unique_suffix()}"
318+
# Pre-create index to avoid multiple threads creating conflicting mappings
319+
wr.opensearch.create_index(
320+
client=client,
321+
index=index,
322+
mappings={"properties": {"name": {"type": "text"}}},
323+
)
324+
try:
325+
response = wr.opensearch.index_documents(
326+
client,
327+
documents=[{"_id": "1", "name": "John"}, {"_id": "2", "name": "George"}, {"_id": "3", "name": "Julia"}],
328+
index=index,
329+
use_threads=use_threads,
330+
)
331+
assert response.get("success", 0) == 3
332+
finally:
333+
wr.opensearch.delete_index(client, index)
334+
335+
315336
def test_index_documents_id_keys(client):
316337
index = f"test_index_documents_id_keys_{_get_unique_suffix()}"
317338
try:

0 commit comments

Comments
 (0)