Skip to content

Commit 859cdcb

Browse files
authored
Consecutive failures forever (#41405)
* update consecutive failures to be tracked forever for partition health * Add tests * react to comments * react to comments * add comment * add comment * rename other variables
1 parent 407f7a3 commit 859cdcb

8 files changed

+289
-69
lines changed

sdk/cosmos/azure-cosmos/azure/cosmos/_endpoint_discovery_retry_policy.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ def ShouldRetry(self, exception): # pylint: disable=unused-argument
9191
# set location-based routing directive based on retry count
9292
# simulating single master writes by ensuring usePreferredLocations
9393
# is set to false
94+
# reasoning being that 403.3 is only expected for write region failover in single writer account
95+
# and we must rely on account locations as they are the source of truth
9496
self.request.route_to_location_with_preferred_location_flag(self.failover_retry_count, False)
9597

9698
return True

sdk/cosmos/azure-cosmos/azure/cosmos/_partition_health_tracker.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@
3232
from ._constants import _Constants as Constants
3333

3434
MINIMUM_REQUESTS_FOR_FAILURE_RATE = 100
35-
MAX_UNAVAILABLE_TIME = 1200 * 1000 # milliseconds
36-
REFRESH_INTERVAL = 60 * 1000 # milliseconds
37-
INITIAL_UNAVAILABLE_TIME = 60 * 1000 # milliseconds
35+
MAX_UNAVAILABLE_TIME_MS = 1200 * 1000 # 20 minutes in milliseconds
36+
REFRESH_INTERVAL_MS = 60 * 1000 # 1 minute in milliseconds
37+
INITIAL_UNAVAILABLE_TIME_MS = 60 * 1000 # 1 minute in milliseconds
3838
# partition is unhealthy if sdk tried to recover and failed
3939
UNHEALTHY = "unhealthy"
4040
# partition is unhealthy tentative when it initially marked unavailable
@@ -58,27 +58,25 @@ def __init__(self) -> None:
5858
self.write_consecutive_failure_count: int = 0
5959
self.unavailability_info: Dict[str, Any] = {}
6060

61-
def reset_health_stats(self) -> None:
61+
def reset_failure_rate_health_stats(self) -> None:
6262
self.write_failure_count = 0
6363
self.read_failure_count = 0
6464
self.write_success_count = 0
6565
self.read_success_count = 0
66-
self.read_consecutive_failure_count = 0
67-
self.write_consecutive_failure_count = 0
6866

6967
def transition_health_status(self, target_health_status: str, curr_time: int) -> None:
7068
if target_health_status == UNHEALTHY :
7169
self.unavailability_info[HEALTH_STATUS] = UNHEALTHY
7270
# reset the last unavailability check time stamp
7371
self.unavailability_info[UNAVAILABLE_INTERVAL] = \
7472
min(self.unavailability_info[UNAVAILABLE_INTERVAL] * 2,
75-
MAX_UNAVAILABLE_TIME)
73+
MAX_UNAVAILABLE_TIME_MS)
7674
self.unavailability_info[LAST_UNAVAILABILITY_CHECK_TIME_STAMP] \
7775
= curr_time
7876
elif target_health_status == UNHEALTHY_TENTATIVE :
7977
self.unavailability_info = {
8078
LAST_UNAVAILABILITY_CHECK_TIME_STAMP: curr_time,
81-
UNAVAILABLE_INTERVAL: INITIAL_UNAVAILABLE_TIME,
79+
UNAVAILABLE_INTERVAL: INITIAL_UNAVAILABLE_TIME_MS,
8280
HEALTH_STATUS: UNHEALTHY_TENTATIVE
8381
}
8482

@@ -108,7 +106,7 @@ def _should_mark_healthy_tentative(partition_health_info: _PartitionHealthInfo,
108106
stale_partition_unavailability_check = partition_health_info.unavailability_info[UNAVAILABLE_INTERVAL]
109107
# check if the partition key range is still unavailable
110108
return ((current_health_status == UNHEALTHY and elapsed_time > stale_partition_unavailability_check)
111-
or (current_health_status == UNHEALTHY_TENTATIVE and elapsed_time > INITIAL_UNAVAILABLE_TIME))
109+
or (current_health_status == UNHEALTHY_TENTATIVE and elapsed_time > INITIAL_UNAVAILABLE_TIME_MS))
112110

113111
logger = logging.getLogger("azure.cosmos._PartitionHealthTracker")
114112

@@ -178,9 +176,10 @@ def check_stale_partition_info(
178176
partition_health_info.transition_health_status(UNHEALTHY, current_time)
179177
request.healthy_tentative_location = location
180178

181-
if current_time - self.last_refresh > REFRESH_INTERVAL:
179+
if current_time - self.last_refresh > REFRESH_INTERVAL_MS:
182180
# all partition stats reset every minute
183181
self._reset_partition_health_tracker_stats()
182+
self.last_refresh = current_time
184183

185184

186185
def get_unhealthy_locations(
@@ -290,4 +289,4 @@ def add_success(self, pk_range_wrapper: PartitionKeyRangeWrapper, operation_type
290289
def _reset_partition_health_tracker_stats(self) -> None:
291290
for locations in self.pk_range_wrapper_to_health_info.values():
292291
for health_info in locations.values():
293-
health_info.reset_health_stats()
292+
health_info.reset_failure_rate_health_stats()

sdk/cosmos/azure-cosmos/tests/test_circuit_breaker_emulator.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,8 @@ def test_write_consecutive_failure_threshold_delete_all_items_by_pk_mm(self, set
160160

161161
validate_unhealthy_partitions_mm(global_endpoint_manager, 1)
162162
# remove faults and reduce initial recover time and perform a write
163-
original_unavailable_time = _partition_health_tracker.INITIAL_UNAVAILABLE_TIME
164-
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME = 1
163+
original_unavailable_time = _partition_health_tracker.INITIAL_UNAVAILABLE_TIME_MS
164+
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME_MS = 1
165165
custom_transport.faults = []
166166
try:
167167
perform_write_operation(DELETE_ALL_ITEMS_BY_PARTITION_KEY,
@@ -171,7 +171,7 @@ def test_write_consecutive_failure_threshold_delete_all_items_by_pk_mm(self, set
171171
PK_VALUE,
172172
uri_down)
173173
finally:
174-
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME = original_unavailable_time
174+
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME_MS = original_unavailable_time
175175
validate_unhealthy_partitions_mm(global_endpoint_manager, 0)
176176

177177

sdk/cosmos/azure-cosmos/tests/test_circuit_breaker_emulator_async.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,8 @@ async def test_write_consecutive_failure_threshold_delete_all_items_by_pk_mm_asy
163163

164164
validate_unhealthy_partitions_mm(global_endpoint_manager, 1)
165165
# remove faults and reduce initial recover time and perform a write
166-
original_unavailable_time = _partition_health_tracker.INITIAL_UNAVAILABLE_TIME
167-
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME = 1
166+
original_unavailable_time = _partition_health_tracker.INITIAL_UNAVAILABLE_TIME_MS
167+
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME_MS = 1
168168
custom_transport.faults = []
169169
try:
170170
await perform_write_operation(DELETE_ALL_ITEMS_BY_PARTITION_KEY,
@@ -174,7 +174,7 @@ async def test_write_consecutive_failure_threshold_delete_all_items_by_pk_mm_asy
174174
PK_VALUE,
175175
uri_down)
176176
finally:
177-
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME = original_unavailable_time
177+
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME_MS = original_unavailable_time
178178
validate_unhealthy_partitions_mm(global_endpoint_manager, 0)
179179
await cleanup_method([custom_setup, setup])
180180

sdk/cosmos/azure-cosmos/tests/test_per_partition_circuit_breaker_mm.py

Lines changed: 65 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313
from azure.cosmos import CosmosClient
1414
from azure.cosmos.exceptions import CosmosHttpResponseError
1515
from _fault_injection_transport import FaultInjectionTransport
16-
from test_per_partition_circuit_breaker_mm_async import DELETE, CREATE, UPSERT, REPLACE, PATCH, BATCH, validate_response_uri, READ, \
16+
from test_per_partition_circuit_breaker_mm_async import DELETE, CREATE, UPSERT, REPLACE, PATCH, BATCH, \
17+
validate_response_uri, READ, \
1718
QUERY_PK, QUERY, CHANGE_FEED, CHANGE_FEED_PK, CHANGE_FEED_EPK, READ_ALL_ITEMS, REGION_1, REGION_2, \
1819
write_operations_and_errors, validate_unhealthy_partitions, read_operations_and_errors, PK_VALUE, operations, \
19-
create_doc
20+
create_doc, validate_stats
2021
from test_per_partition_circuit_breaker_mm_async import DELETE_ALL_ITEMS_BY_PARTITION_KEY
2122

2223
def perform_write_operation(operation, container, fault_injection_container, doc_id, pk, expected_uri):
@@ -99,15 +100,18 @@ class TestPerPartitionCircuitBreakerMM:
99100
host = test_config.TestConfig.host
100101
master_key = test_config.TestConfig.masterKey
101102
TEST_DATABASE_ID = test_config.TestConfig.TEST_DATABASE_ID
102-
TEST_CONTAINER_SINGLE_PARTITION_ID = test_config.TestConfig.TEST_MULTI_PARTITION_CONTAINER_ID
103+
TEST_CONTAINER_MULTI_PARTITION_ID = test_config.TestConfig.TEST_MULTI_PARTITION_CONTAINER_ID
103104

104105
def setup_method_with_custom_transport(self, custom_transport, default_endpoint=host, **kwargs):
106+
container_id = kwargs.pop("container_id", None)
107+
if not container_id:
108+
container_id = self.TEST_CONTAINER_MULTI_PARTITION_ID
105109
client = CosmosClient(default_endpoint, self.master_key,
106110
preferred_locations=[REGION_1, REGION_2],
107111
multiple_write_locations=True,
108112
transport=custom_transport, **kwargs)
109113
db = client.get_database_client(self.TEST_DATABASE_ID)
110-
container = db.get_container_client(self.TEST_CONTAINER_SINGLE_PARTITION_ID)
114+
container = db.get_container_client(container_id)
111115
return {"client": client, "db": db, "col": container}
112116

113117
@pytest.mark.parametrize("write_operation, error", write_operations_and_errors())
@@ -151,8 +155,8 @@ def test_write_consecutive_failure_threshold(self, write_operation, error):
151155

152156
validate_unhealthy_partitions(global_endpoint_manager, 1)
153157
# remove faults and reduce initial recover time and perform a write
154-
original_unavailable_time = _partition_health_tracker.INITIAL_UNAVAILABLE_TIME
155-
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME = 1
158+
original_unavailable_time = _partition_health_tracker.INITIAL_UNAVAILABLE_TIME_MS
159+
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME_MS = 1
156160
custom_transport.faults = []
157161
try:
158162
perform_write_operation(write_operation,
@@ -162,7 +166,7 @@ def test_write_consecutive_failure_threshold(self, write_operation, error):
162166
PK_VALUE,
163167
uri_down)
164168
finally:
165-
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME = original_unavailable_time
169+
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME_MS = original_unavailable_time
166170
validate_unhealthy_partitions(global_endpoint_manager, 0)
167171

168172
@pytest.mark.cosmosCircuitBreakerMultiRegion
@@ -203,8 +207,8 @@ def test_read_consecutive_failure_threshold(self, read_operation, error):
203207
expected_unhealthy_partitions = 1
204208
validate_unhealthy_partitions(global_endpoint_manager, expected_unhealthy_partitions)
205209
# remove faults and reduce initial recover time and perform a read
206-
original_unavailable_time = _partition_health_tracker.INITIAL_UNAVAILABLE_TIME
207-
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME = 1
210+
original_unavailable_time = _partition_health_tracker.INITIAL_UNAVAILABLE_TIME_MS
211+
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME_MS = 1
208212
custom_transport.faults = []
209213
try:
210214
perform_read_operation(read_operation,
@@ -213,7 +217,7 @@ def test_read_consecutive_failure_threshold(self, read_operation, error):
213217
doc['pk'],
214218
uri_down)
215219
finally:
216-
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME = original_unavailable_time
220+
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME_MS = original_unavailable_time
217221
validate_unhealthy_partitions(global_endpoint_manager, 0)
218222

219223
@pytest.mark.parametrize("write_operation, error", write_operations_and_errors())
@@ -297,7 +301,7 @@ def test_read_failure_rate_threshold(self, read_operation, error):
297301
# restore minimum requests
298302
_partition_health_tracker.MINIMUM_REQUESTS_FOR_FAILURE_RATE = 100
299303

300-
def setup_info(self, error):
304+
def setup_info(self, error, **kwargs):
301305
expected_uri = _location_cache.LocationCache.GetLocationalEndpoint(self.host, REGION_2)
302306
uri_down = _location_cache.LocationCache.GetLocationalEndpoint(self.host, REGION_1)
303307
custom_transport = FaultInjectionTransport()
@@ -307,12 +311,57 @@ def setup_info(self, error):
307311
FaultInjectionTransport.predicate_targets_region(r, uri_down))
308312
custom_transport.add_fault(predicate,
309313
error)
310-
custom_setup = self.setup_method_with_custom_transport(custom_transport, default_endpoint=self.host)
314+
custom_setup = self.setup_method_with_custom_transport(custom_transport, default_endpoint=self.host, **kwargs)
311315
fault_injection_container = custom_setup['col']
312-
setup = self.setup_method_with_custom_transport(None, default_endpoint=self.host)
316+
setup = self.setup_method_with_custom_transport(None, default_endpoint=self.host, **kwargs)
313317
container = setup['col']
314318
return container, doc, expected_uri, uri_down, fault_injection_container, custom_transport, predicate
315319

320+
def test_stat_reset(self):
321+
error_lambda = lambda r: FaultInjectionTransport.error_after_delay(
322+
0,
323+
CosmosHttpResponseError(
324+
status_code=503,
325+
message="Some injected error.")
326+
)
327+
container, doc, expected_uri, uri_down, fault_injection_container, custom_transport, predicate = \
328+
self.setup_info(error_lambda, container_id=test_config.TestConfig.TEST_SINGLE_PARTITION_CONTAINER_ID)
329+
container.upsert_item(body=doc)
330+
sleep(1)
331+
global_endpoint_manager = fault_injection_container.client_connection._global_endpoint_manager
332+
# lower refresh interval for testing
333+
_partition_health_tracker.REFRESH_INTERVAL_MS = 10 * 1000
334+
try:
335+
for i in range(2):
336+
validate_unhealthy_partitions(global_endpoint_manager, 0)
337+
# read will fail and retry in other region
338+
perform_read_operation(READ,
339+
fault_injection_container,
340+
doc['id'],
341+
PK_VALUE,
342+
expected_uri)
343+
try:
344+
perform_write_operation(CREATE,
345+
container,
346+
fault_injection_container,
347+
str(uuid.uuid4()),
348+
PK_VALUE,
349+
expected_uri)
350+
except CosmosHttpResponseError as e:
351+
assert e.status_code == 503
352+
validate_unhealthy_partitions(global_endpoint_manager, 0)
353+
validate_stats(global_endpoint_manager, 2, 2, 2, 2, 0, 0)
354+
sleep(25)
355+
perform_read_operation(READ,
356+
fault_injection_container,
357+
doc['id'],
358+
PK_VALUE,
359+
expected_uri)
360+
361+
validate_stats(global_endpoint_manager, 2, 3, 1, 0, 0, 0)
362+
finally:
363+
_partition_health_tracker.REFRESH_INTERVAL_MS = 60 * 1000
364+
316365
@pytest.mark.parametrize("read_operation, write_operation", operations())
317366
def test_service_request_error(self, read_operation, write_operation):
318367
# the region should be tried 4 times before failing over and mark the partition as unavailable
@@ -333,8 +382,8 @@ def test_service_request_error(self, read_operation, write_operation):
333382

334383
# recover partition
335384
# remove faults and reduce initial recover time and perform a write
336-
original_unavailable_time = _partition_health_tracker.INITIAL_UNAVAILABLE_TIME
337-
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME = 1
385+
original_unavailable_time = _partition_health_tracker.INITIAL_UNAVAILABLE_TIME_MS
386+
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME_MS = 1
338387
custom_transport.faults = []
339388
try:
340389
perform_read_operation(read_operation,
@@ -343,7 +392,7 @@ def test_service_request_error(self, read_operation, write_operation):
343392
PK_VALUE,
344393
expected_uri)
345394
finally:
346-
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME = original_unavailable_time
395+
_partition_health_tracker.INITIAL_UNAVAILABLE_TIME_MS = original_unavailable_time
347396
validate_unhealthy_partitions(global_endpoint_manager, 0)
348397

349398
custom_transport.add_fault(predicate,

0 commit comments

Comments
 (0)