diff --git a/google/auth/_service_account_info.py b/google/auth/_service_account_info.py index 6b64adcae..c432080a9 100644 --- a/google/auth/_service_account_info.py +++ b/google/auth/_service_account_info.py @@ -56,7 +56,7 @@ def from_dict(data, require=None, use_rsa_signer=True): if use_rsa_signer: signer = crypt.RSASigner.from_service_account_info(data) else: - signer = crypt.ES256Signer.from_service_account_info(data) + signer = crypt.EsSigner.from_service_account_info(data) return signer diff --git a/google/auth/crypt/__init__.py b/google/auth/crypt/__init__.py index 6d147e706..59519b475 100644 --- a/google/auth/crypt/__init__.py +++ b/google/auth/crypt/__init__.py @@ -40,13 +40,19 @@ from google.auth.crypt import base from google.auth.crypt import rsa +# google.auth.crypt.es depends on the crytpography module which may not be +# successfully imported depending on the system. try: + from google.auth.crypt import es from google.auth.crypt import es256 except ImportError: # pragma: NO COVER + es = None # type: ignore es256 = None # type: ignore -if es256 is not None: # pragma: NO COVER +if es is not None and es256 is not None: # pragma: NO COVER __all__ = [ + "EsSigner", + "EsVerifier", "ES256Signer", "ES256Verifier", "RSASigner", @@ -54,6 +60,11 @@ "Signer", "Verifier", ] + + EsSigner = es.EsSigner + EsVerifier = es.EsVerifier + ES256Signer = es256.ES256Signer + ES256Verifier = es256.ES256Verifier else: # pragma: NO COVER __all__ = ["RSASigner", "RSAVerifier", "Signer", "Verifier"] @@ -65,10 +76,6 @@ RSASigner = rsa.RSASigner RSAVerifier = rsa.RSAVerifier -if es256 is not None: # pragma: NO COVER - ES256Signer = es256.ES256Signer - ES256Verifier = es256.ES256Verifier - def verify_signature(message, signature, certs, verifier_cls=rsa.RSAVerifier): """Verify an RSA or ECDSA cryptographic signature. diff --git a/google/auth/crypt/es.py b/google/auth/crypt/es.py new file mode 100644 index 000000000..f9466af3c --- /dev/null +++ b/google/auth/crypt/es.py @@ -0,0 +1,221 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ECDSA verifier and signer that use the ``cryptography`` library. +""" + +from dataclasses import dataclass +from typing import Any, Dict, Optional, Union + +import cryptography.exceptions +from cryptography.hazmat import backends +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature +from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature +import cryptography.x509 + +from google.auth import _helpers +from google.auth.crypt import base + + +_CERTIFICATE_MARKER = b"-----BEGIN CERTIFICATE-----" +_BACKEND = backends.default_backend() +_PADDING = padding.PKCS1v15() + + +@dataclass +class _ESAttributes: + """A class that models ECDSA attributes. + + Attributes: + rs_size (int): Size for ASN.1 r and s size. + sha_algo (hashes.HashAlgorithm): Hash algorithm. + algorithm (str): Algorithm name. + """ + + rs_size: int + sha_algo: hashes.HashAlgorithm + algorithm: str + + @classmethod + def from_key( + cls, key: Union[ec.EllipticCurvePublicKey, ec.EllipticCurvePrivateKey] + ): + return cls.from_curve(key.curve) + + @classmethod + def from_curve(cls, curve: ec.EllipticCurve): + # ECDSA raw signature has (r||s) format where r,s are two + # integers of size 32 bytes for P-256 curve and 48 bytes + # for P-384 curve. For P-256 curve, we use SHA256 hash algo, + # and for P-384 curve we use SHA384 algo. + if isinstance(curve, ec.SECP384R1): + return cls(48, hashes.SHA384(), "ES384") + else: + # default to ES256 + return cls(32, hashes.SHA256(), "ES256") + + +class EsVerifier(base.Verifier): + """Verifies ECDSA cryptographic signatures using public keys. + + Args: + public_key ( + cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey): + The public key used to verify signatures. + """ + + def __init__(self, public_key: ec.EllipticCurvePublicKey) -> None: + self._pubkey = public_key + self._attributes = _ESAttributes.from_key(public_key) + + @_helpers.copy_docstring(base.Verifier) + def verify(self, message: bytes, signature: bytes) -> bool: + # First convert (r||s) raw signature to ASN1 encoded signature. + sig_bytes = _helpers.to_bytes(signature) + if len(sig_bytes) != self._attributes.rs_size * 2: + return False + r = int.from_bytes(sig_bytes[: self._attributes.rs_size], byteorder="big") + s = int.from_bytes(sig_bytes[self._attributes.rs_size :], byteorder="big") + asn1_sig = encode_dss_signature(r, s) + + message = _helpers.to_bytes(message) + try: + self._pubkey.verify(asn1_sig, message, ec.ECDSA(self._attributes.sha_algo)) + return True + except (ValueError, cryptography.exceptions.InvalidSignature): + return False + + @classmethod + def from_string(cls, public_key: Union[str, bytes]) -> "EsVerifier": + """Construct an Verifier instance from a public key or public + certificate string. + + Args: + public_key (Union[str, bytes]): The public key in PEM format or the + x509 public key certificate. + + Returns: + Verifier: The constructed verifier. + + Raises: + ValueError: If the public key can't be parsed. + """ + public_key_data = _helpers.to_bytes(public_key) + + if _CERTIFICATE_MARKER in public_key_data: + cert = cryptography.x509.load_pem_x509_certificate( + public_key_data, _BACKEND + ) + pubkey = cert.public_key() # type: Any + + else: + pubkey = serialization.load_pem_public_key(public_key_data, _BACKEND) + + if not isinstance(pubkey, ec.EllipticCurvePublicKey): + raise TypeError("Expected public key of type EllipticCurvePublicKey") + + return cls(pubkey) + + +class EsSigner(base.Signer, base.FromServiceAccountMixin): + """Signs messages with an ECDSA private key. + + Args: + private_key ( + cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey): + The private key to sign with. + key_id (str): Optional key ID used to identify this private key. This + can be useful to associate the private key with its associated + public key or certificate. + """ + + def __init__( + self, private_key: ec.EllipticCurvePrivateKey, key_id: Optional[str] = None + ) -> None: + self._key = private_key + self._key_id = key_id + self._attributes = _ESAttributes.from_key(private_key) + + @property + def algorithm(self) -> str: + """Name of the algorithm used to sign messages. + Returns: + str: The algorithm name. + """ + return self._attributes.algorithm + + @property # type: ignore + @_helpers.copy_docstring(base.Signer) + def key_id(self) -> Optional[str]: + return self._key_id + + @_helpers.copy_docstring(base.Signer) + def sign(self, message: bytes) -> bytes: + message = _helpers.to_bytes(message) + asn1_signature = self._key.sign(message, ec.ECDSA(self._attributes.sha_algo)) + + # Convert ASN1 encoded signature to (r||s) raw signature. + (r, s) = decode_dss_signature(asn1_signature) + return r.to_bytes(self._attributes.rs_size, byteorder="big") + s.to_bytes( + self._attributes.rs_size, byteorder="big" + ) + + @classmethod + def from_string( + cls, key: Union[bytes, str], key_id: Optional[str] = None + ) -> "EsSigner": + """Construct a RSASigner from a private key in PEM format. + + Args: + key (Union[bytes, str]): Private key in PEM format. + key_id (str): An optional key id used to identify the private key. + + Returns: + google.auth.crypt._cryptography_rsa.RSASigner: The + constructed signer. + + Raises: + ValueError: If ``key`` is not ``bytes`` or ``str`` (unicode). + UnicodeDecodeError: If ``key`` is ``bytes`` but cannot be decoded + into a UTF-8 ``str``. + ValueError: If ``cryptography`` "Could not deserialize key data." + """ + key_bytes = _helpers.to_bytes(key) + private_key = serialization.load_pem_private_key( + key_bytes, password=None, backend=_BACKEND + ) + + if not isinstance(private_key, ec.EllipticCurvePrivateKey): + raise TypeError("Expected private key of type EllipticCurvePrivateKey") + + return cls(private_key, key_id=key_id) + + def __getstate__(self) -> Dict[str, Any]: + """Pickle helper that serializes the _key attribute.""" + state = self.__dict__.copy() + state["_key"] = self._key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + return state + + def __setstate__(self, state: Dict[str, Any]) -> None: + """Pickle helper that deserializes the _key attribute.""" + state["_key"] = serialization.load_pem_private_key(state["_key"], None) + self.__dict__.update(state) diff --git a/google/auth/crypt/es256.py b/google/auth/crypt/es256.py index 820e4becc..e7bda5d3f 100644 --- a/google/auth/crypt/es256.py +++ b/google/auth/crypt/es256.py @@ -15,93 +15,22 @@ """ECDSA (ES256) verifier and signer that use the ``cryptography`` library. """ -from cryptography import utils # type: ignore -import cryptography.exceptions -from cryptography.hazmat import backends -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import ec -from cryptography.hazmat.primitives.asymmetric import padding -from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature -from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature -import cryptography.x509 +from google.auth.crypt.es import EsSigner +from google.auth.crypt.es import EsVerifier -from google.auth import _helpers -from google.auth.crypt import base - -_CERTIFICATE_MARKER = b"-----BEGIN CERTIFICATE-----" -_BACKEND = backends.default_backend() -_PADDING = padding.PKCS1v15() - - -class ES256Verifier(base.Verifier): +class ES256Verifier(EsVerifier): """Verifies ECDSA cryptographic signatures using public keys. Args: - public_key ( - cryptography.hazmat.primitives.asymmetric.ec.ECDSAPublicKey): - The public key used to verify signatures. + public_key (cryptography.hazmat.primitives.asymmetric.ec.ECDSAPublicKey): The public key used to verify + signatures. """ - def __init__(self, public_key): - self._pubkey = public_key - - @_helpers.copy_docstring(base.Verifier) - def verify(self, message, signature): - # First convert (r||s) raw signature to ASN1 encoded signature. - sig_bytes = _helpers.to_bytes(signature) - if len(sig_bytes) != 64: - return False - r = ( - int.from_bytes(sig_bytes[:32], byteorder="big") - if _helpers.is_python_3() - else utils.int_from_bytes(sig_bytes[:32], byteorder="big") - ) - s = ( - int.from_bytes(sig_bytes[32:], byteorder="big") - if _helpers.is_python_3() - else utils.int_from_bytes(sig_bytes[32:], byteorder="big") - ) - asn1_sig = encode_dss_signature(r, s) - - message = _helpers.to_bytes(message) - try: - self._pubkey.verify(asn1_sig, message, ec.ECDSA(hashes.SHA256())) - return True - except (ValueError, cryptography.exceptions.InvalidSignature): - return False - - @classmethod - def from_string(cls, public_key): - """Construct an Verifier instance from a public key or public - certificate string. - - Args: - public_key (Union[str, bytes]): The public key in PEM format or the - x509 public key certificate. - - Returns: - Verifier: The constructed verifier. - - Raises: - ValueError: If the public key can't be parsed. - """ - public_key_data = _helpers.to_bytes(public_key) - - if _CERTIFICATE_MARKER in public_key_data: - cert = cryptography.x509.load_pem_x509_certificate( - public_key_data, _BACKEND - ) - pubkey = cert.public_key() - - else: - pubkey = serialization.load_pem_public_key(public_key_data, _BACKEND) + pass - return cls(pubkey) - -class ES256Signer(base.Signer, base.FromServiceAccountMixin): +class ES256Signer(EsSigner): """Signs messages with an ECDSA private key. Args: @@ -113,63 +42,4 @@ class ES256Signer(base.Signer, base.FromServiceAccountMixin): public key or certificate. """ - def __init__(self, private_key, key_id=None): - self._key = private_key - self._key_id = key_id - - @property # type: ignore - @_helpers.copy_docstring(base.Signer) - def key_id(self): - return self._key_id - - @_helpers.copy_docstring(base.Signer) - def sign(self, message): - message = _helpers.to_bytes(message) - asn1_signature = self._key.sign(message, ec.ECDSA(hashes.SHA256())) - - # Convert ASN1 encoded signature to (r||s) raw signature. - (r, s) = decode_dss_signature(asn1_signature) - return ( - (r.to_bytes(32, byteorder="big") + s.to_bytes(32, byteorder="big")) - if _helpers.is_python_3() - else (utils.int_to_bytes(r, 32) + utils.int_to_bytes(s, 32)) - ) - - @classmethod - def from_string(cls, key, key_id=None): - """Construct a RSASigner from a private key in PEM format. - - Args: - key (Union[bytes, str]): Private key in PEM format. - key_id (str): An optional key id used to identify the private key. - - Returns: - google.auth.crypt._cryptography_rsa.RSASigner: The - constructed signer. - - Raises: - ValueError: If ``key`` is not ``bytes`` or ``str`` (unicode). - UnicodeDecodeError: If ``key`` is ``bytes`` but cannot be decoded - into a UTF-8 ``str``. - ValueError: If ``cryptography`` "Could not deserialize key data." - """ - key = _helpers.to_bytes(key) - private_key = serialization.load_pem_private_key( - key, password=None, backend=_BACKEND - ) - return cls(private_key, key_id=key_id) - - def __getstate__(self): - """Pickle helper that serializes the _key attribute.""" - state = self.__dict__.copy() - state["_key"] = self._key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption(), - ) - return state - - def __setstate__(self, state): - """Pickle helper that deserializes the _key attribute.""" - state["_key"] = serialization.load_pem_private_key(state["_key"], None) - self.__dict__.update(state) + pass diff --git a/google/auth/jwt.py b/google/auth/jwt.py index 1ebd565d4..9b79f173b 100644 --- a/google/auth/jwt.py +++ b/google/auth/jwt.py @@ -59,17 +59,18 @@ import google.auth.credentials try: - from google.auth.crypt import es256 + from google.auth.crypt import es except ImportError: # pragma: NO COVER - es256 = None # type: ignore + es = None # type: ignore _DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds _DEFAULT_MAX_CACHE_SIZE = 10 _ALGORITHM_TO_VERIFIER_CLASS = {"RS256": crypt.RSAVerifier} -_CRYPTOGRAPHY_BASED_ALGORITHMS = frozenset(["ES256"]) +_CRYPTOGRAPHY_BASED_ALGORITHMS = frozenset(["ES256", "ES384"]) -if es256 is not None: # pragma: NO COVER - _ALGORITHM_TO_VERIFIER_CLASS["ES256"] = es256.ES256Verifier # type: ignore +if es is not None: # pragma: NO COVER + _ALGORITHM_TO_VERIFIER_CLASS["ES256"] = es.EsVerifier # type: ignore + _ALGORITHM_TO_VERIFIER_CLASS["ES384"] = es.EsVerifier # type: ignore def encode(signer, payload, header=None, key_id=None): @@ -95,8 +96,8 @@ def encode(signer, payload, header=None, key_id=None): header.update({"typ": "JWT"}) if "alg" not in header: - if es256 is not None and isinstance(signer, es256.ES256Signer): - header.update({"alg": "ES256"}) + if es is not None and isinstance(signer, es.EsSigner): + header.update({"alg": signer.algorithm}) else: header.update({"alg": "RS256"}) diff --git a/tests/crypt/test_es.py b/tests/crypt/test_es.py new file mode 100644 index 000000000..3a62c1413 --- /dev/null +++ b/tests/crypt/test_es.py @@ -0,0 +1,173 @@ +# Copyright 2016 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import json +import os +import pickle + +from cryptography.hazmat.primitives.asymmetric import ec +import pytest # type: ignore + +from google.auth import _helpers +from google.auth.crypt import base +from google.auth.crypt import es + + +DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data") + +# To generate es384_privatekey.pem, es384_privatekey.pub, and +# es384_public_cert.pem: +# $ openssl ecparam -genkey -name secp384r1 -noout -out es384_privatekey.pem +# $ openssl ec -in es384_privatekey.pem -pubout -out es384_publickey.pem +# $ openssl req -new -x509 -key es384_privatekey.pem -out \ +# > es384_public_cert.pem + +with open(os.path.join(DATA_DIR, "es384_privatekey.pem"), "rb") as fh: + PRIVATE_KEY_BYTES = fh.read() + PKCS1_KEY_BYTES = PRIVATE_KEY_BYTES + +with open(os.path.join(DATA_DIR, "es384_publickey.pem"), "rb") as fh: + PUBLIC_KEY_BYTES = fh.read() + +with open(os.path.join(DATA_DIR, "es384_public_cert.pem"), "rb") as fh: + PUBLIC_CERT_BYTES = fh.read() + +# RSA keys used to test for type errors in EsVerifier and EsSigner. +with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh: + RSA_PRIVATE_KEY_BYTES = fh.read() + RSA_PKCS1_KEY_BYTES = RSA_PRIVATE_KEY_BYTES + +with open(os.path.join(DATA_DIR, "privatekey.pub"), "rb") as fh: + RSA_PUBLIC_KEY_BYTES = fh.read() + +SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "es384_service_account.json") + +with open(SERVICE_ACCOUNT_JSON_FILE, "rb") as fh: + SERVICE_ACCOUNT_INFO = json.load(fh) + + +class TestEsVerifier(object): + def test_verify_success(self): + to_sign = b"foo" + signer = es.EsSigner.from_string(PRIVATE_KEY_BYTES) + actual_signature = signer.sign(to_sign) + + verifier = es.EsVerifier.from_string(PUBLIC_KEY_BYTES) + assert verifier.verify(to_sign, actual_signature) + + def test_verify_unicode_success(self): + to_sign = u"foo" + signer = es.EsSigner.from_string(PRIVATE_KEY_BYTES) + actual_signature = signer.sign(to_sign) + + verifier = es.EsVerifier.from_string(PUBLIC_KEY_BYTES) + assert verifier.verify(to_sign, actual_signature) + + def test_verify_failure(self): + verifier = es.EsVerifier.from_string(PUBLIC_KEY_BYTES) + bad_signature1 = b"" + assert not verifier.verify(b"foo", bad_signature1) + bad_signature2 = b"a" + assert not verifier.verify(b"foo", bad_signature2) + + def test_verify_failure_with_wrong_raw_signature(self): + to_sign = b"foo" + + # This signature has a wrong "r" value in the "(r,s)" raw signature. + wrong_signature = base64.urlsafe_b64decode( + b"m7oaRxUDeYqjZ8qiMwo0PZLTMZWKJLFQREpqce1StMIa_yXQQ-C5WgeIRHW7OqlYSDL0XbUrj_uAw9i-QhfOJQ==" + ) + + verifier = es.EsVerifier.from_string(PUBLIC_KEY_BYTES) + assert not verifier.verify(to_sign, wrong_signature) + + def test_from_string_pub_key(self): + verifier = es.EsVerifier.from_string(PUBLIC_KEY_BYTES) + assert isinstance(verifier, es.EsVerifier) + assert isinstance(verifier._pubkey, ec.EllipticCurvePublicKey) + + def test_from_string_pub_key_unicode(self): + public_key = _helpers.from_bytes(PUBLIC_KEY_BYTES) + verifier = es.EsVerifier.from_string(public_key) + assert isinstance(verifier, es.EsVerifier) + assert isinstance(verifier._pubkey, ec.EllipticCurvePublicKey) + + def test_from_string_pub_cert(self): + verifier = es.EsVerifier.from_string(PUBLIC_CERT_BYTES) + assert isinstance(verifier, es.EsVerifier) + assert isinstance(verifier._pubkey, ec.EllipticCurvePublicKey) + + def test_from_string_pub_cert_unicode(self): + public_cert = _helpers.from_bytes(PUBLIC_CERT_BYTES) + verifier = es.EsVerifier.from_string(public_cert) + assert isinstance(verifier, es.EsVerifier) + assert isinstance(verifier._pubkey, ec.EllipticCurvePublicKey) + + def test_from_string_type_error(self): + with pytest.raises(TypeError): + es.EsVerifier.from_string(RSA_PUBLIC_KEY_BYTES) + + +class TestEsSigner(object): + def test_from_string_pkcs1(self): + signer = es.EsSigner.from_string(PKCS1_KEY_BYTES) + assert isinstance(signer, es.EsSigner) + assert isinstance(signer._key, ec.EllipticCurvePrivateKey) + + def test_from_string_pkcs1_unicode(self): + key_bytes = _helpers.from_bytes(PKCS1_KEY_BYTES) + signer = es.EsSigner.from_string(key_bytes) + assert isinstance(signer, es.EsSigner) + assert isinstance(signer._key, ec.EllipticCurvePrivateKey) + + def test_from_string_bogus_key(self): + key_bytes = "bogus-key" + with pytest.raises(ValueError): + es.EsSigner.from_string(key_bytes) + + def test_from_string_type_error(self): + key_bytes = _helpers.from_bytes(RSA_PKCS1_KEY_BYTES) + with pytest.raises(TypeError): + es.EsSigner.from_string(key_bytes) + + def test_from_service_account_info(self): + signer = es.EsSigner.from_service_account_info(SERVICE_ACCOUNT_INFO) + + assert signer.key_id == SERVICE_ACCOUNT_INFO[base._JSON_FILE_PRIVATE_KEY_ID] + assert isinstance(signer._key, ec.EllipticCurvePrivateKey) + + def test_from_service_account_info_missing_key(self): + with pytest.raises(ValueError) as excinfo: + es.EsSigner.from_service_account_info({}) + + assert excinfo.match(base._JSON_FILE_PRIVATE_KEY) + + def test_from_service_account_file(self): + signer = es.EsSigner.from_service_account_file(SERVICE_ACCOUNT_JSON_FILE) + + assert signer.key_id == SERVICE_ACCOUNT_INFO[base._JSON_FILE_PRIVATE_KEY_ID] + assert isinstance(signer._key, ec.EllipticCurvePrivateKey) + + def test_pickle(self): + signer = es.EsSigner.from_service_account_file(SERVICE_ACCOUNT_JSON_FILE) + + assert signer.key_id == SERVICE_ACCOUNT_INFO[base._JSON_FILE_PRIVATE_KEY_ID] + assert isinstance(signer._key, ec.EllipticCurvePrivateKey) + + pickled_signer = pickle.dumps(signer) + signer = pickle.loads(pickled_signer) + + assert signer.key_id == SERVICE_ACCOUNT_INFO[base._JSON_FILE_PRIVATE_KEY_ID] + assert isinstance(signer._key, ec.EllipticCurvePrivateKey) diff --git a/tests/data/es384_privatekey.pem b/tests/data/es384_privatekey.pem new file mode 100644 index 000000000..12ff96291 --- /dev/null +++ b/tests/data/es384_privatekey.pem @@ -0,0 +1,6 @@ +-----BEGIN EC PRIVATE KEY----- +MIGkAgEBBDBz1wKJNXd2Rzy52A7F3f9LmLp6KaMUTbL1IT3JaDx1kOp4CUFpI9Zs +rdEx7b7kKQGgBwYFK4EEACKhZANiAATRLiEHuOwLr8bjJnJdYG2mrlWtMEPBHOrm +n7RukR80nV5uAcqt+M319T2togP0tQIe621FUpJq7+Hq0vJJbtI1MPuFSDtpZG04 +5se7BVAw63IPV1EdO6vGXxd5Fay88uU= +-----END EC PRIVATE KEY----- diff --git a/tests/data/es384_public_cert.pem b/tests/data/es384_public_cert.pem new file mode 100644 index 000000000..e8d5d4c68 --- /dev/null +++ b/tests/data/es384_public_cert.pem @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICYzCCAeqgAwIBAgIUeYyowQBkomEoMj72pNh754QlGvAwCgYIKoZIzj0EAwIw +aTELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRYwFAYDVQQHDA1Nb3VudGFpbiBW +aWV3MQ8wDQYDVQQKDAZHb29nbGUxDzANBgNVBAsMBkdvb2dsZTETMBEGA1UEAwwK +Z29vZ2xlLmNvbTAeFw0yNTExMTEwMDQzMTlaFw0yNTEyMTEwMDQzMTlaMGkxCzAJ +BgNVBAYTAlVTMQswCQYDVQQIDAJDQTEWMBQGA1UEBwwNTW91bnRhaW4gVmlldzEP +MA0GA1UECgwGR29vZ2xlMQ8wDQYDVQQLDAZHb29nbGUxEzARBgNVBAMMCmdvb2ds +ZS5jb20wdjAQBgcqhkjOPQIBBgUrgQQAIgNiAATRLiEHuOwLr8bjJnJdYG2mrlWt +MEPBHOrmn7RukR80nV5uAcqt+M319T2togP0tQIe621FUpJq7+Hq0vJJbtI1MPuF +SDtpZG045se7BVAw63IPV1EdO6vGXxd5Fay88uWjUzBRMB0GA1UdDgQWBBSRZkxR +63/X4JotxKDRWCI4PwIElDAfBgNVHSMEGDAWgBSRZkxR63/X4JotxKDRWCI4PwIE +lDAPBgNVHRMBAf8EBTADAQH/MAoGCCqGSM49BAMCA2cAMGQCMAU+2yy/luLTa+T6 +Jm86i9GiH/lPYdYwZFvwKJFTdj8FJpv7ySN0J80qzWxtBZTCMQIwZO0ZRdv8s7V3 +022yISIujmsPmgj7lvPuDZZaVn1DVYMG3YmBB+cTp+JTqF3x7lN+ +-----END CERTIFICATE----- diff --git a/tests/data/es384_publickey.pem b/tests/data/es384_publickey.pem new file mode 100644 index 000000000..e78ac0f49 --- /dev/null +++ b/tests/data/es384_publickey.pem @@ -0,0 +1,5 @@ +-----BEGIN PUBLIC KEY----- +MHYwEAYHKoZIzj0CAQYFK4EEACIDYgAE0S4hB7jsC6/G4yZyXWBtpq5VrTBDwRzq +5p+0bpEfNJ1ebgHKrfjN9fU9raID9LUCHuttRVKSau/h6tLySW7SNTD7hUg7aWRt +OObHuwVQMOtyD1dRHTurxl8XeRWsvPLl +-----END PUBLIC KEY----- diff --git a/tests/data/es384_service_account.json b/tests/data/es384_service_account.json new file mode 100644 index 000000000..8302344b1 --- /dev/null +++ b/tests/data/es384_service_account.json @@ -0,0 +1,9 @@ +{ + "type":"gdch_service_account", + "format_version":"1", + "project":"mytest", + "private_key_id":"1234567890", + "private_key":"-----BEGIN EC PRIVATE KEY-----\nMIGkAgEBBDAyqgUeNwuUOMCC9Bzyf4uT2rfZyISJFMq3ByfE+ytUbveUd6RtvoCT\nS9cYbmuj06OgBwYFK4EEACKhZANiAATrUB670cjyRUcarD//92jO52Rqo+jKi0x7\nkscWALlC8bx9zED5zpy948FrQhQgb/TLPhunkyTwWe22CzafS8ik5pCZKkWfiJRV\n9IBMJDTMyocCR013qDXKHZOpJ57wAUw=\n-----END EC PRIVATE KEY-----\n", + "name":"mytest", + "token_uri":"https://service-accounts.org.google.com/authenticate" +} diff --git a/tests/test__service_account_info.py b/tests/test__service_account_info.py index be2657074..7e836861e 100644 --- a/tests/test__service_account_info.py +++ b/tests/test__service_account_info.py @@ -23,13 +23,21 @@ DATA_DIR = os.path.join(os.path.dirname(__file__), "data") SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json") -GDCH_SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "gdch_service_account.json") +GDCH_SERVICE_ACCOUNT_ES256_JSON_FILE = os.path.join( + DATA_DIR, "gdch_service_account.json" +) +GDCH_SERVICE_ACCOUNT_ES384_JSON_FILE = os.path.join( + DATA_DIR, "es384_service_account.json" +) with open(SERVICE_ACCOUNT_JSON_FILE, "r") as fh: SERVICE_ACCOUNT_INFO = json.load(fh) -with open(GDCH_SERVICE_ACCOUNT_JSON_FILE, "r") as fh: - GDCH_SERVICE_ACCOUNT_INFO = json.load(fh) +with open(GDCH_SERVICE_ACCOUNT_ES256_JSON_FILE, "r") as fh: + GDCH_SERVICE_ACCOUNT_ES256_INFO = json.load(fh) + +with open(GDCH_SERVICE_ACCOUNT_ES384_JSON_FILE, "r") as fh: + GDCH_SERVICE_ACCOUNT_ES384_INFO = json.load(fh) def test_from_dict(): @@ -40,10 +48,19 @@ def test_from_dict(): def test_from_dict_es256_signer(): signer = _service_account_info.from_dict( - GDCH_SERVICE_ACCOUNT_INFO, use_rsa_signer=False + GDCH_SERVICE_ACCOUNT_ES256_INFO, use_rsa_signer=False + ) + assert isinstance(signer, crypt.EsSigner) + assert signer.key_id == GDCH_SERVICE_ACCOUNT_ES256_INFO["private_key_id"] + + +def test_from_dict_es384_signer(): + signer = _service_account_info.from_dict( + GDCH_SERVICE_ACCOUNT_ES384_INFO, use_rsa_signer=False ) - assert isinstance(signer, crypt.ES256Signer) - assert signer.key_id == GDCH_SERVICE_ACCOUNT_INFO["private_key_id"] + assert isinstance(signer, crypt.EsSigner) + assert signer.key_id == GDCH_SERVICE_ACCOUNT_ES384_INFO["private_key_id"] + assert signer.algorithm == "ES384" def test_from_dict_bad_private_key(): @@ -75,8 +92,18 @@ def test_from_filename(): def test_from_filename_es256_signer(): _, signer = _service_account_info.from_filename( - GDCH_SERVICE_ACCOUNT_JSON_FILE, use_rsa_signer=False + GDCH_SERVICE_ACCOUNT_ES256_JSON_FILE, use_rsa_signer=False + ) + + assert isinstance(signer, crypt.EsSigner) + assert signer.key_id == GDCH_SERVICE_ACCOUNT_ES256_INFO["private_key_id"] + + +def test_from_filename_es384_signer(): + _, signer = _service_account_info.from_filename( + GDCH_SERVICE_ACCOUNT_ES384_JSON_FILE, use_rsa_signer=False ) - assert isinstance(signer, crypt.ES256Signer) - assert signer.key_id == GDCH_SERVICE_ACCOUNT_INFO["private_key_id"] + assert isinstance(signer, crypt.EsSigner) + assert signer.key_id == GDCH_SERVICE_ACCOUNT_ES384_INFO["private_key_id"] + assert signer.algorithm == "ES384" diff --git a/tests/test_jwt.py b/tests/test_jwt.py index 28660ea33..a5a904d7d 100644 --- a/tests/test_jwt.py +++ b/tests/test_jwt.py @@ -43,6 +43,12 @@ with open(os.path.join(DATA_DIR, "es256_public_cert.pem"), "rb") as fh: EC_PUBLIC_CERT_BYTES = fh.read() +with open(os.path.join(DATA_DIR, "es384_privatekey.pem"), "rb") as fh: + EC384_PRIVATE_KEY_BYTES = fh.read() + +with open(os.path.join(DATA_DIR, "es384_public_cert.pem"), "rb") as fh: + EC384_PUBLIC_CERT_BYTES = fh.read() + SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json") with open(SERVICE_ACCOUNT_JSON_FILE, "rb") as fh: @@ -84,6 +90,11 @@ def es256_signer(): return crypt.ES256Signer.from_string(EC_PRIVATE_KEY_BYTES, "1") +@pytest.fixture +def es384_signer(): + return crypt.EsSigner.from_string(EC384_PRIVATE_KEY_BYTES, "1") + + def test_encode_basic_es256(es256_signer): test_payload = {"test": "value"} encoded = jwt.encode(es256_signer, test_payload) @@ -92,9 +103,19 @@ def test_encode_basic_es256(es256_signer): assert header == {"typ": "JWT", "alg": "ES256", "kid": es256_signer.key_id} +def test_encode_basic_es384(es384_signer): + test_payload = {"test": "value"} + encoded = jwt.encode(es384_signer, test_payload) + header, payload, _, _ = jwt._unverified_decode(encoded) + assert payload == test_payload + assert header == {"typ": "JWT", "alg": "ES384", "kid": es384_signer.key_id} + + @pytest.fixture -def token_factory(signer, es256_signer): - def factory(claims=None, key_id=None, use_es256_signer=False): +def token_factory(signer, es256_signer, es384_signer): + def factory( + claims=None, key_id=None, use_es256_signer=False, use_es384_signer=False + ): now = _helpers.datetime_to_secs(_helpers.utcnow()) payload = { "aud": "audience@example.com", @@ -113,6 +134,8 @@ def factory(claims=None, key_id=None, use_es256_signer=False): if use_es256_signer: return jwt.encode(es256_signer, payload, key_id=key_id) + elif use_es384_signer: + return jwt.encode(es384_signer, payload, key_id=key_id) else: return jwt.encode(signer, payload, key_id=key_id) @@ -158,6 +181,15 @@ def test_decode_valid_es256(token_factory): assert payload["metadata"]["meta"] == "data" +def test_decode_valid_es384(token_factory): + payload = jwt.decode( + token_factory(use_es384_signer=True), certs=EC384_PUBLIC_CERT_BYTES + ) + assert payload["aud"] == "audience@example.com" + assert payload["user"] == "billy bob" + assert payload["metadata"]["meta"] == "data" + + def test_decode_valid_with_audience(token_factory): payload = jwt.decode( token_factory(), certs=PUBLIC_CERT_BYTES, audience="audience@example.com"