|
| 1 | +"""Implements RFC7009""" |
| 2 | + |
| 3 | +import logging |
| 4 | + |
| 5 | +from idpyoidc.exception import ImproperlyConfigured |
| 6 | +from idpyoidc.message import oauth2 |
| 7 | +from idpyoidc.server.endpoint import Endpoint |
| 8 | +from idpyoidc.server.token.exception import UnknownToken |
| 9 | +from idpyoidc.server.token.exception import WrongTokenClass |
| 10 | +from idpyoidc.util import importer |
| 11 | + |
| 12 | +logger = logging.getLogger(__name__) |
| 13 | + |
| 14 | + |
| 15 | +class TokenRevocation(Endpoint): |
| 16 | + """Implements RFC7009""" |
| 17 | + |
| 18 | + request_cls = oauth2.TokenRevocationRequest |
| 19 | + response_cls = oauth2.TokenRevocationResponse |
| 20 | + error_cls = oauth2.TokenRevocationErrorResponse |
| 21 | + request_format = "urlencoded" |
| 22 | + response_format = "json" |
| 23 | + endpoint_name = "revocation_endpoint" |
| 24 | + name = "token_revocation" |
| 25 | + default_capabilities = { |
| 26 | + "client_authn_method": [ |
| 27 | + "client_secret_basic", |
| 28 | + "client_secret_post", |
| 29 | + "client_secret_jwt", |
| 30 | + "bearer_header", |
| 31 | + "private_key_jwt", |
| 32 | + ] |
| 33 | + } |
| 34 | + |
| 35 | + token_types_supported = ["authorization_code", "access_token", "refresh_token"] |
| 36 | + |
| 37 | + def __init__(self, upstream_get, **kwargs): |
| 38 | + Endpoint.__init__(self, upstream_get, **kwargs) |
| 39 | + self.token_revocation_kwargs = kwargs |
| 40 | + |
| 41 | + def get_client_id_from_token(self, endpoint_context, token, request=None): |
| 42 | + _info = endpoint_context.session_manager.get_session_info_by_token( |
| 43 | + token, handler_key="access_token" |
| 44 | + ) |
| 45 | + return _info["client_id"] |
| 46 | + |
| 47 | + def process_request(self, request=None, **kwargs): |
| 48 | + """ |
| 49 | + :param request: The revocation request as a dictionary |
| 50 | + :param kwargs: |
| 51 | + :return: |
| 52 | + """ |
| 53 | + _revoke_request = self.request_cls(**request) |
| 54 | + if "error" in _revoke_request: |
| 55 | + return _revoke_request |
| 56 | + |
| 57 | + request_token = _revoke_request["token"] |
| 58 | + _resp = self.response_cls() |
| 59 | + _context = self.upstream_get("endpoint_context") |
| 60 | + logger.debug("Token Revocation") |
| 61 | + |
| 62 | + try: |
| 63 | + _session_info = _context.session_manager.get_session_info_by_token( |
| 64 | + request_token, grant=True |
| 65 | + ) |
| 66 | + except (UnknownToken, WrongTokenClass): |
| 67 | + return {"response_args": _resp} |
| 68 | + |
| 69 | + client_id = _session_info["client_id"] |
| 70 | + if client_id != _revoke_request["client_id"]: |
| 71 | + logger.debug("{} owner of token".format(client_id)) |
| 72 | + logger.warning("Client using token it was not given") |
| 73 | + return self.error_cls(error="invalid_grant", error_description="Wrong client") |
| 74 | + |
| 75 | + grant = _session_info["grant"] |
| 76 | + _token = grant.get_token(request_token) |
| 77 | + |
| 78 | + try: |
| 79 | + self.token_types_supported = _context.cdb[client_id]["token_revocation"][ |
| 80 | + "token_types_supported"] |
| 81 | + except Exception: |
| 82 | + self.token_types_supported = self.token_revocation_kwargs.get("token_types_supported", |
| 83 | + self.token_types_supported) |
| 84 | + |
| 85 | + try: |
| 86 | + self.policy = _context.cdb[client_id]["token_revocation"]["policy"] |
| 87 | + except Exception: |
| 88 | + self.policy = self.token_revocation_kwargs.get("policy", { |
| 89 | + "": {"callable": validate_token_revocation_policy}}) |
| 90 | + |
| 91 | + if _token.token_class not in self.token_types_supported: |
| 92 | + desc = ( |
| 93 | + "The authorization server does not support the revocation of " |
| 94 | + "the presented token type. That is, the client tried to revoke an access " |
| 95 | + "token on a server not supporting this feature." |
| 96 | + ) |
| 97 | + return self.error_cls(error="unsupported_token_type", error_description=desc) |
| 98 | + |
| 99 | + return self._revoke(_revoke_request, _session_info) |
| 100 | + |
| 101 | + def _revoke(self, request, session_info): |
| 102 | + _context = self.upstream_get("endpoint_context") |
| 103 | + _mngr = _context.session_manager |
| 104 | + _token = _mngr.find_token(session_info["branch_id"], request["token"]) |
| 105 | + |
| 106 | + _cls = _token.token_class |
| 107 | + if _cls not in self.policy: |
| 108 | + _cls = "" |
| 109 | + |
| 110 | + temp_policy = self.policy[_cls] |
| 111 | + callable = temp_policy["callable"] |
| 112 | + kwargs = temp_policy.get("kwargs", {}) |
| 113 | + |
| 114 | + if isinstance(callable, str): |
| 115 | + try: |
| 116 | + fn = importer(callable) |
| 117 | + except Exception: |
| 118 | + raise ImproperlyConfigured(f"Error importing {callable} policy callable") |
| 119 | + else: |
| 120 | + fn = callable |
| 121 | + |
| 122 | + try: |
| 123 | + return fn(_token, session_info=session_info, **kwargs) |
| 124 | + except Exception as e: |
| 125 | + logger.error(f"Error while executing the {fn} policy callable: {e}") |
| 126 | + return self.error_cls(error="server_error", error_description="Internal server error") |
| 127 | + |
| 128 | + |
| 129 | +def validate_token_revocation_policy(token, session_info, **kwargs): |
| 130 | + _token = token |
| 131 | + _token.revoke() |
| 132 | + |
| 133 | + response_args = {"response_args": {}} |
| 134 | + return oauth2.TokenRevocationResponse(**response_args) |
0 commit comments