Skip to content

Commit 25b5c8f

Browse files
Container recreate retry policy Support for Cosmos DB Python SDK (#36043)
* Add client cache to hold container properties This adds a client level cache for container properties. * Added tests for container properties cache * Update _cosmos_client_connection_async.py * Update _cosmos_client_connection_async.py * Fix typos * Update container.py * Update _container.py * Update cache test * Update test_container_properties_cache.py * update tests * Update test_container_properties_cache.py * Update tests updated tests that were compensating for extra container reads, the extra reads no longer happen with adding the container cache properties in the client. * tests updates * Mypy fixes * mypy fix * Update * Cache update Fixed some reccomnded changes, and added optimizations for the cache. Only including needed properties info that will be unlikely to change. * test update * Update CHANGELOG.md * added set_properties_cache method Added a method in base.py to set the properties cache so we only have to change one location in case in the future we want to cache additional properties * Update _base.py * Update _cosmos_client_connection.py * MyPy fixes * Fix mypy issue * Update _cosmos_client_connection.py * Update _cosmos_client_connection.py * Update _cosmos_client_connection_async.py * Update _cosmos_client_connection.py * Update container cache Changed container cache to be a private attribute. Also added tests to verify the contents of the cache to be correct ones. * Update sdk/cosmos/azure-cosmos/CHANGELOG.md Co-authored-by: Kushagra Thapar <[email protected]> * Additional pylint updates * more pylint fixes * pylint fix * Update test_container_properties_cache.py * Container Cache updates Fixed broken tests that used old properties cache. Added samples to show how new properties cache works and best way to get the container properties * Update CHANGELOG.md * Added support to handle container recreates This adds support to retry on a container recreate that will refresh the cache of container properties. * spell check for string literal * Pylint fixes * Pylint fixes * Update _container_recreate_retry_policy.py * pylint updates * Update _container_recreate_retry_policy.py * pylint fixes * Container Recreate Retry Policy Update Moves container properties refresh out of ShouldRetry. Handles reextraction of partition keys fro document definition. Error for getting throughput now thrown in the retry utils. * quick fixes * Update _cosmos_client_connection_async.py * Update _retry_utility_async.py * update no offer throughput error This updates the 404 from offerthroughput error to now have a client side substatus code instead of throwing a generic 404. * Update _container_recreate_retry_policy.py Pylint fixes * pylint fixes * Mypy fixes * more mypy fixes * Container Recreate Updates Changes based on Feedback. Also adds query tests for container recreate. * typehint and test updates added test for batch operations in container recreate * Update _container_recreate_retry_policy.py * Update exceptions.py * Add Changefeed Test Cases Updated with changes based on feedback, also added test cases for changefeed retries in container recreate. * docstring update added docstrings --------- Co-authored-by: Kushagra Thapar <[email protected]>
1 parent e82aaea commit 25b5c8f

17 files changed

+1550
-95
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
### 4.7.1 (Unreleased)
44

55
#### Features Added
6+
* Added Retry Policy for Container Recreate in the Python SDK. See [PR 36043](https://github.com/Azure/azure-sdk-for-python/pull/36043)
67

78
#### Breaking Changes
89

@@ -11,6 +12,9 @@
1112
* Fixed SDK regex validation that would not allow for item ids to be longer than 255 characters. See [PR 36569](https://github.com/Azure/azure-sdk-for-python/pull/36569).
1213

1314
#### Other Changes
15+
* Getting offer thoughput when it has not been defined in a container will now give a 404/10004 instead of just a 404. See [PR 36043](https://github.com/Azure/azure-sdk-for-python/pull/36043)
16+
* Incomplete Partition Key Extractions in documents for Subpartitioning now gives 400/1001 instead of just a 400. See [PR 36043](https://github.com/Azure/azure-sdk-for-python/pull/36043)
17+
1418

1519
### 4.7.0 (2024-05-15)
1620

sdk/cosmos/azure-cosmos/azure/cosmos/_base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,11 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
319319
if options.get("correlatedActivityId"):
320320
headers[http_constants.HttpHeaders.CorrelatedActivityId] = options["correlatedActivityId"]
321321

322+
# If it is an operation at the container level, verify the rid of the container to see if the cache needs to be
323+
# refreshed.
324+
if resource_type != 'dbs' and options.get("containerRID"):
325+
headers[http_constants.HttpHeaders.IntendedCollectionRID] = options["containerRID"]
326+
322327
return headers
323328

324329

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
# The MIT License (MIT)
2+
# Copyright (c) 2021 Microsoft Corporation
3+
4+
# Permission is hereby granted, free of charge, to any person obtaining a copy
5+
# of this software and associated documentation files (the "Software"), to deal
6+
# in the Software without restriction, including without limitation the rights
7+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
# copies of the Software, and to permit persons to whom the Software is
9+
# furnished to do so, subject to the following conditions:
10+
11+
# The above copyright notice and this permission notice shall be included in all
12+
# copies or substantial portions of the Software.
13+
14+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
# SOFTWARE.
21+
22+
"""Internal class for container recreate retry policy implementation in the Azure
23+
Cosmos database service.
24+
"""
25+
import json
26+
from typing import Optional, Dict, Any, List, Union
27+
28+
from azure.core.pipeline.transport._base import HttpRequest
29+
30+
from . import http_constants
31+
from .partition_key import _Empty, _Undefined
32+
33+
34+
# pylint: disable=protected-access
35+
36+
37+
class ContainerRecreateRetryPolicy:
38+
def __init__(self, client: Optional[Any], container_caches: Optional[Dict[str, Dict[str, Any]]],
39+
request: Optional[HttpRequest], *args: Optional[List]):
40+
self.retry_after_in_milliseconds = 0 # Same as in .net
41+
self.refresh_container_properties_cache = True
42+
self.args = args
43+
self._intended_headers = http_constants.HttpHeaders.IntendedCollectionRID
44+
self.container_rid = None
45+
self.container_link = None
46+
self.link = None
47+
self._headers = dict(request.headers) if request else {}
48+
if self._headers and self._intended_headers in self._headers:
49+
self.container_rid = self._headers[self._intended_headers]
50+
if container_caches:
51+
self.container_link = self.__find_container_link_with_rid(container_caches, self.container_rid)
52+
self.client = client
53+
self.exception = None
54+
55+
def ShouldRetry(self, exception: Optional[Any]) -> bool:
56+
"""Returns true if the request should retry based on the passed-in exception.
57+
58+
:param (exceptions.CosmosHttpResponseError instance) exception:
59+
:returns: a boolean stating whether the request should be retried
60+
:rtype: bool
61+
62+
"""
63+
64+
self.exception = exception # needed for pylint
65+
if self.refresh_container_properties_cache:
66+
if not self.container_rid or not self.container_link:
67+
return False
68+
self.refresh_container_properties_cache = False
69+
return True
70+
return False
71+
72+
def __find_container_link_with_rid(self, container_properties_caches: Optional[Dict[str, Any]], rid: str) -> \
73+
Optional[str]:
74+
if container_properties_caches:
75+
for key, inner_dict in container_properties_caches.items():
76+
is_match = next((k for k, v in inner_dict.items() if v == rid), None)
77+
if is_match:
78+
return key
79+
# If we cannot get the container link at all it might mean the cache was somehow deleted, this isn't
80+
# a container request so this retry is not needed. Return None.
81+
return None
82+
83+
def check_if_rid_different(self, container_link: str,
84+
container_properties_caches: Optional[Dict[str, Any]], rid: str) -> bool:
85+
if container_properties_caches:
86+
return container_properties_caches[container_link]["_rid"] == rid
87+
return not rid
88+
89+
def should_extract_partition_key(self, container_cache: Optional[Dict[str, Any]]) -> bool:
90+
if self._headers and http_constants.HttpHeaders.PartitionKey in self._headers:
91+
current_partition_key = self._headers[http_constants.HttpHeaders.PartitionKey]
92+
partition_key_definition = container_cache["partitionKey"] if container_cache else None
93+
if partition_key_definition and partition_key_definition["kind"] == "MultiHash":
94+
# A null in the multihash partition key indicates a failure in extracting partition keys
95+
# from the document definition
96+
return 'null' in current_partition_key
97+
# These values indicate the partition key was not successfully extracted from the document definition
98+
return current_partition_key in ('[{}]', '[]', [{}], [])
99+
return False
100+
101+
def _extract_partition_key(self, client: Optional[Any], container_cache: Optional[Dict[str, Any]], body: str)\
102+
-> Optional[Union[str, List, Dict]]:
103+
partition_key_definition = container_cache["partitionKey"] if container_cache else None
104+
body_dict = self.__str_to_dict(body)
105+
new_partition_key: Optional[Union[str, List, Dict]] = None
106+
if body_dict:
107+
options = client._AddPartitionKey(self.container_link, body_dict, {}) if client else {}
108+
# if partitionKey value is Undefined, serialize it as [{}] to be consistent with other SDKs.
109+
if options and isinstance(options["partitionKey"], _Undefined):
110+
new_partition_key = [{}]
111+
# If partitionKey value is Empty, serialize it as [], which is the equivalent sent for migrated collections
112+
elif options and isinstance(options["partitionKey"], _Empty):
113+
new_partition_key = []
114+
# else serialize using json dumps method which apart from regular values will serialize None into null
115+
elif partition_key_definition and partition_key_definition["kind"] == "MultiHash":
116+
new_partition_key = json.dumps(options["partitionKey"], separators=(',', ':'))
117+
else:
118+
new_partition_key = json.dumps([options["partitionKey"]])
119+
return new_partition_key
120+
121+
async def _extract_partition_key_async(self, client: Optional[Any],
122+
container_cache: Optional[Dict[str, Any]],
123+
body: str) -> Optional[Union[str, List, Dict]]:
124+
partition_key_definition = container_cache["partitionKey"] if container_cache else None
125+
body_dict = self.__str_to_dict(body)
126+
new_partition_key: Optional[Union[str, List, Dict]] = None
127+
if body_dict:
128+
options = await client._AddPartitionKey(self.container_link, body_dict, {}) if client else {}
129+
# if partitionKey value is Undefined, serialize it as [{}] to be consistent with other SDKs.
130+
if isinstance(options["partitionKey"], _Undefined):
131+
new_partition_key = [{}]
132+
# If partitionKey value is Empty, serialize it as [], which is the equivalent sent for migrated collections
133+
elif isinstance(options["partitionKey"], _Empty):
134+
new_partition_key = []
135+
# else serialize using json dumps method which apart from regular values will serialize None into null
136+
elif partition_key_definition["kind"] == "MultiHash":
137+
new_partition_key = json.dumps(options["partitionKey"], separators=(',', ':'))
138+
else:
139+
new_partition_key = json.dumps([options["partitionKey"]])
140+
return new_partition_key
141+
142+
def should_update_throughput_link(self, body: Optional[str], cached_container: Optional[Dict[str, Any]]) -> bool:
143+
body_dict = self.__str_to_dict(body) if body else None
144+
if not body_dict:
145+
return False
146+
try:
147+
# If this is a request to get throughput properties then we will update the link
148+
if body_dict["query"] == "SELECT * FROM root r WHERE r.resource=@link":
149+
self.link = cached_container["_self"] if cached_container else None
150+
return True
151+
except (TypeError, IndexError, KeyError):
152+
return False
153+
return False
154+
155+
def _update_throughput_link(self, body: str) -> str:
156+
body_dict = self.__str_to_dict(body) if body else None
157+
if not body_dict:
158+
return body
159+
body_dict["parameters"][0]["value"] = self.link
160+
return json.dumps(body_dict, separators=(',', ':'))
161+
162+
def __str_to_dict(self, dict_string: str) -> Dict:
163+
try:
164+
# Use json.loads() to convert string to dictionary
165+
dict_obj = json.loads(dict_string)
166+
return dict_obj
167+
except (SyntaxError, ValueError):
168+
return {}

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1173,6 +1173,10 @@ def _QueryChangeFeed(
11731173
collection_id = base.GetResourceIdOrFullNameFromLink(collection_link)
11741174

11751175
def fetch_fn(options: Mapping[str, Any]) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
1176+
if collection_link in self.__container_properties_cache:
1177+
new_options = dict(options)
1178+
new_options["containerRID"] = self.__container_properties_cache[collection_link]["_rid"]
1179+
options = new_options
11761180
return self.__QueryFeed(
11771181
path,
11781182
resource_key,
@@ -3202,7 +3206,6 @@ def _AddPartitionKey(
32023206
options: Mapping[str, Any]
32033207
) -> Dict[str, Any]:
32043208
collection_link = base.TrimBeginningAndEndingSlashes(collection_link)
3205-
# TODO: Refresh the cache if partition is extracted automatically and we get a 400.1001
32063209
partitionKeyDefinition = self._get_partition_key_definition(collection_link)
32073210
new_options = dict(options)
32083211
# If the collection doesn't have a partition key definition, skip it as it's a legacy collection
@@ -3218,19 +3221,19 @@ def _ExtractPartitionKey(
32183221
self,
32193222
partitionKeyDefinition: Mapping[str, Any],
32203223
document: Mapping[str, Any]
3221-
) -> Union[List[Union[str, float, bool, _Empty]], str, float, bool, _Empty, _Undefined]:
3224+
) -> Union[List[Optional[Union[str, float, bool]]], str, float, bool, _Empty, _Undefined]:
32223225
if partitionKeyDefinition["kind"] == "MultiHash":
3223-
ret = []
3226+
ret: List[Optional[Union[str, float, bool]]] = []
32243227
for partition_key_level in partitionKeyDefinition["paths"]:
32253228
# Parses the paths into a list of token each representing a property
32263229
partition_key_parts = base.ParsePaths([partition_key_level])
32273230
# Check if the partitionKey is system generated or not
32283231
is_system_key = partitionKeyDefinition["systemKey"] if "systemKey" in partitionKeyDefinition else False
3229-
32303232
# Navigates the document to retrieve the partitionKey specified in the paths
3231-
val = self._retrieve_partition_key(partition_key_parts, document, is_system_key)
3232-
if isinstance(val, _Undefined):
3233-
break
3233+
val: Optional[Union[str, float, bool, _Empty, _Undefined]] = self._retrieve_partition_key(
3234+
partition_key_parts, document, is_system_key)
3235+
if isinstance(val, (_Undefined, _Empty)):
3236+
val = None
32343237
ret.append(val)
32353238
return ret
32363239

@@ -3272,6 +3275,12 @@ def refresh_routing_map_provider(self) -> None:
32723275
# re-initializes the routing map provider, effectively refreshing the current partition key range cache
32733276
self._routing_map_provider = routing_map_provider.SmartRoutingMapProvider(self)
32743277

3278+
def _refresh_container_properties_cache(self, container_link: str):
3279+
# If container properties cache is stale, refresh it by reading the container.
3280+
container = self.ReadContainer(container_link, options=None)
3281+
# Only cache Container Properties that will not change in the lifetime of the container
3282+
self._set_container_properties_cache(container_link, _set_properties_cache(container))
3283+
32753284
def _UpdateSessionIfRequired(
32763285
self,
32773286
request_headers: Mapping[str, Any],

sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@
2121

2222
"""Internal methods for executing functions in the Azure Cosmos database service.
2323
"""
24-
24+
import json
2525
import time
2626
from typing import Optional
2727

2828
from azure.core.exceptions import AzureError, ClientAuthenticationError
2929
from azure.core.pipeline import PipelineRequest
3030
from azure.core.pipeline.policies import RetryPolicy
31+
from azure.core.pipeline.transport._base import HttpRequest
3132

3233
from . import exceptions
3334
from . import _endpoint_discovery_retry_policy
@@ -36,10 +37,11 @@
3637
from . import _session_retry_policy
3738
from . import _gone_retry_policy
3839
from . import _timeout_failover_retry_policy
40+
from . import _container_recreate_retry_policy
3941
from .http_constants import HttpHeaders, StatusCodes, SubStatusCodes
4042

4143

42-
# pylint: disable=protected-access
44+
# pylint: disable=protected-access, disable=too-many-lines, disable=too-many-statements, disable=too-many-branches
4345

4446

4547
def Execute(client, global_endpoint_manager, function, *args, **kwargs):
@@ -76,6 +78,16 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs):
7678
timeout_failover_retry_policy = _timeout_failover_retry_policy._TimeoutFailoverRetryPolicy(
7779
client.connection_policy, global_endpoint_manager, *args
7880
)
81+
# HttpRequest we would need to modify for Container Recreate Retry Policy
82+
request: Optional[HttpRequest] = None
83+
if args and len(args) > 3:
84+
# Reference HttpRequest instance in args
85+
request = args[3]
86+
container_recreate_retry_policy = _container_recreate_retry_policy.ContainerRecreateRetryPolicy(
87+
client, client._container_properties_cache, request, *args)
88+
else:
89+
container_recreate_retry_policy = _container_recreate_retry_policy.ContainerRecreateRetryPolicy(
90+
client, client._container_properties_cache, None, *args)
7991

8092
while True:
8193
client_timeout = kwargs.get('timeout')
@@ -95,7 +107,19 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs):
95107
client.last_response_headers[
96108
HttpHeaders.ThrottleRetryWaitTimeInMs
97109
] = resourceThrottle_retry_policy.cumulative_wait_time_in_milliseconds
98-
110+
# TODO: It is better to raise Exceptions manually in the method related to the request,
111+
# a rework of retry would be needed to be able to retry exceptions raised that way.
112+
# for now raising a manual exception here should allow it to be retried.
113+
# If container does not have throughput, results will return empty list.
114+
# We manually raise a 404. We raise it here, so we can handle it in retry utilities.
115+
if result and isinstance(result[0], dict) and 'Offers' in result[0] and \
116+
not result[0]['Offers'] and request.method == 'POST':
117+
# Grab the link used for getting throughput properties to add to message.
118+
link = json.loads(request.body)["parameters"][0]["value"]
119+
raise exceptions.CosmosResourceNotFoundError(
120+
status_code=StatusCodes.NOT_FOUND,
121+
message="Could not find ThroughputProperties for container " + link,
122+
sub_status_code=SubStatusCodes.THROUGHPUT_OFFER_NOT_FOUND)
99123
return result
100124
except exceptions.CosmosHttpResponseError as e:
101125
retry_policy = defaultRetry_policy
@@ -112,6 +136,31 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs):
112136
retry_policy = sessionRetry_policy
113137
elif exceptions._partition_range_is_gone(e):
114138
retry_policy = partition_key_range_gone_retry_policy
139+
elif exceptions._container_recreate_exception(e):
140+
retry_policy = container_recreate_retry_policy
141+
# Before we retry if retry policy is container recreate, we need refresh the cache of the
142+
# container properties and pass in the new RID in the headers.
143+
client._refresh_container_properties_cache(retry_policy.container_link)
144+
if e.sub_status != SubStatusCodes.COLLECTION_RID_MISMATCH and retry_policy.check_if_rid_different(
145+
retry_policy.container_link, client._container_properties_cache, retry_policy.container_rid):
146+
retry_policy.refresh_container_properties_cache = False
147+
else:
148+
cached_container = client._container_properties_cache[retry_policy.container_link]
149+
# If partition key value was previously extracted from the document definition
150+
# reattempt to extract partition key with updated partition key definition
151+
if retry_policy.should_extract_partition_key(cached_container):
152+
new_partition_key = retry_policy._extract_partition_key(
153+
client, container_cache=cached_container, body=request.body
154+
)
155+
request.headers[HttpHeaders.PartitionKey] = new_partition_key
156+
# If getting throughput, we have to replace the container link received from stale cache
157+
# with refreshed cache
158+
if retry_policy.should_update_throughput_link(request.body, cached_container):
159+
new_body = retry_policy._update_throughput_link(request.body)
160+
request.body = new_body
161+
162+
retry_policy.container_rid = cached_container["_rid"]
163+
request.headers[retry_policy._intended_headers] = retry_policy.container_rid
115164
elif e.status_code in (StatusCodes.REQUEST_TIMEOUT, e.status_code == StatusCodes.SERVICE_UNAVAILABLE):
116165
retry_policy = timeout_failover_retry_policy
117166

0 commit comments

Comments
 (0)