42
42
_REQ_THROTTLE_NAME ,
43
43
_RETRYABLE_STATUS_CODES ,
44
44
_THROTTLE_STATUS_CODES ,
45
+ DropCode ,
45
46
)
46
47
from azure .monitor .opentelemetry .exporter ._connection_string_parser import ConnectionStringParser
47
48
from azure .monitor .opentelemetry .exporter ._storage import LocalFileStorage
53
54
is_statsbeat_enabled ,
54
55
set_statsbeat_initial_success ,
55
56
)
56
- from azure .monitor .opentelemetry .exporter .statsbeat ._utils import _update_requests_map
57
+ from azure .monitor .opentelemetry .exporter .statsbeat ._utils import (
58
+ _update_requests_map ,
59
+ _track_dropped_items ,
60
+ _track_retry_items ,
61
+ _track_successful_items ,
62
+ )
63
+
57
64
58
65
logger = logging .getLogger (__name__ )
59
66
@@ -162,6 +169,14 @@ def __init__(self, **kwargs: Any) -> None:
162
169
163
170
collect_statsbeat_metrics (self )
164
171
172
+ # Initialize customer statsbeat if enabled
173
+ self ._customer_statsbeat_metrics = None
174
+ if self ._should_collect_customer_statsbeat ():
175
+
176
+ from azure .monitor .opentelemetry .exporter .statsbeat ._customer_statsbeat import collect_customer_statsbeat
177
+ # Collect customer statsbeat metrics
178
+ collect_customer_statsbeat (self )
179
+
165
180
def _transmit_from_storage (self ) -> None :
166
181
if not self .storage :
167
182
return
@@ -184,6 +199,10 @@ def _handle_transmit_from_storage(self, envelopes: List[TelemetryItem], result:
184
199
elif result == ExportResult .SUCCESS :
185
200
# Try to send any cached events
186
201
self ._transmit_from_storage ()
202
+ else :
203
+ # Track items that would have been retried but are dropped since client has local storage disabled
204
+ if self ._customer_statsbeat_metrics and self ._should_collect_customer_statsbeat ():
205
+ _track_dropped_items (self ._customer_statsbeat_metrics , envelopes , DropCode .CLIENT_STORAGE_DISABLED )
187
206
188
207
# pylint: disable=too-many-branches
189
208
# pylint: disable=too-many-nested-blocks
@@ -219,14 +238,25 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
219
238
_update_requests_map (_REQ_SUCCESS_NAME [1 ], 1 )
220
239
reach_ingestion = True
221
240
result = ExportResult .SUCCESS
241
+
242
+ # Track successful items in customer statsbeat
243
+ if self ._customer_statsbeat_metrics and self ._should_collect_customer_statsbeat ():
244
+ _track_successful_items (self ._customer_statsbeat_metrics , envelopes )
222
245
else : # 206
223
246
reach_ingestion = True
224
247
resend_envelopes = []
225
248
for error in track_response .errors :
226
249
if _is_retryable_code (error .status_code ):
227
250
resend_envelopes .append (envelopes [error .index ]) # type: ignore
251
+ # Track retried items in customer statsbeat
252
+ if self ._customer_statsbeat_metrics and self ._should_collect_customer_statsbeat ():
253
+ _track_retry_items (self ._customer_statsbeat_metrics , resend_envelopes , error )
228
254
else :
229
255
if not self ._is_stats_exporter ():
256
+ # Track dropped items in customer statsbeat, non-retryable scenario
257
+ if self ._customer_statsbeat_metrics and self ._should_collect_customer_statsbeat ():
258
+ if error is not None and hasattr (error , "index" ) and error .index is not None :
259
+ _track_dropped_items (self ._customer_statsbeat_metrics , [envelopes [error .index ]], error .status_code )
230
260
logger .error (
231
261
"Data drop %s: %s %s." ,
232
262
error .status_code ,
@@ -237,6 +267,10 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
237
267
envelopes_to_store = [x .as_dict () for x in resend_envelopes ]
238
268
self .storage .put (envelopes_to_store , 0 )
239
269
self ._consecutive_redirects = 0
270
+ elif resend_envelopes :
271
+ # Track items that would have been retried but are dropped since client has local storage disabled
272
+ if self ._customer_statsbeat_metrics and self ._should_collect_customer_statsbeat ():
273
+ _track_dropped_items (self ._customer_statsbeat_metrics , resend_envelopes , DropCode .CLIENT_STORAGE_DISABLED )
240
274
# Mark as not retryable because we already write to storage here
241
275
result = ExportResult .FAILED_NOT_RETRYABLE
242
276
except HttpResponseError as response_error :
@@ -249,6 +283,8 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
249
283
result = ExportResult .FAILED_RETRYABLE
250
284
# Log error for 401: Unauthorized, 403: Forbidden to assist with customer troubleshooting
251
285
if not self ._is_stats_exporter ():
286
+ if self ._customer_statsbeat_metrics and self ._should_collect_customer_statsbeat ():
287
+ _track_retry_items (self ._customer_statsbeat_metrics , envelopes , response_error )
252
288
if response_error .status_code == 401 :
253
289
logger .error (
254
290
"Retryable server side error: %s. " \
@@ -257,6 +293,8 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
257
293
response_error .message ,
258
294
)
259
295
elif response_error .status_code == 403 :
296
+ if self ._customer_statsbeat_metrics and self ._should_collect_customer_statsbeat ():
297
+ _track_retry_items (self ._customer_statsbeat_metrics , envelopes , response_error )
260
298
logger .error (
261
299
"Retryable server side error: %s. " \
262
300
"Your application may be configured with a token credential " \
@@ -269,6 +307,10 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
269
307
if self ._should_collect_stats ():
270
308
_update_requests_map (_REQ_THROTTLE_NAME [1 ], value = response_error .status_code )
271
309
result = ExportResult .FAILED_NOT_RETRYABLE
310
+
311
+ if not self ._is_stats_exporter ():
312
+ if self ._customer_statsbeat_metrics and self ._should_collect_customer_statsbeat ():
313
+ _track_dropped_items (self ._customer_statsbeat_metrics , envelopes , response_error .status_code )
272
314
elif _is_redirect_code (response_error .status_code ):
273
315
self ._consecutive_redirects = self ._consecutive_redirects + 1
274
316
# pylint: disable=W0212
@@ -286,12 +328,22 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
286
328
result = self ._transmit (envelopes )
287
329
else :
288
330
if not self ._is_stats_exporter ():
331
+ if self ._customer_statsbeat_metrics and self ._should_collect_customer_statsbeat ():
332
+ _track_dropped_items (self ._customer_statsbeat_metrics , envelopes , DropCode .CLIENT_EXCEPTION , "Error parsing redirect information." )
289
333
logger .error (
290
334
"Error parsing redirect information." ,
291
335
)
292
336
result = ExportResult .FAILED_NOT_RETRYABLE
293
337
else :
294
338
if not self ._is_stats_exporter ():
339
+ # Track dropped items in customer statsbeat, non-retryable scenario
340
+ if self ._customer_statsbeat_metrics and self ._should_collect_customer_statsbeat ():
341
+ _track_dropped_items (
342
+ self ._customer_statsbeat_metrics ,
343
+ envelopes ,
344
+ DropCode .CLIENT_EXCEPTION ,
345
+ "Error sending telemetry because of circular redirects. Please check the integrity of your connection string."
346
+ )
295
347
logger .error (
296
348
"Error sending telemetry because of circular redirects. "
297
349
"Please check the integrity of your connection string."
@@ -311,20 +363,33 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
311
363
"Non-retryable server side error: %s." ,
312
364
response_error .message ,
313
365
)
366
+ # Track dropped items in customer statsbeat, non-retryable scenario
367
+ if self ._customer_statsbeat_metrics and self ._should_collect_customer_statsbeat ():
368
+ _track_dropped_items (self ._customer_statsbeat_metrics , envelopes , response_error .status_code )
314
369
if _is_invalid_code (response_error .status_code ):
315
370
# Shutdown statsbeat on invalid code from customer endpoint
316
371
# Import here to avoid circular dependencies
317
372
from azure .monitor .opentelemetry .exporter .statsbeat ._statsbeat import (
318
373
shutdown_statsbeat_metrics ,
319
374
)
375
+ from azure .monitor .opentelemetry .exporter .statsbeat ._customer_statsbeat import (
376
+ shutdown_customer_statsbeat_metrics ,
377
+ )
320
378
321
379
shutdown_statsbeat_metrics ()
380
+ # Also shutdown customer statsbeat on invalid code
381
+ shutdown_customer_statsbeat_metrics ()
322
382
result = ExportResult .FAILED_NOT_RETRYABLE
323
383
except ServiceRequestError as request_error :
324
384
# Errors when we're fairly sure that the server did not receive the
325
385
# request, so it should be safe to retry.
326
386
# ServiceRequestError is raised by azure.core for these cases
327
387
logger .warning ("Retrying due to server request error: %s." , request_error .message )
388
+
389
+ # Track retry items in customer statsbeat for client-side exceptions
390
+ if self ._customer_statsbeat_metrics and self ._should_collect_customer_statsbeat ():
391
+ _track_retry_items (self ._customer_statsbeat_metrics , envelopes , request_error )
392
+
328
393
if self ._should_collect_stats ():
329
394
exc_type = request_error .exc_type
330
395
if exc_type is None or exc_type is type (None ):
@@ -333,6 +398,11 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
333
398
result = ExportResult .FAILED_RETRYABLE
334
399
except Exception as ex :
335
400
logger .exception ("Envelopes could not be exported and are not retryable: %s." , ex ) # pylint: disable=C4769
401
+
402
+ # Track dropped items in customer statsbeat for general exceptions
403
+ if self ._customer_statsbeat_metrics and self ._should_collect_customer_statsbeat ():
404
+ _track_dropped_items (self ._customer_statsbeat_metrics , envelopes , DropCode .CLIENT_EXCEPTION , str (ex ))
405
+
336
406
if self ._should_collect_stats ():
337
407
_update_requests_map (_REQ_EXCEPTION_NAME [1 ], value = ex .__class__ .__name__ )
338
408
result = ExportResult .FAILED_NOT_RETRYABLE
@@ -355,6 +425,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
355
425
)
356
426
357
427
shutdown_statsbeat_metrics ()
428
+
358
429
# pylint: disable=lost-exception
359
430
return ExportResult .FAILED_NOT_RETRYABLE # pylint: disable=W0134
360
431
# pylint: disable=lost-exception
@@ -373,13 +444,31 @@ def _should_collect_stats(self):
373
444
and not self ._instrumentation_collection
374
445
)
375
446
447
+
448
+ # check to see whether its the case of customer stats collection
449
+ def _should_collect_customer_statsbeat (self ):
450
+ # Import here to avoid circular dependencies
451
+ from azure .monitor .opentelemetry .exporter .statsbeat ._customer_statsbeat import get_customer_statsbeat_shutdown
452
+
453
+ env_value = os .environ .get ("APPLICATIONINSIGHTS_STATSBEAT_ENABLED_PREVIEW" , "" )
454
+ is_customer_statsbeat_enabled = env_value .lower () == "true"
455
+ # Don't collect customer statsbeat for instrumentation collection or customer statsbeat exporters
456
+ return (
457
+ is_customer_statsbeat_enabled
458
+ and not get_customer_statsbeat_shutdown ()
459
+ and not self ._is_customer_stats_exporter ()
460
+ and not self ._instrumentation_collection
461
+ )
462
+
376
463
# check to see if statsbeat is in "attempting to be initialized" state
377
464
def _is_statsbeat_initializing_state (self ):
378
465
return self ._is_stats_exporter () and not get_statsbeat_shutdown () and not get_statsbeat_initial_success ()
379
466
380
467
def _is_stats_exporter (self ):
381
468
return self .__class__ .__name__ == "_StatsBeatExporter"
382
469
470
+ def _is_customer_stats_exporter (self ):
471
+ return getattr (self , '_is_customer_statsbeat' , False )
383
472
384
473
def _is_invalid_code (response_code : Optional [int ]) -> bool :
385
474
"""Determine if response is a invalid response.
0 commit comments