File tree Expand file tree Collapse file tree 3 files changed +25
-2
lines changed
Expand file tree Collapse file tree 3 files changed +25
-2
lines changed Original file line number Diff line number Diff line change @@ -84,6 +84,11 @@ class ESResultFormatter(ResultFormatter):
8484 class _Hits (Hits ):
8585 def __init__ (self , * args , ** kwargs ):
8686 super ().__init__ (* args , ** kwargs )
87+ # Check if this is an error response from Elasticsearch
88+ if "error" in self .data :
89+ logger .error ("ES returned error response: %s" , self .data )
90+ raise ValueError ("Invalid response format" )
91+
8792 # make sure the document is coming from
8893 # elasticsearch at initialization time
8994 if "hits" not in self .data :
Original file line number Diff line number Diff line change @@ -147,6 +147,10 @@ async def _(*args, **kwargs):
147147 elif error_type == "index_not_found_exception" :
148148 raise QueryPipelineException (500 , error_type )
149149
150+ elif error_type == "es_rejected_execution_exception" :
151+ # ES cluster is overloaded, all thread pools at capacity
152+ raise QueryPipelineException (503 , "Service Unavailable" , "Elasticsearch cluster overloaded" )
153+
150154 else : # unexpected
151155 raise
152156
Original file line number Diff line number Diff line change @@ -160,10 +160,10 @@ async def func():
160160
161161
162162@pytest .mark .asyncio
163- async def test_generic_exception ():
163+ async def test_index_not_found_exception ():
164164 @capturesESExceptions
165165 async def func ():
166- exc = Exception (message = "test_generic_exception " , meta = {}, body = {})
166+ exc = Exception (message = "test_index_not_found_exception " , meta = {}, body = {})
167167 exc .status_code = 500
168168 exc .info = {"error" : {"type" : "index_not_found_exception" , "reason" : "test_reason" }}
169169 raise exc
@@ -174,6 +174,20 @@ async def func():
174174 assert exc_info .value .summary == "TypeError"
175175 assert exc_info .value .details == "Exception() takes no keyword arguments"
176176
177+ @pytest .mark .asyncio
178+ async def test_es_rejected_execution_exception ():
179+ @capturesESExceptions
180+ async def func ():
181+ exc = TransportError ("test_es_rejected_execution_exception" )
182+ exc .status_code = 503
183+ exc .info = {"error" : {"type" : "es_rejected_execution_exception" , "reason" : "rejected execution of TimedRunnable..." }}
184+ raise exc
185+
186+ with pytest .raises (QueryPipelineException ) as exc_info :
187+ await func ()
188+ assert exc_info .value .code == 503
189+ assert exc_info .value .summary == "Service Unavailable"
190+ assert exc_info .value .details == "Elasticsearch cluster overloaded"
177191
178192@pytest .mark .asyncio
179193async def test_search_phase_execution_exception_rejected_execution ():
You can’t perform that action at this time.
0 commit comments