|
| 1 | +from typing import List |
1 | 2 | from typing import Optional
|
2 | 3 | from typing import Tuple
|
3 | 4 | from typing import Union
|
4 | 5 |
|
5 | 6 | from fastapi.security.utils import get_authorization_scheme_param
|
6 |
| -from starlette.authentication import AuthCredentials |
7 | 7 | from starlette.authentication import AuthenticationBackend
|
8 | 8 | from starlette.middleware.authentication import AuthenticationMiddleware
|
9 | 9 | from starlette.requests import Request
|
|
16 | 16 | from .utils import jwt_decode
|
17 | 17 |
|
18 | 18 |
|
| 19 | +class Auth: |
| 20 | + scopes: List[str] |
| 21 | + |
| 22 | + def __init__(self, scopes: Optional[List[str]] = None) -> None: |
| 23 | + self.scopes = scopes or [] |
| 24 | + |
| 25 | + |
| 26 | +class User(dict): |
| 27 | + is_authenticated: bool |
| 28 | + |
| 29 | + def __init__(self, seq: Optional[dict] = None, **kwargs) -> None: |
| 30 | + self.is_authenticated = seq is not None |
| 31 | + super().__init__(seq or {}, **kwargs) |
| 32 | + |
| 33 | + |
19 | 34 | class OAuth2Backend(AuthenticationBackend):
|
20 |
| - async def authenticate(self, request: Request) -> Optional[Tuple["AuthCredentials", Optional[dict]]]: |
| 35 | + async def authenticate(self, request: Request) -> Optional[Tuple["Auth", "User"]]: |
21 | 36 | authorization = request.cookies.get("Authorization")
|
22 | 37 | scheme, param = get_authorization_scheme_param(authorization)
|
23 | 38 |
|
24 | 39 | if not scheme or not param:
|
25 |
| - return AuthCredentials(), None |
| 40 | + return Auth(), User() |
26 | 41 |
|
27 |
| - access_token = jwt_decode(param) |
28 |
| - scope = access_token.pop("scope") |
29 |
| - return AuthCredentials(scope), access_token |
| 42 | + user = jwt_decode(param) |
| 43 | + scopes = user.pop("scope") |
| 44 | + return Auth(scopes), User(user) |
30 | 45 |
|
31 | 46 |
|
32 | 47 | class OAuth2Middleware:
|
|
0 commit comments