Skip to content

Commit 637eecb

Browse files
committed
Merge remote-tracking branch 'origin/1.1.x' into add-cli-pathing-application
2 parents 0e91c96 + 87acd81 commit 637eecb

File tree

7 files changed

+78
-9
lines changed

7 files changed

+78
-9
lines changed

biothings/hub/dataindex/indexer_task.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from biothings.utils.es import ESIndex as BaseESIndex
1111
from biothings.utils.loggers import get_logger
12+
from biothings.utils.serializer import to_json
1213

1314
try:
1415
from biothings.utils.mongo import doc_feeder
@@ -92,8 +93,12 @@ def _action(doc):
9293
self.logger.error(error)
9394
self.logger.error("Document ID %s failed: %s", document_id, reason)
9495

95-
self.logger.warning("Discovered errors during the bulk index task. Defaulting to 0 indexed documents")
96-
return 0
96+
serialized_errors = to_json(errors, indent=True)
97+
message = (
98+
f"Bulk indexing failed for index '{self.index_name}'. "
99+
f"Elasticsearch responded with errors:\n{serialized_errors}"
100+
)
101+
raise helpers.BulkIndexError(message, errors) from e
97102

98103
# NOTE
99104
# Why doesn't "mget", "mexists", "mindex" belong to the base class?

biothings/hub/datainspect/inspector.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from biothings.hub import INSPECTOR_CATEGORY
1111
from biothings.hub.databuild.backend import create_backend
1212
from biothings.hub.datainspect.doc_inspect import (
13+
clean_big_nums,
1314
compute_metadata,
1415
flatten_and_validate,
1516
get_converters,

biothings/utils/common.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import random
2222
import string
2323
import sys
24+
import tarfile
2425
import time
2526
import types
2627
import urllib.parse
@@ -160,7 +161,7 @@ def safewfile(filename, prompt=True, default="C", mode="w"):
160161

161162
def anyfile(infile, mode="r"):
162163
"""
163-
return a file handler with the support for gzip/zip comppressed files.
164+
return a file handler with the support for gzip/zip compressed files.
164165
if infile is a two value tuple, then first one is the compressed file;
165166
the second one is the actual filename in the compressed file.
166167
e.g., ('a.zip', 'aa.txt')
@@ -171,6 +172,25 @@ def anyfile(infile, mode="r"):
171172
else:
172173
rawfile = os.path.splitext(infile)[0]
173174
filetype = os.path.splitext(infile)[1].lower()
175+
176+
177+
# use tarfile built-in method to check for tar file before anything else
178+
if tarfile.is_tarfile(infile):
179+
tar_file = tarfile.open(infile, mode)
180+
try:
181+
extracted = tar_file.extractfile(rawfile)
182+
except KeyError:
183+
# provided rawfile does not appear in the tarball
184+
tar_file.close()
185+
raise Exception("target member does not contain the provided tar file.")
186+
187+
# extracted member is not a regular file or link
188+
if extracted is None:
189+
tar_file.close()
190+
raise Exception("invalid target file: must be a regular file or a link")
191+
192+
return io.TextIOWrapper(extracted)
193+
174194
if filetype == ".gz":
175195
# import gzip
176196
in_f = io.TextIOWrapper(gzip.GzipFile(infile, mode))

biothings/web/query/engine.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
"""
2323

2424
import asyncio
25+
import logging
2526

2627
from elasticsearch import NotFoundError, RequestError
2728
from elasticsearch.dsl import MultiSearch, Search
2829

2930
from biothings.web.query.builder import ESScrollID
3031

32+
logger = logging.getLogger(__name__)
33+
3134

3235
class ResultInterrupt(Exception):
3336
def __init__(self, data):
@@ -139,6 +142,13 @@ async def execute(self, query, **options):
139142
raise RawResultInterrupt(res)
140143

141144
if not res["hits"]["hits"]:
145+
scroll_id=query.data
146+
try:
147+
await self.client.clear_scroll(scroll_id=scroll_id)
148+
logger.info("Scroll context cleared: %s", scroll_id)
149+
except NotFoundError as e:
150+
logger.warning("Scroll context not found (ID: %s): %s", scroll_id, str(e))
151+
# Always raise this exception regardless of whether clear_scroll succeeds
142152
raise EndScrollInterrupt()
143153

144154
return res

biothings/web/query/formatter.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,26 @@ class ESResultFormatter(ResultFormatter):
8484
class _Hits(Hits):
8585
def __init__(self, *args, **kwargs):
8686
super().__init__(*args, **kwargs)
87+
# Check if this is an error response from Elasticsearch
88+
if "error" in self.data:
89+
logger.error("ES returned error response: %s", self.data)
90+
raise ValueError("Invalid response format")
91+
8792
# make sure the document is coming from
8893
# elasticsearch at initialization time
89-
assert "hits" in self.data
90-
assert "total" in self.data["hits"]
91-
assert "hits" in self.data["hits"]
94+
if "hits" not in self.data:
95+
logger.error("ES response missing 'hits' field. Response data: %s", self.data)
96+
raise ValueError("Response missing 'hits' field")
97+
if "total" not in self.data["hits"]:
98+
logger.error("ES response missing 'hits.total' field. Response data: %s", self.data)
99+
raise ValueError("Response missing 'hits.total' field")
100+
if "hits" not in self.data["hits"]:
101+
logger.error("ES response missing 'hits.hits' field. Response data: %s", self.data)
102+
raise ValueError("Response missing 'hits.hits' field")
92103
for hit in self.data["hits"]["hits"]:
93-
assert "_source" in hit
104+
if "_source" not in hit:
105+
logger.error("ES hit missing '_source' field. Hit data: %s", hit)
106+
raise ValueError("Hit missing '_source' field")
94107

95108
class _Doc(Doc):
96109
pass

biothings/web/query/pipeline.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,10 @@ async def _(*args, **kwargs):
147147
elif error_type == "index_not_found_exception":
148148
raise QueryPipelineException(500, error_type)
149149

150+
elif error_type == "es_rejected_execution_exception":
151+
# ES cluster is overloaded, all thread pools at capacity
152+
raise QueryPipelineException(503, "Service Unavailable", "Elasticsearch cluster overloaded")
153+
150154
else: # unexpected
151155
raise
152156

tests/web/test_es_exceptions.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,10 @@ async def func():
160160

161161

162162
@pytest.mark.asyncio
163-
async def test_generic_exception():
163+
async def test_index_not_found_exception():
164164
@capturesESExceptions
165165
async def func():
166-
exc = Exception(message="test_generic_exception", meta={}, body={})
166+
exc = Exception(message="test_index_not_found_exception", meta={}, body={})
167167
exc.status_code = 500
168168
exc.info = {"error": {"type": "index_not_found_exception", "reason": "test_reason"}}
169169
raise exc
@@ -175,6 +175,22 @@ async def func():
175175
assert exc_info.value.details == "Exception() takes no keyword arguments"
176176

177177

178+
@pytest.mark.asyncio
179+
async def test_es_rejected_execution_exception():
180+
@capturesESExceptions
181+
async def func():
182+
exc = TransportError("test_es_rejected_execution_exception")
183+
exc.status_code = 503
184+
exc.info = {"error": {"type": "es_rejected_execution_exception", "reason": "rejected execution of TimedRunnable..."}}
185+
raise exc
186+
187+
with pytest.raises(QueryPipelineException) as exc_info:
188+
await func()
189+
assert exc_info.value.code == 503
190+
assert exc_info.value.summary == "Service Unavailable"
191+
assert exc_info.value.details == "Elasticsearch cluster overloaded"
192+
193+
178194
@pytest.mark.asyncio
179195
async def test_search_phase_execution_exception_rejected_execution():
180196
@capturesESExceptions

0 commit comments

Comments
 (0)