Skip to content

Commit 032ee78

Browse files
retry s3 calls
1 parent 80e97ff commit 032ee78

File tree

4 files changed

+202
-160
lines changed

4 files changed

+202
-160
lines changed

gen3workflow/app.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,12 @@ async def get_status():
6767
generate_unique_id_function=generate_unique_route_id,
6868
)
6969

70-
# `async_client` is used to hit the TES API, the Arborist service and AWS S3.
71-
# Calls to S3 tend to timeout when uploading large files (and we might also be rate-limited).
72-
# AsyncHTTPTransport supports retrying on httpx.ConnectError or httpx.ConnectTimeout.
70+
# `async_client` is used to hit the TES API and AWS S3.
71+
# Calls to AWS S3 tend to timeout when uploading large files, so we increase the timeout.
72+
# We may also be rate-limited, so calls are retried (see routes/s3.py).
7373
# The `httpx_client` parameter is not meant to be used in production. It allows mocking
7474
# external calls when testing.
75-
app.async_client = httpx_client or httpx.AsyncClient(
76-
transport=httpx.AsyncHTTPTransport(retries=3), timeout=120
77-
)
75+
app.async_client = httpx_client or httpx.AsyncClient(timeout=120)
7876

7977
app.include_router(ga4gh_tes_router, tags=["GA4GH TES"])
8078
app.include_router(s3_router, tags=["S3"])

gen3workflow/routes/s3.py

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
import asyncio
12
from datetime import datetime, timezone
23
import hashlib
3-
import traceback
4+
import random
45
import urllib.parse
56

67
import boto3
@@ -14,18 +15,23 @@
1415
HTTP_400_BAD_REQUEST,
1516
HTTP_401_UNAUTHORIZED,
1617
HTTP_403_FORBIDDEN,
18+
HTTP_404_NOT_FOUND,
1719
)
1820

1921
from gen3workflow import aws_utils, logger
2022
from gen3workflow.auth import Auth
2123
from gen3workflow.config import config
22-
from gen3workflow.routes.system import get_status
2324

2425

2526
s3_root_router = APIRouter(include_in_schema=False)
2627
s3_router = APIRouter(prefix="/s3")
2728

2829

30+
s3_max_retries = 3
31+
s3_retry_base_delay = 0.5
32+
s3_retry_backoff_factor = 2
33+
34+
2935
async def set_access_token_and_get_user_id(auth: Auth, headers: Headers) -> str:
3036
"""
3137
Extract the user's access token and (in some cases) the user's ID, which should have been
@@ -336,28 +342,66 @@ async def s3_endpoint(path: str, request: Request):
336342
signing_key, string_to_sign.encode("utf-8"), hashlib.sha256
337343
).hexdigest()
338344

339-
# construct the Authorization header from the credentials and the signature, and forward the
340-
# call to AWS S3 with the new Authorization header
345+
# construct the Authorization header from the credentials and the signature
341346
headers["authorization"] = (
342347
f"AWS4-HMAC-SHA256 Credential={credentials.access_key}/{date}/{region}/{service}/aws4_request, SignedHeaders={signed_headers}, Signature={signature}"
343348
)
344349
s3_api_url = f"https://{user_bucket}.s3.{region}.amazonaws.com/{api_endpoint}"
345350
logger.debug(f"Outgoing S3 request: '{request.method} {s3_api_url}'")
346-
response = await request.app.async_client.request(
347-
method=request.method,
348-
url=s3_api_url,
349-
headers=headers,
350-
params=query_params,
351-
data=body,
352-
)
353351

354-
if response.status_code >= 300:
355-
logger.debug(f"Received a failure status code from AWS: {response.status_code}")
356-
# no need to log 404 errors except in debug mode: they are are expected when running
357-
# workflows (e.g. for Nextflow workflows, error output files may not be present when there
358-
# were no errors)
359-
if response.status_code != 404:
360-
logger.error(f"Error from AWS: {response.status_code} {response.text}")
352+
# forward the call to AWS S3 with the new Authorization header.
353+
# this call is retried with exponential backoff in case of unexpected error from S3.
354+
attempt = 1
355+
while True:
356+
proceed = True
357+
exception = None
358+
try:
359+
response = await request.app.async_client.request(
360+
method=request.method,
361+
url=s3_api_url,
362+
headers=headers,
363+
params=query_params,
364+
data=body,
365+
)
366+
367+
if response.status_code >= 300:
368+
# no need to log details (unless in debug mode) or retry in the case of a 404
369+
# error: 404s are are expected when running workflows (e.g. for Nextflow workflows,
370+
# error output files may not be present when there were no errors)
371+
if response.status_code != HTTP_404_NOT_FOUND:
372+
logger.error(
373+
f"Error from S3: {response.status_code} {response.text}"
374+
)
375+
# do not retry in the case of a 403 error: authentication is done internally by
376+
# this function, so 403 errors are internal service errors
377+
if response.status_code != HTTP_403_FORBIDDEN:
378+
proceed = False
379+
else:
380+
logger.debug(f"Error from S3: {response.status_code}")
381+
except Exception as e:
382+
proceed = False
383+
exception = e
384+
385+
# exit if the call succeeded or should not be retried, or we reached the max number of
386+
# retries
387+
if proceed:
388+
break
389+
if attempt == s3_max_retries:
390+
logger.error(
391+
f"Outgoing S3 request failed (attempt {attempt}/{s3_max_retries}). Giving up"
392+
)
393+
if exception:
394+
raise exception
395+
break
396+
397+
# retry with exponential backoff
398+
delay = s3_retry_base_delay * (s3_retry_backoff_factor**attempt)
399+
delay += delay * 0.1 * random.uniform(-1, 1) # add jitter
400+
logger.warning(
401+
f"Outgoing S3 request failed (attempt {attempt}/{s3_max_retries}). Retrying in {delay:.2f} seconds"
402+
)
403+
await asyncio.sleep(delay)
404+
attempt += 1
361405

362406
# return the response from AWS S3.
363407
# - mask the details of 403 errors from the end user: authentication is done internally by this

0 commit comments

Comments
 (0)