@@ -152,119 +152,37 @@ def _transmit(self, envelopes):
152152 return status
153153
154154 text = 'N/A'
155- data = None
155+ status_code = 0
156156 try :
157157 text = response .text
158+ status_code = response .status_code
158159 except Exception as ex :
159160 if not self ._is_stats_exporter ():
160161 logger .warning ('Error while reading response body %s.' , ex )
161- else :
162- try :
163- data = json .loads (text )
164- except Exception :
165- pass
162+ if self ._check_stats_collection ():
163+ _requests_map ['exception' ] = _requests_map .get ('exception' , 0 ) + 1 # noqa: E501
164+ return TransportStatusCode .DROP
166165
167166 if self ._is_stats_exporter () and \
168167 not state .get_statsbeat_shutdown () and \
169168 not state .get_statsbeat_initial_success ():
170169 # If statsbeat exporter, record initialization as success if
171170 # appropriate status code is returned
172- if _reached_ingestion_status_code (response . status_code ):
171+ if _reached_ingestion_status_code (status_code ):
173172 state .set_statsbeat_initial_success (True )
174173 elif _statsbeat_failure_reached_threshold ():
175174 # If ingestion threshold during statsbeat initialization is
176175 # reached, return back code to shut it down
177176 return TransportStatusCode .STATSBEAT_SHUTDOWN
178177
179- if response . status_code == 200 :
178+ if status_code == 200 : # Success
180179 self ._consecutive_redirects = 0
181180 if self ._check_stats_collection ():
182181 with _requests_lock :
183182 _requests_map ['success' ] = _requests_map .get ('success' , 0 ) + 1 # noqa: E501
184183 return TransportStatusCode .SUCCESS
185- # Status code not 200, 439 or 402 counts as failures
186- if self ._check_stats_collection ():
187- if response .status_code != 439 and response .status_code != 402 :
188- with _requests_lock :
189- _requests_map ['failure' ] = _requests_map .get ('failure' , 0 ) + 1 # noqa: E501
190- if response .status_code == 206 : # Partial Content
191- if data :
192- try :
193- resend_envelopes = []
194- for error in data ['errors' ]:
195- if error ['statusCode' ] in (
196- 429 , # Too Many Requests
197- 500 , # Internal Server Error
198- 503 , # Service Unavailable
199- ):
200- resend_envelopes .append (envelopes [error ['index' ]])
201- else :
202- logger .error (
203- 'Data drop %s: %s %s.' ,
204- error ['statusCode' ],
205- error ['message' ],
206- envelopes [error ['index' ]],
207- )
208- if resend_envelopes :
209- if self ._check_stats_collection ():
210- with _requests_lock :
211- _requests_map ['retry' ] = _requests_map .get ('retry' , 0 ) + 1 # noqa: E501
212- self .storage .put (resend_envelopes )
213- except Exception as ex :
214- if not self ._is_stats_exporter ():
215- logger .error (
216- 'Error while processing %s: %s %s.' ,
217- response .status_code ,
218- text ,
219- ex ,
220- )
221- return TransportStatusCode .DROP
222- # cannot parse response body, fallback to retry
223- if response .status_code in (
224- 206 , # Partial Content
225- 429 , # Too Many Requests
226- 500 , # Internal Server Error
227- 503 , # Service Unavailable
228- ):
229- if not self ._is_stats_exporter ():
230- logger .warning (
231- 'Transient server side error %s: %s.' ,
232- response .status_code ,
233- text ,
234- )
235- # server side error (retryable)
236- if self ._check_stats_collection ():
237- with _requests_lock :
238- _requests_map ['retry' ] = _requests_map .get ('retry' , 0 ) + 1 # noqa: E501
239- return TransportStatusCode .RETRY
240- # Authentication error
241- if response .status_code == 401 :
242- if not self ._is_stats_exporter ():
243- logger .warning (
244- 'Authentication error %s: %s.' ,
245- response .status_code ,
246- text ,
247- )
248- if self ._check_stats_collection ():
249- with _requests_lock :
250- _requests_map ['retry' ] = _requests_map .get ('retry' , 0 ) + 1 # noqa: E501
251- return TransportStatusCode .RETRY
252- # Forbidden error
253- # Can occur when v2 endpoint is used while AI resource is configured
254- # with disableLocalAuth
255- if response .status_code == 403 :
256- if not self ._is_stats_exporter ():
257- logger .warning (
258- 'Forbidden error %s: %s.' ,
259- response .status_code ,
260- text ,
261- )
262- if self ._check_stats_collection ():
263- with _requests_lock :
264- _requests_map ['retry' ] = _requests_map .get ('retry' , 0 ) + 1 # noqa: E501
265- return TransportStatusCode .RETRY
266- # Redirect
267- if response .status_code in (307 , 308 ):
184+ elif _status_code_is_redirect (status_code ): # Redirect
185+ # for statsbeat, these are not tracked as success nor failures
268186 self ._consecutive_redirects += 1
269187 if self ._consecutive_redirects < _MAX_CONSECUTIVE_REDIRECTS :
270188 if response .headers :
@@ -290,20 +208,114 @@ def _transmit(self, envelopes):
290208 if self ._check_stats_collection ():
291209 with _requests_lock :
292210 _requests_map ['exception' ] = _requests_map .get ('exception' , 0 ) + 1 # noqa: E501
293- # Other, server side error (non-retryable)
294- if not self ._is_stats_exporter ():
295- logger .error (
296- 'Non-retryable server side error %s: %s.' ,
297- response .status_code ,
298- text ,
299- )
300- if self ._check_stats_collection ():
301- if response .status_code == 402 or response .status_code == 439 :
211+ return TransportStatusCode .DROP
212+ elif _status_code_is_throttle (status_code ): # Throttle
213+ if self ._check_stats_collection ():
302214 # 402: Monthly Quota Exceeded (new SDK)
303215 # 439: Monthly Quota Exceeded (old SDK) <- Currently OC SDK
304216 with _requests_lock :
305217 _requests_map ['throttle' ] = _requests_map .get ('throttle' , 0 ) + 1 # noqa: E501
306- return TransportStatusCode .DROP
218+ return TransportStatusCode .DROP
219+ elif _status_code_is_retryable (status_code ): # Retry
220+ if not self ._is_stats_exporter ():
221+ if status_code == 401 : # Authentication error
222+ logger .warning (
223+ 'Authentication error %s: %s.' ,
224+ status_code ,
225+ text ,
226+ )
227+ elif status_code == 403 :
228+ # Forbidden error
229+ # Can occur when v2 endpoint is used while AI resource is configured # noqa: E501
230+ # with disableLocalAuth
231+ logger .warning (
232+ 'Forbidden error %s: %s.' ,
233+ status_code ,
234+ text ,
235+ )
236+ else :
237+ logger .warning (
238+ 'Transient server side error %s: %s. Retrying.' ,
239+ status_code ,
240+ text ,
241+ )
242+ if self ._check_stats_collection ():
243+ with _requests_lock :
244+ _requests_map ['retry' ] = _requests_map .get ('retry' , 0 ) + 1 # noqa: E501
245+ return TransportStatusCode .RETRY
246+ elif status_code == 206 : # Partial Content
247+ data = None
248+ try :
249+ data = json .loads (text )
250+ except Exception as ex :
251+ if not self ._is_stats_exporter ():
252+ logger .warning ('Error while reading response body %s for partial content.' , ex ) # noqa: E501
253+ if self ._check_stats_collection ():
254+ _requests_map ['exception' ] = _requests_map .get ('exception' , 0 ) + 1 # noqa: E501
255+ return TransportStatusCode .DROP
256+ if data :
257+ try :
258+ resend_envelopes = []
259+ for error in data ['errors' ]:
260+ if _status_code_is_retryable (error ['statusCode' ]):
261+ resend_envelopes .append (envelopes [error ['index' ]])
262+ if self ._check_stats_collection ():
263+ with _requests_lock :
264+ _requests_map ['retry' ] = _requests_map .get ('retry' , 0 ) + 1 # noqa: E501
265+ else :
266+ logger .error (
267+ 'Data drop %s: %s %s.' ,
268+ error ['statusCode' ],
269+ error ['message' ],
270+ envelopes [error ['index' ]],
271+ )
272+ if self .storage and resend_envelopes :
273+ self .storage .put (resend_envelopes )
274+ except Exception as ex :
275+ if not self ._is_stats_exporter ():
276+ logger .error (
277+ 'Error while processing %s: %s %s.' ,
278+ status_code ,
279+ text ,
280+ ex ,
281+ )
282+ if self ._check_stats_collection ():
283+ _requests_map ['exception' ] = _requests_map .get ('exception' , 0 ) + 1 # noqa: E501
284+ return TransportStatusCode .DROP
285+ # cannot parse response body, fallback to retry
286+ else :
287+ # 400 and 404 will be tracked as failure count
288+ # 400 - Invalid - The server cannot or will not process the request due to the invalid telemetry (invalid data, iKey) # noqa: E501
289+ # 404 - Ingestion is allowed only from stamp specific endpoint - must update connection string # noqa: E501
290+ if self ._check_stats_collection ():
291+ with _requests_lock :
292+ _requests_map ['failure' ] = _requests_map .get ('failure' , 0 ) + 1 # noqa: E501
293+ # Other, server side error (non-retryable)
294+ if not self ._is_stats_exporter ():
295+ logger .error (
296+ 'Non-retryable server side error %s: %s.' ,
297+ status_code ,
298+ text ,
299+ )
300+ return TransportStatusCode .DROP
301+
302+
303+ def _status_code_is_redirect (status_code ):
304+ return status_code in (307 , 308 )
305+
306+
307+ def _status_code_is_throttle (status_code ):
308+ return status_code in (402 , 439 )
309+
310+
311+ def _status_code_is_retryable (status_code ):
312+ return status_code in (
313+ 401 , # Unauthorized
314+ 403 , # Forbidden
315+ 429 , # Too many requests
316+ 500 , # Internal server error
317+ 503 , # Service unavailable
318+ )
307319
308320
309321def _reached_ingestion_status_code (status_code ):
0 commit comments