Skip to content

Commit 2eb11f6

Browse files
authored
Refactor statsbeat to use StatsbeatManager (#42716)
1 parent fa4b6ea commit 2eb11f6

File tree

22 files changed

+1677
-4276
lines changed

22 files changed

+1677
-4276
lines changed

sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
([#42360] https://github.com/Azure/azure-sdk-for-python/pull/42360)
2525
- Configuration manager/worker fetch via OneSettings part 2 - Concurrency and refactoring of _ConfigurationManager
2626
([#42508] https://github.com/Azure/azure-sdk-for-python/pull/42508)
27+
- Refactoring of statsbeat to use `StatsbeatManager`
28+
([#42716] https://github.com/Azure/azure-sdk-for-python/pull/42716)
2729

2830
## 1.0.0b41 (2025-07-31)
2931

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,15 @@
9090

9191
# Statsbeat
9292
# (OpenTelemetry metric name, Statsbeat metric name)
93+
# Note: OpenTelemetry SDK normalizes metric names to lowercase, so first element should be lowercase
9394
_ATTACH_METRIC_NAME = ("attach", "Attach")
9495
_FEATURE_METRIC_NAME = ("feature", "Feature")
95-
_REQ_EXCEPTION_NAME = ("statsbeat_exception_count", "Exception_Count")
96-
_REQ_DURATION_NAME = ("statsbeat_duration", "Request_Duration")
97-
_REQ_FAILURE_NAME = ("statsbeat_failure_count", "Request_Failure_Count")
98-
_REQ_RETRY_NAME = ("statsbeat_retry_count", "Retry_Count")
99-
_REQ_SUCCESS_NAME = ("statsbeat_success_count", "Request_Success_Count")
100-
_REQ_THROTTLE_NAME = ("statsbeat_throttle_count", "Throttle_Count")
96+
_REQ_EXCEPTION_NAME = ("exception_count", "Exception_Count")
97+
_REQ_DURATION_NAME = ("request_duration", "Request_Duration")
98+
_REQ_FAILURE_NAME = ("request_failure_count", "Request_Failure_Count")
99+
_REQ_RETRY_NAME = ("retry_count", "Retry_Count")
100+
_REQ_SUCCESS_NAME = ("request_success_count", "Request_Success_Count")
101+
_REQ_THROTTLE_NAME = ("throttle_count", "Throttle_Count")
101102

102103
_STATSBEAT_METRIC_NAME_MAPPINGS = dict(
103104
[
@@ -117,8 +118,8 @@
117118
# pylint: disable=line-too-long
118119
_DEFAULT_NON_EU_STATS_CONNECTION_STRING = "InstrumentationKey=c4a29126-a7cb-47e5-b348-11414998b11e;IngestionEndpoint=https://westus-0.in.applicationinsights.azure.com/"
119120
_DEFAULT_EU_STATS_CONNECTION_STRING = "InstrumentationKey=7dc56bab-3c0c-4e9f-9ebb-d1acadee8d0f;IngestionEndpoint=https://westeurope-5.in.applicationinsights.azure.com/"
120-
_DEFAULT_STATS_SHORT_EXPORT_INTERVAL = 900 # 15 minutes
121-
_DEFAULT_STATS_LONG_EXPORT_INTERVAL = 86400 # 24 hours
121+
_DEFAULT_STATS_SHORT_EXPORT_INTERVAL = 15 * 60 # 15 minutes in s
122+
_DEFAULT_STATS_LONG_EXPORT_INTERVAL = 24 * 60 * 60 # 24 hours in s
122123
_EU_ENDPOINTS = [
123124
"westeurope",
124125
"northeurope",

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_generated/models/_models_py3.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1320,7 +1320,7 @@ class TrackResponse(msrest.serialization.Model):
13201320
"errors": {"key": "errors", "type": "[TelemetryErrorDetails]"},
13211321
}
13221322

1323-
def __init__(
1323+
def __init__( # type: ignore
13241324
self,
13251325
*,
13261326
items_received: Optional[int] = None,

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_storage.py

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import random
99
import subprocess
1010
import errno
11-
from typing import Union
11+
from typing import Union, Optional, Any, Generator, Tuple, List, Type
1212
from enum import Enum
1313

1414
from azure.monitor.opentelemetry.exporter._utils import PeriodicTask
@@ -26,15 +26,15 @@
2626
os.environ.get("SYSTEMDRIVE", "C:"), r"\Windows\System32\icacls.exe"
2727
)
2828

29-
def _fmt(timestamp):
29+
def _fmt(timestamp: datetime.datetime) -> str:
3030
return timestamp.strftime("%Y-%m-%dT%H%M%S.%f")
3131

3232

33-
def _now():
33+
def _now() -> datetime.datetime:
3434
return datetime.datetime.now(tz=datetime.timezone.utc)
3535

3636

37-
def _seconds(seconds):
37+
def _seconds(seconds: int) -> datetime.timedelta:
3838
return datetime.timedelta(seconds=seconds)
3939

4040
class StorageExportResult(Enum):
@@ -45,24 +45,24 @@ class StorageExportResult(Enum):
4545

4646
# pylint: disable=broad-except
4747
class LocalFileBlob:
48-
def __init__(self, fullpath):
48+
def __init__(self, fullpath: str) -> None:
4949
self.fullpath = fullpath
5050

51-
def delete(self):
51+
def delete(self) -> None:
5252
try:
5353
os.remove(self.fullpath)
5454
except Exception:
5555
pass # keep silent
5656

57-
def get(self):
57+
def get(self) -> Optional[Tuple[Any, ...]]:
5858
try:
5959
with open(self.fullpath, "r", encoding="utf-8") as file:
6060
return tuple(json.loads(line.strip()) for line in file.readlines())
6161
except Exception:
6262
pass # keep silent
6363
return None
6464

65-
def put(self, data, lease_period=0) -> Union[StorageExportResult, str]:
65+
def put(self, data: List[Any], lease_period: int = 0) -> Union[StorageExportResult, str]:
6666
try:
6767
fullpath = self.fullpath + ".tmp"
6868
with open(fullpath, "w", encoding="utf-8") as file:
@@ -80,7 +80,7 @@ def put(self, data, lease_period=0) -> Union[StorageExportResult, str]:
8080
except Exception as ex:
8181
return str(ex)
8282

83-
def lease(self, period):
83+
def lease(self, period: int) -> Optional['LocalFileBlob']:
8484
timestamp = _now() + _seconds(period)
8585
fullpath = self.fullpath
8686
if fullpath.endswith(".lock"):
@@ -98,14 +98,14 @@ def lease(self, period):
9898
class LocalFileStorage:
9999
def __init__(
100100
self,
101-
path,
102-
max_size=50 * 1024 * 1024, # 50MiB
103-
maintenance_period=60, # 1 minute
104-
retention_period=48 * 60 * 60, # 48 hours
105-
write_timeout=60, # 1 minute,
106-
name=None,
107-
lease_period=60, # 1 minute
108-
):
101+
path: str,
102+
max_size: int = 50 * 1024 * 1024, # 50MiB
103+
maintenance_period: int = 60, # 1 minute
104+
retention_period: int = 48 * 60 * 60, # 48 hours
105+
write_timeout: int = 60, # 1 minute,
106+
name: Optional[str] = None,
107+
lease_period: int = 60, # 1 minute
108+
) -> None:
109109
self._path = os.path.abspath(path)
110110
self._max_size = max_size
111111
self._retention_period = retention_period
@@ -124,19 +124,24 @@ def __init__(
124124
else:
125125
logger.error("Could not set secure permissions on storage folder, local storage is disabled.")
126126

127-
def close(self):
127+
def close(self) -> None:
128128
if self._enabled:
129129
self._maintenance_task.cancel()
130130
self._maintenance_task.join()
131131

132-
def __enter__(self):
132+
def __enter__(self) -> 'LocalFileStorage':
133133
return self
134134

135135
# pylint: disable=redefined-builtin
136-
def __exit__(self, type, value, traceback):
136+
def __exit__(
137+
self,
138+
exc_type: Optional[Type[BaseException]],
139+
exc_value: Optional[BaseException],
140+
traceback: Optional[Any]
141+
) -> None:
137142
self.close()
138143

139-
def _maintenance_routine(self):
144+
def _maintenance_routine(self) -> None:
140145
try:
141146
# pylint: disable=unused-variable
142147
for blob in self.gets():
@@ -145,7 +150,7 @@ def _maintenance_routine(self):
145150
pass # keep silent
146151

147152
# pylint: disable=too-many-nested-blocks
148-
def gets(self):
153+
def gets(self) -> Generator[LocalFileBlob, None, None]:
149154
if self._enabled:
150155
now = _now()
151156
lease_deadline = _fmt(now)
@@ -184,7 +189,7 @@ def gets(self):
184189
else:
185190
pass
186191

187-
def get(self):
192+
def get(self) -> Optional['LocalFileBlob']:
188193
if not self._enabled:
189194
return None
190195
cursor = self.gets()
@@ -194,7 +199,7 @@ def get(self):
194199
pass
195200
return None
196201

197-
def put(self, data, lease_period=None) -> Union[StorageExportResult, str]:
202+
def put(self, data: List[Any], lease_period: Optional[int] = None) -> Union[StorageExportResult, str]:
198203
try:
199204
if not self._enabled:
200205
if get_local_storage_setup_state_readonly():
@@ -221,7 +226,7 @@ def put(self, data, lease_period=None) -> Union[StorageExportResult, str]:
221226
return str(ex)
222227

223228

224-
def _check_and_set_folder_permissions(self):
229+
def _check_and_set_folder_permissions(self) -> bool:
225230
"""
226231
Validate and set folder permissions where the telemetry data will be stored.
227232
:return: True if folder was created and permissions set successfully, False otherwise.
@@ -266,7 +271,7 @@ def _check_and_set_folder_permissions(self):
266271
set_local_storage_setup_state_exception(str(ex))
267272
return False
268273

269-
def _check_storage_size(self):
274+
def _check_storage_size(self) -> bool:
270275
size = 0
271276
# pylint: disable=unused-variable
272277
for dirpath, dirnames, filenames in os.walk(self._path):
@@ -295,7 +300,7 @@ def _check_storage_size(self):
295300
return False
296301
return True
297302

298-
def _get_current_user(self):
303+
def _get_current_user(self) -> str:
299304
user = ""
300305
domain = os.environ.get("USERDOMAIN")
301306
username = os.environ.get("USERNAME")

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -380,10 +380,13 @@ def _get_scope(aad_audience=None):
380380

381381
class Singleton(type):
382382
_instance = None
383+
_lock = threading.Lock()
383384

384-
def __call__(cls, *args, **kwargs):
385+
def __call__(cls, *args: Any, **kwargs: Any):
385386
if not cls._instance:
386-
cls._instance = super(Singleton, cls).__call__(*args, **kwargs)
387+
with cls._lock:
388+
if not cls._instance:
389+
cls._instance = super(Singleton, cls).__call__(*args, **kwargs)
387390
return cls._instance
388391

389392
def _get_telemetry_type(item: TelemetryItem):

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
from azure.monitor.opentelemetry.exporter.statsbeat._state import (
5252
get_statsbeat_initial_success,
5353
get_statsbeat_shutdown,
54+
get_customer_sdkstats_shutdown,
5455
increment_and_check_statsbeat_failure_count,
5556
is_statsbeat_enabled,
5657
set_statsbeat_initial_success,
@@ -99,9 +100,11 @@ def __init__(self, **kwargs: Any) -> None:
99100
# self._configuration_manager = _ConfigurationManager()
100101

101102
self._api_version = kwargs.get("api_version") or _SERVICE_API_LATEST
103+
# We do not need to use entra Id if this is a sdkStats exporter
102104
if self._is_stats_exporter():
103105
self._credential = None
104106
else:
107+
# We use the credential on a regular exporter or customer sdkStats exporter
105108
self._credential = _get_authentication_credential(**kwargs)
106109
self._consecutive_redirects = 0 # To prevent circular redirects
107110
self._disable_offline_storage = kwargs.get("disable_offline_storage", False)
@@ -157,8 +160,8 @@ def __init__(self, **kwargs: Any) -> None:
157160
)
158161
self.storage = None
159162
if not self._disable_offline_storage:
160-
self.storage = LocalFileStorage(
161-
path=self._storage_directory,
163+
self.storage = LocalFileStorage( # pyright: ignore
164+
path=self._storage_directory, # type: ignore
162165
max_size=self._storage_max_size,
163166
maintenance_period=self._storage_maintenance_period,
164167
retention_period=self._storage_retention_period,
@@ -170,10 +173,12 @@ def __init__(self, **kwargs: Any) -> None:
170173

171174
# statsbeat initialization
172175
if self._should_collect_stats():
173-
# Import here to avoid circular dependencies
174-
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import collect_statsbeat_metrics
175-
176-
collect_statsbeat_metrics(self)
176+
try:
177+
# Import here to avoid circular dependencies
178+
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import collect_statsbeat_metrics
179+
collect_statsbeat_metrics(self)
180+
except Exception as e: # pylint: disable=broad-except
181+
logger.warning("Failed to initialize statsbeat metrics: %s", e)
177182

178183
# Initialize customer sdkstats if enabled
179184
self._customer_sdkstats_metrics = None
@@ -453,21 +458,20 @@ def _should_collect_stats(self):
453458
is_statsbeat_enabled()
454459
and not get_statsbeat_shutdown()
455460
and not self._is_stats_exporter()
461+
and not self._is_customer_sdkstats_exporter()
456462
and not self._instrumentation_collection
457463
)
458464

459465

460466
# check to see whether its the case of customer sdkstats collection
461467
def _should_collect_customer_sdkstats(self):
462-
# Import here to avoid circular dependencies
463-
from azure.monitor.opentelemetry.exporter.statsbeat._state import get_customer_sdkstats_shutdown
464-
465468
env_value = os.environ.get("APPLICATIONINSIGHTS_SDKSTATS_ENABLED_PREVIEW", "")
466469
is_customer_sdkstats_enabled = env_value.lower() == "true"
467-
# Don't collect customer sdkstats for instrumentation collection or customer sdkstats exporter
470+
# Don't collect customer sdkstats for instrumentation collection, sdkstats exporter or customer sdkstats exporter
468471
return (
469472
is_customer_sdkstats_enabled
470473
and not get_customer_sdkstats_shutdown()
474+
and not self._is_stats_exporter()
471475
and not self._is_customer_sdkstats_exporter()
472476
and not self._instrumentation_collection
473477
)
@@ -477,7 +481,7 @@ def _is_statsbeat_initializing_state(self):
477481
return self._is_stats_exporter() and not get_statsbeat_shutdown() and not get_statsbeat_initial_success()
478482

479483
def _is_stats_exporter(self):
480-
return self.__class__.__name__ == "_StatsBeatExporter"
484+
return getattr(self, "_is_sdkstats", False)
481485

482486
def _is_customer_sdkstats_exporter(self):
483487
return getattr(self, '_is_customer_sdkstats', False)

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/metrics/_exporter.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
_APPLICATIONINSIGHTS_METRIC_NAMESPACE_OPT_IN,
4141
_AUTOCOLLECTED_INSTRUMENT_NAMES,
4242
_METRIC_ENVELOPE_NAME,
43+
_STATSBEAT_METRIC_NAME_MAPPINGS,
4344
)
4445
from azure.monitor.opentelemetry.exporter import _utils
4546
from azure.monitor.opentelemetry.exporter._generated.models import (
@@ -75,13 +76,15 @@ class AzureMonitorMetricExporter(BaseExporter, MetricExporter):
7576
"""Azure Monitor Metric exporter for OpenTelemetry."""
7677

7778
def __init__(self, **kwargs: Any) -> None:
79+
self._is_sdkstats = kwargs.get("is_sdkstats", False)
80+
self._is_customer_sdkstats = kwargs.get("is_customer_sdkstats", False)
81+
self._metrics_to_log_analytics = self._determine_metrics_to_log_analytics()
7882
BaseExporter.__init__(self, **kwargs)
7983
MetricExporter.__init__(
8084
self,
8185
preferred_temporality=APPLICATION_INSIGHTS_METRIC_TEMPORALITIES, # type: ignore
8286
preferred_aggregation=kwargs.get("preferred_aggregation"), # type: ignore
8387
)
84-
self._metrics_to_log_analytics = self._determine_metrics_to_log_analytics()
8588

8689
# pylint: disable=R1702
8790
def export(
@@ -157,7 +160,13 @@ def _point_to_envelope(
157160
# When Metrics to Log Analytics is disabled, only send Standard metrics and _OTELRESOURCE_
158161
if not self._metrics_to_log_analytics and name not in _AUTOCOLLECTED_INSTRUMENT_NAMES:
159162
return None
160-
envelope = _convert_point_to_envelope(point, name, resource, scope)
163+
164+
# Apply statsbeat metric name mapping if this is a statsbeat exporter
165+
final_metric_name = name
166+
if self._is_sdkstats and name in _STATSBEAT_METRIC_NAME_MAPPINGS:
167+
final_metric_name = _STATSBEAT_METRIC_NAME_MAPPINGS[name]
168+
169+
envelope = _convert_point_to_envelope(point, final_metric_name, resource, scope)
161170
if name in _AUTOCOLLECTED_INSTRUMENT_NAMES:
162171
envelope = _handle_std_metric_envelope(envelope, name, point.attributes) # type: ignore
163172
if envelope is not None:
@@ -182,8 +191,11 @@ def _determine_metrics_to_log_analytics(self) -> bool:
182191
:return: False if metrics should not be sent to Log Analytics, True otherwise.
183192
:rtype: bool
184193
"""
194+
# If sdkStats exporter, always send to LA
195+
if self._is_sdkstats:
196+
return True
185197
# Disabling metrics to Log Analytics via env var is currently only specified for AKS Attach scenarios.
186-
if not _utils._is_on_aks() or not _utils._is_attach_enabled() or self._is_stats_exporter():
198+
if not _utils._is_on_aks() or not _utils._is_attach_enabled():
187199
return True
188200
env_var = os.environ.get(_APPLICATIONINSIGHTS_METRICS_TO_LOGANALYTICS_ENABLED)
189201
if not env_var:
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
4+
"""
5+
Statsbeat metrics collection module.
6+
7+
This module provides a singleton-based, thread-safe manager for collecting
8+
and reporting statsbeat metrics.
9+
"""
10+
11+
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import (
12+
collect_statsbeat_metrics,
13+
shutdown_statsbeat_metrics,
14+
)
15+
from azure.monitor.opentelemetry.exporter.statsbeat._manager import (
16+
StatsbeatConfig,
17+
StatsbeatManager,
18+
)
19+
20+
__all__ = [
21+
'StatsbeatConfig',
22+
'StatsbeatManager',
23+
'collect_statsbeat_metrics',
24+
'shutdown_statsbeat_metrics',
25+
]

0 commit comments

Comments
 (0)