|
12 | 12 |
|
13 | 13 | import test_config |
14 | 14 | from azure.cosmos import _location_cache, _partition_health_tracker |
15 | | -from azure.cosmos._partition_health_tracker import HEALTH_STATUS, UNHEALTHY, UNHEALTHY_TENTATIVE |
16 | 15 | from azure.cosmos.aio import CosmosClient |
17 | 16 | from azure.cosmos.exceptions import CosmosHttpResponseError |
18 | 17 | from _fault_injection_transport_async import FaultInjectionTransportAsync |
19 | | - |
20 | | -REGION_1 = "West US 3" |
21 | | -REGION_2 = "West US" |
22 | | -CHANGE_FEED = "changefeed" |
23 | | -CHANGE_FEED_PK = "changefeed_pk" |
24 | | -CHANGE_FEED_EPK = "changefeed_epk" |
25 | | -READ = "read" |
26 | | -CREATE = "create" |
27 | | -READ_ALL_ITEMS = "read_all_items" |
28 | | -DELETE_ALL_ITEMS_BY_PARTITION_KEY = "delete_all_items_by_partition_key" |
29 | | -QUERY = "query" |
30 | | -QUERY_PK = "query_pk" |
31 | | -BATCH = "batch" |
32 | | -UPSERT = "upsert" |
33 | | -REPLACE = "replace" |
34 | | -PATCH = "patch" |
35 | | -DELETE = "delete" |
36 | | -PK_VALUE = "pk1" |
37 | | - |
| 18 | +from test_per_partition_circuit_breaker_mm import create_doc, read_operations_and_errors, \ |
| 19 | + write_operations_and_errors, operations, REGION_1, REGION_2, CHANGE_FEED, CHANGE_FEED_PK, CHANGE_FEED_EPK, READ, \ |
| 20 | + CREATE, READ_ALL_ITEMS, DELETE_ALL_ITEMS_BY_PARTITION_KEY, QUERY, QUERY_PK, BATCH, UPSERT, REPLACE, PATCH, DELETE, \ |
| 21 | + PK_VALUE, validate_unhealthy_partitions, validate_response_uri |
| 22 | +from test_per_partition_circuit_breaker_mm import validate_stats |
38 | 23 |
|
39 | 24 | COLLECTION = "created_collection" |
40 | 25 |
|
41 | | -def create_errors(): |
42 | | - errors = [] |
43 | | - error_codes = [408, 500, 502, 503] |
44 | | - for error_code in error_codes: |
45 | | - errors.append(CosmosHttpResponseError( |
46 | | - status_code=error_code, |
47 | | - message="Some injected error.")) |
48 | | - errors.append(ServiceResponseError(message="Injected Service Response Error.")) |
49 | | - return errors |
50 | | - |
51 | | -def write_operations_and_errors(): |
52 | | - write_operations = [CREATE, UPSERT, REPLACE, DELETE, PATCH, BATCH] |
53 | | - errors = create_errors() |
54 | | - params = [] |
55 | | - for write_operation in write_operations: |
56 | | - for error in errors: |
57 | | - params.append((write_operation, error)) |
58 | | - |
59 | | - return params |
60 | | - |
61 | | -def create_doc(): |
62 | | - return {'id': str(uuid.uuid4()), |
63 | | - 'pk': PK_VALUE, |
64 | | - 'name': 'sample document', |
65 | | - 'key': 'value'} |
66 | | - |
67 | | -def read_operations_and_errors(): |
68 | | - read_operations = [READ, QUERY_PK, CHANGE_FEED, CHANGE_FEED_PK, CHANGE_FEED_EPK] |
69 | | - errors = create_errors() |
70 | | - params = [] |
71 | | - for read_operation in read_operations: |
72 | | - for error in errors: |
73 | | - params.append((read_operation, error)) |
74 | | - |
75 | | - return params |
76 | | - |
77 | | -def operations(): |
78 | | - write_operations = [CREATE, UPSERT, REPLACE, DELETE, PATCH, BATCH] |
79 | | - read_operations = [READ, QUERY_PK, CHANGE_FEED_PK, CHANGE_FEED_EPK] |
80 | | - operations = [] |
81 | | - for i, write_operation in enumerate(write_operations): |
82 | | - operations.append((read_operations[i % len(read_operations)], write_operation)) |
83 | | - |
84 | | - return operations |
85 | | - |
86 | | -def validate_response_uri(response, expected_uri): |
87 | | - request = response.get_response_headers()["_request"] |
88 | | - assert request.url.startswith(expected_uri) |
89 | | - |
90 | 26 | async def perform_write_operation(operation, container, fault_injection_container, doc_id, pk, expected_uri): |
91 | 27 | doc = {'id': doc_id, |
92 | 28 | 'pk': pk, |
@@ -162,38 +98,6 @@ async def perform_read_operation(operation, container, doc_id, pk, expected_uri) |
162 | 98 | async for _ in container.read_all_items(): |
163 | 99 | pass |
164 | 100 |
|
165 | | -def validate_unhealthy_partitions(global_endpoint_manager, |
166 | | - expected_unhealthy_partitions): |
167 | | - health_info_map = global_endpoint_manager.global_partition_endpoint_manager_core.partition_health_tracker.pk_range_wrapper_to_health_info |
168 | | - unhealthy_partitions = 0 |
169 | | - for pk_range_wrapper, location_to_health_info in health_info_map.items(): |
170 | | - for location, health_info in location_to_health_info.items(): |
171 | | - health_status = health_info.unavailability_info.get(HEALTH_STATUS) |
172 | | - if health_status == UNHEALTHY_TENTATIVE or health_status == UNHEALTHY: |
173 | | - unhealthy_partitions += 1 |
174 | | - else: |
175 | | - assert health_info.read_consecutive_failure_count < 10 |
176 | | - assert health_info.write_consecutive_failure_count < 5 |
177 | | - |
178 | | - assert unhealthy_partitions == expected_unhealthy_partitions |
179 | | - |
180 | | -def validate_stats(global_endpoint_manager, |
181 | | - expected_write_consecutive_failure_count, |
182 | | - expected_read_consecutive_failure_count, |
183 | | - expected_read_failure_count, |
184 | | - expected_write_failure_count, |
185 | | - expected_write_success_count, |
186 | | - expected_read_success_count): |
187 | | - health_info_map = global_endpoint_manager.global_partition_endpoint_manager_core.partition_health_tracker.pk_range_wrapper_to_health_info |
188 | | - for pk_range_wrapper, location_to_health_info in health_info_map.items(): |
189 | | - health_info = location_to_health_info[REGION_1] |
190 | | - assert health_info.read_consecutive_failure_count == expected_read_consecutive_failure_count |
191 | | - assert health_info.write_consecutive_failure_count == expected_write_consecutive_failure_count |
192 | | - assert health_info.read_failure_count == expected_read_failure_count |
193 | | - assert health_info.write_failure_count == expected_write_failure_count |
194 | | - assert health_info.read_success_count == expected_read_success_count |
195 | | - assert health_info.write_success_count == expected_write_success_count |
196 | | - |
197 | 101 | async def cleanup_method(initialized_objects: List[Dict[str, Any]]): |
198 | 102 | for obj in initialized_objects: |
199 | 103 | method_client: CosmosClient = obj["client"] |
@@ -416,10 +320,10 @@ async def test_read_failure_rate_threshold_async(self, read_operation, error): |
416 | 320 | validate_unhealthy_partitions(global_endpoint_manager, 0) |
417 | 321 | # read will fail and retry in other region |
418 | 322 | await perform_read_operation(read_operation, |
419 | | - fault_injection_container, |
420 | | - doc['id'], |
421 | | - PK_VALUE, |
422 | | - expected_uri) |
| 323 | + fault_injection_container, |
| 324 | + doc['id'], |
| 325 | + PK_VALUE, |
| 326 | + expected_uri) |
423 | 327 | if read_operation in (CHANGE_FEED, QUERY, READ_ALL_ITEMS): |
424 | 328 | # these operations are cross partition so they would mark both partitions as unavailable |
425 | 329 | expected_unhealthy_partitions = 5 |
@@ -468,7 +372,7 @@ async def test_stat_reset_async(self): |
468 | 372 | except CosmosHttpResponseError as e: |
469 | 373 | assert e.status_code == 503 |
470 | 374 | validate_unhealthy_partitions(global_endpoint_manager, 0) |
471 | | - validate_stats(global_endpoint_manager, 2, 2, 2, 2, 0, 0) |
| 375 | + validate_stats(global_endpoint_manager, 2, 2, 2, 2, 0, 0) |
472 | 376 | await asyncio.sleep(25) |
473 | 377 | await perform_read_operation(READ, |
474 | 378 | fault_injection_container, |
@@ -507,10 +411,10 @@ async def test_service_request_error_async(self, read_operation, write_operation |
507 | 411 | custom_transport.faults = [] |
508 | 412 | try: |
509 | 413 | await perform_read_operation(read_operation, |
510 | | - fault_injection_container, |
511 | | - doc['id'], |
512 | | - PK_VALUE, |
513 | | - expected_uri) |
| 414 | + fault_injection_container, |
| 415 | + doc['id'], |
| 416 | + PK_VALUE, |
| 417 | + expected_uri) |
514 | 418 | finally: |
515 | 419 | _partition_health_tracker.INITIAL_UNAVAILABLE_TIME_MS = original_unavailable_time |
516 | 420 | validate_unhealthy_partitions(global_endpoint_manager, 0) |
|
0 commit comments