Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions biothings/web/query/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,26 @@ class ESResultFormatter(ResultFormatter):
class _Hits(Hits):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Check if this is an error response from Elasticsearch
if "error" in self.data:
logger.error("ES returned error response: %s", self.data)
raise ValueError("Invalid response format")

# make sure the document is coming from
# elasticsearch at initialization time
assert "hits" in self.data
assert "total" in self.data["hits"]
assert "hits" in self.data["hits"]
if "hits" not in self.data:
logger.error("ES response missing 'hits' field. Response data: %s", self.data)
raise ValueError("Response missing 'hits' field")
if "total" not in self.data["hits"]:
logger.error("ES response missing 'hits.total' field. Response data: %s", self.data)
raise ValueError("Response missing 'hits.total' field")
if "hits" not in self.data["hits"]:
logger.error("ES response missing 'hits.hits' field. Response data: %s", self.data)
raise ValueError("Response missing 'hits.hits' field")
for hit in self.data["hits"]["hits"]:
assert "_source" in hit
if "_source" not in hit:
logger.error("ES hit missing '_source' field. Hit data: %s", hit)
raise ValueError("Hit missing '_source' field")

class _Doc(Doc):
pass
Expand Down
4 changes: 4 additions & 0 deletions biothings/web/query/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ async def _(*args, **kwargs):
elif error_type == "index_not_found_exception":
raise QueryPipelineException(500, error_type)

elif error_type == "es_rejected_execution_exception":
# ES cluster is overloaded, all thread pools at capacity
raise QueryPipelineException(503, "Service Unavailable", "Elasticsearch cluster overloaded")

else: # unexpected
raise

Expand Down
20 changes: 18 additions & 2 deletions tests/web/test_es_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,10 @@ async def func():


@pytest.mark.asyncio
async def test_generic_exception():
async def test_index_not_found_exception():
@capturesESExceptions
async def func():
exc = Exception(message="test_generic_exception", meta={}, body={})
exc = Exception(message="test_index_not_found_exception", meta={}, body={})
exc.status_code = 500
exc.info = {"error": {"type": "index_not_found_exception", "reason": "test_reason"}}
raise exc
Expand All @@ -175,6 +175,22 @@ async def func():
assert exc_info.value.details == "Exception() takes no keyword arguments"


@pytest.mark.asyncio
async def test_es_rejected_execution_exception():
@capturesESExceptions
async def func():
exc = TransportError("test_es_rejected_execution_exception")
exc.status_code = 503
exc.info = {"error": {"type": "es_rejected_execution_exception", "reason": "rejected execution of TimedRunnable..."}}
raise exc

with pytest.raises(QueryPipelineException) as exc_info:
await func()
assert exc_info.value.code == 503
assert exc_info.value.summary == "Service Unavailable"
assert exc_info.value.details == "Elasticsearch cluster overloaded"


@pytest.mark.asyncio
async def test_search_phase_execution_exception_rejected_execution():
@capturesESExceptions
Expand Down