Skip to content

Commit a0ef7b2

Browse files
authored
Refactor one settings config manager (#42508)
1 parent 35f1e42 commit a0ef7b2

File tree

6 files changed

+525
-159
lines changed

6 files changed

+525
-159
lines changed

sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212

1313
### Other Changes
1414

15+
- Configuration manager/worker fetch via OneSettings part 1 - Change detection
16+
([#42360] https://github.com/Azure/azure-sdk-for-python/pull/42360)
17+
- Configuration manager/worker fetch via OneSettings part 2 - Concurrency and refactoring of _ConfigurationManager
18+
([#42508] https://github.com/Azure/azure-sdk-for-python/pull/42508)
19+
1520
## 1.0.0b41 (2025-07-31)
1621

1722
### Features Added
18-
19-
- Configuration manager/worker fetch via OneSettings part 1
20-
([#42360] https://github.com/Azure/azure-sdk-for-python/pull/42360)
2123
- Added RateLimited Sampler
2224
([#41954](https://github.com/Azure/azure-sdk-for-python/pull/41954))
2325
- Refactored Application Insights Sampler Code

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/__init__.py

Lines changed: 145 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Copyright (c) Microsoft Corporation. All rights reserved.
22
# Licensed under the MIT License.
3+
from dataclasses import dataclass, field
34
from typing import Dict, Optional
45
import logging
56
from threading import Lock
@@ -8,26 +9,40 @@
89
_ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS,
910
_ONE_SETTINGS_PYTHON_KEY,
1011
_ONE_SETTINGS_CHANGE_URL,
12+
_ONE_SETTINGS_CONFIG_URL,
1113
)
1214
from azure.monitor.opentelemetry.exporter._configuration._utils import make_onesettings_request
1315

1416
# Set up logger
1517
logger = logging.getLogger(__name__)
1618

1719

20+
@dataclass
21+
class _ConfigurationState:
22+
"""Immutable state object for configuration data."""
23+
etag: str = ""
24+
refresh_interval: int = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS
25+
version_cache: int = -1
26+
settings_cache: Dict[str, str] = field(default_factory=dict)
27+
28+
def with_updates(self, **kwargs) -> '_ConfigurationState': # pylint: disable=C4741,C4742
29+
"""Create a new state object with updated values."""
30+
return _ConfigurationState(
31+
etag=kwargs.get('etag', self.etag),
32+
refresh_interval=kwargs.get('refresh_interval', self.refresh_interval),
33+
version_cache=kwargs.get('version_cache', self.version_cache),
34+
settings_cache=kwargs.get('settings_cache', self.settings_cache.copy())
35+
)
36+
37+
1838
class _ConfigurationManager:
1939
"""Singleton class to manage configuration settings."""
2040

2141
_instance = None
2242
_configuration_worker = None
2343
_instance_lock = Lock()
24-
_config_lock = Lock()
25-
_settings_lock = Lock()
26-
_version_lock = Lock()
27-
_etag = None
28-
_refresh_interval = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS
29-
_settings_cache: Dict[str, str] = {}
30-
_version_cache = 0
44+
_state_lock = Lock() # Single lock for all state
45+
_current_state = _ConfigurationState()
3146

3247
def __new__(cls):
3348
with cls._instance_lock:
@@ -41,81 +56,156 @@ def _initialize_worker(self):
4156
"""Initialize the ConfigurationManager and start the configuration worker."""
4257
# Lazy import to avoid circular import
4358
from azure.monitor.opentelemetry.exporter._configuration._worker import _ConfigurationWorker
44-
self._configuration_worker = _ConfigurationWorker(self._refresh_interval)
59+
60+
# Get initial refresh interval from state
61+
with _ConfigurationManager._state_lock:
62+
initial_refresh_interval = _ConfigurationManager._current_state.refresh_interval
63+
64+
self._configuration_worker = _ConfigurationWorker(initial_refresh_interval)
4565

4666
def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str, str]] = None) -> int:
47-
"""Fetch configuration from OneSettings and update local cache.
67+
"""Fetch configuration from OneSettings and update local cache atomically.
4868
4969
This method performs a conditional HTTP request to OneSettings using the
50-
current ETag for efficient caching. It updates the local configuration
51-
cache with any new settings and manages version tracking for change detection.
70+
current ETag for efficient caching. It atomically updates the local configuration
71+
state with any new settings and manages version tracking for change detection.
72+
73+
The method implements a check-and-set pattern for thread safety:
74+
1. Reads current state atomically to prepare request headers
75+
2. Makes HTTP request to OneSettings CHANGE endpoint outside locks
76+
3. Re-reads current state to make version comparison decisions
77+
4. Conditionally fetches from CONFIG endpoint if version increased
78+
5. Updates all state fields atomically in a single operation
79+
80+
Version comparison logic:
81+
- Version increase: New configuration available, fetches and caches new settings
82+
- Version same: No changes detected, ETag and refresh interval updated safely
83+
- Version decrease: Unexpected rollback state, logged as warning, no updates applied
5284
53-
The method handles version comparison logic:
54-
- Version increase: New configuration available, cache updated
55-
- Version same: No changes, cache remains unchanged
56-
- Version decrease: Unexpected state, logged as warning
57-
58-
:param query_dict: Optional query parameters to include
59-
in the OneSettings request. Commonly used for targeting specific
60-
configuration namespaces or environments.
85+
Error handling:
86+
- CONFIG endpoint failure: ETag not updated to preserve retry capability on next call
87+
- Network failures: Handled by make_onesettings_request, returns default values
88+
- Missing settings/version: Logged as warning, only ETag and refresh interval updated
89+
90+
:param query_dict: Optional query parameters to include in the OneSettings request.
91+
Commonly used for targeting specific configuration namespaces or environments.
92+
If None, defaults to empty dictionary.
6193
:type query_dict: Optional[Dict[str, str]]
6294
6395
:return: Updated refresh interval in seconds for the next configuration check.
96+
This value comes from the OneSettings response and determines how frequently
97+
the background worker should call this method.
6498
:rtype: int
6599
66100
Thread Safety:
67-
This method is thread-safe and uses multiple locks to ensure consistent
68-
state across concurrent access to configuration data.
101+
This method is thread-safe using atomic state updates. Multiple threads can
102+
call this method concurrently without data corruption. The implementation uses
103+
a single state lock with minimal critical sections to reduce lock contention.
104+
105+
HTTP requests are performed outside locks to prevent blocking other threads
106+
during potentially slow network operations.
69107
70-
Note:
71-
The method automatically handles ETag-based conditional requests to
72-
minimize unnecessary data transfer when configuration hasn't changed.
108+
Caching Behavior:
109+
The method automatically includes ETag headers for conditional requests to
110+
minimize unnecessary data transfer. If the server responds with 304 Not Modified,
111+
only the refresh interval is updated while preserving existing configuration.
112+
113+
On CONFIG endpoint failures, the ETag is intentionally not updated to ensure
114+
the next request can retry fetching the same configuration version.
115+
116+
State Consistency:
117+
All configuration state (ETag, refresh interval, version, settings) is updated
118+
atomically using immutable state objects. This prevents race conditions where
119+
different threads might observe inconsistent combinations of these values.
73120
"""
74121
query_dict = query_dict or {}
75122
headers = {}
76123

77-
# Prepare headers with current etag and refresh interval
78-
with self._config_lock:
79-
if self._etag:
80-
headers["If-None-Match"] = self._etag
81-
if self._refresh_interval:
82-
headers["x-ms-onesetinterval"] = str(self._refresh_interval)
124+
# Read current state atomically
125+
with _ConfigurationManager._state_lock:
126+
current_state = _ConfigurationManager._current_state
127+
if current_state.etag:
128+
headers["If-None-Match"] = current_state.etag
129+
if current_state.refresh_interval:
130+
headers["x-ms-onesetinterval"] = str(current_state.refresh_interval)
83131

84132
# Make the OneSettings request
85133
response = make_onesettings_request(_ONE_SETTINGS_CHANGE_URL, query_dict, headers)
86134

87-
# Update configuration state based on response
88-
with self._config_lock:
89-
self._etag = response.etag
90-
self._refresh_interval = response.refresh_interval
91-
92-
# Evaluate CONFIG_VERSION to see if we need to fetch new config
93-
if response.settings:
94-
with self._version_lock:
95-
if response.version is not None:
96-
# New config published successfully, make a call to config endpoint
97-
if response.version > self._version_cache:
98-
# TODO: Call config endpoint to pull new config
99-
# Update latest version
100-
self._version_cache = response.version
101-
elif response.version == self._version_cache:
102-
# No new config has been published, do nothing
103-
pass
135+
# Prepare new state updates
136+
new_state_updates = {}
137+
if response.etag is not None:
138+
new_state_updates['etag'] = response.etag
139+
if response.refresh_interval and response.refresh_interval > 0:
140+
new_state_updates['refresh_interval'] = response.refresh_interval # type: ignore
141+
142+
if response.status_code == 304:
143+
# Not modified: Only update etag and refresh interval below
144+
pass
145+
# Handle version and settings updates
146+
elif response.settings and response.version is not None:
147+
needs_config_fetch = False
148+
with _ConfigurationManager._state_lock:
149+
current_state = _ConfigurationManager._current_state
150+
151+
if response.version > current_state.version_cache:
152+
# Version increase: new config available
153+
needs_config_fetch = True
154+
elif response.version < current_state.version_cache:
155+
# Version rollback: Erroneous state
156+
logger.warning("Fetched version is lower than cached version. No configurations updated.")
157+
needs_config_fetch = False
158+
else:
159+
# Version unchanged: No new config
160+
needs_config_fetch = False
161+
162+
# Fetch config
163+
if needs_config_fetch:
164+
config_response = make_onesettings_request(_ONE_SETTINGS_CONFIG_URL, query_dict)
165+
if config_response.status_code == 200 and config_response.settings:
166+
# Validate that the versions from change and config match
167+
if config_response.version == response.version:
168+
new_state_updates.update({
169+
'version_cache': response.version, # type: ignore
170+
'settings_cache': config_response.settings # type: ignore
171+
})
104172
else:
105-
# Erroneous state, should not occur under normal circumstances
106-
logger.warning(
107-
"Latest `CHANGE_VERSION` is less than the current stored version," \
108-
" no configurations updated."
109-
)
110-
return self._refresh_interval
173+
logger.warning("Version mismatch between change and config responses." \
174+
"No configurations updated.")
175+
# We do not update etag to allow retry on next call
176+
new_state_updates.pop('etag', None)
177+
else:
178+
logger.warning("Unexpected response status: %d", config_response.status_code)
179+
# We do not update etag to allow retry on next call
180+
new_state_updates.pop('etag', None)
181+
else:
182+
# No settings or version provided
183+
logger.warning("No settings or version provided in config response. Config not updated.")
184+
185+
# Atomic state update
186+
with _ConfigurationManager._state_lock:
187+
latest_state = _ConfigurationManager._current_state # Always use latest state
188+
_ConfigurationManager._current_state = latest_state.with_updates(**new_state_updates)
189+
return _ConfigurationManager._current_state.refresh_interval
190+
191+
def get_settings(self) -> Dict[str, str]: # pylint: disable=C4741,C4742
192+
"""Get current settings cache."""
193+
with _ConfigurationManager._state_lock:
194+
return _ConfigurationManager._current_state.settings_cache.copy()
195+
196+
def get_current_version(self) -> int: # pylint: disable=C4741,C4742
197+
"""Get current version."""
198+
with _ConfigurationManager._state_lock:
199+
return _ConfigurationManager._current_state.version_cache
111200

112201
def shutdown(self) -> None:
113202
"""Shutdown the configuration worker."""
114-
with self._instance_lock:
203+
with _ConfigurationManager._instance_lock:
115204
if self._configuration_worker:
116205
self._configuration_worker.shutdown()
117206
self._configuration_worker = None
118-
self._instance = None
207+
if _ConfigurationManager._instance:
208+
_ConfigurationManager._instance = None
119209

120210

121211
def _update_configuration_and_get_refresh_interval() -> int:

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/_utils.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,16 @@ class OneSettingsResponse:
2424
refresh_interval (int): Interval in seconds for the next configuration refresh
2525
settings (Dict[str, str]): Dictionary of configuration key-value pairs
2626
version (Optional[int]): Configuration version number for change tracking
27+
status_code (int): HTTP status code from the response
2728
"""
2829

2930
def __init__(
3031
self,
3132
etag: Optional[str] = None,
3233
refresh_interval: int = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS,
3334
settings: Optional[Dict[str, str]] = None,
34-
version: Optional[int] = None
35+
version: Optional[int] = None,
36+
status_code: int = 200
3537
):
3638
"""Initialize OneSettingsResponse with configuration data.
3739
@@ -42,11 +44,13 @@ def __init__(
4244
settings (Optional[Dict[str, str]], optional): Configuration settings dictionary.
4345
Defaults to empty dict if None.
4446
version (Optional[int], optional): Configuration version number. Defaults to None.
47+
status_code (int, optional): HTTP status code. Defaults to 200.
4548
"""
4649
self.etag = etag
4750
self.refresh_interval = refresh_interval
4851
self.settings = settings or {}
4952
self.version = version
53+
self.status_code = status_code
5054

5155

5256
def make_onesettings_request(url: str, query_dict: Optional[Dict[str, str]] = None,
@@ -115,6 +119,7 @@ def _parse_onesettings_response(response: requests.Response) -> OneSettingsRespo
115119
- refresh_interval: Next refresh interval from headers
116120
- settings: Configuration key-value pairs (empty for 304/errors)
117121
- version: Configuration version number for change tracking
122+
- status_code: HTTP status code of the response
118123
:rtype: OneSettingsResponse
119124
Note:
120125
This function logs warnings for various error conditions but does not
@@ -131,7 +136,9 @@ def _parse_onesettings_response(response: requests.Response) -> OneSettingsRespo
131136
etag = response.headers.get("ETag")
132137
refresh_interval_header = response.headers.get("x-ms-onesetinterval")
133138
try:
134-
refresh_interval = int(refresh_interval_header) if refresh_interval_header else refresh_interval
139+
# Note: OneSettings refresh interval is in minutes, convert to seconds
140+
if refresh_interval_header:
141+
refresh_interval = int(refresh_interval_header) * 60
135142
except (ValueError, TypeError):
136143
logger.warning("Invalid refresh interval format: %s", refresh_interval_header)
137144
refresh_interval = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS
@@ -162,4 +169,4 @@ def _parse_onesettings_response(response: requests.Response) -> OneSettingsRespo
162169
elif status_code == 500:
163170
logger.warning("Internal server error from OneSettings: %s", response.content)
164171

165-
return OneSettingsResponse(etag, refresh_interval, settings, version)
172+
return OneSettingsResponse(etag, refresh_interval, settings, version, status_code)

0 commit comments

Comments
 (0)