Skip to content

Commit 19a45cc

Browse files
authored
Merge pull request #31 from release-engineering/clearRetry
Add clearer retry logic with logging [RHELDST-9679]
2 parents 0471367 + d2b47e8 commit 19a45cc

File tree

3 files changed

+94
-23
lines changed

3 files changed

+94
-23
lines changed

fastpurge/_client.py

Lines changed: 61 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
from threading import local, Lock
77

88
import requests
9+
from requests.adapters import HTTPAdapter
10+
from requests.exceptions import RetryError
11+
from urllib3.util import Retry
12+
from http import HTTPStatus
913

1014
try:
1115
from time import monotonic
@@ -32,6 +36,29 @@
3236
])
3337

3438

39+
class LoggingRetry(Retry):
40+
def __init__(self, *args, **kwargs, ):
41+
self._logger = kwargs.pop('logger', None)
42+
super(LoggingRetry, self).__init__(*args, **kwargs)
43+
44+
def new(self, **kw):
45+
kw['logger'] = self._logger
46+
return super(LoggingRetry, self).new(**kw)
47+
48+
def increment(self, method, url, *args, **kwargs):
49+
response = kwargs.get("response")
50+
if response:
51+
self._logger.error("An invalid status code %s was received "
52+
"when trying to %s to %s: %s",
53+
response.status, method, url, response.reason)
54+
else: # pragma: no cover
55+
self._logger.error(
56+
"An unknown error occurred when trying to %s to %s", method,
57+
url)
58+
return super(LoggingRetry, self).increment(method, url, *args,
59+
**kwargs)
60+
61+
3562
class FastPurgeError(RuntimeError):
3663
"""Raised when the Fast Purge API reports an error.
3764
@@ -74,6 +101,11 @@ class FastPurgeClient(object):
74101
# Default network matches Akamai's documented default
75102
DEFAULT_NETWORK = os.environ.get("FAST_PURGE_DEFAULT_NETWORK", "production")
76103

104+
# Max number of retries allowed for HTTP requests, and the backoff used
105+
# to extend the delay between requests.
106+
MAX_RETRIES = int(os.environ.get("FAST_PURGE_MAX_RETRIES", "10"))
107+
108+
RETRY_BACKOFF = float(os.environ.get("FAST_PURGE_RETRY_BACKOFF", "0.15"))
77109
# Default purge type.
78110
# Akamai recommend "invalidate", so why is "delete" our default?
79111
# Here's what Akamai docs have to say:
@@ -197,12 +229,32 @@ def __baseurl(self):
197229

198230
return '{out}:{port}'.format(out=out, port=self.__port)
199231

232+
@property
233+
def __retry_policy(self):
234+
retries = getattr(self.__local, 'retries', None)
235+
if not retries:
236+
retries = LoggingRetry(
237+
total=self.MAX_RETRIES,
238+
backoff_factor=self.RETRY_BACKOFF,
239+
# We strictly require 201 here since that's how the server
240+
# tells us we queued something async, as expected
241+
status_forcelist=[status.value for status in HTTPStatus
242+
if status.value != 201],
243+
allowed_methods={'POST'},
244+
logger=LOG,
245+
)
246+
self.__local.retries = retries
247+
return retries
248+
200249
@property
201250
def __session(self):
202251
session = getattr(self.__local, 'session', None)
203252
if not session:
204253
session = requests.Session()
205254
session.auth = EdgeGridAuth(**self.__auth)
255+
session.mount(self.__baseurl,
256+
HTTPAdapter(max_retries=self.__retry_policy))
257+
206258
self.__local.session = session
207259
return session
208260

@@ -223,21 +275,16 @@ def __get_request_bodies(self, objects):
223275
def __start_purge(self, endpoint, request_body):
224276
headers = {'Content-Type': 'application/json'}
225277
LOG.debug("POST JSON of size %s to %s", len(request_body), endpoint)
226-
227-
response = self.__session.post(endpoint, data=request_body, headers=headers)
228-
229-
# Did it succeed? We strictly require 201 here since that's how the server tells
230-
# us we queued something async, as expected
231-
if response.status_code != 201:
232-
message = "Request to {endpoint} failed: {r.status_code} {r.reason} {text}".\
233-
format(endpoint=endpoint, r=response, text=response.text[0:800])
278+
try:
279+
response = self.__session.post(endpoint, data=request_body, headers=headers)
280+
response_body = response.json()
281+
estimated_seconds = response_body.get('estimatedSeconds', 5)
282+
return Purge(response_body, monotonic() + estimated_seconds)
283+
except RetryError as e:
284+
message = "Request to {endpoint} was unsuccessful after {retries} retries: {reason}". \
285+
format(endpoint=endpoint, retries=self.MAX_RETRIES, reason=e.args[0].reason)
234286
LOG.debug("%s", message)
235-
raise FastPurgeError(message)
236-
237-
response_body = response.json()
238-
estimated_seconds = response_body.get('estimatedSeconds', 5)
239-
240-
return Purge(response_body, monotonic() + estimated_seconds)
287+
raise FastPurgeError(message) from e
241288

242289
def purge_objects(self, object_type, objects, **kwargs):
243290
"""Purge a collection of objects.

test-requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ pytest
22
requests-mock
33
mock
44
bandit==1.7.5;python_version > '3'
5+
responses

tests/test_purge.py

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pytest
22
import requests_mock
33
import mock
4+
import responses
45

56
try:
67
from time import monotonic
@@ -37,7 +38,7 @@ def requests_mocker():
3738

3839

3940
@pytest.fixture
40-
def no_retries():
41+
def no_thread_retries():
4142
"""Suppress retries for the duration of this fixture."""
4243

4344
with mock.patch('more_executors.retry.ExceptionRetryPolicy') as policy_class:
@@ -131,20 +132,20 @@ def test_scheme_port(client_auth, requests_mocker):
131132
assert future.result()
132133

133134

134-
def test_response_fails(client, requests_mocker, no_retries):
135+
@responses.activate
136+
def test_response_fails(client, no_thread_retries, monkeypatch):
135137
"""Requests fail with a FastPurgeError if API gives unsuccessful response."""
138+
url = 'https://fastpurge.example.com/ccu/v3/delete/cpcode/production'
139+
# Decrease backoff, otherwise the test will run for 5 minutes
140+
monkeypatch.setenv("FAST_PURGE_RETRY_BACKOFF", "0.001")
136141

137-
requests_mocker.register_uri(
138-
method='POST',
139-
url='https://fastpurge.example.com/ccu/v3/delete/cpcode/production',
140-
status_code=503,
141-
reason='simulated internal error')
142-
142+
responses.add(responses.POST, url, status=503,
143+
content_type="application/json", body="Error")
143144
future = client.purge_by_cpcode([1234, 5678])
144145
exception = future.exception()
145146

146147
assert isinstance(exception, FastPurgeError)
147-
assert '503 simulated internal error' in str(exception)
148+
assert 'too many 503 error responses' in str(exception)
148149

149150

150151
def test_split_requests(client, requests_mocker):
@@ -201,3 +202,25 @@ def test_multiple_clients_with_the_same_auth_dict(client_auth):
201202
client2 = FastPurgeClient(auth=client_auth)
202203

203204
assert client1 is not client2
205+
206+
207+
@responses.activate(registry=responses.registries.OrderedRegistry)
208+
def test_retries_on_error(client_auth):
209+
"""Sanity check for the retry functionality"""
210+
url = 'http://fastpurge.example.com:42/ccu/v3/delete/tag/staging'
211+
err_1 = responses.add(responses.POST, url, status=500,
212+
content_type="application/json", body="Error")
213+
err_2 = responses.add(responses.POST, url, status=501,
214+
content_type="application/json", body="Error")
215+
res = responses.add(responses.POST, url, status=201,
216+
content_type="application/json",
217+
json={'estimatedSeconds': 0.1})
218+
219+
client = FastPurgeClient(auth=client_auth, scheme='http', port=42)
220+
221+
future = client.purge_by_tag(['red'], network='staging')
222+
223+
assert future.result()
224+
assert len(err_1.calls) == 1
225+
assert len(err_2.calls) == 1
226+
assert len(res.calls) == 1

0 commit comments

Comments
 (0)