|
| 1 | +import time |
| 2 | +import binascii |
| 3 | +import base64 |
| 4 | +import uuid |
| 5 | +import logging |
| 6 | + |
| 7 | +import jwt |
| 8 | + |
| 9 | + |
| 10 | +logger = logging.getLogger(__file__) |
| 11 | + |
| 12 | +class Signer(object): |
| 13 | + def sign_assertion( |
| 14 | + self, audience, issuer, subject, expires_at, |
| 15 | + issued_at=None, assertion_id=None, **kwargs): |
| 16 | + # Names are defined in https://tools.ietf.org/html/rfc7521#section-5 |
| 17 | + raise NotImplementedError("Will be implemented by sub-class") |
| 18 | + |
| 19 | + |
| 20 | +class JwtSigner(Signer): |
| 21 | + def __init__(self, key, algorithm, sha1_thumbprint=None, headers=None): |
| 22 | + """Create a signer. |
| 23 | +
|
| 24 | + Args: |
| 25 | +
|
| 26 | + key (str): The key for signing, e.g. a base64 encoded private key. |
| 27 | + algorithm (str): |
| 28 | + "RS256", etc.. See https://pyjwt.readthedocs.io/en/latest/algorithms.html |
| 29 | + RSA and ECDSA algorithms require "pip install cryptography". |
| 30 | + sha1_thumbprint (str): The x5t aka X.509 certificate SHA-1 thumbprint. |
| 31 | + headers (dict): Additional headers, e.g. "kid" or "x5c" etc. |
| 32 | + """ |
| 33 | + self.key = key |
| 34 | + self.algorithm = algorithm |
| 35 | + self.headers = headers or {} |
| 36 | + if sha1_thumbprint: # https://tools.ietf.org/html/rfc7515#section-4.1.7 |
| 37 | + self.headers["x5t"] = base64.urlsafe_b64encode( |
| 38 | + binascii.a2b_hex(sha1_thumbprint)).decode() |
| 39 | + |
| 40 | + def sign_assertion( |
| 41 | + self, audience, issuer, subject=None, expires_at=None, |
| 42 | + issued_at=None, assertion_id=None, not_before=None, |
| 43 | + additional_claims=None, **kwargs): |
| 44 | + """Sign a JWT Assertion. |
| 45 | +
|
| 46 | + Parameters are defined in https://tools.ietf.org/html/rfc7523#section-3 |
| 47 | + Key-value pairs in additional_claims will be added into payload as-is. |
| 48 | + """ |
| 49 | + now = time.time() |
| 50 | + payload = { |
| 51 | + 'aud': audience, |
| 52 | + 'iss': issuer, |
| 53 | + 'sub': subject or issuer, |
| 54 | + 'exp': expires_at or (now + 10*60), # 10 minutes |
| 55 | + 'iat': issued_at or now, |
| 56 | + 'jti': assertion_id or str(uuid.uuid4()), |
| 57 | + } |
| 58 | + if not_before: |
| 59 | + payload['nbf'] = not_before |
| 60 | + payload.update(additional_claims or {}) |
| 61 | + try: |
| 62 | + return jwt.encode( |
| 63 | + payload, self.key, algorithm=self.algorithm, headers=self.headers) |
| 64 | + except: |
| 65 | + if self.algorithm.startswith("RS") or self.algorithm.starswith("ES"): |
| 66 | + logger.exception( |
| 67 | + 'Some algorithms requires "pip install cryptography". ' |
| 68 | + 'See https://pyjwt.readthedocs.io/en/latest/installation.html#cryptographic-dependencies-optional') |
| 69 | + raise |
| 70 | + |
0 commit comments