Skip to content

Commit b0512f5

Browse files
Retry calls to S3 (#105)
1 parent 80e97ff commit b0512f5

File tree

5 files changed

+203
-160
lines changed

5 files changed

+203
-160
lines changed

gen3workflow/app.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,12 @@ async def get_status():
6767
generate_unique_id_function=generate_unique_route_id,
6868
)
6969

70-
# `async_client` is used to hit the TES API, the Arborist service and AWS S3.
71-
# Calls to S3 tend to timeout when uploading large files (and we might also be rate-limited).
72-
# AsyncHTTPTransport supports retrying on httpx.ConnectError or httpx.ConnectTimeout.
70+
# `async_client` is used to hit the TES API and AWS S3.
71+
# Calls to AWS S3 tend to timeout when uploading large files, so we increase the timeout.
72+
# We may also be rate-limited, so calls are retried (see routes/s3.py).
7373
# The `httpx_client` parameter is not meant to be used in production. It allows mocking
7474
# external calls when testing.
75-
app.async_client = httpx_client or httpx.AsyncClient(
76-
transport=httpx.AsyncHTTPTransport(retries=3), timeout=120
77-
)
75+
app.async_client = httpx_client or httpx.AsyncClient(timeout=120)
7876

7977
app.include_router(ga4gh_tes_router, tags=["GA4GH TES"])
8078
app.include_router(s3_router, tags=["S3"])

gen3workflow/routes/ga4gh_tes.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,8 @@ def apply_view_to_task(view: str, task: dict) -> dict:
208208
task["inputs"][i].pop("content", None)
209209
for i in range(len(task.get("logs", []))):
210210
task["logs"][i].pop("system_logs", None)
211+
# TODO if the TES server returns a SYSTEM_ERROR, we may want to keep the system_logs, or at
212+
# least log them before removing them
211213

212214
return task
213215

gen3workflow/routes/s3.py

Lines changed: 61 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
import asyncio
12
from datetime import datetime, timezone
23
import hashlib
3-
import traceback
4+
import random
45
import urllib.parse
56

67
import boto3
@@ -14,18 +15,23 @@
1415
HTTP_400_BAD_REQUEST,
1516
HTTP_401_UNAUTHORIZED,
1617
HTTP_403_FORBIDDEN,
18+
HTTP_404_NOT_FOUND,
1719
)
1820

1921
from gen3workflow import aws_utils, logger
2022
from gen3workflow.auth import Auth
2123
from gen3workflow.config import config
22-
from gen3workflow.routes.system import get_status
2324

2425

2526
s3_root_router = APIRouter(include_in_schema=False)
2627
s3_router = APIRouter(prefix="/s3")
2728

2829

30+
S3_MAX_RETRIES = 3
31+
S3_RETRY_BASE_DELAY = 0.5
32+
S3_RETRY_BACKOFF_FACTOR = 2
33+
34+
2935
async def set_access_token_and_get_user_id(auth: Auth, headers: Headers) -> str:
3036
"""
3137
Extract the user's access token and (in some cases) the user's ID, which should have been
@@ -336,28 +342,65 @@ async def s3_endpoint(path: str, request: Request):
336342
signing_key, string_to_sign.encode("utf-8"), hashlib.sha256
337343
).hexdigest()
338344

339-
# construct the Authorization header from the credentials and the signature, and forward the
340-
# call to AWS S3 with the new Authorization header
345+
# construct the Authorization header from the credentials and the signature
341346
headers["authorization"] = (
342347
f"AWS4-HMAC-SHA256 Credential={credentials.access_key}/{date}/{region}/{service}/aws4_request, SignedHeaders={signed_headers}, Signature={signature}"
343348
)
344349
s3_api_url = f"https://{user_bucket}.s3.{region}.amazonaws.com/{api_endpoint}"
345350
logger.debug(f"Outgoing S3 request: '{request.method} {s3_api_url}'")
346-
response = await request.app.async_client.request(
347-
method=request.method,
348-
url=s3_api_url,
349-
headers=headers,
350-
params=query_params,
351-
data=body,
352-
)
353351

354-
if response.status_code >= 300:
355-
logger.debug(f"Received a failure status code from AWS: {response.status_code}")
356-
# no need to log 404 errors except in debug mode: they are are expected when running
357-
# workflows (e.g. for Nextflow workflows, error output files may not be present when there
358-
# were no errors)
359-
if response.status_code != 404:
360-
logger.error(f"Error from AWS: {response.status_code} {response.text}")
352+
# forward the call to AWS S3 with the new Authorization header.
353+
# this call is retried with exponential backoff in case of unexpected error from S3.
354+
for attempt in range(1, S3_MAX_RETRIES + 1):
355+
proceed = True
356+
exception = None
357+
try:
358+
response = await request.app.async_client.request(
359+
method=request.method,
360+
url=s3_api_url,
361+
headers=headers,
362+
params=query_params,
363+
data=body,
364+
)
365+
366+
if response.status_code >= 300:
367+
# no need to log details (unless in debug mode) or retry in the case of a 404
368+
# error: 404s are are expected when running workflows (e.g. for Nextflow workflows,
369+
# stderr output files may not be present when there were no errors)
370+
if response.status_code != HTTP_404_NOT_FOUND:
371+
logger.error(
372+
f"Error from S3: {response.status_code} {response.text}"
373+
)
374+
# do not retry in the case of a 403 error: authentication is done internally by
375+
# this function, so 403 errors are internal service errors
376+
if response.status_code != HTTP_403_FORBIDDEN:
377+
proceed = False
378+
else:
379+
logger.debug(f"Error from S3: {response.status_code}")
380+
except Exception as e:
381+
logger.error(f"Exception while attempting to make a call to S3: {e}")
382+
proceed = False
383+
exception = e
384+
385+
# exit if the call succeeded or should not be retried, or we reached the max number of
386+
# retries
387+
if proceed:
388+
break
389+
if attempt == S3_MAX_RETRIES:
390+
logger.error(
391+
f"Outgoing S3 request failed (attempt {attempt}/{S3_MAX_RETRIES}). Giving up"
392+
)
393+
if exception:
394+
raise exception
395+
break
396+
397+
# retry with exponential backoff
398+
delay = S3_RETRY_BASE_DELAY * (S3_RETRY_BACKOFF_FACTOR**attempt)
399+
delay += delay * 0.1 * random.uniform(-1, 1) # add jitter
400+
logger.warning(
401+
f"Outgoing S3 request failed (attempt {attempt}/{S3_MAX_RETRIES}). Retrying in {delay:.2f} seconds"
402+
)
403+
await asyncio.sleep(delay)
361404

362405
# return the response from AWS S3.
363406
# - mask the details of 403 errors from the end user: authentication is done internally by this

0 commit comments

Comments
 (0)