|
12 | 12 |
|
13 | 13 | import test_config
|
14 | 14 | from azure.cosmos import _location_cache, _partition_health_tracker
|
15 |
| -from azure.cosmos._partition_health_tracker import HEALTH_STATUS, UNHEALTHY, UNHEALTHY_TENTATIVE |
16 | 15 | from azure.cosmos.aio import CosmosClient
|
17 | 16 | from azure.cosmos.exceptions import CosmosHttpResponseError
|
18 | 17 | from _fault_injection_transport_async import FaultInjectionTransportAsync
|
19 |
| - |
20 |
| -REGION_1 = "West US 3" |
21 |
| -REGION_2 = "West US" |
22 |
| -CHANGE_FEED = "changefeed" |
23 |
| -CHANGE_FEED_PK = "changefeed_pk" |
24 |
| -CHANGE_FEED_EPK = "changefeed_epk" |
25 |
| -READ = "read" |
26 |
| -CREATE = "create" |
27 |
| -READ_ALL_ITEMS = "read_all_items" |
28 |
| -DELETE_ALL_ITEMS_BY_PARTITION_KEY = "delete_all_items_by_partition_key" |
29 |
| -QUERY = "query" |
30 |
| -QUERY_PK = "query_pk" |
31 |
| -BATCH = "batch" |
32 |
| -UPSERT = "upsert" |
33 |
| -REPLACE = "replace" |
34 |
| -PATCH = "patch" |
35 |
| -DELETE = "delete" |
36 |
| -PK_VALUE = "pk1" |
37 |
| - |
| 18 | +from test_per_partition_circuit_breaker_mm import create_doc, read_operations_and_errors, \ |
| 19 | + write_operations_and_errors, operations, REGION_1, REGION_2, CHANGE_FEED, CHANGE_FEED_PK, CHANGE_FEED_EPK, READ, \ |
| 20 | + CREATE, READ_ALL_ITEMS, DELETE_ALL_ITEMS_BY_PARTITION_KEY, QUERY, QUERY_PK, BATCH, UPSERT, REPLACE, PATCH, DELETE, \ |
| 21 | + PK_VALUE, validate_unhealthy_partitions, validate_response_uri |
| 22 | +from test_per_partition_circuit_breaker_mm import validate_stats |
38 | 23 |
|
39 | 24 | COLLECTION = "created_collection"
|
40 | 25 |
|
41 |
| -def create_errors(): |
42 |
| - errors = [] |
43 |
| - error_codes = [408, 500, 502, 503] |
44 |
| - for error_code in error_codes: |
45 |
| - errors.append(CosmosHttpResponseError( |
46 |
| - status_code=error_code, |
47 |
| - message="Some injected error.")) |
48 |
| - errors.append(ServiceResponseError(message="Injected Service Response Error.")) |
49 |
| - return errors |
50 |
| - |
51 |
| -def write_operations_and_errors(): |
52 |
| - write_operations = [CREATE, UPSERT, REPLACE, DELETE, PATCH, BATCH] |
53 |
| - errors = create_errors() |
54 |
| - params = [] |
55 |
| - for write_operation in write_operations: |
56 |
| - for error in errors: |
57 |
| - params.append((write_operation, error)) |
58 |
| - |
59 |
| - return params |
60 |
| - |
61 |
| -def create_doc(): |
62 |
| - return {'id': str(uuid.uuid4()), |
63 |
| - 'pk': PK_VALUE, |
64 |
| - 'name': 'sample document', |
65 |
| - 'key': 'value'} |
66 |
| - |
67 |
| -def read_operations_and_errors(): |
68 |
| - read_operations = [READ, QUERY_PK, CHANGE_FEED, CHANGE_FEED_PK, CHANGE_FEED_EPK] |
69 |
| - errors = create_errors() |
70 |
| - params = [] |
71 |
| - for read_operation in read_operations: |
72 |
| - for error in errors: |
73 |
| - params.append((read_operation, error)) |
74 |
| - |
75 |
| - return params |
76 |
| - |
77 |
| -def operations(): |
78 |
| - write_operations = [CREATE, UPSERT, REPLACE, DELETE, PATCH, BATCH] |
79 |
| - read_operations = [READ, QUERY_PK, CHANGE_FEED_PK, CHANGE_FEED_EPK] |
80 |
| - operations = [] |
81 |
| - for i, write_operation in enumerate(write_operations): |
82 |
| - operations.append((read_operations[i % len(read_operations)], write_operation)) |
83 |
| - |
84 |
| - return operations |
85 |
| - |
86 |
| -def validate_response_uri(response, expected_uri): |
87 |
| - request = response.get_response_headers()["_request"] |
88 |
| - assert request.url.startswith(expected_uri) |
89 |
| - |
90 | 26 | async def perform_write_operation(operation, container, fault_injection_container, doc_id, pk, expected_uri):
|
91 | 27 | doc = {'id': doc_id,
|
92 | 28 | 'pk': pk,
|
@@ -162,38 +98,6 @@ async def perform_read_operation(operation, container, doc_id, pk, expected_uri)
|
162 | 98 | async for _ in container.read_all_items():
|
163 | 99 | pass
|
164 | 100 |
|
165 |
| -def validate_unhealthy_partitions(global_endpoint_manager, |
166 |
| - expected_unhealthy_partitions): |
167 |
| - health_info_map = global_endpoint_manager.global_partition_endpoint_manager_core.partition_health_tracker.pk_range_wrapper_to_health_info |
168 |
| - unhealthy_partitions = 0 |
169 |
| - for pk_range_wrapper, location_to_health_info in health_info_map.items(): |
170 |
| - for location, health_info in location_to_health_info.items(): |
171 |
| - health_status = health_info.unavailability_info.get(HEALTH_STATUS) |
172 |
| - if health_status == UNHEALTHY_TENTATIVE or health_status == UNHEALTHY: |
173 |
| - unhealthy_partitions += 1 |
174 |
| - else: |
175 |
| - assert health_info.read_consecutive_failure_count < 10 |
176 |
| - assert health_info.write_consecutive_failure_count < 5 |
177 |
| - |
178 |
| - assert unhealthy_partitions == expected_unhealthy_partitions |
179 |
| - |
180 |
| -def validate_stats(global_endpoint_manager, |
181 |
| - expected_write_consecutive_failure_count, |
182 |
| - expected_read_consecutive_failure_count, |
183 |
| - expected_read_failure_count, |
184 |
| - expected_write_failure_count, |
185 |
| - expected_write_success_count, |
186 |
| - expected_read_success_count): |
187 |
| - health_info_map = global_endpoint_manager.global_partition_endpoint_manager_core.partition_health_tracker.pk_range_wrapper_to_health_info |
188 |
| - for pk_range_wrapper, location_to_health_info in health_info_map.items(): |
189 |
| - health_info = location_to_health_info[REGION_1] |
190 |
| - assert health_info.read_consecutive_failure_count == expected_read_consecutive_failure_count |
191 |
| - assert health_info.write_consecutive_failure_count == expected_write_consecutive_failure_count |
192 |
| - assert health_info.read_failure_count == expected_read_failure_count |
193 |
| - assert health_info.write_failure_count == expected_write_failure_count |
194 |
| - assert health_info.read_success_count == expected_read_success_count |
195 |
| - assert health_info.write_success_count == expected_write_success_count |
196 |
| - |
197 | 101 | async def cleanup_method(initialized_objects: List[Dict[str, Any]]):
|
198 | 102 | for obj in initialized_objects:
|
199 | 103 | method_client: CosmosClient = obj["client"]
|
@@ -416,10 +320,10 @@ async def test_read_failure_rate_threshold_async(self, read_operation, error):
|
416 | 320 | validate_unhealthy_partitions(global_endpoint_manager, 0)
|
417 | 321 | # read will fail and retry in other region
|
418 | 322 | await perform_read_operation(read_operation,
|
419 |
| - fault_injection_container, |
420 |
| - doc['id'], |
421 |
| - PK_VALUE, |
422 |
| - expected_uri) |
| 323 | + fault_injection_container, |
| 324 | + doc['id'], |
| 325 | + PK_VALUE, |
| 326 | + expected_uri) |
423 | 327 | if read_operation in (CHANGE_FEED, QUERY, READ_ALL_ITEMS):
|
424 | 328 | # these operations are cross partition so they would mark both partitions as unavailable
|
425 | 329 | expected_unhealthy_partitions = 5
|
@@ -468,7 +372,7 @@ async def test_stat_reset_async(self):
|
468 | 372 | except CosmosHttpResponseError as e:
|
469 | 373 | assert e.status_code == 503
|
470 | 374 | validate_unhealthy_partitions(global_endpoint_manager, 0)
|
471 |
| - validate_stats(global_endpoint_manager, 2, 2, 2, 2, 0, 0) |
| 375 | + validate_stats(global_endpoint_manager, 2, 2, 2, 2, 0, 0) |
472 | 376 | await asyncio.sleep(25)
|
473 | 377 | await perform_read_operation(READ,
|
474 | 378 | fault_injection_container,
|
@@ -507,10 +411,10 @@ async def test_service_request_error_async(self, read_operation, write_operation
|
507 | 411 | custom_transport.faults = []
|
508 | 412 | try:
|
509 | 413 | await perform_read_operation(read_operation,
|
510 |
| - fault_injection_container, |
511 |
| - doc['id'], |
512 |
| - PK_VALUE, |
513 |
| - expected_uri) |
| 414 | + fault_injection_container, |
| 415 | + doc['id'], |
| 416 | + PK_VALUE, |
| 417 | + expected_uri) |
514 | 418 | finally:
|
515 | 419 | _partition_health_tracker.INITIAL_UNAVAILABLE_TIME_MS = original_unavailable_time
|
516 | 420 | validate_unhealthy_partitions(global_endpoint_manager, 0)
|
|
0 commit comments