Skip to content

Commit 1ea266b

Browse files
authored
Added logic for race conditions and global instance for customer SDKStats (#42655)
* Added logic for race conditions and updated the implementation to use a global instance for customer SDKStats * Updated CHANGELOG * Minor adjustments * Refactored code after statsbeat PR * Fix test * Clean up test file
1 parent 9611aed commit 1ea266b

File tree

9 files changed

+409
-282
lines changed

9 files changed

+409
-282
lines changed

sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
([#42573](https://github.com/Azure/azure-sdk-for-python/pull/42573))
1616
- Customer Facing SDKStats: Exception categorization as per [Spec] - https://github.com/aep-health-and-standards/Telemetry-Collection-Spec/blob/main/ApplicationInsights/sdkstats/customer_facing_sdk_stats.md
1717
([#42695](https://github.com/Azure/azure-sdk-for-python/pull/42695))
18-
18+
- Customer Facing SDKStats: Added logic for race conditions and updated the implementation to use a global instance for customer SDKStats metrics
19+
([#42655](https://github.com/Azure/azure-sdk-for-python/pull/42655))
1920
### Breaking Changes
2021

2122
### Bugs Fixed

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -182,17 +182,17 @@ def __init__(self, language: str, version: str, compute_type: str):
182182
self.version = version
183183
self.compute_type = compute_type
184184

185-
## Map from Azure Monitor envelope names to TelemetryType
185+
## Map from Azure Monitor envelope base_types to TelemetryType
186186
_TYPE_MAP = {
187-
_EVENT_ENVELOPE_NAME: _CUSTOM_EVENT,
188-
_METRIC_ENVELOPE_NAME: _CUSTOM_METRIC,
189-
_REMOTE_DEPENDENCY_ENVELOPE_NAME: _DEPENDENCY,
190-
_EXCEPTION_ENVELOPE_NAME: _EXCEPTION,
191-
_PAGE_VIEW_ENVELOPE_NAME: _PAGE_VIEW,
192-
_MESSAGE_ENVELOPE_NAME: _TRACE,
193-
_REQUEST_ENVELOPE_NAME: _REQUEST,
194-
_PERFORMANCE_COUNTER_ENVELOPE_NAME: _PERFORMANCE_COUNTER,
195-
_AVAILABILITY_ENVELOPE_NAME: _AVAILABILITY,
187+
"EventData": _CUSTOM_EVENT,
188+
"MetricData": _CUSTOM_METRIC,
189+
"RemoteDependencyData": _DEPENDENCY,
190+
"ExceptionData": _EXCEPTION,
191+
"PageViewData": _PAGE_VIEW,
192+
"MessageData": _TRACE,
193+
"RequestData": _REQUEST,
194+
"PerformanceCounterData": _PERFORMANCE_COUNTER,
195+
"AvailabilityData": _AVAILABILITY,
196196
}
197197

198198
# Exception categories

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py

Lines changed: 34 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,14 @@
5656
increment_and_check_statsbeat_failure_count,
5757
is_statsbeat_enabled,
5858
set_statsbeat_initial_success,
59+
is_customer_sdkstats_enabled,
5960
)
6061
from azure.monitor.opentelemetry.exporter.statsbeat._utils import (
6162
_update_requests_map,
63+
_track_dropped_items_from_storage,
6264
_track_dropped_items,
6365
_track_retry_items,
6466
_track_successful_items,
65-
_track_dropped_items_from_storage,
6667
)
6768

6869

@@ -181,8 +182,7 @@ def __init__(self, **kwargs: Any) -> None:
181182
except Exception as e: # pylint: disable=broad-except
182183
logger.warning("Failed to initialize statsbeat metrics: %s", e)
183184

184-
# Initialize customer sdkstats if enabled
185-
self._customer_sdkstats_metrics = None
185+
# customer sdkstats initialization
186186
if self._should_collect_customer_sdkstats():
187187

188188
from azure.monitor.opentelemetry.exporter.statsbeat._customer_sdkstats import collect_customer_sdkstats
@@ -209,16 +209,16 @@ def _handle_transmit_from_storage(self, envelopes: List[TelemetryItem], result:
209209
if result == ExportResult.FAILED_RETRYABLE:
210210
envelopes_to_store = [x.as_dict() for x in envelopes]
211211
result_from_storage_put = self.storage.put(envelopes_to_store)
212-
if self._customer_sdkstats_metrics and self._should_collect_customer_sdkstats():
213-
_track_dropped_items_from_storage(self._customer_sdkstats_metrics, result_from_storage_put, envelopes)
212+
if self._should_collect_customer_sdkstats():
213+
_track_dropped_items_from_storage(result_from_storage_put, envelopes)
214214
elif result == ExportResult.SUCCESS:
215215
# Try to send any cached events
216216
self._transmit_from_storage()
217217

218218
else:
219219
# Track items that would have been retried but are dropped since client has local storage disabled
220-
if self._customer_sdkstats_metrics and self._should_collect_customer_sdkstats():
221-
_track_dropped_items(self._customer_sdkstats_metrics, envelopes, DropCode.CLIENT_STORAGE_DISABLED)
220+
if self._should_collect_customer_sdkstats():
221+
_track_dropped_items(envelopes, DropCode.CLIENT_STORAGE_DISABLED)
222222

223223
# pylint: disable=too-many-branches
224224
# pylint: disable=too-many-nested-blocks
@@ -256,23 +256,23 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
256256
result = ExportResult.SUCCESS
257257

258258
# Track successful items in customer sdkstats
259-
if self._customer_sdkstats_metrics and self._should_collect_customer_sdkstats():
260-
_track_successful_items(self._customer_sdkstats_metrics, envelopes)
259+
if self._should_collect_customer_sdkstats():
260+
_track_successful_items(envelopes)
261261
else: # 206
262262
reach_ingestion = True
263263
resend_envelopes = []
264264
for error in track_response.errors:
265265
if _is_retryable_code(error.status_code):
266266
resend_envelopes.append(envelopes[error.index]) # type: ignore
267267
# Track retried items in customer sdkstats
268-
if self._customer_sdkstats_metrics and self._should_collect_customer_sdkstats():
269-
_track_retry_items(self._customer_sdkstats_metrics, resend_envelopes, error)
268+
if self._should_collect_customer_sdkstats():
269+
_track_retry_items(resend_envelopes, error)
270270
else:
271271
if not self._is_stats_exporter():
272272
# Track dropped items in customer sdkstats, non-retryable scenario
273-
if self._customer_sdkstats_metrics and self._should_collect_customer_sdkstats():
274-
if error is not None and hasattr(error, "index") and error.index is not None:
275-
_track_dropped_items(self._customer_sdkstats_metrics, [envelopes[error.index]], error.status_code)
273+
if self._should_collect_customer_sdkstats():
274+
if error is not None and hasattr(error, "index") and error.index is not None and isinstance(error.status_code, int):
275+
_track_dropped_items([envelopes[error.index]], error.status_code)
276276
logger.error(
277277
"Data drop %s: %s %s.",
278278
error.status_code,
@@ -282,13 +282,13 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
282282
if self.storage and resend_envelopes:
283283
envelopes_to_store = [x.as_dict() for x in resend_envelopes]
284284
result_from_storage = self.storage.put(envelopes_to_store, 0)
285-
if self._customer_sdkstats_metrics and self._should_collect_customer_sdkstats():
286-
_track_dropped_items_from_storage(self._customer_sdkstats_metrics, result_from_storage, resend_envelopes)
285+
if self._should_collect_customer_sdkstats():
286+
_track_dropped_items_from_storage(result_from_storage, resend_envelopes)
287287
self._consecutive_redirects = 0
288288
elif resend_envelopes:
289289
# Track items that would have been retried but are dropped since client has local storage disabled
290-
if self._customer_sdkstats_metrics and self._should_collect_customer_sdkstats():
291-
_track_dropped_items(self._customer_sdkstats_metrics, resend_envelopes, DropCode.CLIENT_STORAGE_DISABLED)
290+
if self._should_collect_customer_sdkstats():
291+
_track_dropped_items(resend_envelopes, DropCode.CLIENT_STORAGE_DISABLED)
292292
# Mark as not retryable because we already write to storage here
293293
result = ExportResult.FAILED_NOT_RETRYABLE
294294
except HttpResponseError as response_error:
@@ -301,8 +301,8 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
301301
result = ExportResult.FAILED_RETRYABLE
302302
# Log error for 401: Unauthorized, 403: Forbidden to assist with customer troubleshooting
303303
if not self._is_stats_exporter():
304-
if self._customer_sdkstats_metrics and self._should_collect_customer_sdkstats():
305-
_track_retry_items(self._customer_sdkstats_metrics, envelopes, response_error)
304+
if self._should_collect_customer_sdkstats():
305+
_track_retry_items(envelopes, response_error)
306306
if response_error.status_code == 401:
307307
logger.error(
308308
"Retryable server side error: %s. " \
@@ -311,8 +311,8 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
311311
response_error.message,
312312
)
313313
elif response_error.status_code == 403:
314-
if self._customer_sdkstats_metrics and self._should_collect_customer_sdkstats():
315-
_track_retry_items(self._customer_sdkstats_metrics, envelopes, response_error)
314+
if self._should_collect_customer_sdkstats():
315+
_track_retry_items(envelopes, response_error)
316316
logger.error(
317317
"Retryable server side error: %s. " \
318318
"Your application may be configured with a token credential " \
@@ -327,8 +327,8 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
327327
result = ExportResult.FAILED_NOT_RETRYABLE
328328

329329
if not self._is_stats_exporter():
330-
if self._customer_sdkstats_metrics and self._should_collect_customer_sdkstats():
331-
_track_dropped_items(self._customer_sdkstats_metrics, envelopes, response_error.status_code)
330+
if self._should_collect_customer_sdkstats() and isinstance(response_error.status_code, int):
331+
_track_dropped_items(envelopes, response_error.status_code)
332332
elif _is_redirect_code(response_error.status_code):
333333
self._consecutive_redirects = self._consecutive_redirects + 1
334334
# pylint: disable=W0212
@@ -346,18 +346,17 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
346346
result = self._transmit(envelopes)
347347
else:
348348
if not self._is_stats_exporter():
349-
if self._customer_sdkstats_metrics and self._should_collect_customer_sdkstats():
350-
_track_dropped_items(self._customer_sdkstats_metrics, envelopes, DropCode.CLIENT_EXCEPTION, _exception_categories.CLIENT_EXCEPTION.value)
349+
if self._should_collect_customer_sdkstats():
350+
_track_dropped_items(envelopes, DropCode.CLIENT_EXCEPTION, _exception_categories.CLIENT_EXCEPTION.value)
351351
logger.error(
352352
"Error parsing redirect information.",
353353
)
354354
result = ExportResult.FAILED_NOT_RETRYABLE
355355
else:
356356
if not self._is_stats_exporter():
357357
# Track dropped items in customer sdkstats, non-retryable scenario
358-
if self._customer_sdkstats_metrics and self._should_collect_customer_sdkstats():
358+
if self._should_collect_customer_sdkstats():
359359
_track_dropped_items(
360-
self._customer_sdkstats_metrics,
361360
envelopes,
362361
DropCode.CLIENT_EXCEPTION,
363362
_exception_categories.CLIENT_EXCEPTION.value
@@ -382,8 +381,8 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
382381
response_error.message,
383382
)
384383
# Track dropped items in customer sdkstats, non-retryable scenario
385-
if self._customer_sdkstats_metrics and self._should_collect_customer_sdkstats():
386-
_track_dropped_items(self._customer_sdkstats_metrics, envelopes, response_error.status_code)
384+
if self._should_collect_customer_sdkstats() and isinstance(response_error.status_code, int):
385+
_track_dropped_items(envelopes, response_error.status_code)
387386
if _is_invalid_code(response_error.status_code):
388387
# Shutdown statsbeat on invalid code from customer endpoint
389388
# Import here to avoid circular dependencies
@@ -405,8 +404,8 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
405404
logger.warning("Retrying due to server request error: %s.", request_error.message)
406405

407406
# Track retry items in customer sdkstats for client-side exceptions
408-
if self._customer_sdkstats_metrics and self._should_collect_customer_sdkstats():
409-
_track_retry_items(self._customer_sdkstats_metrics, envelopes, request_error)
407+
if self._should_collect_customer_sdkstats():
408+
_track_retry_items(envelopes, request_error)
410409

411410
if self._should_collect_stats():
412411
exc_type = request_error.exc_type
@@ -418,8 +417,8 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
418417
logger.exception("Envelopes could not be exported and are not retryable: %s.", ex) # pylint: disable=C4769
419418

420419
# Track dropped items in customer sdkstats for general exceptions
421-
if self._customer_sdkstats_metrics and self._should_collect_customer_sdkstats():
422-
_track_dropped_items(self._customer_sdkstats_metrics, envelopes, DropCode.CLIENT_EXCEPTION, _exception_categories.CLIENT_EXCEPTION.value)
420+
if self._should_collect_customer_sdkstats():
421+
_track_dropped_items(envelopes, DropCode.CLIENT_EXCEPTION, _exception_categories.CLIENT_EXCEPTION.value)
423422

424423
if self._should_collect_stats():
425424
_update_requests_map(_REQ_EXCEPTION_NAME[1], value=ex.__class__.__name__)
@@ -466,11 +465,9 @@ def _should_collect_stats(self):
466465

467466
# check to see whether its the case of customer sdkstats collection
468467
def _should_collect_customer_sdkstats(self):
469-
env_value = os.environ.get("APPLICATIONINSIGHTS_SDKSTATS_ENABLED_PREVIEW", "")
470-
is_customer_sdkstats_enabled = env_value.lower() == "true"
471468
# Don't collect customer sdkstats for instrumentation collection, sdkstats exporter or customer sdkstats exporter
472469
return (
473-
is_customer_sdkstats_enabled
470+
is_customer_sdkstats_enabled()
474471
and not get_customer_sdkstats_shutdown()
475472
and not self._is_stats_exporter()
476473
and not self._is_customer_sdkstats_exporter()

0 commit comments

Comments
 (0)