Skip to content

Commit f17312e

Browse files
authored
Discovery Async (Azure#37061)
* fixing header bug * Async Discover + Failover * Fixing Test, mypy, and pylint issues * Fix test async issue * review comments * more spelling fixes * Trying to fix async test issue * try to fix async issue * Update test_configuration_async_client_manager.py * async discovery * Rename, add sync lock, add type check * removing unneeded lock. * client base * Added missing lock
1 parent d6c2c23 commit f17312e

14 files changed

+1406
-375
lines changed

sdk/appconfiguration/azure-appconfiguration-provider/azure/appconfiguration/provider/_azureappconfigurationprovider.py

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -231,9 +231,9 @@ def load(*args, **kwargs) -> "AzureAppConfigurationProvider":
231231
kwargs.pop("headers", {}),
232232
"Startup",
233233
provider._replica_client_manager.get_client_count() - 1, # pylint:disable=protected-access
234-
kwargs.pop("uses_feature_flags", False),
235-
kwargs.pop("feature_filters_used", {}),
236-
uses_key_vault,
234+
provider._feature_flag_enabled, # pylint:disable=protected-access
235+
provider._feature_filter_usage, # pylint:disable=protected-access
236+
provider._uses_key_vault, # pylint:disable=protected-access
237237
)
238238

239239
try:
@@ -522,25 +522,24 @@ def refresh(self, **kwargs) -> None:
522522
success = False
523523
need_refresh = False
524524
error_message = """
525-
Failed to refresh configuration settings. No Azure App Configuration stores successfully
526-
refreshed.
525+
Failed to refresh configuration settings from Azure App Configuration.
527526
"""
528527
exception: Exception = RuntimeError(error_message)
529528
try:
530529
self._replica_client_manager.refresh_clients()
531530
active_clients = self._replica_client_manager.get_active_clients()
532531

532+
headers = _get_headers(
533+
kwargs.pop("headers", {}),
534+
"Watch",
535+
self._replica_client_manager.get_client_count() - 1,
536+
self._feature_flag_enabled,
537+
self._feature_filter_usage,
538+
self._uses_key_vault,
539+
)
533540
for client in active_clients:
534541
try:
535542
if self._refresh_on:
536-
headers = _get_headers(
537-
kwargs.pop("headers", {}),
538-
"Watch",
539-
self._replica_client_manager.get_client_count() - 1,
540-
self._feature_flag_enabled,
541-
self._feature_filter_usage,
542-
self._uses_key_vault,
543-
)
544543
need_refresh, self._refresh_on, configuration_settings = client.refresh_configuration_settings(
545544
self._selects, self._refresh_on, headers, **kwargs
546545
)
@@ -556,14 +555,6 @@ def refresh(self, **kwargs) -> None:
556555
if need_refresh:
557556
self._dict = configuration_settings_processed
558557
if self._feature_flag_refresh_enabled:
559-
headers = _get_headers(
560-
kwargs.pop("headers", {}),
561-
"Watch",
562-
self._replica_client_manager.get_client_count() - 1,
563-
self._feature_flag_enabled,
564-
self._feature_filter_usage,
565-
self._uses_key_vault,
566-
)
567558
need_ff_refresh, self._refresh_on_feature_flags, feature_flags, filters_used = (
568559
client.refresh_feature_flags(
569560
self._refresh_on_feature_flags, self._feature_flag_selectors, headers, **kwargs
@@ -670,7 +661,8 @@ def __getitem__(self, key: str) -> Any:
670661
"""
671662
Returns the value of the specified key.
672663
"""
673-
return self._dict[key]
664+
with self._update_lock:
665+
return self._dict[key]
674666

675667
def __iter__(self) -> Iterator[str]:
676668
return self._dict.__iter__()

sdk/appconfiguration/azure-appconfiguration-provider/azure/appconfiguration/provider/_client_manager.py

Lines changed: 31 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from logging import getLogger
77
import json
88
import time
9-
import random
109
from dataclasses import dataclass
1110
from typing import Tuple, Union, Dict, List, Any, Optional, Mapping
1211
from typing_extensions import Self
@@ -19,26 +18,19 @@
1918
AzureAppConfigurationClient,
2019
FeatureFlagConfigurationSetting,
2120
)
22-
from ._models import SettingSelector
23-
from ._constants import (
24-
FEATURE_FLAG_PREFIX,
25-
PERCENTAGE_FILTER_NAMES,
26-
TIME_WINDOW_FILTER_NAMES,
27-
TARGETING_FILTER_NAMES,
28-
CUSTOM_FILTER_KEY,
29-
PERCENTAGE_FILTER_KEY,
30-
TIME_WINDOW_FILTER_KEY,
31-
TARGETING_FILTER_KEY,
21+
from ._client_manager_base import (
22+
_ConfigurationClientWrapperBase,
23+
ConfigurationClientManagerBase,
24+
FALLBACK_CLIENT_REFRESH_EXPIRED_INTERVAL,
25+
MINIMAL_CLIENT_REFRESH_INTERVAL,
3226
)
27+
from ._models import SettingSelector
28+
from ._constants import FEATURE_FLAG_PREFIX
3329
from ._discovery import find_auto_failover_endpoints
3430

35-
FALLBACK_CLIENT_REFRESH_EXPIRED_INTERVAL = 3600 # 1 hour in seconds
36-
MINIMAL_CLIENT_REFRESH_INTERVAL = 30 # 30 seconds
37-
3831

3932
@dataclass
40-
class _ConfigurationClientWrapper:
41-
endpoint: str
33+
class _ConfigurationClientWrapper(_ConfigurationClientWrapperBase):
4234
_client: AzureAppConfigurationClient
4335
backoff_end_time: float = 0
4436
failed_attempts: int = 0
@@ -171,26 +163,21 @@ def load_feature_flags(
171163
loaded_feature_flags = []
172164
# Needs to be removed unknown keyword argument for list_configuration_settings
173165
kwargs.pop("sentinel_keys", None)
174-
filters_used = {}
166+
filters_used: Dict[str, bool] = {}
175167
for select in feature_flag_selectors:
176168
feature_flags = self._client.list_configuration_settings(
177169
key_filter=FEATURE_FLAG_PREFIX + select.key_filter, label_filter=select.label_filter, **kwargs
178170
)
179171
for feature_flag in feature_flags:
180172
loaded_feature_flags.append(json.loads(feature_flag.value))
173+
if not isinstance(feature_flag, FeatureFlagConfigurationSetting):
174+
# If the feature flag is not a FeatureFlagConfigurationSetting, it means it was selected by
175+
# mistake, so we should ignore it.
176+
continue
181177

182178
if feature_flag_refresh_enabled:
183179
feature_flag_sentinel_keys[(feature_flag.key, feature_flag.label)] = feature_flag.etag
184-
if feature_flag.filters:
185-
for filter in feature_flag.filters:
186-
if filter.get("name") in PERCENTAGE_FILTER_NAMES:
187-
filters_used[PERCENTAGE_FILTER_KEY] = True
188-
elif filter.get("name") in TIME_WINDOW_FILTER_NAMES:
189-
filters_used[TIME_WINDOW_FILTER_KEY] = True
190-
elif filter.get("name") in TARGETING_FILTER_NAMES:
191-
filters_used[TARGETING_FILTER_KEY] = True
192-
else:
193-
filters_used[CUSTOM_FILTER_KEY] = True
180+
self._feature_flag_telemetry(feature_flag, filters_used)
194181
return loaded_feature_flags, feature_flag_sentinel_keys, filters_used
195182

196183
@distributed_trace
@@ -248,7 +235,7 @@ def refresh_feature_flags(
248235
changed = self._check_configuration_setting(key=key, label=label, etag=etag, headers=headers, **kwargs)
249236
if changed:
250237
feature_flags, feature_flag_sentinel_keys, filters_used = self.load_feature_flags(
251-
feature_flag_selectors, True, **kwargs
238+
feature_flag_selectors, True, headers=headers, **kwargs
252239
)
253240
return True, feature_flag_sentinel_keys, feature_flags, filters_used
254241
return False, None, None, {}
@@ -288,7 +275,7 @@ def __exit__(self, *args):
288275
self._client.__exit__(*args)
289276

290277

291-
class ConfigurationClientManager: # pylint:disable=too-many-instance-attributes
278+
class ConfigurationClientManager(ConfigurationClientManagerBase): # pylint:disable=too-many-instance-attributes
292279
def __init__(
293280
self,
294281
connection_string: Optional[str],
@@ -302,51 +289,34 @@ def __init__(
302289
max_backoff_sec,
303290
**kwargs
304291
):
305-
self._replica_clients = []
306-
self._original_endpoint = endpoint
292+
super(ConfigurationClientManager, self).__init__(
293+
endpoint,
294+
user_agent,
295+
retry_total,
296+
retry_backoff_max,
297+
replica_discovery_enabled,
298+
min_backoff_sec,
299+
max_backoff_sec,
300+
**kwargs
301+
)
307302
self._original_connection_string = connection_string
308303
self._credential = credential
309-
self._user_agent = user_agent
310-
self._retry_total = retry_total
311-
self._retry_backoff_max = retry_backoff_max
312-
self._replica_discovery_enabled = replica_discovery_enabled
313-
self._next_update_time = time.time() + MINIMAL_CLIENT_REFRESH_INTERVAL
314-
self._args = dict(kwargs)
315-
self._min_backoff_sec = min_backoff_sec
316-
self._max_backoff_sec = max_backoff_sec
317304

318-
failover_endpoints = find_auto_failover_endpoints(endpoint, replica_discovery_enabled)
319305
if connection_string and endpoint:
320306
self._replica_clients.append(
321307
_ConfigurationClientWrapper.from_connection_string(
322308
endpoint, connection_string, user_agent, retry_total, retry_backoff_max, **self._args
323309
)
324310
)
325-
for failover_endpoint in failover_endpoints:
326-
failover_connection_string = connection_string.replace(endpoint, failover_endpoint)
327-
self._replica_clients.append(
328-
_ConfigurationClientWrapper.from_connection_string(
329-
failover_endpoint,
330-
failover_connection_string,
331-
user_agent,
332-
retry_total,
333-
retry_backoff_max,
334-
**self._args
335-
)
336-
)
311+
self._setup_failover_endpoints()
337312
return
338313
if endpoint and credential:
339314
self._replica_clients.append(
340315
_ConfigurationClientWrapper.from_credential(
341316
endpoint, credential, user_agent, retry_total, retry_backoff_max, **self._args
342317
)
343318
)
344-
for failover_endpoint in failover_endpoints:
345-
self._replica_clients.append(
346-
_ConfigurationClientWrapper.from_credential(
347-
failover_endpoint, credential, user_agent, retry_total, retry_backoff_max, **self._args
348-
)
349-
)
319+
self._setup_failover_endpoints()
350320
return
351321
raise ValueError("Please pass either endpoint and credential, or a connection string with a value.")
352322

@@ -355,6 +325,9 @@ def refresh_clients(self):
355325
return
356326
if self._next_update_time > time.time():
357327
return
328+
self._setup_failover_endpoints()
329+
330+
def _setup_failover_endpoints(self):
358331
failover_endpoints = find_auto_failover_endpoints(self._original_endpoint, self._replica_discovery_enabled)
359332

360333
if failover_endpoints is None:
@@ -404,40 +377,11 @@ def refresh_clients(self):
404377
self._replica_clients = new_clients
405378
self._next_update_time = time.time() + MINIMAL_CLIENT_REFRESH_INTERVAL
406379

407-
def get_active_clients(self):
408-
active_clients = []
409-
for client in self._replica_clients:
410-
if client.is_active():
411-
active_clients.append(client)
412-
return active_clients
413-
414380
def backoff(self, client: _ConfigurationClientWrapper):
415381
client.failed_attempts += 1
416382
backoff_time = self._calculate_backoff(client.failed_attempts)
417383
client.backoff_end_time = (time.time() * 1000) + backoff_time
418384

419-
def get_client_count(self) -> int:
420-
return len(self._replica_clients)
421-
422-
def _calculate_backoff(self, attempts: int) -> float:
423-
max_attempts = 63
424-
ms_per_second = 1000 # 1 Second in milliseconds
425-
426-
min_backoff_milliseconds = self._min_backoff_sec * ms_per_second
427-
max_backoff_milliseconds = self._max_backoff_sec * ms_per_second
428-
429-
if self._max_backoff_sec <= self._min_backoff_sec:
430-
return min_backoff_milliseconds
431-
432-
calculated_milliseconds = max(1, min_backoff_milliseconds) * (1 << min(attempts, max_attempts))
433-
434-
if calculated_milliseconds > max_backoff_milliseconds or calculated_milliseconds <= 0:
435-
calculated_milliseconds = max_backoff_milliseconds
436-
437-
return min_backoff_milliseconds + (
438-
random.uniform(0.0, 1.0) * (calculated_milliseconds - min_backoff_milliseconds)
439-
)
440-
441385
def __eq__(self, other):
442386
if len(self._replica_clients) != len(other._replica_clients):
443387
return False
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# ------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License. See License.txt in the project root for
4+
# license information.
5+
# -------------------------------------------------------------------------
6+
import time
7+
import random
8+
from dataclasses import dataclass
9+
from typing import Dict, List
10+
from azure.appconfiguration import ( # type:ignore # pylint:disable=no-name-in-module
11+
FeatureFlagConfigurationSetting,
12+
)
13+
from ._constants import (
14+
PERCENTAGE_FILTER_NAMES,
15+
TIME_WINDOW_FILTER_NAMES,
16+
TARGETING_FILTER_NAMES,
17+
CUSTOM_FILTER_KEY,
18+
PERCENTAGE_FILTER_KEY,
19+
TIME_WINDOW_FILTER_KEY,
20+
TARGETING_FILTER_KEY,
21+
)
22+
23+
FALLBACK_CLIENT_REFRESH_EXPIRED_INTERVAL = 3600 # 1 hour in seconds
24+
MINIMAL_CLIENT_REFRESH_INTERVAL = 30 # 30 seconds
25+
26+
27+
@dataclass
28+
class _ConfigurationClientWrapperBase:
29+
endpoint: str
30+
31+
def _feature_flag_telemetry(self, feature_flag: FeatureFlagConfigurationSetting, filters_used: Dict[str, bool]):
32+
if feature_flag.filters:
33+
for filter in feature_flag.filters:
34+
if filter.get("name") in PERCENTAGE_FILTER_NAMES:
35+
filters_used[PERCENTAGE_FILTER_KEY] = True
36+
elif filter.get("name") in TIME_WINDOW_FILTER_NAMES:
37+
filters_used[TIME_WINDOW_FILTER_KEY] = True
38+
elif filter.get("name") in TARGETING_FILTER_NAMES:
39+
filters_used[TARGETING_FILTER_KEY] = True
40+
else:
41+
filters_used[CUSTOM_FILTER_KEY] = True
42+
43+
44+
class ConfigurationClientManagerBase: # pylint:disable=too-many-instance-attributes
45+
def __init__(
46+
self,
47+
endpoint: str,
48+
user_agent: str,
49+
retry_total,
50+
retry_backoff_max,
51+
replica_discovery_enabled,
52+
min_backoff_sec,
53+
max_backoff_sec,
54+
**kwargs
55+
):
56+
self._replica_clients: List[_ConfigurationClientWrapperBase] = []
57+
self._original_endpoint = endpoint
58+
self._user_agent = user_agent
59+
self._retry_total = retry_total
60+
self._retry_backoff_max = retry_backoff_max
61+
self._replica_discovery_enabled = replica_discovery_enabled
62+
self._next_update_time = time.time() + MINIMAL_CLIENT_REFRESH_INTERVAL
63+
self._args = dict(kwargs)
64+
self._min_backoff_sec = min_backoff_sec
65+
self._max_backoff_sec = max_backoff_sec
66+
67+
def get_active_clients(self):
68+
active_clients = []
69+
for client in self._replica_clients:
70+
if client.is_active():
71+
active_clients.append(client)
72+
return active_clients
73+
74+
def get_client_count(self) -> int:
75+
return len(self._replica_clients)
76+
77+
def _calculate_backoff(self, attempts: int) -> float:
78+
max_attempts = 63
79+
ms_per_second = 1000 # 1 Second in milliseconds
80+
81+
min_backoff_milliseconds = self._min_backoff_sec * ms_per_second
82+
max_backoff_milliseconds = self._max_backoff_sec * ms_per_second
83+
84+
if self._max_backoff_sec <= self._min_backoff_sec:
85+
return min_backoff_milliseconds
86+
87+
calculated_milliseconds = max(1, min_backoff_milliseconds) * (1 << min(attempts, max_attempts))
88+
89+
if calculated_milliseconds > max_backoff_milliseconds or calculated_milliseconds <= 0:
90+
calculated_milliseconds = max_backoff_milliseconds
91+
92+
return min_backoff_milliseconds + (
93+
random.uniform(0.0, 1.0) * (calculated_milliseconds - min_backoff_milliseconds)
94+
)
95+
96+
def __eq__(self, other):
97+
if len(self._replica_clients) != len(other._replica_clients):
98+
return False
99+
for i in range(len(self._replica_clients)): # pylint:disable=consider-using-enumerate
100+
if self._replica_clients[i] != other._replica_clients[i]:
101+
return False
102+
return True

0 commit comments

Comments
 (0)