Skip to content

Commit ef8f8e8

Browse files
committed
Fix OpenSearch connection exhaustion under load
1 parent bc4de34 commit ef8f8e8

File tree

3 files changed

+29
-1
lines changed

3 files changed

+29
-1
lines changed

scan_explorer_service/open_search.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,19 @@
33
from flask import current_app
44
from scan_explorer_service.utils.search_utils import EsFields, OrderOptions
55

6+
7+
def _get_os_client():
8+
if not hasattr(current_app, '_os_client') or current_app._os_client is None:
9+
url = current_app.config.get('OPEN_SEARCH_URL')
10+
current_app._os_client = opensearchpy.OpenSearch(
11+
url,
12+
timeout=30,
13+
max_retries=2,
14+
retry_on_timeout=True,
15+
pool_maxsize=20,
16+
)
17+
return current_app._os_client
18+
619
def create_query_string_query(query_string: str):
720
query = {
821
"query": {
@@ -70,7 +83,7 @@ def append_highlight(query: dict):
7083

7184

7285
def es_search(query: dict) -> Iterator[str]:
73-
es = opensearchpy.OpenSearch(current_app.config.get('OPEN_SEARCH_URL'))
86+
es = _get_os_client()
7487
resp = es.search(index=current_app.config.get(
7588
'OPEN_SEARCH_INDEX'), body=query)
7689
return resp

scan_explorer_service/utils/search_utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ class OrderOptions(str, enum.Enum):
5353

5454
def parse_query_args(args):
5555
qs = re.sub(':\s*', ':', args.get('q', '', str))
56+
if not qs or not qs.strip():
57+
raise ValueError('Query string is required')
5658

5759
qs, qs_dict = parse_query_string(qs)
5860

scan_explorer_service/views/metadata.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from scan_explorer_service.views.view_utils import ApiErrors
1010
from scan_explorer_service.views.manifest import _cache_delete
1111
from scan_explorer_service.open_search import EsFields, page_os_search, aggregate_search, page_ocr_os_search
12+
import opensearchpy
1213
import requests
1314

1415
bp_metadata = Blueprint('metadata', __name__, url_prefix='/metadata')
@@ -183,6 +184,9 @@ def article_search():
183184
page_count = page_os_search(qs, page, limit, sort)['hits']['total']['value']
184185
agg_limit = current_app.config.get("OPEN_SEARCH_AGG_BUCKET_LIMIT", 10000)
185186
return jsonify(serialize_os_article_result(result, page, limit, text_query, collection_count, page_count, agg_limit))
187+
except (opensearchpy.exceptions.ConnectionError, opensearchpy.exceptions.ConnectionTimeout, opensearchpy.exceptions.TransportError) as e:
188+
current_app.logger.exception(f"OpenSearch error: {e}")
189+
return jsonify(message='Search service temporarily unavailable', type=ApiErrors.SearchError.value), 503
186190
except Exception as e:
187191
current_app.logger.exception(f"An exception has occurred: {e}")
188192
return jsonify(message=str(e), type=ApiErrors.SearchError.value), 400
@@ -200,6 +204,9 @@ def collection_search():
200204
text_query = qs_dict[SearchOptions.FullText.value]
201205
agg_limit = current_app.config.get("OPEN_SEARCH_AGG_BUCKET_LIMIT", 10000)
202206
return jsonify(serialize_os_collection_result(result, page, limit, text_query, agg_limit))
207+
except (opensearchpy.exceptions.ConnectionError, opensearchpy.exceptions.ConnectionTimeout, opensearchpy.exceptions.TransportError) as e:
208+
current_app.logger.exception(f"OpenSearch error: {e}")
209+
return jsonify(message='Search service temporarily unavailable', type=ApiErrors.SearchError.value), 503
203210
except Exception as e:
204211
return jsonify(message=str(e), type=ApiErrors.SearchError.value), 400
205212

@@ -214,6 +221,9 @@ def page_search():
214221
if SearchOptions.FullText.value in qs_dict.keys():
215222
text_query = qs_dict[SearchOptions.FullText.value]
216223
return jsonify(serialize_os_page_result(result, page, limit, text_query))
224+
except (opensearchpy.exceptions.ConnectionError, opensearchpy.exceptions.ConnectionTimeout, opensearchpy.exceptions.TransportError) as e:
225+
current_app.logger.exception(f"OpenSearch error: {e}")
226+
return jsonify(message='Search service temporarily unavailable', type=ApiErrors.SearchError.value), 503
217227
except Exception as e:
218228
return jsonify(message=str(e), type=ApiErrors.SearchError.value), 400
219229

@@ -244,5 +254,8 @@ def get_page_ocr():
244254
result = page_ocr_os_search(collection_id, page_number)
245255
return serialize_os_page_ocr_result(result)
246256

257+
except (opensearchpy.exceptions.ConnectionError, opensearchpy.exceptions.ConnectionTimeout, opensearchpy.exceptions.TransportError) as e:
258+
current_app.logger.exception(f"OpenSearch error: {e}")
259+
return jsonify(message='Search service temporarily unavailable', type=ApiErrors.SearchError.value), 503
247260
except Exception as e:
248261
return jsonify(message=str(e), type=ApiErrors.SearchError.value), 400

0 commit comments

Comments
 (0)