Skip to content

Commit 9414695

Browse files
authored
[Bugfix] Partition Scoping Incorrect for PPCB (#42751)
* add logs and improve diagnostics * fix logs * fix partitionkey issue * add tests * remove logs and add change log * fix pylint * fix tests
1 parent c6eb450 commit 9414695

11 files changed

+162
-37
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
* Fixed bug where during health checks read regions were marked as unavailable for write operations. See [PR 42525](https://github.com/Azure/azure-sdk-for-python/pull/42525).
1313
* Fixed bug where containers named with spaces or special characters using session consistency would fall back to eventual consistency. See [PR 42608](https://github.com/Azure/azure-sdk-for-python/pull/42608)
1414
* Fixed bug where `excluded_locations` was not being honored for some metadata calls. See [PR 42266](https://github.com/Azure/azure-sdk-for-python/pull/42266).
15+
* Fixed partition scoping for per partition circuit breaker. See [PR 42751](https://github.com/Azure/azure-sdk-for-python/pull/42751)
1516

1617
#### Other Changes
1718
* Added session token false progress merge logic. See [42393](https://github.com/Azure/azure-sdk-for-python/pull/42393)

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2122,7 +2122,8 @@ def PatchItem(
21222122
# Patch will use WriteEndpoint since it uses PUT operation
21232123
request_params = RequestObject(resource_type,
21242124
documents._OperationType.Patch,
2125-
headers)
2125+
headers,
2126+
options.get("partitionKey", None))
21262127
request_params.set_excluded_location_from_options(options)
21272128
base.set_session_token_header(self, headers, path, request_params, options)
21282129
request_params.set_retry_write(options, self.connection_policy.RetryNonIdempotentWrites)
@@ -2216,7 +2217,8 @@ def _Batch(
22162217
documents._OperationType.Batch, options)
22172218
request_params = RequestObject(http_constants.ResourceType.Document,
22182219
documents._OperationType.Batch,
2219-
headers)
2220+
headers,
2221+
options.get("partitionKey", None))
22202222
request_params.set_excluded_location_from_options(options)
22212223
base.set_session_token_header(self, headers, path, request_params, options)
22222224
return cast(
@@ -2280,7 +2282,8 @@ def DeleteAllItemsByPartitionKey(
22802282
http_constants.ResourceType.PartitionKey, documents._OperationType.Delete, options)
22812283
request_params = RequestObject(http_constants.ResourceType.PartitionKey,
22822284
documents._OperationType.Delete,
2283-
headers)
2285+
headers,
2286+
options.get("partitionKey", None))
22842287
request_params.set_excluded_location_from_options(options)
22852288
_, last_response_headers = self.__Post(
22862289
path=path,
@@ -2654,7 +2657,7 @@ def GetDatabaseAccount(
26542657
request_params = RequestObject(http_constants.ResourceType.DatabaseAccount,
26552658
documents._OperationType.Read,
26562659
headers,
2657-
url_connection)
2660+
endpoint_override=url_connection)
26582661
result, last_response_headers = self.__Get("", request_params, headers, **kwargs)
26592662
self.last_response_headers = last_response_headers
26602663
database_account = DatabaseAccount()
@@ -2706,7 +2709,7 @@ def _GetDatabaseAccountCheck(
27062709
request_params = RequestObject(http_constants.ResourceType.DatabaseAccount,
27072710
documents._OperationType.Read,
27082711
headers,
2709-
url_connection)
2712+
endpoint_override=url_connection)
27102713
self.__Get("", request_params, headers, **kwargs)
27112714

27122715

@@ -2744,7 +2747,10 @@ def Create(
27442747
headers = base.GetHeaders(self, initial_headers, "post", path, id, resource_type,
27452748
documents._OperationType.Create, options)
27462749
# Create will use WriteEndpoint since it uses POST operation
2747-
request_params = RequestObject(resource_type, documents._OperationType.Create, headers)
2750+
request_params = RequestObject(resource_type,
2751+
documents._OperationType.Create,
2752+
headers,
2753+
options.get("partitionKey", None))
27482754
request_params.set_excluded_location_from_options(options)
27492755
base.set_session_token_header(self, headers, path, request_params, options)
27502756
request_params.set_retry_write(options, self.connection_policy.RetryNonIdempotentWrites)
@@ -2792,7 +2798,10 @@ def Upsert(
27922798
documents._OperationType.Upsert, options)
27932799
headers[http_constants.HttpHeaders.IsUpsert] = True
27942800
# Upsert will use WriteEndpoint since it uses POST operation
2795-
request_params = RequestObject(resource_type, documents._OperationType.Upsert, headers)
2801+
request_params = RequestObject(resource_type,
2802+
documents._OperationType.Upsert,
2803+
headers,
2804+
options.get("partitionKey", None))
27962805
request_params.set_excluded_location_from_options(options)
27972806
base.set_session_token_header(self, headers, path, request_params, options)
27982807
request_params.set_retry_write(options, self.connection_policy.RetryNonIdempotentWrites)
@@ -2838,7 +2847,10 @@ def Replace(
28382847
headers = base.GetHeaders(self, initial_headers, "put", path, id, resource_type,
28392848
documents._OperationType.Replace, options)
28402849
# Replace will use WriteEndpoint since it uses PUT operation
2841-
request_params = RequestObject(resource_type, documents._OperationType.Replace, headers)
2850+
request_params = RequestObject(resource_type,
2851+
documents._OperationType.Replace,
2852+
headers,
2853+
options.get("partitionKey", None))
28422854
request_params.set_excluded_location_from_options(options)
28432855
base.set_session_token_header(self, headers, path, request_params, options)
28442856
request_params.set_retry_write(options, self.connection_policy.RetryNonIdempotentWrites)
@@ -2883,7 +2895,10 @@ def Read(
28832895
headers = base.GetHeaders(self, initial_headers, "get", path, id, resource_type,
28842896
documents._OperationType.Read, options)
28852897
# Read will use ReadEndpoint since it uses GET operation
2886-
request_params = RequestObject(resource_type, documents._OperationType.Read, headers)
2898+
request_params = RequestObject(resource_type,
2899+
documents._OperationType.Read,
2900+
headers,
2901+
options.get("partitionKey", None))
28872902
request_params.set_excluded_location_from_options(options)
28882903
base.set_session_token_header(self, headers, path, request_params, options)
28892904
result, last_response_headers = self.__Get(path, request_params, headers, **kwargs)
@@ -2927,7 +2942,10 @@ def DeleteResource(
29272942
headers = base.GetHeaders(self, initial_headers, "delete", path, id, resource_type,
29282943
documents._OperationType.Delete, options)
29292944
# Delete will use WriteEndpoint since it uses DELETE operation
2930-
request_params = RequestObject(resource_type, documents._OperationType.Delete, headers)
2945+
request_params = RequestObject(resource_type,
2946+
documents._OperationType.Delete,
2947+
headers,
2948+
options.get("partitionKey", None))
29312949
base.set_session_token_header(self, headers, path, request_params, options)
29322950
request_params.set_retry_write(options, self.connection_policy.RetryNonIdempotentWrites)
29332951
request_params.set_excluded_location_from_options(options)
@@ -3178,7 +3196,8 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]:
31783196
request_params = RequestObject(
31793197
resource_type,
31803198
op_type,
3181-
headers
3199+
headers,
3200+
options.get("partitionKey", None)
31823201
)
31833202
request_params.set_excluded_location_from_options(options)
31843203
base.set_session_token_header(self, headers, path, request_params, options, partition_key_range_id)
@@ -3219,7 +3238,10 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]:
32193238
options,
32203239
partition_key_range_id
32213240
)
3222-
request_params = RequestObject(resource_type, documents._OperationType.SqlQuery, req_headers)
3241+
request_params = RequestObject(resource_type,
3242+
documents._OperationType.SqlQuery,
3243+
req_headers,
3244+
options.get("partitionKey", None))
32233245
request_params.set_excluded_location_from_options(options)
32243246
if not is_query_plan:
32253247
req_headers[http_constants.HttpHeaders.IsQuery] = "true"

sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_circuit_breaker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ def create_pk_range_wrapper(self, request: RequestObject) -> Optional[PartitionK
6868
options = {}
6969
if request.excluded_locations:
7070
options[_Constants.Kwargs.EXCLUDED_LOCATIONS] = request.excluded_locations
71-
if HttpHeaders.PartitionKey in request.headers:
72-
partition_key_value = request.headers[HttpHeaders.PartitionKey]
71+
if request.pk_val:
72+
partition_key_value = request.pk_val
7373
# get the partition key range for the given partition key
7474
epk_range = [partition_key._get_epk_range_for_partition_key(partition_key_value)] # pylint: disable=protected-access
7575
partition_ranges = (self.client._routing_map_provider # pylint: disable=protected-access

sdk/cosmos/azure-cosmos/azure/cosmos/_partition_health_tracker.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,10 @@ def check_stale_partition_info(
172172
# only one request should be used to recover
173173
with self.stale_partition_lock:
174174
if _should_mark_healthy_tentative(partition_health_info, current_time):
175+
logger.debug("Attempting recovery for %s in %s where health info is %s.",
176+
pk_range_wrapper,
177+
location,
178+
partition_health_info)
175179
# this will trigger one attempt to recover
176180
partition_health_info.transition_health_status(UNHEALTHY, current_time)
177181
request.healthy_tentative_location = location
@@ -235,6 +239,11 @@ def add_failure(
235239

236240
# Retrieve the consecutive failure threshold from the environment.
237241
consecutive_failure_threshold = int(os.environ.get(env_key, default_consecutive_threshold))
242+
# log the current stats
243+
logger.debug("Failure for partition %s in location %s has %s",
244+
pk_range_wrapper,
245+
location,
246+
self.pk_range_wrapper_to_health_info[pk_range_wrapper][location])
238247

239248
# Call the threshold checker with the current stats.
240249
self._check_thresholds(

sdk/cosmos/azure-cosmos/azure/cosmos/_request_object.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def __init__(
3232
resource_type: str,
3333
operation_type: str,
3434
headers: Dict[str, Any],
35+
pk_val: Optional[Any] = None,
3536
endpoint_override: Optional[str] = None,
3637
) -> None:
3738
self.resource_type = resource_type
@@ -46,6 +47,7 @@ def __init__(
4647
self.excluded_locations: Optional[List[str]] = None
4748
self.excluded_locations_circuit_breaker: List[str] = []
4849
self.healthy_tentative_location: Optional[str] = None
50+
self.pk_val = pk_val
4951
self.retry_write: bool = False
5052

5153
def route_to_location_with_preferred_location_flag( # pylint: disable=name-too-long

0 commit comments

Comments
 (0)