Skip to content

Commit 456a00b

Browse files
authored
Implement Onesettings control plane - part 1 (#42360)
1 parent 6af5eaf commit 456a00b

File tree

10 files changed

+1044
-0
lines changed

10 files changed

+1044
-0
lines changed

sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
### Features Added
1616

17+
- Configuration manager/worker fetch via OneSettings part 1
18+
([#42360] https://github.com/Azure/azure-sdk-for-python/pull/42360)
1719
- Added RateLimited Sampler
1820
([#41954](https://github.com/Azure/azure-sdk-for-python/pull/41954))
1921
- Refactored Application Insights Sampler Code
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
from typing import Dict, Optional
4+
import logging
5+
from threading import Lock
6+
7+
from azure.monitor.opentelemetry.exporter._constants import (
8+
_ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS,
9+
_ONE_SETTINGS_PYTHON_KEY,
10+
_ONE_SETTINGS_CHANGE_URL,
11+
)
12+
from azure.monitor.opentelemetry.exporter._configuration._utils import make_onesettings_request
13+
14+
# Set up logger
15+
logger = logging.getLogger(__name__)
16+
17+
18+
class _ConfigurationManager:
19+
"""Singleton class to manage configuration settings."""
20+
21+
_instance = None
22+
_configuration_worker = None
23+
_instance_lock = Lock()
24+
_config_lock = Lock()
25+
_settings_lock = Lock()
26+
_version_lock = Lock()
27+
_etag = None
28+
_refresh_interval = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS
29+
_settings_cache: Dict[str, str] = {}
30+
_version_cache = 0
31+
32+
def __new__(cls):
33+
with cls._instance_lock:
34+
if cls._instance is None:
35+
cls._instance = super(_ConfigurationManager, cls).__new__(cls)
36+
# Initialize the instance here to avoid re-initialization
37+
cls._instance._initialize_worker()
38+
return cls._instance
39+
40+
def _initialize_worker(self):
41+
"""Initialize the ConfigurationManager and start the configuration worker."""
42+
# Lazy import to avoid circular import
43+
from azure.monitor.opentelemetry.exporter._configuration._worker import _ConfigurationWorker
44+
self._configuration_worker = _ConfigurationWorker(self._refresh_interval)
45+
46+
def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str, str]] = None) -> int:
47+
"""Fetch configuration from OneSettings and update local cache.
48+
49+
This method performs a conditional HTTP request to OneSettings using the
50+
current ETag for efficient caching. It updates the local configuration
51+
cache with any new settings and manages version tracking for change detection.
52+
53+
The method handles version comparison logic:
54+
- Version increase: New configuration available, cache updated
55+
- Version same: No changes, cache remains unchanged
56+
- Version decrease: Unexpected state, logged as warning
57+
58+
:param query_dict: Optional query parameters to include
59+
in the OneSettings request. Commonly used for targeting specific
60+
configuration namespaces or environments.
61+
:type query_dict: Optional[Dict[str, str]]
62+
63+
:return: Updated refresh interval in seconds for the next configuration check.
64+
:rtype: int
65+
66+
Thread Safety:
67+
This method is thread-safe and uses multiple locks to ensure consistent
68+
state across concurrent access to configuration data.
69+
70+
Note:
71+
The method automatically handles ETag-based conditional requests to
72+
minimize unnecessary data transfer when configuration hasn't changed.
73+
"""
74+
query_dict = query_dict or {}
75+
headers = {}
76+
77+
# Prepare headers with current etag and refresh interval
78+
with self._config_lock:
79+
if self._etag:
80+
headers["If-None-Match"] = self._etag
81+
if self._refresh_interval:
82+
headers["x-ms-onesetinterval"] = str(self._refresh_interval)
83+
84+
# Make the OneSettings request
85+
response = make_onesettings_request(_ONE_SETTINGS_CHANGE_URL, query_dict, headers)
86+
87+
# Update configuration state based on response
88+
with self._config_lock:
89+
self._etag = response.etag
90+
self._refresh_interval = response.refresh_interval
91+
92+
# Evaluate CONFIG_VERSION to see if we need to fetch new config
93+
if response.settings:
94+
with self._version_lock:
95+
if response.version is not None:
96+
# New config published successfully, make a call to config endpoint
97+
if response.version > self._version_cache:
98+
# TODO: Call config endpoint to pull new config
99+
# Update latest version
100+
self._version_cache = response.version
101+
elif response.version == self._version_cache:
102+
# No new config has been published, do nothing
103+
pass
104+
else:
105+
# Erroneous state, should not occur under normal circumstances
106+
logger.warning(
107+
"Latest `CHANGE_VERSION` is less than the current stored version," \
108+
" no configurations updated."
109+
)
110+
return self._refresh_interval
111+
112+
def shutdown(self) -> None:
113+
"""Shutdown the configuration worker."""
114+
with self._instance_lock:
115+
if self._configuration_worker:
116+
self._configuration_worker.shutdown()
117+
self._configuration_worker = None
118+
self._instance = None
119+
120+
121+
def _update_configuration_and_get_refresh_interval() -> int:
122+
targeting = {
123+
"namespaces": _ONE_SETTINGS_PYTHON_KEY,
124+
}
125+
return _ConfigurationManager().get_configuration_and_refresh_interval(targeting)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
from typing import Dict, Optional
4+
import json
5+
import logging
6+
import requests
7+
8+
from azure.monitor.opentelemetry.exporter._constants import (
9+
_ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS,
10+
_ONE_SETTINGS_CHANGE_VERSION_KEY,
11+
)
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
class OneSettingsResponse:
17+
"""Response object containing OneSettings API response data.
18+
19+
This class encapsulates the parsed response from a OneSettings API call,
20+
including configuration settings, version information, and metadata.
21+
22+
Attributes:
23+
etag (Optional[str]): ETag header value for caching and conditional requests
24+
refresh_interval (int): Interval in seconds for the next configuration refresh
25+
settings (Dict[str, str]): Dictionary of configuration key-value pairs
26+
version (Optional[int]): Configuration version number for change tracking
27+
"""
28+
29+
def __init__(
30+
self,
31+
etag: Optional[str] = None,
32+
refresh_interval: int = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS,
33+
settings: Optional[Dict[str, str]] = None,
34+
version: Optional[int] = None
35+
):
36+
"""Initialize OneSettingsResponse with configuration data.
37+
38+
Args:
39+
etag (Optional[str], optional): ETag header value for caching. Defaults to None.
40+
refresh_interval (int, optional): Refresh interval in seconds.
41+
Defaults to _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS.
42+
settings (Optional[Dict[str, str]], optional): Configuration settings dictionary.
43+
Defaults to empty dict if None.
44+
version (Optional[int], optional): Configuration version number. Defaults to None.
45+
"""
46+
self.etag = etag
47+
self.refresh_interval = refresh_interval
48+
self.settings = settings or {}
49+
self.version = version
50+
51+
52+
def make_onesettings_request(url: str, query_dict: Optional[Dict[str, str]] = None,
53+
headers: Optional[Dict[str, str]] = None) -> OneSettingsResponse:
54+
"""Make an HTTP request to the OneSettings API and parse the response.
55+
56+
This function handles the complete OneSettings request lifecycle including:
57+
- Making the HTTP GET request with optional query parameters and headers
58+
- Error handling for network, HTTP, and JSON parsing errors
59+
- Parsing the response into a structured OneSettingsResponse object
60+
61+
:param url: The OneSettings API endpoint URL to request
62+
:type url: str
63+
:param query_dict: Query parameters to include
64+
in the request URL. Defaults to None.
65+
:type query_dict: Optional[Dict[str, str]]
66+
:param headers: HTTP headers to include in the request.
67+
Common headers include 'If-None-Match' for ETag caching. Defaults to None.
68+
:type headers: Optional[Dict[str, str]]
69+
70+
:return: Parsed response containing configuration data and metadata.
71+
Returns a default response object if the request fails.
72+
:rtype: OneSettingsResponse
73+
74+
Raises:
75+
Does not raise exceptions - all errors are caught and logged, returning a
76+
default OneSettingsResponse object.
77+
"""
78+
query_dict = query_dict or {}
79+
headers = headers or {}
80+
81+
try:
82+
result = requests.get(url, params=query_dict, headers=headers, timeout=10)
83+
result.raise_for_status() # Raises an exception for 4XX/5XX responses
84+
85+
return _parse_onesettings_response(result)
86+
except requests.exceptions.RequestException as ex:
87+
logger.warning("Failed to fetch configuration from OneSettings: %s", str(ex))
88+
return OneSettingsResponse()
89+
except json.JSONDecodeError as ex:
90+
logger.warning("Failed to parse OneSettings response: %s", str(ex))
91+
return OneSettingsResponse()
92+
except Exception as ex: # pylint: disable=broad-exception-caught
93+
logger.warning("Unexpected error while fetching configuration: %s", str(ex))
94+
return OneSettingsResponse()
95+
96+
def _parse_onesettings_response(response: requests.Response) -> OneSettingsResponse:
97+
"""Parse an HTTP response from OneSettings into a structured response object.
98+
99+
This function processes the OneSettings API response and extracts:
100+
- HTTP headers (ETag, refresh interval)
101+
- Response body (configuration settings, version)
102+
- Status code handling (200, 304, 4xx, 5xx)
103+
104+
The parser handles different HTTP status codes appropriately:
105+
- 200: New configuration data available, parse settings and version
106+
- 304: Not modified, configuration unchanged (empty settings)
107+
- 400/404/414/500: Various error conditions, logged with warnings
108+
109+
:param response: HTTP response object from the requests library containing
110+
the OneSettings API response with headers, status code, and content.
111+
:type response: requests.Response
112+
113+
:return: Structured response object containing:
114+
- etag: ETag header value for conditional requests
115+
- refresh_interval: Next refresh interval from headers
116+
- settings: Configuration key-value pairs (empty for 304/errors)
117+
- version: Configuration version number for change tracking
118+
:rtype: OneSettingsResponse
119+
Note:
120+
This function logs warnings for various error conditions but does not
121+
raise exceptions, always returning a valid OneSettingsResponse object.
122+
"""
123+
etag = None
124+
refresh_interval = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS
125+
settings: Dict[str, str] = {}
126+
status_code = response.status_code
127+
version = None
128+
129+
# Extract headers
130+
if response.headers:
131+
etag = response.headers.get("ETag")
132+
refresh_interval_header = response.headers.get("x-ms-onesetinterval")
133+
try:
134+
refresh_interval = int(refresh_interval_header) if refresh_interval_header else refresh_interval
135+
except (ValueError, TypeError):
136+
logger.warning("Invalid refresh interval format: %s", refresh_interval_header)
137+
refresh_interval = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS
138+
139+
# Handle different status codes
140+
if status_code == 304:
141+
# 304 Not Modified - cache stays the same
142+
pass
143+
elif status_code == 200:
144+
# 200 OK - parse new settings
145+
if response.content:
146+
try:
147+
decoded_string = response.content.decode("utf-8")
148+
config = json.loads(decoded_string)
149+
settings = config.get("settings", {})
150+
if settings and settings.get(_ONE_SETTINGS_CHANGE_VERSION_KEY) is not None:
151+
version = int(settings.get(_ONE_SETTINGS_CHANGE_VERSION_KEY)) # type: ignore
152+
except (UnicodeDecodeError, json.JSONDecodeError) as ex:
153+
logger.warning("Failed to decode OneSettings response content: %s", str(ex))
154+
except ValueError as ex:
155+
logger.warning("Failed to parse OneSettings change version: %s", str(ex))
156+
elif status_code == 400:
157+
logger.warning("Bad request to OneSettings: %s", response.content)
158+
elif status_code == 404:
159+
logger.warning("OneSettings configuration not found: %s", response.content)
160+
elif status_code == 414:
161+
logger.warning("OneSettings request URI too long: %s", response.content)
162+
elif status_code == 500:
163+
logger.warning("Internal server error from OneSettings: %s", response.content)
164+
165+
return OneSettingsResponse(etag, refresh_interval, settings, version)

0 commit comments

Comments
 (0)