Skip to content

Commit 37c99f0

Browse files
tvaron3kushagraThaparFabianMeiswinkelallenkim0129
authored
Per Partition Circuit Breaker (Azure#40302)
* change default read timeout * fix tests * Add read timeout tests for database account calls * fix timeout retry policy * Fixed the timeout logic * Fixed the timeout retry policy * Mock tests for timeout and failover retry policy * Create test_dummy.py * Update test_dummy.py * Update test_dummy.py * Update test_dummy.py * Iterating on fault injection tooling * Refactoring to have FaultInjectionTransport in its own file * Update test_dummy.py * Reafctoring FaultInjectionTransport * Iterating on tests * Prettifying tests * small refactoring * Adding MM topology on Emulator * Adding cross region retry tests * Add Excluded Locations Feature * initial ppcb changes * add missing changes * fix mypy errors * Refactored gem for ppcb and hooked up retryconfigurations with failure tracking * fix use multiple write locations bug * clean up and revert vs env variable changes * remove async await * refactor and fix tests * Fix refactoring * Fix tests * fix tests * add more tests * add more tests * Add tests * fix tests * fix tests * fix tests * fix test * fix test * fix tests * fix async in test * Added multi-region tests * Fix _AddParitionKey to pass options to sub methods * Added initial live tests * Updated live-platform-matrix for multi-region tests * initial sync version of fault injection * add all sync tests * add new error and fix logs * fix test * Add cosmosQuery mark to TestQuery * Correct spelling * Fixed live platform matrix syntax * Changed Multi-regions * first ppcb test * fix test * refactor due to pk range wrapper needing io call and pylint * Added client level ExcludedLocation for async * Update Live test settings * Added Async tests * Add more live tests for all other Python versions * Fix Async test failure * add test for failure_rate threshold * Fix live test failures * fix pylint and cspell * Fix live test failures * fix pylint * Fix live test failures * Add test_delete_all_items_by_partition_key * Remove test_delete_all_items_by_partition_key * fix and add tests * add collection rid to batch * add partition key range id to partition key range to cache * address failures * update tests * Added missing doc for excluded_locations in async client * Remove duplicate functions * add more operations * Fix live tests with multi write locations * Fixed bug with endpoint routing with multi write region partition key API calls * Adding emulator tests for delete_all_items_by_partition_key API * minimized duplicate codes * Added Async emulator tests * Nit: Changed test names * Addressed comments about documents * live tests * fix tests * add container rid * fix mm tests * test improvements * recovering optimizations, lower request timeout, disable in region retries * recovering optimizations, lower request timeout, disable in region retries * fix transitions from a success * Implement exponential backoff * fix pylint * add sync tests * refactor tests * sync changes * sync changes and cleanup * container cache changes * revert change * add extra mapping to container cache * fix emulator tests * add sync single master tests * fix tests * fix tests * fix tests * fix tests * fix tests * test changes * fix some tests * fix tests * Rename test files * fix tests and setup ppcb pipeline * fix tests * fix ci tests * move all ppcb tests to live tests * add logger for ppcb test * fix ci tests * fix tests * fixed resource token bug and tests * fix tests and split up them up across to live tests pipelines * fix tests and cspell * add sync deleta all item partiton key tests * fix tests * fix ci tests * fix tests * fix ci * fix ci * refactor tests, removed print statements, and added async emulator delete all items by partition key tests * fix tests * fix ci tests * fix live tests * fix tests * remove unnecessary line * fix tests * fix tests * fix tests * fix tests * fix tests * fix tests * fix tests * remove unused logger * add regression testing for cross partition queries * react to comments * react to comments * react to async client changes from merge * react to comments * fix tests * react to comments and fix tests * react to comments and fix tests * fix tests --------- Co-authored-by: Kushagra Thapar <[email protected]> Co-authored-by: Kushagra Thapar <[email protected]> Co-authored-by: Fabian Meiswinkel <[email protected]> Co-authored-by: Allen Kim <[email protected]> Co-authored-by: Allen Kim <[email protected]>
1 parent 2da0f5c commit 37c99f0

File tree

58 files changed

+3131
-325
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+3131
-325
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
#### Features Added
66
* Added ability to set a user agent suffix at the client level. See [PR 40904](https://github.com/Azure/azure-sdk-for-python/pull/40904)
77
* Added ability to use request level `excluded_locations` on metadata calls, such as getting container properties. See [PR 40905](https://github.com/Azure/azure-sdk-for-python/pull/40905)
8+
* Per partition circuit breaker support. It can be enabled through the environment variable `AZURE_COSMOS_ENABLE_CIRCUIT_BREAKER`. See [PR 40302](https://github.com/Azure/azure-sdk-for-python/pull/40302).
89

910
#### Bugs Fixed
11+
* Fixed how resource tokens are parsed for metadata calls in the lifecycle of a document operation. See [PR 40302](https://github.com/Azure/azure-sdk-for-python/pull/40302).
1012
* Fixed issue where Query Change Feed did not return items if the container uses legacy Hash V1 Partition Keys. This also fixes issues with not being able to change feed query for Specific Partition Key Values for HPK. See [PR 41270](https://github.com/Azure/azure-sdk-for-python/pull/41270/)
1113

1214
#### Other Changes

sdk/cosmos/azure-cosmos/azure/cosmos/_base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -879,8 +879,8 @@ def _format_batch_operations(
879879
return final_operations
880880

881881

882-
def _set_properties_cache(properties: Dict[str, Any]) -> Dict[str, Any]:
882+
def _build_properties_cache(properties: Dict[str, Any], container_link: str) -> Dict[str, Any]:
883883
return {
884884
"_self": properties.get("_self", None), "_rid": properties.get("_rid", None),
885-
"partitionKey": properties.get("partitionKey", None)
885+
"partitionKey": properties.get("partitionKey", None), "container_link": container_link
886886
}

sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,16 @@ class _Constants:
5151
HS_MAX_ITEMS_CONFIG_DEFAULT: int = 1000
5252
MAX_ITEM_BUFFER_VS_CONFIG: str = "AZURE_COSMOS_MAX_ITEM_BUFFER_VECTOR_SEARCH"
5353
MAX_ITEM_BUFFER_VS_CONFIG_DEFAULT: int = 50000
54+
CIRCUIT_BREAKER_ENABLED_CONFIG: str = "AZURE_COSMOS_ENABLE_CIRCUIT_BREAKER"
55+
CIRCUIT_BREAKER_ENABLED_CONFIG_DEFAULT: str = "False"
56+
# Only applicable when circuit breaker is enabled -------------------------
57+
CONSECUTIVE_ERROR_COUNT_TOLERATED_FOR_READ: str = "AZURE_COSMOS_CONSECUTIVE_ERROR_COUNT_TOLERATED_FOR_READ"
58+
CONSECUTIVE_ERROR_COUNT_TOLERATED_FOR_READ_DEFAULT: int = 10
59+
CONSECUTIVE_ERROR_COUNT_TOLERATED_FOR_WRITE: str = "AZURE_COSMOS_CONSECUTIVE_ERROR_COUNT_TOLERATED_FOR_WRITE"
60+
CONSECUTIVE_ERROR_COUNT_TOLERATED_FOR_WRITE_DEFAULT: int = 5
61+
FAILURE_PERCENTAGE_TOLERATED = "AZURE_COSMOS_FAILURE_PERCENTAGE_TOLERATED"
62+
FAILURE_PERCENTAGE_TOLERATED_DEFAULT: int = 90
63+
# -------------------------------------------------------------------------
5464

5565
# Error code translations
5666
ERROR_TRANSLATIONS: Dict[int, str] = {

sdk/cosmos/azure-cosmos/azure/cosmos/_container_recreate_retry_policy.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,8 @@ def ShouldRetry(self, exception: Optional[Any]) -> bool:
7272
def __find_container_link_with_rid(self, container_properties_caches: Optional[Dict[str, Any]], rid: str) -> \
7373
Optional[str]:
7474
if container_properties_caches:
75-
for key, inner_dict in container_properties_caches.items():
76-
is_match = next((k for k, v in inner_dict.items() if v == rid), None)
77-
if is_match:
78-
return key
75+
if rid in container_properties_caches:
76+
return container_properties_caches[rid]["container_link"]
7977
# If we cannot get the container link at all it might mean the cache was somehow deleted, this isn't
8078
# a container request so this retry is not needed. Return None.
8179
return None

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 60 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
HttpResponse # pylint: disable=no-legacy-azure-core-http-response-import
4949

5050
from . import _base as base
51-
from . import _global_endpoint_manager as global_endpoint_manager
51+
from ._global_partition_endpoint_manager_circuit_breaker import _GlobalPartitionEndpointManagerForCircuitBreaker
5252
from . import _query_iterable as query_iterable
5353
from . import _runtime_constants as runtime_constants
5454
from . import _session
@@ -57,7 +57,7 @@
5757
from . import documents
5858
from . import http_constants, exceptions
5959
from ._auth_policy import CosmosBearerTokenCredentialPolicy
60-
from ._base import _set_properties_cache
60+
from ._base import _build_properties_cache
6161
from ._change_feed.change_feed_iterable import ChangeFeedIterable
6262
from ._change_feed.change_feed_state import ChangeFeedState
6363
from ._constants import _Constants as Constants
@@ -168,7 +168,7 @@ def __init__( # pylint: disable=too-many-statements
168168
self.last_response_headers: CaseInsensitiveDict = CaseInsensitiveDict()
169169

170170
self.UseMultipleWriteLocations = False
171-
self._global_endpoint_manager = global_endpoint_manager._GlobalEndpointManager(self)
171+
self._global_endpoint_manager = _GlobalPartitionEndpointManagerForCircuitBreaker(self)
172172

173173
retry_policy = None
174174
if isinstance(self.connection_policy.ConnectionRetryConfiguration, HTTPPolicy):
@@ -262,6 +262,7 @@ def _set_container_properties_cache(self, container_link: str, properties: Optio
262262
:type properties: Optional[Dict[str, Any]]"""
263263
if properties:
264264
self.__container_properties_cache[container_link] = properties
265+
self.__container_properties_cache[properties["_rid"]] = properties
265266
else:
266267
self.__container_properties_cache[container_link] = {}
267268

@@ -1295,8 +1296,13 @@ def CreateItem(
12951296

12961297
if base.IsItemContainerLink(database_or_container_link):
12971298
options = self._AddPartitionKey(database_or_container_link, document, options)
1298-
return self.Create(document, path, http_constants.ResourceType.Document, collection_id, None,
1299-
options, **kwargs)
1299+
return self.Create(document,
1300+
path,
1301+
http_constants.ResourceType.Document,
1302+
collection_id,
1303+
None,
1304+
options,
1305+
**kwargs)
13001306

13011307
def UpsertItem(
13021308
self,
@@ -1332,8 +1338,13 @@ def UpsertItem(
13321338
collection_id, document, path = self._GetContainerIdWithPathForItem(
13331339
database_or_container_link, document, options
13341340
)
1335-
return self.Upsert(document, path, http_constants.ResourceType.Document, collection_id, None,
1336-
options, **kwargs)
1341+
return self.Upsert(document,
1342+
path,
1343+
http_constants.ResourceType.Document,
1344+
collection_id,
1345+
None,
1346+
options,
1347+
**kwargs)
13371348

13381349
PartitionResolverErrorMessage = (
13391350
"Couldn't find any partition resolvers for the database link provided. "
@@ -2020,8 +2031,13 @@ def ReplaceItem(
20202031
collection_link = base.GetItemContainerLink(document_link)
20212032
options = self._AddPartitionKey(collection_link, new_document, options)
20222033

2023-
return self.Replace(new_document, path, http_constants.ResourceType.Document, document_id, None,
2024-
options, **kwargs)
2034+
return self.Replace(new_document,
2035+
path,
2036+
http_constants.ResourceType.Document,
2037+
document_id,
2038+
None,
2039+
options,
2040+
**kwargs)
20252041

20262042
def PatchItem(
20272043
self,
@@ -2052,7 +2068,9 @@ def PatchItem(
20522068
headers = base.GetHeaders(self, self.default_headers, "patch", path, document_id, resource_type,
20532069
documents._OperationType.Patch, options)
20542070
# Patch will use WriteEndpoint since it uses PUT operation
2055-
request_params = RequestObject(resource_type, documents._OperationType.Patch)
2071+
request_params = RequestObject(resource_type,
2072+
documents._OperationType.Patch,
2073+
headers)
20562074
request_params.set_excluded_location_from_options(options)
20572075
request_data = {}
20582076
if options.get("filterPredicate"):
@@ -2142,7 +2160,9 @@ def _Batch(
21422160
headers = base.GetHeaders(self, initial_headers, "post", path, collection_id,
21432161
http_constants.ResourceType.Document,
21442162
documents._OperationType.Batch, options)
2145-
request_params = RequestObject(http_constants.ResourceType.Document, documents._OperationType.Batch)
2163+
request_params = RequestObject(http_constants.ResourceType.Document,
2164+
documents._OperationType.Batch,
2165+
headers)
21462166
request_params.set_excluded_location_from_options(options)
21472167
return cast(
21482168
Tuple[List[Dict[str, Any]], CaseInsensitiveDict],
@@ -2203,7 +2223,9 @@ def DeleteAllItemsByPartitionKey(
22032223
collection_id = base.GetResourceIdOrFullNameFromLink(collection_link)
22042224
headers = base.GetHeaders(self, self.default_headers, "post", path, collection_id,
22052225
http_constants.ResourceType.PartitionKey, documents._OperationType.Delete, options)
2206-
request_params = RequestObject(http_constants.ResourceType.PartitionKey, documents._OperationType.Delete)
2226+
request_params = RequestObject(http_constants.ResourceType.PartitionKey,
2227+
documents._OperationType.Delete,
2228+
headers)
22072229
request_params.set_excluded_location_from_options(options)
22082230
_, last_response_headers = self.__Post(
22092231
path=path,
@@ -2377,7 +2399,7 @@ def ExecuteStoredProcedure(
23772399

23782400
# ExecuteStoredProcedure will use WriteEndpoint since it uses POST operation
23792401
request_params = RequestObject(http_constants.ResourceType.StoredProcedure,
2380-
documents._OperationType.ExecuteJavaScript)
2402+
documents._OperationType.ExecuteJavaScript, headers)
23812403
result, self.last_response_headers = self.__Post(path, request_params, params, headers, **kwargs)
23822404
return result
23832405

@@ -2573,7 +2595,9 @@ def GetDatabaseAccount(
25732595

25742596
headers = base.GetHeaders(self, self.default_headers, "get", "", "", "",
25752597
documents._OperationType.Read,{}, client_id=self.client_id)
2576-
request_params = RequestObject(http_constants.ResourceType.DatabaseAccount, documents._OperationType.Read,
2598+
request_params = RequestObject(http_constants.ResourceType.DatabaseAccount,
2599+
documents._OperationType.Read,
2600+
headers,
25772601
url_connection)
25782602
result, last_response_headers = self.__Get("", request_params, headers, **kwargs)
25792603
self.last_response_headers = last_response_headers
@@ -2623,7 +2647,9 @@ def _GetDatabaseAccountCheck(
26232647

26242648
headers = base.GetHeaders(self, self.default_headers, "get", "", "", "",
26252649
documents._OperationType.Read,{}, client_id=self.client_id)
2626-
request_params = RequestObject(http_constants.ResourceType.DatabaseAccount, documents._OperationType.Read,
2650+
request_params = RequestObject(http_constants.ResourceType.DatabaseAccount,
2651+
documents._OperationType.Read,
2652+
headers,
26272653
url_connection)
26282654
self.__Get("", request_params, headers, **kwargs)
26292655

@@ -2663,7 +2689,7 @@ def Create(
26632689
options)
26642690
# Create will use WriteEndpoint since it uses POST operation
26652691

2666-
request_params = RequestObject(typ, documents._OperationType.Create)
2692+
request_params = RequestObject(typ, documents._OperationType.Create, headers)
26672693
request_params.set_excluded_location_from_options(options)
26682694
result, last_response_headers = self.__Post(path, request_params, body, headers, **kwargs)
26692695
self.last_response_headers = last_response_headers
@@ -2710,7 +2736,7 @@ def Upsert(
27102736
headers[http_constants.HttpHeaders.IsUpsert] = True
27112737

27122738
# Upsert will use WriteEndpoint since it uses POST operation
2713-
request_params = RequestObject(typ, documents._OperationType.Upsert)
2739+
request_params = RequestObject(typ, documents._OperationType.Upsert, headers)
27142740
request_params.set_excluded_location_from_options(options)
27152741
result, last_response_headers = self.__Post(path, request_params, body, headers, **kwargs)
27162742
self.last_response_headers = last_response_headers
@@ -2754,7 +2780,7 @@ def Replace(
27542780
headers = base.GetHeaders(self, initial_headers, "put", path, id, typ, documents._OperationType.Replace,
27552781
options)
27562782
# Replace will use WriteEndpoint since it uses PUT operation
2757-
request_params = RequestObject(typ, documents._OperationType.Replace)
2783+
request_params = RequestObject(typ, documents._OperationType.Replace, headers)
27582784
request_params.set_excluded_location_from_options(options)
27592785
result, last_response_headers = self.__Put(path, request_params, resource, headers, **kwargs)
27602786
self.last_response_headers = last_response_headers
@@ -2796,7 +2822,7 @@ def Read(
27962822
initial_headers = initial_headers or self.default_headers
27972823
headers = base.GetHeaders(self, initial_headers, "get", path, id, typ, documents._OperationType.Read, options)
27982824
# Read will use ReadEndpoint since it uses GET operation
2799-
request_params = RequestObject(typ, documents._OperationType.Read)
2825+
request_params = RequestObject(typ, documents._OperationType.Read, headers)
28002826
request_params.set_excluded_location_from_options(options)
28012827
result, last_response_headers = self.__Get(path, request_params, headers, **kwargs)
28022828
self.last_response_headers = last_response_headers
@@ -2836,7 +2862,7 @@ def DeleteResource(
28362862
headers = base.GetHeaders(self, initial_headers, "delete", path, id, typ, documents._OperationType.Delete,
28372863
options)
28382864
# Delete will use WriteEndpoint since it uses DELETE operation
2839-
request_params = RequestObject(typ, documents._OperationType.Delete)
2865+
request_params = RequestObject(typ, documents._OperationType.Delete, headers)
28402866
request_params.set_excluded_location_from_options(options)
28412867
result, last_response_headers = self.__Delete(path, request_params, headers, **kwargs)
28422868
self.last_response_headers = last_response_headers
@@ -3069,24 +3095,27 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]:
30693095
initial_headers = self.default_headers.copy()
30703096
# Copy to make sure that default_headers won't be changed.
30713097
if query is None:
3098+
op_typ = documents._OperationType.QueryPlan if is_query_plan else documents._OperationType.ReadFeed
30723099
# Query operations will use ReadEndpoint even though it uses GET(for feed requests)
3073-
request_params = RequestObject(
3074-
resource_type,
3075-
documents._OperationType.QueryPlan if is_query_plan else documents._OperationType.ReadFeed
3076-
)
3077-
request_params.set_excluded_location_from_options(options)
30783100
headers = base.GetHeaders(
30793101
self,
30803102
initial_headers,
30813103
"get",
30823104
path,
30833105
resource_id,
30843106
resource_type,
3085-
request_params.operation_type,
3107+
op_typ,
30863108
options,
30873109
partition_key_range_id
30883110
)
30893111

3112+
request_params = RequestObject(
3113+
resource_type,
3114+
op_typ,
3115+
headers
3116+
)
3117+
request_params.set_excluded_location_from_options(options)
3118+
30903119
change_feed_state: Optional[ChangeFeedState] = options.get("changeFeedState")
30913120
if change_feed_state is not None:
30923121
feed_options = {}
@@ -3115,8 +3144,6 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]:
31153144
raise SystemError("Unexpected query compatibility mode.")
31163145

31173146
# Query operations will use ReadEndpoint even though it uses POST(for regular query operations)
3118-
request_params = RequestObject(resource_type, documents._OperationType.SqlQuery)
3119-
request_params.set_excluded_location_from_options(options)
31203147
req_headers = base.GetHeaders(
31213148
self,
31223149
initial_headers,
@@ -3129,6 +3156,9 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]:
31293156
partition_key_range_id
31303157
)
31313158

3159+
request_params = RequestObject(resource_type, documents._OperationType.SqlQuery, req_headers)
3160+
request_params.set_excluded_location_from_options(options)
3161+
31323162
# check if query has prefix partition key
31333163
isPrefixPartitionQuery = kwargs.pop("isPrefixPartitionQuery", None)
31343164
if isPrefixPartitionQuery and "partitionKeyDefinition" in kwargs:
@@ -3364,7 +3394,7 @@ def _refresh_container_properties_cache(self, container_link: str):
33643394
# If container properties cache is stale, refresh it by reading the container.
33653395
container = self.ReadContainer(container_link, options=None)
33663396
# Only cache Container Properties that will not change in the lifetime of the container
3367-
self._set_container_properties_cache(container_link, _set_properties_cache(container))
3397+
self._set_container_properties_cache(container_link, _build_properties_cache(container, container_link))
33683398

33693399
def _UpdateSessionIfRequired(
33703400
self,
@@ -3407,5 +3437,5 @@ def _get_partition_key_definition(
34073437
else:
34083438
container = self.ReadContainer(collection_link, options)
34093439
partition_key_definition = container.get("partitionKey")
3410-
self.__container_properties_cache[collection_link] = _set_properties_cache(container)
3440+
self._set_container_properties_cache(collection_link, _build_properties_cache(container, collection_link))
34113441
return partition_key_definition

0 commit comments

Comments
 (0)