Skip to content

Commit 6f78dc7

Browse files
authored
Bug Retrying Writes (Azure#40672)
* No retries for writes * update changelog * fix pylint * fix test
1 parent 88a96e1 commit 6f78dc7

File tree

8 files changed

+146
-26
lines changed

8 files changed

+146
-26
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,17 @@
44

55
#### Features Added
66
* Added ability to set `throughput_bucket` header at the client level and for all requests. See [PR 40340](https://github.com/Azure/azure-sdk-for-python/pull/40340).
7-
* Added ability to use Filters from Logging module on Diagnostics Logging based on Http request/response related attributes. See [PR 39897](https://github.com/Azure/azure-sdk-for-python/pull/39897)
7+
* Added ability to use Filters from Logging module on Diagnostics Logging based on Http request/response related attributes. See [PR 39897](https://github.com/Azure/azure-sdk-for-python/pull/39897).
88

99
#### Breaking Changes
1010

1111
#### Bugs Fixed
1212
* Fixed how the environment variables in the sdk are parsed. See [PR 40303](https://github.com/Azure/azure-sdk-for-python/pull/40303).
1313
* Fixed health check to check the first write region when it is not specified in the preferred regions. See [PR 40588](https://github.com/Azure/azure-sdk-for-python/pull/40588).
14+
* Fixed bug where writes were being retried for 5xx status codes for patch and replace. See [PR 40672](https://github.com/Azure/azure-sdk-for-python/pull/40672).
1415

1516
#### Other Changes
16-
* Optimized Diagnostics Logging by reducing time spent on logging. Logged Errors are more readable and formatted. See [PR 39897](https://github.com/Azure/azure-sdk-for-python/pull/39897)
17+
* Optimized Diagnostics Logging by reducing time spent on logging. Logged Errors are more readable and formatted. See [PR 39897](https://github.com/Azure/azure-sdk-for-python/pull/39897).
1718
* Health checks are now done concurrently and for all regions for async apis. See [PR 40588](https://github.com/Azure/azure-sdk-for-python/pull/40588).
1819

1920

sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
from . import _timeout_failover_retry_policy
4040
from . import exceptions
4141
from .documents import _OperationType
42+
from .exceptions import CosmosHttpResponseError
4243
from .http_constants import HttpHeaders, StatusCodes, SubStatusCodes, ResourceType
4344

4445

@@ -337,11 +338,13 @@ def send(self, request):
337338
self.sleep(retry_settings, request.context.transport)
338339
continue
339340
raise err
341+
except CosmosHttpResponseError as err:
342+
raise err
340343
except AzureError as err:
341344
retry_error = err
342345
if _has_database_account_header(request.http_request.headers):
343346
raise err
344-
if self._is_method_retryable(retry_settings, request.http_request):
347+
if _has_read_retryable_headers(request.http_request.headers) and retry_settings['read'] > 0:
345348
retry_active = self.increment(retry_settings, response=request, error=err)
346349
if retry_active:
347350
self.sleep(retry_settings, request.context.transport)

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from .._retry_utility import (_configure_timeout, _has_read_retryable_headers,
4141
_handle_service_response_retries, _handle_service_request_retries,
4242
_has_database_account_header)
43+
from ..exceptions import CosmosHttpResponseError
4344
from ..http_constants import HttpHeaders, StatusCodes, SubStatusCodes
4445

4546

@@ -305,11 +306,13 @@ async def send(self, request):
305306
except ImportError:
306307
raise err # pylint: disable=raise-missing-from
307308
raise err
309+
except CosmosHttpResponseError as err:
310+
raise err
308311
except AzureError as err:
309312
retry_error = err
310313
if _has_database_account_header(request.http_request.headers):
311314
raise err
312-
if self._is_method_retryable(retry_settings, request.http_request):
315+
if _has_read_retryable_headers(request.http_request.headers) and retry_settings['read'] > 0:
313316
retry_active = self.increment(retry_settings, response=request, error=err)
314317
if retry_active:
315318
await self.sleep(retry_settings, request.context.transport)

sdk/cosmos/azure-cosmos/tests/_fault_injection_transport.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
from azure.cosmos.exceptions import CosmosHttpResponseError
3939
from azure.core.exceptions import ServiceRequestError, ServiceResponseError
4040

41+
from azure.cosmos.http_constants import ResourceType, HttpHeaders
42+
4143
class FaultInjectionTransport(RequestsTransport):
4244
logger = logging.getLogger('azure.cosmos.fault_injection_transport')
4345
logger.setLevel(logging.DEBUG)
@@ -130,21 +132,26 @@ def predicate_req_for_document_with_id(r: HttpRequest, id_value: str) -> bool:
130132

131133
@staticmethod
132134
def predicate_is_database_account_call(r: HttpRequest) -> bool:
133-
is_db_account_read = (r.headers.get('x-ms-thinclient-proxy-resource-type') == 'databaseaccount'
134-
and r.headers.get('x-ms-thinclient-proxy-operation-type') == 'Read')
135+
is_db_account_read = (r.headers.get(HttpHeaders.ThinClientProxyResourceType) == ResourceType.DatabaseAccount
136+
and r.headers.get(HttpHeaders.ThinClientProxyOperationType) == documents._OperationType.Read)
135137

136138
return is_db_account_read
137139

138140
@staticmethod
139141
def predicate_is_document_operation(r: HttpRequest) -> bool:
140-
is_document_operation = (r.headers.get('x-ms-thinclient-proxy-resource-type') == 'docs')
141-
142+
is_document_operation = r.headers.get(HttpHeaders.ThinClientProxyResourceType) == ResourceType.Document
142143
return is_document_operation
143144

145+
@staticmethod
146+
def predicate_is_operation_type(r: HttpRequest, operation_type: str) -> bool:
147+
is_operation_type = r.headers.get(HttpHeaders.ThinClientProxyOperationType) == operation_type
148+
149+
return is_operation_type
150+
144151
@staticmethod
145152
def predicate_is_write_operation(r: HttpRequest, uri_prefix: str) -> bool:
146153
is_write_document_operation = documents._OperationType.IsWriteOperation(
147-
str(r.headers.get('x-ms-thinclient-proxy-operation-type')))
154+
str(r.headers.get(HttpHeaders.ThinClientProxyOperationType)),)
148155

149156
return is_write_document_operation and uri_prefix in r.url
150157

sdk/cosmos/azure-cosmos/tests/_fault_injection_transport_async.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
from azure.cosmos.exceptions import CosmosHttpResponseError
3737
from azure.core.exceptions import ServiceRequestError, ServiceResponseError
3838

39+
from azure.cosmos.http_constants import ResourceType, HttpHeaders
40+
3941
class FaultInjectionTransportAsync(AioHttpTransport):
4042
logger = logging.getLogger('azure.cosmos.fault_injection_transport_async')
4143
logger.setLevel(logging.DEBUG)
@@ -128,17 +130,23 @@ def predicate_req_for_document_with_id(r: HttpRequest, id_value: str) -> bool:
128130

129131
@staticmethod
130132
def predicate_is_database_account_call(r: HttpRequest) -> bool:
131-
is_db_account_read = (r.headers.get('x-ms-thinclient-proxy-resource-type') == 'databaseaccount'
132-
and r.headers.get('x-ms-thinclient-proxy-operation-type') == 'Read')
133+
is_db_account_read = (r.headers.get(HttpHeaders.ThinClientProxyResourceType) == ResourceType.DatabaseAccount
134+
and r.headers.get(HttpHeaders.ThinClientProxyOperationType) == documents._OperationType.Read)
133135

134136
return is_db_account_read
135137

136138
@staticmethod
137139
def predicate_is_document_operation(r: HttpRequest) -> bool:
138-
is_document_operation = (r.headers.get('x-ms-thinclient-proxy-resource-type') == 'docs')
140+
is_document_operation = (r.headers.get(HttpHeaders.ThinClientProxyResourceType) ==
141+
ResourceType.Document)
139142

140143
return is_document_operation
141144

145+
@staticmethod
146+
def predicate_is_operation_type(r: HttpRequest, operation_type: str) -> bool:
147+
is_operation_type = r.headers.get(HttpHeaders.ThinClientProxyOperationType) == operation_type
148+
return is_operation_type
149+
142150
@staticmethod
143151
def predicate_is_write_operation(r: HttpRequest, uri_prefix: str) -> bool:
144152
is_write_document_operation = documents._OperationType.IsWriteOperation(

sdk/cosmos/azure-cosmos/tests/test_config.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77
import unittest
88
import uuid
99

10+
from azure.cosmos._retry_utility import _has_database_account_header, _has_read_retryable_headers
1011
from azure.cosmos.cosmos_client import CosmosClient
12+
from azure.cosmos.exceptions import CosmosHttpResponseError
1113
from azure.cosmos.http_constants import StatusCodes
1214
from azure.cosmos.partition_key import PartitionKey
13-
from azure.cosmos import (ContainerProxy, DatabaseProxy, documents, exceptions, ConnectionRetryPolicy,
15+
from azure.cosmos import (ContainerProxy, DatabaseProxy, documents, exceptions,
1416
http_constants, _retry_utility)
1517
from azure.cosmos.aio import _retry_utility_async
1618
from azure.core.exceptions import AzureError, ServiceRequestError, ServiceResponseError
@@ -289,7 +291,7 @@ def body(self):
289291

290292

291293
class MockConnectionRetryPolicy(RetryPolicy):
292-
def __init__(self, resource_type, error, **kwargs):
294+
def __init__(self, resource_type, error=None, **kwargs):
293295
self.resource_type = resource_type
294296
self.error = error
295297
self.counter = 0
@@ -310,7 +312,8 @@ def send(self, request):
310312
# raise the passed in exception for the passed in resource + operation combination
311313
if request.http_request.headers.get(http_constants.HttpHeaders.ThinClientProxyResourceType) == self.resource_type:
312314
self.request_endpoints.append(request.http_request.url)
313-
raise self.error
315+
if self.error:
316+
raise self.error
314317
response = self.next.send(request)
315318
break
316319
except ServiceRequestError as err:
@@ -336,8 +339,14 @@ def send(self, request):
336339
self.sleep(retry_settings, request.context.transport)
337340
continue
338341
raise err
342+
except CosmosHttpResponseError as err:
343+
raise err
339344
except AzureError as err:
340-
if self._is_method_retryable(retry_settings, request.http_request):
345+
retry_error = err
346+
if _has_database_account_header(request.http_request.headers):
347+
raise err
348+
if _has_read_retryable_headers(request.http_request.headers) and retry_settings['read'] > 0:
349+
self.counter += 1
341350
retry_active = self.increment(retry_settings, response=request, error=err)
342351
if retry_active:
343352
self.sleep(retry_settings, request.context.transport)
@@ -353,7 +362,7 @@ def send(self, request):
353362

354363
class MockConnectionRetryPolicyAsync(AsyncRetryPolicy):
355364

356-
def __init__(self, resource_type, error, **kwargs):
365+
def __init__(self, resource_type, error = None, **kwargs):
357366
self.resource_type = resource_type
358367
self.error = error
359368
self.counter = 0
@@ -387,7 +396,8 @@ async def send(self, request):
387396
if request.http_request.headers.get(
388397
http_constants.HttpHeaders.ThinClientProxyResourceType) == self.resource_type:
389398
self.request_endpoints.append(request.http_request.url)
390-
raise self.error
399+
if self.error:
400+
raise self.error
391401
_retry_utility._configure_timeout(request, absolute_timeout, per_request_timeout)
392402
response = await self.next.send(request)
393403
break
@@ -403,7 +413,6 @@ async def send(self, request):
403413
if retry_settings['connect'] > 0:
404414
self.counter += 1
405415
retry_active = self.increment(retry_settings, response=request, error=err)
406-
print("Basic Retry in retry utility: ", retry_active)
407416
if retry_active:
408417
await self.sleep(retry_settings, request.context.transport)
409418
continue
@@ -422,18 +431,19 @@ async def send(self, request):
422431
await self.sleep(retry_settings, request.context.transport)
423432
continue
424433
raise err
434+
except CosmosHttpResponseError as err:
435+
raise err
425436
except AzureError as err:
426437
retry_error = err
427-
if self._is_method_retryable(retry_settings, request.http_request):
438+
if _has_database_account_header(request.http_request.headers):
439+
raise err
440+
if _has_read_retryable_headers(request.http_request.headers) and retry_settings['read'] > 0:
428441
retry_active = self.increment(retry_settings, response=request, error=err)
442+
self.counter += 1
429443
if retry_active:
430444
await self.sleep(retry_settings, request.context.transport)
431445
continue
432446
raise err
433-
finally:
434-
end_time = time.time()
435-
if absolute_timeout:
436-
absolute_timeout -= (end_time - start_time)
437447

438448
self.update_context(response.context, retry_settings)
439449
return response

sdk/cosmos/azure-cosmos/tests/test_retry_policy.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,20 @@
1010
import azure.cosmos.cosmos_client as cosmos_client
1111
import azure.cosmos.exceptions as exceptions
1212
import test_config
13-
from azure.cosmos import _retry_utility, PartitionKey
13+
from azure.cosmos import _retry_utility, PartitionKey, documents
1414
from azure.cosmos.http_constants import HttpHeaders, StatusCodes
15+
from _fault_injection_transport import FaultInjectionTransport
16+
17+
18+
def setup_method_with_custom_transport(
19+
custom_transport,
20+
**kwargs):
21+
connection_retry_policy = test_config.MockConnectionRetryPolicy(resource_type="docs")
22+
client = cosmos_client.CosmosClient(test_config.TestConfig.host, test_config.TestConfig.masterKey,
23+
transport=custom_transport, connection_retry_policy=connection_retry_policy, **kwargs)
24+
db = client.get_database_client(test_config.TestConfig.TEST_DATABASE_ID)
25+
container = db.get_container_client(test_config.TestConfig.TEST_SINGLE_PARTITION_CONTAINER_ID)
26+
return {"client": client, "db": db, "col": container, "retry_policy": connection_retry_policy}
1527

1628

1729
@pytest.mark.cosmosEmulator
@@ -442,6 +454,35 @@ def test_resource_throttle_and_connection_retry_total_retry_with_max_backoff(sel
442454
finally:
443455
_retry_utility.ExecuteFunction = self.original_execute_function
444456

457+
def test_patch_replace_no_retry(self):
458+
doc = {'id': str(uuid.uuid4()),
459+
'pk': str(uuid.uuid4()),
460+
'name': 'sample document',
461+
'key': 'value'}
462+
custom_transport = FaultInjectionTransport()
463+
predicate = lambda r: (FaultInjectionTransport.predicate_is_operation_type(r, documents._OperationType.Patch)
464+
or FaultInjectionTransport.predicate_is_operation_type(r, documents._OperationType.Replace))
465+
custom_transport.add_fault(predicate, lambda r: FaultInjectionTransport.error_after_delay(
466+
0,
467+
exceptions.CosmosHttpResponseError(
468+
status_code=502,
469+
message="Some random reverse proxy error.")))
470+
471+
initialized_objects = setup_method_with_custom_transport(
472+
custom_transport,
473+
)
474+
container = initialized_objects["col"]
475+
connection_retry_policy = initialized_objects["retry_policy"]
476+
container.create_item(body=doc)
477+
operations = [{"op": "incr", "path": "/company", "value": 3}]
478+
with self.assertRaises(exceptions.CosmosHttpResponseError):
479+
container.patch_item(item=doc['id'], partition_key=doc['pk'], patch_operations=operations)
480+
assert connection_retry_policy.counter == 0
481+
with self.assertRaises(exceptions.CosmosHttpResponseError):
482+
doc['name'] = "something else"
483+
container.replace_item(item=doc['id'], body=doc)
484+
assert connection_retry_policy.counter == 0
485+
445486
def _MockExecuteFunction(self, function, *args, **kwargs):
446487
response = test_config.FakeResponse({HttpHeaders.RetryAfterInMilliseconds: self.retry_after_in_milliseconds})
447488
raise exceptions.CosmosHttpResponseError(

sdk/cosmos/azure-cosmos/tests/test_retry_policy_async.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
# The MIT License (MIT)
22
# Copyright (c) Microsoft Corporation. All rights reserved.
3-
3+
import asyncio
44
import unittest
55
import uuid
66

77
import pytest
88

9+
from azure.cosmos import documents
10+
911
import azure.cosmos._retry_options as retry_options
1012
import azure.cosmos.exceptions as exceptions
1113
import test_config
@@ -15,6 +17,8 @@
1517
from azure.cosmos.aio import DatabaseProxy, ContainerProxy
1618
import azure.cosmos.aio._retry_utility_async as _retry_utility
1719
from azure.cosmos._retry_options import RetryOptions
20+
from _fault_injection_transport_async import FaultInjectionTransportAsync
21+
1822

1923
class ConnectionMode:
2024
"""Represents the connection mode to be used by the client."""
@@ -49,6 +53,18 @@ def __init__(self) -> None:
4953
self.ConnectionRetryConfiguration: Optional["ConnectionRetryPolicy"] = None
5054
self.ResponsePayloadOnWriteDisabled: bool = False
5155

56+
async def setup_method_with_custom_transport(
57+
custom_transport,
58+
**kwargs):
59+
connection_policy = ConnectionPolicy()
60+
connection_retry_policy = test_config.MockConnectionRetryPolicyAsync(resource_type="docs")
61+
connection_policy.ConnectionRetryConfiguration = connection_retry_policy
62+
client = CosmosClient(test_config.TestConfig.host, test_config.TestConfig.masterKey,
63+
transport=custom_transport, connection_policy=connection_policy, **kwargs)
64+
db = client.get_database_client(test_config.TestConfig.TEST_DATABASE_ID)
65+
container = db.get_container_client(test_config.TestConfig.TEST_SINGLE_PARTITION_CONTAINER_ID)
66+
return {"client": client, "db": db, "col": container, "retry_policy": connection_retry_policy}
67+
5268
@pytest.mark.cosmosEmulator
5369
class TestRetryPolicyAsync(unittest.IsolatedAsyncioTestCase):
5470
created_database: DatabaseProxy = None
@@ -490,6 +506,37 @@ async def test_resource_throttle_and_connection_retry_total_retry_with_max_backo
490506
finally:
491507
_retry_utility.ExecuteFunctionAsync = self.original_execute_function
492508

509+
async def test_patch_replace_no_retry_async(self):
510+
doc = {'id': str(uuid.uuid4()),
511+
'pk': str(uuid.uuid4()),
512+
'name': 'sample document',
513+
'key': 'value'}
514+
custom_transport = FaultInjectionTransportAsync()
515+
predicate = lambda r: (FaultInjectionTransportAsync.predicate_is_operation_type(r, documents._OperationType.Patch)
516+
or FaultInjectionTransportAsync.predicate_is_operation_type(r, documents._OperationType.Replace))
517+
custom_transport.add_fault(predicate, lambda r: asyncio.create_task(FaultInjectionTransportAsync.error_after_delay(
518+
0,
519+
exceptions.CosmosHttpResponseError(
520+
status_code=502,
521+
message="Some random reverse proxy error."))))
522+
523+
initialized_objects = await setup_method_with_custom_transport(
524+
custom_transport,
525+
)
526+
container = initialized_objects["col"]
527+
connection_retry_policy = initialized_objects["retry_policy"]
528+
await container.create_item(body=doc)
529+
operations = [{"op": "incr", "path": "/company", "value": 3}]
530+
with self.assertRaises(exceptions.CosmosHttpResponseError):
531+
await container.patch_item(item=doc['id'], partition_key=doc['pk'], patch_operations=operations)
532+
assert connection_retry_policy.counter == 0
533+
with self.assertRaises(exceptions.CosmosHttpResponseError):
534+
doc['name'] = "something else"
535+
await container.replace_item(item=doc['id'], body=doc)
536+
assert connection_retry_policy.counter == 0
537+
# Cleanup
538+
await initialized_objects["client"].close()
539+
493540
async def _MockExecuteFunction(self, function, *args, **kwargs):
494541
response = test_config.FakeResponse({HttpHeaders.RetryAfterInMilliseconds: self.retry_after_in_milliseconds})
495542
raise exceptions.CosmosHttpResponseError(

0 commit comments

Comments
 (0)