diff --git a/.flake8 b/.flake8 index 4e487c57..76ddaa85 100644 --- a/.flake8 +++ b/.flake8 @@ -10,8 +10,9 @@ select = A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z,B901,B902,B903,B950 # W503: Mutually exclusive with W504. ignore = E226,E501,E722,W503 per-file-ignores = + # B011: assert False used for coverage skipping # S101: Pytest uses assert - tests/*:S101 + tests/*:B011,S101 # flake8-import-order application-import-names = aiohttp_security diff --git a/aiohttp_security/jwt_identity.py b/aiohttp_security/jwt_identity.py index 6edcf69e..b322cda9 100644 --- a/aiohttp_security/jwt_identity.py +++ b/aiohttp_security/jwt_identity.py @@ -2,7 +2,7 @@ """ -from typing import Optional +from typing import Optional, Tuple, Type from aiohttp import web @@ -11,14 +11,23 @@ try: import jwt HAS_JWT = True + _bases_error: Tuple[Type[jwt.exceptions.PyJWTError], ...] + _bases_error = (jwt.exceptions.PyJWTError,) except ImportError: # pragma: no cover HAS_JWT = False + _bases_error = () AUTH_HEADER_NAME = 'Authorization' AUTH_SCHEME = 'Bearer ' +# This class inherits from ValueError to maintain backward compatibility +# with previous versions of aiohttp-security +class InvalidAuthorizationScheme(ValueError, *_bases_error): # type: ignore[misc] + """Exception when the auth method can't be read from header.""" + + class JWTIdentityPolicy(AbstractIdentityPolicy): def __init__(self, secret: str, algorithm: str = "HS256", key: str = "login"): if not HAS_JWT: @@ -34,8 +43,8 @@ async def identify(self, request: web.Request) -> Optional[str]: return None if not header_identity.startswith(AUTH_SCHEME): - raise ValueError("Invalid authorization scheme. " - + "Should be `{}`".format(AUTH_SCHEME)) + raise InvalidAuthorizationScheme("Invalid authorization scheme. " + "Should be `{}`".format(AUTH_SCHEME)) token = header_identity.split(' ')[1].strip() diff --git a/tests/test_jwt_identity.py b/tests/test_jwt_identity.py index b4ed430c..0b9cd207 100644 --- a/tests/test_jwt_identity.py +++ b/tests/test_jwt_identity.py @@ -80,3 +80,28 @@ async def check(request): resp = await client.get('/', headers=headers) assert 400 == resp.status assert 'Invalid authorization scheme' in resp.reason + + +async def test_identify_expired_signature(make_token, aiohttp_client): + kwt_secret_key = "Key" # noqa: S105 + + token = make_token({"login": "Andrew", "exp": 0}, kwt_secret_key) + + async def check(request): + policy = request.app[IDENTITY_KEY] + try: + await policy.identify(request) + except jwt.exceptions.PyJWTError as exc: + raise web.HTTPBadRequest(reason=str(exc)) + + assert False + + app = web.Application() + _setup(app, JWTIdentityPolicy(kwt_secret_key), Autz()) + app.router.add_route("GET", "/", check) + + client = await aiohttp_client(app) + headers = {"Authorization": "Bearer {}".format(token)} + resp = await client.get("/", headers=headers) + assert 400 == resp.status + assert "Signature has expired" in resp.reason