Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ select = A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z,B901,B902,B903,B950
# W503: Mutually exclusive with W504.
ignore = E226,E501,E722,W503
per-file-ignores =
# B011: assert False used for coverage skipping
# S101: Pytest uses assert
tests/*:S101
tests/*:B011,S101

# flake8-import-order
application-import-names = aiohttp_security
Expand Down
15 changes: 12 additions & 3 deletions aiohttp_security/jwt_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

"""

from typing import Optional
from typing import Optional, Tuple, Type

from aiohttp import web

Expand All @@ -11,14 +11,23 @@
try:
import jwt
HAS_JWT = True
_bases_error: Tuple[Type[jwt.exceptions.PyJWTError], ...]
_bases_error = (jwt.exceptions.PyJWTError,)
except ImportError: # pragma: no cover
HAS_JWT = False
_bases_error = ()


AUTH_HEADER_NAME = 'Authorization'
AUTH_SCHEME = 'Bearer '


# This class inherits from ValueError to maintain backward compatibility
# with previous versions of aiohttp-security
class InvalidAuthorizationScheme(ValueError, *_bases_error): # type: ignore[misc]
"""Exception when the auth method can't be read from header."""


class JWTIdentityPolicy(AbstractIdentityPolicy):
def __init__(self, secret: str, algorithm: str = "HS256", key: str = "login"):
if not HAS_JWT:
Expand All @@ -34,8 +43,8 @@ async def identify(self, request: web.Request) -> Optional[str]:
return None

if not header_identity.startswith(AUTH_SCHEME):
raise ValueError("Invalid authorization scheme. "
+ "Should be `{}<token>`".format(AUTH_SCHEME))
raise InvalidAuthorizationScheme("Invalid authorization scheme. "
"Should be `{}<token>`".format(AUTH_SCHEME))

token = header_identity.split(' ')[1].strip()

Expand Down
25 changes: 25 additions & 0 deletions tests/test_jwt_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,28 @@
resp = await client.get('/', headers=headers)
assert 400 == resp.status
assert 'Invalid authorization scheme' in resp.reason


async def test_identify_expired_signature(make_token, aiohttp_client):
kwt_secret_key = "Key" # noqa: S105

token = make_token({"login": "Andrew", "exp": 0}, kwt_secret_key)

async def check(request):
policy = request.app[IDENTITY_KEY]
try:
await policy.identify(request)
except jwt.exceptions.PyJWTError as exc:
raise web.HTTPBadRequest(reason=str(exc))

assert False

Check warning on line 97 in tests/test_jwt_identity.py

View check run for this annotation

Codecov / codecov/patch

tests/test_jwt_identity.py#L97

Added line #L97 was not covered by tests

app = web.Application()
_setup(app, JWTIdentityPolicy(kwt_secret_key), Autz())
app.router.add_route("GET", "/", check)

client = await aiohttp_client(app)
headers = {"Authorization": "Bearer {}".format(token)}
resp = await client.get("/", headers=headers)
assert 400 == resp.status
assert "Signature has expired" in resp.reason
Loading