Skip to content

Commit f8ab118

Browse files
authored
[Cosmos] Resiliency and Documentation Improvements (Azure#36514)
* 403.3 loop fix, regional routing fix, improvement on service request errors functional code, missing tests now * Update ErrorCodesAndRetries.md * Update TimeoutAndRetriesConfig.md * Update http_constants.py * Update CHANGELOG.md * test improvements for 403 retry * fix emulator tests * Update test_globaldb.py * Update test_globaldb.py * add ServiceRequestError test and doc update * addressing comments * Update test_globaldb.py * Update test_globaldb.py * Update test_globaldb.py * Update test_globaldb.py * move policy * revert * fixes * Update test_globaldb.py * Update test_globaldb.py * Update test_globaldb.py * Update test_globaldb.py * Update test_globaldb.py * Update CHANGELOG.md * 503 retries * align readme with changelog * forceful db account refresh * remove premature locational endpoint * make GEM refresh every 5 mins as it should have * Delete drz3-drill.txt * Update CHANGELOG.md * Update test_location_cache.py * Update _global_endpoint_manager.py * ensure only one initial database account call * Delete dr-zdrill-005.txt * Update test_location_cache.py * Update test_location_cache.py * Update test_location_cache.py * overhaul location_cache tests
1 parent 886139f commit f8ab118

18 files changed

+391
-534
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,23 @@
33
### 4.7.1 (Unreleased)
44

55
#### Features Added
6+
* SDK will now retry all ServiceRequestErrors (failing outgoing requests) before failing. Default number of retries is 3. See [PR 36514](https://github.com/Azure/azure-sdk-for-python/pull/36514).
67
* Added Retry Policy for Container Recreate in the Python SDK. See [PR 36043](https://github.com/Azure/azure-sdk-for-python/pull/36043)
78
* Added option to disable write payload on writes. See [PR 37365](https://github.com/Azure/azure-sdk-for-python/pull/37365)
89
* Added get feed ranges API. See [PR 37687](https://github.com/Azure/azure-sdk-for-python/pull/37687)
910
* Added feed range support in `query_items_change_feed`. See [PR 37687](https://github.com/Azure/azure-sdk-for-python/pull/37687)
1011

11-
#### Breaking Changes
12-
1312
#### Bugs Fixed
14-
* Consolidated Container Properties Cache to be in the Client to cache partition key definition and container rid to avoid unnecessary container reads. See [PR 35731](https://github.com/Azure/azure-sdk-for-python/pull/35731)
13+
* Consolidated Container Properties Cache to be in the Client to cache partition key definition and container rid to avoid unnecessary container reads. See [PR 35731](https://github.com/Azure/azure-sdk-for-python/pull/35731).
14+
* Fixed bug with client hangs when running into WriteForbidden exceptions. See [PR 36514](https://github.com/Azure/azure-sdk-for-python/pull/36514).
15+
* Added retry handling logic for DatabaseAccountNotFound exceptions. See [PR 36514](https://github.com/Azure/azure-sdk-for-python/pull/36514).
1516
* Fixed SDK regex validation that would not allow for item ids to be longer than 255 characters. See [PR 36569](https://github.com/Azure/azure-sdk-for-python/pull/36569).
1617
* Fixed issue where 'NoneType' object has no attribute error was raised when a session retry happened during a query. See [PR 37578](https://github.com/Azure/azure-sdk-for-python/pull/37578).
1718

1819
#### Other Changes
1920
* Getting offer thoughput when it has not been defined in a container will now give a 404/10004 instead of just a 404. See [PR 36043](https://github.com/Azure/azure-sdk-for-python/pull/36043)
2021
* Incomplete Partition Key Extractions in documents for Subpartitioning now gives 400/1001 instead of just a 400. See [PR 36043](https://github.com/Azure/azure-sdk-for-python/pull/36043)
21-
22+
* SDK will now make database account calls every 5 minutes to refresh location cache. See [PR 36514](https://github.com/Azure/azure-sdk-for-python/pull/36514).
2223

2324
### 4.7.0 (2024-05-15)
2425

sdk/cosmos/azure-cosmos/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ New releases of this SDK won't support Python 2.x starting January 1st, 2022. Pl
3131

3232
* Azure subscription - [Create a free account][azure_sub]
3333
* Azure [Cosmos DB account][cosmos_account] - SQL API
34-
* [Python 3.6+][python]
34+
* [Python 3.8+][python]
3535

3636
If you need a Cosmos DB SQL API account, you can create one with this [Azure CLI][azure_cli] command:
3737

sdk/cosmos/azure-cosmos/azure/cosmos/_endpoint_discovery_retry_policy.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,4 @@ def ShouldRetry(self, exception): # pylint: disable=unused-argument
9696
# is set to false
9797
self.request.route_to_location_with_preferred_location_flag(self.failover_retry_count, False)
9898

99-
# Resolve the endpoint for the request and pin the resolution to the resolved endpoint
100-
# This enables marking the endpoint unavailability on endpoint failover/unreachability
101-
self.location_endpoint = self.global_endpoint_manager.resolve_service_endpoint(self.request)
102-
self.request.route_to_location(self.location_endpoint)
10399
return True

sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from . import exceptions
3232
from ._location_cache import LocationCache
3333

34+
3435
# pylint: disable=protected-access
3536

3637

@@ -89,29 +90,29 @@ def force_refresh(self, database_account):
8990
self.refresh_endpoint_list(database_account)
9091

9192
def refresh_endpoint_list(self, database_account, **kwargs):
92-
with self.refresh_lock:
93-
# if refresh is not needed or refresh is already taking place, return
94-
if not self.refresh_needed:
95-
return
96-
try:
97-
self._refresh_endpoint_list_private(database_account, **kwargs)
98-
except Exception as e:
99-
raise e
93+
if self.location_cache.current_time_millis() - self.last_refresh_time > self.refresh_time_interval_in_ms:
94+
self.refresh_needed = True
95+
if self.refresh_needed:
96+
with self.refresh_lock:
97+
# if refresh is not needed or refresh is already taking place, return
98+
if not self.refresh_needed:
99+
return
100+
try:
101+
self._refresh_endpoint_list_private(database_account, **kwargs)
102+
except Exception as e:
103+
raise e
100104

101105
def _refresh_endpoint_list_private(self, database_account=None, **kwargs):
102106
if database_account:
103107
self.location_cache.perform_on_database_account_read(database_account)
104108
self.refresh_needed = False
105-
106-
if (
107-
self.location_cache.should_refresh_endpoints()
108-
and self.location_cache.current_time_millis() - self.last_refresh_time > self.refresh_time_interval_in_ms
109-
):
110-
if not database_account:
109+
self.last_refresh_time = self.location_cache.current_time_millis()
110+
else:
111+
if self.location_cache.should_refresh_endpoints() or self.refresh_needed:
112+
self.refresh_needed = False
113+
self.last_refresh_time = self.location_cache.current_time_millis()
111114
database_account = self._GetDatabaseAccount(**kwargs)
112115
self.location_cache.perform_on_database_account_read(database_account)
113-
self.last_refresh_time = self.location_cache.current_time_millis()
114-
self.refresh_needed = False
115116

116117
def _GetDatabaseAccount(self, **kwargs):
117118
"""Gets the database account.

sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,17 +200,17 @@ def clear_stale_endpoint_unavailability_info(self):
200200

201201
self.location_unavailability_info_by_endpoint = new_location_unavailability_info
202202

203-
def is_endpoint_unavailable(self, endpoint, expected_available_operations):
203+
def is_endpoint_unavailable(self, endpoint: str, expected_available_operation: str):
204204
unavailability_info = (
205205
self.location_unavailability_info_by_endpoint[endpoint]
206206
if endpoint in self.location_unavailability_info_by_endpoint
207207
else None
208208
)
209209

210210
if (
211-
expected_available_operations == EndpointOperationType.NoneType
211+
expected_available_operation == EndpointOperationType.NoneType
212212
or not unavailability_info
213-
or expected_available_operations not in unavailability_info["operationType"]
213+
or expected_available_operation not in unavailability_info["operationType"]
214214
):
215215
return False
216216

sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import time
2626
from typing import Optional
2727

28-
from azure.core.exceptions import AzureError, ClientAuthenticationError
28+
from azure.core.exceptions import AzureError, ClientAuthenticationError, ServiceRequestError
2929
from azure.core.pipeline import PipelineRequest
3030
from azure.core.pipeline.policies import RetryPolicy
3131
from azure.core.pipeline.transport._base import HttpRequest
@@ -124,7 +124,8 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs):
124124
except exceptions.CosmosHttpResponseError as e:
125125
retry_policy = defaultRetry_policy
126126
# Re-assign retry policy based on error code
127-
if e.status_code == StatusCodes.FORBIDDEN and e.sub_status == SubStatusCodes.WRITE_FORBIDDEN:
127+
if e.status_code == StatusCodes.FORBIDDEN and e.sub_status in\
128+
[SubStatusCodes.DATABASE_ACCOUNT_NOT_FOUND, SubStatusCodes.WRITE_FORBIDDEN]:
128129
retry_policy = endpointDiscovery_retry_policy
129130
elif e.status_code == StatusCodes.TOO_MANY_REQUESTS:
130131
retry_policy = resourceThrottle_retry_policy
@@ -161,7 +162,7 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs):
161162

162163
retry_policy.container_rid = cached_container["_rid"]
163164
request.headers[retry_policy._intended_headers] = retry_policy.container_rid
164-
elif e.status_code in (StatusCodes.REQUEST_TIMEOUT, e.status_code == StatusCodes.SERVICE_UNAVAILABLE):
165+
elif e.status_code in [StatusCodes.REQUEST_TIMEOUT, StatusCodes.SERVICE_UNAVAILABLE]:
165166
retry_policy = timeout_failover_retry_policy
166167

167168
# If none of the retry policies applies or there is no retry needed, set the
@@ -259,6 +260,15 @@ def send(self, request):
259260
timeout_error.response = response
260261
timeout_error.history = retry_settings['history']
261262
raise
263+
except ServiceRequestError as err:
264+
# the request ran into a socket timeout or failed to establish a new connection
265+
# since request wasn't sent, we retry up to however many connection retries are configured (default 3)
266+
if retry_settings['connect'] > 0:
267+
retry_active = self.increment(retry_settings, response=request, error=err)
268+
if retry_active:
269+
self.sleep(retry_settings, request.context.transport)
270+
continue
271+
raise err
262272
except AzureError as err:
263273
retry_error = err
264274
if self._is_method_retryable(retry_settings, request.http_request):

sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ def _request_body_from_data(data):
5959
if data is None or isinstance(data, str) or _is_readable_stream(data):
6060
return data
6161
if isinstance(data, (dict, list, tuple)):
62-
6362
json_dumped = json.dumps(data, separators=(",", ":"))
6463

6564
return json_dumped
@@ -70,9 +69,8 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin
7069
"""Makes one http request using the requests module.
7170
7271
:param _GlobalEndpointManager global_endpoint_manager:
73-
:param dict request_params:
74-
contains the resourceType, operationType, endpointOverride,
75-
useWriteEndpoint, useAlternateWriteEndpoint information
72+
:param ~azure.cosmos._request_object.RequestObject request_params:
73+
contains information for the request, like the resource_type, operation_type, and endpoint_override
7674
:param documents.ConnectionPolicy connection_policy:
7775
:param azure.core.PipelineClient pipeline_client:
7876
Pipeline client to process the request
@@ -90,7 +88,8 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin
9088
# Every request tries to perform a refresh
9189
client_timeout = kwargs.get('timeout')
9290
start_time = time.time()
93-
global_endpoint_manager.refresh_endpoint_list(None, **kwargs)
91+
if request_params.resource_type != http_constants.ResourceType.DatabaseAccount:
92+
global_endpoint_manager.refresh_endpoint_list(None, **kwargs)
9493
if client_timeout is not None:
9594
kwargs['timeout'] = client_timeout - (time.time() - start_time)
9695
if kwargs['timeout'] <= 0:
@@ -100,8 +99,8 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin
10099
base_url = request_params.endpoint_override
101100
else:
102101
base_url = global_endpoint_manager.resolve_service_endpoint(request_params)
103-
if base_url != pipeline_client._base_url:
104-
request.url = request.url.replace(pipeline_client._base_url, base_url)
102+
if not request.url.startswith(base_url):
103+
request.url = _replace_url_prefix(request.url, base_url)
105104

106105
parse_result = urlparse(request.url)
107106

@@ -167,20 +166,31 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin
167166
return result, headers
168167

169168

169+
def _replace_url_prefix(original_url, new_prefix):
170+
parts = original_url.split('/', 3)
171+
172+
if not new_prefix.endswith('/'):
173+
new_prefix += '/'
174+
175+
new_url = new_prefix + parts[3] if len(parts) > 3 else new_prefix
176+
177+
return new_url
178+
179+
170180
def _PipelineRunFunction(pipeline_client, request, **kwargs):
171181
# pylint: disable=protected-access
172182

173183
return pipeline_client._pipeline.run(request, **kwargs)
174184

175185
def SynchronizedRequest(
176-
client,
177-
request_params,
178-
global_endpoint_manager,
179-
connection_policy,
180-
pipeline_client,
181-
request,
182-
request_data,
183-
**kwargs
186+
client,
187+
request_params,
188+
global_endpoint_manager,
189+
connection_policy,
190+
pipeline_client,
191+
request,
192+
request_data,
193+
**kwargs
184194
):
185195
"""Performs one synchronized http request according to the parameters.
186196

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,15 @@
3131
from .. import exceptions
3232
from .. import http_constants
3333
from . import _retry_utility_async
34-
from .._synchronized_request import _request_body_from_data
34+
from .._synchronized_request import _request_body_from_data, _replace_url_prefix
3535

3636

3737
async def _Request(global_endpoint_manager, request_params, connection_policy, pipeline_client, request, **kwargs):
3838
"""Makes one http request using the requests module.
3939
4040
:param _GlobalEndpointManager global_endpoint_manager:
41-
:param dict request_params:
42-
contains the resourceType, operationType, endpointOverride,
43-
useWriteEndpoint, useAlternateWriteEndpoint information
41+
:param ~azure.cosmos._request_object.RequestObject request_params:
42+
contains information for the request, like the resource_type, operation_type, and endpoint_override
4443
:param documents.ConnectionPolicy connection_policy:
4544
:param azure.core.PipelineClient pipeline_client:
4645
Pipeline client to process the request
@@ -58,7 +57,8 @@ async def _Request(global_endpoint_manager, request_params, connection_policy, p
5857
# Every request tries to perform a refresh
5958
client_timeout = kwargs.get('timeout')
6059
start_time = time.time()
61-
await global_endpoint_manager.refresh_endpoint_list(None, **kwargs)
60+
if request_params.resource_type != http_constants.ResourceType.DatabaseAccount:
61+
await global_endpoint_manager.refresh_endpoint_list(None, **kwargs)
6262
if client_timeout is not None:
6363
kwargs['timeout'] = client_timeout - (time.time() - start_time)
6464
if kwargs['timeout'] <= 0:
@@ -68,8 +68,8 @@ async def _Request(global_endpoint_manager, request_params, connection_policy, p
6868
base_url = request_params.endpoint_override
6969
else:
7070
base_url = global_endpoint_manager.resolve_service_endpoint(request_params)
71-
if base_url != pipeline_client._base_url:
72-
request.url = request.url.replace(pipeline_client._base_url, base_url)
71+
if not request.url.startswith(base_url):
72+
request.url = _replace_url_prefix(request.url, base_url)
7373

7474
parse_result = urlparse(request.url)
7575

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_endpoint_manager_async.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from .. import exceptions
3030
from .._location_cache import LocationCache
3131

32+
3233
# pylint: disable=protected-access
3334

3435
class _GlobalEndpointManager(object):
@@ -83,6 +84,8 @@ async def force_refresh(self, database_account):
8384
await self.refresh_endpoint_list(database_account)
8485

8586
async def refresh_endpoint_list(self, database_account, **kwargs):
87+
if self.location_cache.current_time_millis() - self.last_refresh_time > self.refresh_time_interval_in_ms:
88+
self.refresh_needed = True
8689
if self.refresh_needed:
8790
async with self.refresh_lock:
8891
# if refresh is not needed or refresh is already taking place, return
@@ -94,18 +97,16 @@ async def refresh_endpoint_list(self, database_account, **kwargs):
9497
raise e
9598

9699
async def _refresh_endpoint_list_private(self, database_account=None, **kwargs):
97-
self.refresh_needed = False
98100
if database_account:
99101
self.location_cache.perform_on_database_account_read(database_account)
102+
self.refresh_needed = False
103+
self.last_refresh_time = self.location_cache.current_time_millis()
100104
else:
101-
if (
102-
self.location_cache.should_refresh_endpoints()
103-
and
104-
self.location_cache.current_time_millis() - self.last_refresh_time > self.refresh_time_interval_in_ms
105-
):
105+
if self.location_cache.should_refresh_endpoints() or self.refresh_needed:
106+
self.refresh_needed = False
107+
self.last_refresh_time = self.location_cache.current_time_millis()
106108
database_account = await self._GetDatabaseAccount(**kwargs)
107109
self.location_cache.perform_on_database_account_read(database_account)
108-
self.last_refresh_time = self.location_cache.current_time_millis()
109110

110111
async def _GetDatabaseAccount(self, **kwargs):
111112
"""Gets the database account.

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import asyncio
2727
from typing import Optional
2828

29-
from azure.core.exceptions import AzureError, ClientAuthenticationError
29+
from azure.core.exceptions import AzureError, ClientAuthenticationError, ServiceRequestError
3030
from azure.core.pipeline.policies import AsyncRetryPolicy
3131
from azure.core.pipeline.transport._base import HttpRequest
3232

@@ -123,7 +123,8 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg
123123
return result
124124
except exceptions.CosmosHttpResponseError as e:
125125
retry_policy = None
126-
if e.status_code == StatusCodes.FORBIDDEN and e.sub_status == SubStatusCodes.WRITE_FORBIDDEN:
126+
if e.status_code == StatusCodes.FORBIDDEN and e.sub_status in \
127+
[SubStatusCodes.DATABASE_ACCOUNT_NOT_FOUND, SubStatusCodes.WRITE_FORBIDDEN]:
127128
retry_policy = endpointDiscovery_retry_policy
128129
elif e.status_code == StatusCodes.TOO_MANY_REQUESTS:
129130
retry_policy = resourceThrottle_retry_policy
@@ -160,7 +161,7 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg
160161

161162
retry_policy.container_rid = cached_container["_rid"]
162163
request.headers[retry_policy._intended_headers] = retry_policy.container_rid
163-
elif e.status_code in (StatusCodes.REQUEST_TIMEOUT, e.status_code == StatusCodes.SERVICE_UNAVAILABLE):
164+
elif e.status_code in [StatusCodes.REQUEST_TIMEOUT, StatusCodes.SERVICE_UNAVAILABLE]:
164165
retry_policy = timeout_failover_retry_policy
165166
else:
166167
retry_policy = defaultRetry_policy
@@ -245,6 +246,15 @@ async def send(self, request):
245246
timeout_error.response = response
246247
timeout_error.history = retry_settings['history']
247248
raise
249+
except ServiceRequestError as err:
250+
# the request ran into a socket timeout or failed to establish a new connection
251+
# since request wasn't sent, we retry up to however many connection retries are configured (default 3)
252+
if retry_settings['connect'] > 0:
253+
retry_active = self.increment(retry_settings, response=request, error=err)
254+
if retry_active:
255+
await self.sleep(retry_settings, request.context.transport)
256+
continue
257+
raise err
248258
except AzureError as err:
249259
retry_error = err
250260
if self._is_method_retryable(retry_settings, request.http_request):

0 commit comments

Comments
 (0)