Skip to content
Open
7 changes: 5 additions & 2 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,18 @@ Line wrapping and indentation
x=1,
y=2)

The one exception to this rule are logging method invocations and calls to
reject() and require()::
The exception to this rule are logging method invocations, calls to
reject(), require(), or uses of TestCase.subTest() ::

logger.info('Waiting up to %s seconds for %s queues to %s ...',
timeout, len(queues), 'empty' if empty else 'not be empty')

reject(spline not in reticulated_splines,
'Unreticulated splines cause discombobulation.')

with self.subTest('manifest', catalog=catalog, format=format,
fetch=fetch, curl=curl, wait=wait):

Only if the second and subsequent arguments won't fit on one line, do we
wrap all arguments, one line per argument.

Expand Down
197 changes: 138 additions & 59 deletions lambdas/service/app.py

Large diffs are not rendered by default.

1,473 changes: 1,459 additions & 14 deletions lambdas/service/openapi.json

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions src/azul/chalice.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import mimetypes
import os
import pathlib
import time
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -883,3 +884,27 @@ def lambda_context(self) -> LambdaContext:
def current_request(self) -> AzulRequest:
assert self.app.current_request is not None
return self.app.current_request

def server_side_sleep(self, max_seconds: float) -> float:
"""
Sleep in the current Lambda function.

:param max_seconds: The requested amount of time to sleep in seconds.
The actual time slept will be less if the requested
amount would cause the Lambda function to exceed its
execution timeout.

:return: The actual amount of time slept in seconds
"""
assert isinstance(max_seconds, float), max_seconds
assert 0 <= max_seconds <= config.api_gateway_lambda_timeout, max_seconds
remaining_time = self.lambda_context.get_remaining_time_in_millis() / 1000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add validation of type and range of the argument.

# A small buffer is subtracted from the Lambda's remaining time to
# ensure that it wakes up before it runs out of execution time (and
# before API Gateway times out) so it gets a chance to return a response
# to the client.
actual_seconds = min(max_seconds,
remaining_time - config.api_gateway_timeout_padding - 3)
log.debug('Sleeping for %.3f seconds', actual_seconds)
time.sleep(actual_seconds)
return actual_seconds
6 changes: 3 additions & 3 deletions src/azul/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
ClassVar,
)

import attr
import attrs
from botocore.exceptions import (
ClientError,
)
Expand Down Expand Up @@ -91,7 +91,7 @@ def description(self):
return self.fget.__doc__


@attr.s(frozen=True, kw_only=True, auto_attribs=True)
@attrs.frozen(kw_only=True, slots=False)
class HealthController(AppController):
lambda_name: str

Expand Down Expand Up @@ -168,7 +168,7 @@ def _make_response(self, body: JSON) -> Response:
return Response(body=json.dumps(body), status_code=status)


@attr.s(frozen=True, kw_only=True, auto_attribs=True)
@attrs.frozen(kw_only=True, slots=False)
class Health:
"""
Encapsulates information about the health status of an Azul deployment. All
Expand Down
6 changes: 3 additions & 3 deletions src/azul/service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
get_args,
)

import attr
import attrs
from chalice import (
ForbiddenError,
)
Expand Down Expand Up @@ -318,7 +318,7 @@ def key(v):
return cast(FiltersJSON, filters)


@attr.s(auto_attribs=True, kw_only=True, frozen=True)
@attrs.frozen(kw_only=True)
class Filters:
explicit: FiltersJSON
source_ids: set[str]
Expand All @@ -341,7 +341,7 @@ def to_json(self) -> JSON:
}

def update(self, filters: FiltersJSON) -> Self:
return attr.evolve(self, explicit={**self.explicit, **filters})
return attrs.evolve(self, explicit={**self.explicit, **filters})

def reify(self,
plugin: MetadataPlugin,
Expand Down
10 changes: 8 additions & 2 deletions src/azul/service/app_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Mapping,
)

import attr
import attrs
from chalice import (
BadRequestError as BRE,
BadRequestError,
Expand All @@ -29,7 +29,7 @@
)


@attr.s(auto_attribs=True, frozen=True, kw_only=True)
@attrs.frozen(kw_only=True)
class ServiceAppController(AppController):
file_url_func: FileUrlFunc

Expand Down Expand Up @@ -57,6 +57,12 @@ def validate_catalog(catalog):
f'Must be one of {set(config.catalogs)}.')


def validate_wait(wait: str | None):
valid_values = ['0', '1']
if wait not in [None, *valid_values]:
raise BRE(f'Invalid wait value `{wait}`. Must be one of {valid_values}')


class Mandatory:
"""
Validation wrapper signifying that a parameter is mandatory.
Expand Down
4 changes: 2 additions & 2 deletions src/azul/service/catalog_controller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import attr
import attrs

from azul import (
CatalogName,
Expand Down Expand Up @@ -55,7 +55,7 @@ def list_catalogs(self) -> schema.object(
'atlas': catalog.atlas,
'plugins': {
plugin_type: {
**attr.asdict(plugin),
**attrs.asdict(plugin),
**self._plugin_config(plugin_type, catalog.name)
}
for plugin_type, plugin in catalog.plugins.items()
Expand Down
6 changes: 1 addition & 5 deletions src/azul/service/drs_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from datetime import (
datetime,
)
import time
import urllib.parse

from chalice import (
Expand Down Expand Up @@ -184,10 +183,7 @@ def _dos_gs_url(self, file_uuid, version) -> mutable_furl:
return url
elif dss_response.status_code == 301:
url = dss_response.next.url
remaining_lambda_seconds = self.lambda_context.get_remaining_time_in_millis() / 1000
server_side_sleep = min(1,
max(remaining_lambda_seconds - config.api_gateway_timeout_padding - 3, 0))
time.sleep(server_side_sleep)
self.server_side_sleep(1.0)
else:
raise ChaliceViewError({
'msg': f'Received {dss_response.status_code} from DSS. Could not get file'
Expand Down
23 changes: 20 additions & 3 deletions src/azul/service/manifest_controller.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from collections.abc import (
Mapping,
)
from math import (
ceil,
)
from typing import (
TypedDict,
cast,
get_type_hints,
)

import attr
import attrs
from chalice import (
BadRequestError,
ChaliceViewError,
Expand Down Expand Up @@ -76,7 +79,7 @@ class ManifestGenerationState(TypedDict, total=False):
assert manifest_state_key in get_type_hints(ManifestGenerationState)


@attr.s(frozen=True, auto_attribs=True, kw_only=True)
@attrs.frozen(kw_only=True)
class ManifestController(SourceController):
manifest_url_func: ManifestUrlFunc

Expand Down Expand Up @@ -240,10 +243,13 @@ def get_manifest_async(self,
assert False, token_or_result

body: dict[str, int | str | FlatJSON]
wait = query_params.get('wait')

if manifest is None:
assert token is not None
url = self.manifest_url_func(fetch=fetch, token_or_key=token.encode())
url = self.manifest_url_func(fetch=fetch,
token_or_key=token.encode(),
**({} if wait is None else {'wait': wait}))
body = {
'Status': 301,
'Location': str(url),
Expand Down Expand Up @@ -281,6 +287,17 @@ def get_manifest_async(self,
'CommandLine': self.service.command_lines(manifest, url, authentication)
}

if wait is not None:
if wait == '0':
pass
elif wait == '1':
retry_after = body.get('Retry-After')
if retry_after is not None:
time_slept = self.server_side_sleep(float(retry_after))
body['Retry-After'] = ceil(retry_after - time_slept)
else:
assert False, wait

# Note: Response objects returned without a 'Content-Type' header will
# be given one of type 'application/json' as default by Chalice.
# https://aws.github.io/chalice/tutorials/basicrestapi.html#customizing-the-http-response
Expand Down
24 changes: 8 additions & 16 deletions src/azul/service/repository_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
)
import json
import logging
import time
from math import (
ceil,
)
from typing import (
Any,
Callable,
cast,
)

import attr
import attrs
from chalice import (
BadRequestError,
Expand Down Expand Up @@ -58,6 +59,7 @@
Mandatory,
validate_catalog,
validate_params,
validate_wait,
)
from azul.service.elasticsearch_service import (
IndexNotFoundError,
Expand Down Expand Up @@ -220,7 +222,7 @@ def download_file(self,
validate_params(query_params,
catalog=str,
requestIndex=int,
wait=self._validate_wait,
wait=validate_wait,
replica=self._validate_replica,
token=str,
**self._file_param_validators(catalog, request_index))
Expand All @@ -240,7 +242,7 @@ def download_file(self,
if file is None:
raise NotFoundError(f'Unable to find file {file_uuid!r}, '
f'version {file_version!r} in catalog {catalog!r}')
file = attr.evolve(file, **adict(name=file_name, drs_uri=drs_uri))
file = attrs.evolve(file, **adict(name=file_name, drs_uri=drs_uri))
else:
file = self._file_from_request(catalog, file_uuid, query_params)

Expand Down Expand Up @@ -298,14 +300,8 @@ def download_file(self,
if wait == '0':
pass
elif wait == '1':
# Sleep in the lambda but ensure that we wake up before it
# runs out of execution time (and before API Gateway times
# out) so we get a chance to return a response to the client
remaining_time = self.lambda_context.get_remaining_time_in_millis() / 1000
server_side_sleep = min(float(retry_after),
remaining_time - config.api_gateway_timeout_padding - 3)
time.sleep(server_side_sleep)
retry_after = round(retry_after - server_side_sleep)
time_slept = self.server_side_sleep(float(retry_after))
retry_after = ceil(retry_after - time_slept)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change from round to ceil is currently in a commit labeled "Add a default value for the /repository/files wait parameter". Please explain why going from round to ceil is related to adding a default or isolate that change in its own commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved change to commit Fix rounding of /repository/file retry-after value

else:
assert False, wait
query_params = self._file_to_request(download.file) | adict(
Expand Down Expand Up @@ -363,10 +359,6 @@ def field_types(self, catalog: CatalogName) -> Mapping[str, FieldType]:
result[accessible] = pass_thru_bool
return result

def _validate_wait(self, wait: str | None):
if wait not in ('0', '1', None):
raise ValueError

def _validate_replica(self, replica: str):
if replica not in ('aws', 'gcp'):
raise ValueError
Expand Down
34 changes: 28 additions & 6 deletions test/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
BytesIO,
TextIOWrapper,
)
from itertools import (
chain,
product,
)
import json
import os
from pathlib import (
Expand Down Expand Up @@ -558,13 +562,24 @@ def _test_other_endpoints(self):
def _test_manifest(self, catalog: CatalogName):
supported_formats = self.metadata_plugin(catalog).manifest_formats
assert supported_formats
for format in [None, *supported_formats]:
for curl, wait, format in chain(
product([False], [None], [None, *supported_formats]),
product([True], [None, 0, 1], [None, *supported_formats])
):
filters = self._manifest_filters(catalog)
execution_ids = set()
coin_flip = bool(self.random.getrandbits(1))
for i, fetch in enumerate([coin_flip, coin_flip, not coin_flip]):
with self.subTest('manifest', catalog=catalog, format=format, i=i, fetch=fetch):
args = dict(catalog=catalog, filters=json.dumps(filters))
if curl:
coin_flip = False
fetch_modes = [coin_flip]
else:
coin_flip = bool(self.random.getrandbits(1))
fetch_modes = [coin_flip, coin_flip, not coin_flip]
for i, fetch in enumerate(fetch_modes):
with self.subTest('manifest', catalog=catalog, format=format,
i=i, fetch=fetch, curl=curl, wait=wait):
args = dict(catalog=catalog,
filters=json.dumps(filters),
**({} if wait is None else {'wait': wait}))
if format is None:
format = first(supported_formats)
else:
Expand All @@ -591,7 +606,10 @@ def get_url(*args, **kwargs):
# resilience against DOS attacks.

def worker(_):
response = self._check_endpoint(PUT, '/manifest/files', args=args, fetch=fetch)
response = self._check_endpoint(POST if curl else PUT,
'/manifest/files',
args=args,
fetch=fetch)
self._manifest_validators[format](catalog, response)

# FIXME: Set number of workers back to 3
Expand Down Expand Up @@ -891,6 +909,10 @@ def _get_url_content(self, method: str, url: furl) -> bytes:
retry_after = response.headers.get('Retry-After')
if retry_after is not None:
retry_after = float(retry_after)
if url.args.get('wait') == 1:
# The wait should have happened server-side and been
# subtracted from the retry-after that was returned.
self.assertEqual(0.0, retry_after)
log.info('Sleeping %.3fs to honor Retry-After header', retry_after)
time.sleep(retry_after)
url = furl(response.headers['Location'])
Expand Down
Loading