Skip to content

Commit 8b20f2c

Browse files
Assaf Mentzermureddy29jaidisido
authored
OpenSearch Support (#880)
* [skip ci] elasticsearch support init: structure and skeleton code * [skip ci] rename elasticsearch->opensearch * [skip ci] merge Assaf and Murali forks * [skip ci] fixed filter_path pandasticsearch issue * [skip ci] disable scan for now * [skip ci] path documentation * [skip ci] add delete_index * [skip ci] add delete_index * [skip ci] add index_json * [skip ci] add index_csv local path * [skip ci] add is_scroll to search (scan) * [skip ci] add search_by_sql * [skip ci] opensearch test infra * [skip ci] index create/delete ignore exceptions * [skip ci] index_documents documents type * [skip ci] removed pandasticsearch dependency * [skip ci] port typo * [skip ci] enforced_pandas_params * [skip ci] isort & black * Added OpenSearch tutorial * typing fixes * [skip ci] isort * [skip ci] black opensearch * [skip ci] opensearch validation * [skip ci] opensearch: poetry add requests-aws4auth and elasticsearch * [skip ci] opensearch: add support for host with schema http/https * Update 031 - OpenSearch.ipynb Fixed typo's * [skip ci] opensearch: index_documents 429 error * [skip ci] opensearch: add jsonpath_ng library * [skip ci] opensearch: renamed fgac user/password * [skip ci] opensearch: add connection timeout * opensearch: get_credentials_from_session * [skip ci] opensearch: indexing progressbar * [skip ci] opensearch.index_documents.max_retries default 5 * opensearch: replace elasticsearch-py with opensearch-py low-level client * [skip ci] opensearch filter_path default value * [skip ci] opensearch tutorial * Minor - Pylint * [skip ci] opensearch: pylint f-string and file open encoding * opensearch: add to CONTRIBUTING.md * opensearch: update aws-cdk packages to have the same minimum version Co-authored-by: Muralidhar Reddy <[email protected]> Co-authored-by: jaidisido <[email protected]>
1 parent 7433926 commit 8b20f2c

File tree

18 files changed

+3571
-266
lines changed

18 files changed

+3571
-266
lines changed

CONTRIBUTING.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,10 @@ or
215215
``./deploy-base.sh``
216216
``./deploy-databases.sh``
217217

218+
* [OPTIONAL] Deploy the Cloudformation template `opensearch.yaml` (if you need to test Amazon OpenSearch Service). This step could take about 15 minutes to deploy.
219+
220+
``./deploy-opensearch.sh``
221+
218222
* Go to the `EC2 -> SecurityGroups` console, open the `aws-data-wrangler-*` security group and configure to accept your IP from any TCP port.
219223
- Alternatively run:
220224

@@ -244,7 +248,7 @@ or
244248

245249
``pytest -n 8 tests/test_db.py``
246250

247-
* To run all data lake test functions for all python versions (Only if Amazon QuickSight is activated):
251+
* To run all data lake test functions for all python versions (Only if Amazon QuickSight is activated and Amazon OpenSearch template is deployed):
248252

249253
``./test.sh``
250254

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ FROM "sampleDB"."sampleTable" ORDER BY time DESC LIMIT 3
136136
- [026 - Amazon Timestream](https://github.com/awslabs/aws-data-wrangler/blob/main/tutorials/026%20-%20Amazon%20Timestream.ipynb)
137137
- [027 - Amazon Timestream 2](https://github.com/awslabs/aws-data-wrangler/blob/main/tutorials/027%20-%20Amazon%20Timestream%202.ipynb)
138138
- [028 - Amazon DynamoDB](https://github.com/awslabs/aws-data-wrangler/blob/main/tutorials/028%20-%20DynamoDB.ipynb)
139+
- [031 - OpenSearch](https://github.com/awslabs/aws-data-wrangler/blob/main/tutorials/031%20-%20OpenSearch.ipynb)
139140
- [**API Reference**](https://aws-data-wrangler.readthedocs.io/en/2.11.0/api.html)
140141
- [Amazon S3](https://aws-data-wrangler.readthedocs.io/en/2.11.0/api.html#amazon-s3)
141142
- [AWS Glue Catalog](https://aws-data-wrangler.readthedocs.io/en/2.11.0/api.html#aws-glue-catalog)

awswrangler/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
emr,
1818
exceptions,
1919
mysql,
20+
opensearch,
2021
postgresql,
2122
quicksight,
2223
redshift,
@@ -38,6 +39,7 @@
3839
"data_api",
3940
"dynamodb",
4041
"exceptions",
42+
"opensearch",
4143
"quicksight",
4244
"s3",
4345
"sts",

awswrangler/opensearch/__init__.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
"""Utilities Module for Amazon OpenSearch."""
2+
3+
from awswrangler.opensearch._read import search, search_by_sql
4+
from awswrangler.opensearch._utils import connect
5+
from awswrangler.opensearch._write import create_index, delete_index, index_csv, index_df, index_documents, index_json
6+
7+
__all__ = [
8+
"connect",
9+
"create_index",
10+
"delete_index",
11+
"index_csv",
12+
"index_documents",
13+
"index_df",
14+
"index_json",
15+
"search",
16+
"search_by_sql",
17+
]

awswrangler/opensearch/_read.py

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
"""Amazon OpenSearch Read Module (PRIVATE)."""
2+
3+
from typing import Any, Collection, Dict, List, Mapping, Optional, Union
4+
5+
import pandas as pd
6+
from opensearchpy import OpenSearch
7+
from opensearchpy.helpers import scan
8+
9+
from awswrangler.opensearch._utils import _get_distribution
10+
11+
12+
def _resolve_fields(row: Mapping[str, Any]) -> Mapping[str, Any]:
13+
fields = {}
14+
for field in row:
15+
if isinstance(row[field], dict):
16+
nested_fields = _resolve_fields(row[field])
17+
for n_field, val in nested_fields.items():
18+
fields[f"{field}.{n_field}"] = val
19+
else:
20+
fields[field] = row[field]
21+
return fields
22+
23+
24+
def _hit_to_row(hit: Mapping[str, Any]) -> Mapping[str, Any]:
25+
row: Dict[str, Any] = {}
26+
for k in hit.keys():
27+
if k == "_source":
28+
solved_fields = _resolve_fields(hit["_source"])
29+
row.update(solved_fields)
30+
elif k.startswith("_"):
31+
row[k] = hit[k]
32+
return row
33+
34+
35+
def _search_response_to_documents(response: Mapping[str, Any]) -> List[Mapping[str, Any]]:
36+
return [_hit_to_row(hit) for hit in response["hits"]["hits"]]
37+
38+
39+
def _search_response_to_df(response: Union[Mapping[str, Any], Any]) -> pd.DataFrame:
40+
return pd.DataFrame(_search_response_to_documents(response))
41+
42+
43+
def search(
44+
client: OpenSearch,
45+
index: Optional[str] = "_all",
46+
search_body: Optional[Dict[str, Any]] = None,
47+
doc_type: Optional[str] = None,
48+
is_scroll: Optional[bool] = False,
49+
filter_path: Optional[Union[str, Collection[str]]] = None,
50+
**kwargs: Any,
51+
) -> pd.DataFrame:
52+
"""Return results matching query DSL as pandas dataframe.
53+
54+
Parameters
55+
----------
56+
client : OpenSearch
57+
instance of opensearchpy.OpenSearch to use.
58+
index : str, optional
59+
A comma-separated list of index names to search.
60+
use `_all` or empty string to perform the operation on all indices.
61+
search_body : Dict[str, Any], optional
62+
The search definition using the [Query DSL](https://opensearch.org/docs/opensearch/query-dsl/full-text/).
63+
doc_type : str, optional
64+
Name of the document type (for Elasticsearch versions 5.x and earlier).
65+
is_scroll : bool, optional
66+
Allows to retrieve a large numbers of results from a single search request using
67+
[scroll](https://opensearch.org/docs/opensearch/rest-api/scroll/)
68+
for example, for machine learning jobs.
69+
Because scroll search contexts consume a lot of memory, we suggest you don’t use the scroll operation
70+
for frequent user queries.
71+
filter_path : Union[str, Collection[str]], optional
72+
Use the filter_path parameter to reduce the size of the OpenSearch Service response \
73+
(default: ['hits.hits._id','hits.hits._source'])
74+
**kwargs :
75+
KEYWORD arguments forwarded to [opensearchpy.OpenSearch.search]\
76+
(https://opensearch-py.readthedocs.io/en/latest/api.html#opensearchpy.OpenSearch.search)
77+
and also to [opensearchpy.helpers.scan](https://opensearch-py.readthedocs.io/en/master/helpers.html#scan)
78+
if `is_scroll=True`
79+
80+
Returns
81+
-------
82+
Union[pandas.DataFrame, Iterator[pandas.DataFrame]]
83+
Results as Pandas DataFrame
84+
85+
Examples
86+
--------
87+
Searching an index using query DSL
88+
89+
>>> import awswrangler as wr
90+
>>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT')
91+
>>> df = wr.opensearch.search(
92+
... client=client,
93+
... index='movies',
94+
... search_body={
95+
... "query": {
96+
... "match": {
97+
... "title": "wind"
98+
... }
99+
... }
100+
... }
101+
... )
102+
103+
104+
"""
105+
if doc_type:
106+
kwargs["doc_type"] = doc_type
107+
108+
if filter_path is None:
109+
filter_path = ["hits.hits._id", "hits.hits._source"]
110+
111+
if is_scroll:
112+
if isinstance(filter_path, str):
113+
filter_path = [filter_path]
114+
filter_path = ["_scroll_id", "_shards"] + list(filter_path) # required for scroll
115+
documents_generator = scan(client, index=index, query=search_body, filter_path=filter_path, **kwargs)
116+
documents = [_hit_to_row(doc) for doc in documents_generator]
117+
df = pd.DataFrame(documents)
118+
else:
119+
response = client.search(index=index, body=search_body, filter_path=filter_path, **kwargs)
120+
df = _search_response_to_df(response)
121+
return df
122+
123+
124+
def search_by_sql(client: OpenSearch, sql_query: str, **kwargs: Any) -> pd.DataFrame:
125+
"""Return results matching [SQL query](https://opensearch.org/docs/search-plugins/sql/index/) as pandas dataframe.
126+
127+
Parameters
128+
----------
129+
client : OpenSearch
130+
instance of opensearchpy.OpenSearch to use.
131+
sql_query : str
132+
SQL query
133+
**kwargs :
134+
KEYWORD arguments forwarded to request url (e.g.: filter_path, etc.)
135+
136+
Returns
137+
-------
138+
Union[pandas.DataFrame, Iterator[pandas.DataFrame]]
139+
Results as Pandas DataFrame
140+
141+
Examples
142+
--------
143+
Searching an index using SQL query
144+
145+
>>> import awswrangler as wr
146+
>>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT')
147+
>>> df = wr.opensearch.search_by_sql(
148+
>>> client=client,
149+
>>> sql_query='SELECT * FROM my-index LIMIT 50'
150+
>>> )
151+
152+
153+
"""
154+
if _get_distribution(client) == "opensearch":
155+
url = "/_plugins/_sql"
156+
else:
157+
url = "/_opendistro/_sql"
158+
159+
kwargs["format"] = "json"
160+
body = {"query": sql_query}
161+
for size_att in ["size", "fetch_size"]:
162+
if size_att in kwargs:
163+
body["fetch_size"] = kwargs[size_att]
164+
del kwargs[size_att] # unrecognized parameter
165+
response = client.transport.perform_request(
166+
"POST", url, headers={"Content-Type": "application/json"}, body=body, params=kwargs
167+
)
168+
df = _search_response_to_df(response)
169+
return df

awswrangler/opensearch/_utils.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
"""Amazon OpenSearch Utils Module (PRIVATE)."""
2+
3+
import logging
4+
import re
5+
from typing import Any, Optional
6+
7+
import boto3
8+
from opensearchpy import OpenSearch, RequestsHttpConnection
9+
from requests_aws4auth import AWS4Auth
10+
11+
from awswrangler import _utils, exceptions
12+
13+
_logger: logging.Logger = logging.getLogger(__name__)
14+
15+
16+
def _get_distribution(client: OpenSearch) -> Any:
17+
return client.info().get("version", {}).get("distribution", "elasticsearch")
18+
19+
20+
def _get_version(client: OpenSearch) -> Any:
21+
return client.info().get("version", {}).get("number")
22+
23+
24+
def _get_version_major(client: OpenSearch) -> Any:
25+
version = _get_version(client)
26+
if version:
27+
return int(version.split(".")[0])
28+
return None
29+
30+
31+
def _strip_endpoint(endpoint: str) -> str:
32+
uri_schema = re.compile(r"https?://")
33+
return uri_schema.sub("", endpoint).strip().strip("/")
34+
35+
36+
def connect(
37+
host: str,
38+
port: Optional[int] = 443,
39+
boto3_session: Optional[boto3.Session] = boto3.Session(),
40+
region: Optional[str] = None,
41+
username: Optional[str] = None,
42+
password: Optional[str] = None,
43+
) -> OpenSearch:
44+
"""Create a secure connection to the specified Amazon OpenSearch domain.
45+
46+
Note
47+
----
48+
We use [opensearch-py](https://github.com/opensearch-project/opensearch-py), an OpenSearch low-level python client.
49+
50+
The username and password are mandatory if the OS Cluster uses [Fine Grained Access Control]\
51+
(https://docs.aws.amazon.com/opensearch-service/latest/developerguide/fgac.html).
52+
If fine grained access control is disabled, session access key and secret keys are used.
53+
54+
Parameters
55+
----------
56+
host : str
57+
Amazon OpenSearch domain, for example: my-test-domain.us-east-1.es.amazonaws.com.
58+
port : int
59+
OpenSearch Service only accepts connections over port 80 (HTTP) or 443 (HTTPS)
60+
boto3_session : boto3.Session(), optional
61+
Boto3 Session. The default boto3 Session will be used if boto3_session receive None.
62+
region :
63+
AWS region of the Amazon OS domain. If not provided will be extracted from boto3_session.
64+
username :
65+
Fine-grained access control username. Mandatory if OS Cluster uses Fine Grained Access Control.
66+
password :
67+
Fine-grained access control password. Mandatory if OS Cluster uses Fine Grained Access Control.
68+
69+
Returns
70+
-------
71+
opensearchpy.OpenSearch
72+
OpenSearch low-level client.
73+
https://github.com/opensearch-project/opensearch-py/blob/main/opensearchpy/client/__init__.py
74+
"""
75+
valid_ports = {80, 443}
76+
77+
if port not in valid_ports:
78+
raise ValueError(f"results: port must be one of {valid_ports}")
79+
80+
if username and password:
81+
http_auth = (username, password)
82+
else:
83+
if region is None:
84+
region = _utils.get_region_from_session(boto3_session=boto3_session)
85+
creds = _utils.get_credentials_from_session(boto3_session=boto3_session)
86+
if creds.access_key is None or creds.secret_key is None:
87+
raise exceptions.InvalidArgument(
88+
"One of IAM Role or AWS ACCESS_KEY_ID and SECRET_ACCESS_KEY must be "
89+
"given. Unable to find ACCESS_KEY_ID and SECRET_ACCESS_KEY in boto3 "
90+
"session."
91+
)
92+
http_auth = AWS4Auth(creds.access_key, creds.secret_key, region, "es", session_token=creds.token)
93+
try:
94+
es = OpenSearch(
95+
host=_strip_endpoint(host),
96+
port=port,
97+
http_auth=http_auth,
98+
use_ssl=True,
99+
verify_certs=True,
100+
connection_class=RequestsHttpConnection,
101+
timeout=30,
102+
max_retries=10,
103+
retry_on_timeout=True,
104+
)
105+
except Exception as e:
106+
_logger.error("Error connecting to Opensearch cluster. Please verify authentication details")
107+
raise e
108+
return es

0 commit comments

Comments
 (0)