Skip to content
Merged
2 changes: 1 addition & 1 deletion pymdoccbor/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.9.0"
__version__ = "1.0.0"
147 changes: 95 additions & 52 deletions pymdoccbor/mdoc/issuer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from datetime import datetime, timezone
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
from pycose.keys import CoseKey, EC2Key
from typing import Union

Expand All @@ -23,14 +25,15 @@ class MdocCborIssuer:
"""
def __init__(
self,
key_label: str = None,
user_pin: str = None,
lib_path: str = None,
slot_id: int = None,
key_label: str | None = None,
user_pin: str | None = None,
lib_path: str | None = None,
slot_id: int | None = None,
hsm: bool = False,
alg: str = None,
kid: str = None,
alg: str | None = None,
kid: str | None = None,
private_key: Union[dict, CoseKey] = {},
cert_info: dict | None = None,
):
"""
Initialize a new MdocCborIssuer
Expand Down Expand Up @@ -67,16 +70,17 @@ def __init__(
self.hsm = hsm
self.alg = alg
self.kid = kid
self.cert_info = cert_info

def new(
self,
data: dict,
doctype: str,
validity: dict = None,
devicekeyinfo: Union[dict, CoseKey, str] = None,
cert_path: str = None,
revocation: dict = None,
status: dict = None
validity: dict | None = None,
devicekeyinfo: dict | CoseKey | str | None = None,
cert_path: str | None = None,
revocation: dict | None = None,
status: dict | None = None
) -> dict:
"""
create a new mdoc with signed mso
Expand All @@ -93,49 +97,86 @@ def new(
"""
if isinstance(devicekeyinfo, dict):
devicekeyinfoCoseKeyObject = CoseKey.from_dict(devicekeyinfo)
devicekeyinfo = {
1: devicekeyinfoCoseKeyObject.kty.identifier,
-1: devicekeyinfoCoseKeyObject.crv.identifier,
-2: devicekeyinfoCoseKeyObject.x,
-3: devicekeyinfoCoseKeyObject.y,
}
if devicekeyinfoCoseKeyObject.kty.identifier == 2: # EC2Key
devicekeyinfo = {
1: devicekeyinfoCoseKeyObject.kty.identifier,
-1: devicekeyinfoCoseKeyObject.crv.identifier,
-2: devicekeyinfoCoseKeyObject.x,
-3: devicekeyinfoCoseKeyObject.y,
}
elif devicekeyinfoCoseKeyObject.kty.identifier == 1: # OKPKey
devicekeyinfo = {
1: devicekeyinfoCoseKeyObject.kty.identifier,
-1: devicekeyinfoCoseKeyObject.crv.identifier,
-2: devicekeyinfoCoseKeyObject.x,
}
elif devicekeyinfoCoseKeyObject.kty.identifier == 3: # RSAKey
devicekeyinfo = {
1: devicekeyinfoCoseKeyObject.kty.identifier,
-1: devicekeyinfoCoseKeyObject.n,
-2: devicekeyinfoCoseKeyObject.e,
}
else:
raise TypeError("Unsupported key type in devicekeyinfo")
if isinstance(devicekeyinfo, str):
device_key_bytes = base64.urlsafe_b64decode(devicekeyinfo.encode("utf-8"))
public_key:EllipticCurvePublicKey = serialization.load_pem_public_key(device_key_bytes)
curve_name = public_key.curve.name
curve_map = {
"secp256r1": 1, # NIST P-256
"secp384r1": 2, # NIST P-384
"secp521r1": 3, # NIST P-521
"brainpoolP256r1": 8, # Brainpool P-256
"brainpoolP384r1": 9, # Brainpool P-384
"brainpoolP512r1": 10, # Brainpool P-512
# Add more curve mappings as needed
}
curve_identifier = curve_map.get(curve_name)

# Extract the x and y coordinates from the public key
x = public_key.public_numbers().x.to_bytes(
(public_key.public_numbers().x.bit_length() + 7)
// 8, # Number of bytes needed
"big", # Byte order
)
public_key = serialization.load_pem_public_key(device_key_bytes)

if isinstance(public_key, EllipticCurvePublicKey):
curve_name = public_key.curve.name
curve_map = {
"secp256r1": 1, # NIST P-256
"secp384r1": 2, # NIST P-384
"secp521r1": 3, # NIST P-521
"brainpoolP256r1": 8, # Brainpool P-256
"brainpoolP384r1": 9, # Brainpool P-384
"brainpoolP512r1": 10, # Brainpool P-512
# Add more curve mappings as needed
}
curve_identifier = curve_map.get(curve_name)

y = public_key.public_numbers().y.to_bytes(
(public_key.public_numbers().y.bit_length() + 7)
// 8, # Number of bytes needed
"big", # Byte order
)
# Extract the x and y coordinates from the public key
x = public_key.public_numbers().x.to_bytes(
(public_key.public_numbers().x.bit_length() + 7)
// 8, # Number of bytes needed
"big", # Byte order
)

devicekeyinfo = {
1: 2,
-1: curve_identifier,
-2: x,
-3: y,
}
y = public_key.public_numbers().y.to_bytes(
(public_key.public_numbers().y.bit_length() + 7)
// 8, # Number of bytes needed
"big", # Byte order
)

else:
devicekeyinfo: CoseKey = devicekeyinfo
devicekeyinfo = {
1: 2,
-1: curve_identifier,
-2: x,
-3: y,
}
elif isinstance(public_key, Ed25519PublicKey):
devicekeyinfo = {
1: 1, # OKPKey
-1: "Ed25519", # Curve identifier for Ed25519
-2: public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
)
}
elif isinstance(public_key, RSAPublicKey):
devicekeyinfo = {
1: 3, # RSAKey
-1: public_key.public_numbers().n.to_bytes(
(public_key.public_numbers().n.bit_length() + 7) // 8,
"big"
),
-2: public_key.public_numbers().e.to_bytes(
(public_key.public_numbers().e.bit_length() + 7) // 8,
"big"
)
}
else:
raise TypeError("Loaded public key is not an EllipticCurvePublicKey")

if self.hsm:
msoi = MsoIssuer(
Expand All @@ -149,7 +190,8 @@ def new(
alg=self.alg,
kid=self.kid,
validity=validity,
revocation=revocation
revocation=revocation,
cert_info=self.cert_info
)

else:
Expand All @@ -159,10 +201,11 @@ def new(
alg=self.alg,
cert_path=cert_path,
validity=validity,
revocation=revocation
revocation=revocation,
cert_info=self.cert_info
)

mso = msoi.sign(doctype=doctype, device_key=devicekeyinfo,valid_from=datetime.now(timezone.utc))
mso = msoi.sign(doctype=doctype, device_key=devicekeyinfo, valid_from=datetime.now(timezone.utc))

mso_cbor = mso.encode(
tag=False,
Expand Down
87 changes: 51 additions & 36 deletions pymdoccbor/mso/issuer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,24 @@
import uuid
import logging

logger = logging.getLogger("pymdoccbor")

from pycose.headers import Algorithm #, KID
from pycose.keys import CoseKey, EC2Key
from pycose.keys import CoseKey
from pycose.headers import Algorithm
from pycose.messages import Sign1Message

from typing import Union

from pymdoccbor.exceptions import MsoPrivateKeyRequired
from pymdoccbor import settings
from pymdoccbor.x509 import MsoX509Fabric
from pymdoccbor.x509 import selfsigned_x509cert
from pymdoccbor.tools import shuffle_dict
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from cryptography.x509 import Certificate


from cbor_diag import *

logger = logging.getLogger("pymdoccbor")

class MsoIssuer(MsoX509Fabric):
class MsoIssuer:
"""
MsoIssuer helper class to create a new mso
"""
Expand All @@ -34,17 +31,18 @@ def __init__(
self,
data: dict,
validity: dict,
cert_path: str = None,
key_label: str = None,
user_pin: str = None,
lib_path: str = None,
slot_id: int = None,
kid: str = None,
alg: str = None,
hsm: bool = False,
private_key: Union[dict, CoseKey] = None,
digest_alg: str = settings.PYMDOC_HASHALG,
revocation: dict = None
cert_path: str | None = None,
key_label: str | None = None,
user_pin: str | None = None,
lib_path: str | None = None,
slot_id: int | None = None,
kid: str | None = None,
alg: str | None = None,
hsm: bool | None = False,
private_key: dict | CoseKey | None = None,
digest_alg: str | None = settings.PYMDOC_HASHALG,
revocation: dict | None = None,
cert_info: dict | None = None,
) -> None:
"""
Initialize a new MsoIssuer
Expand All @@ -64,17 +62,17 @@ def __init__(
:param revocation: dict: revocation status dict to include in the mso, it may include status_list and identifier_list keys
"""

if not hsm:
if private_key:
if isinstance(private_key, dict):
self.private_key = CoseKey.from_dict(private_key)
if not self.private_key.kid:
self.private_key.kid = str(uuid.uuid4())
elif isinstance(private_key, CoseKey):
self.private_key = private_key
else:
raise ValueError("private_key must be a dict or CoseKey object")
if private_key:
if isinstance(private_key, dict):
self.private_key = CoseKey.from_dict(private_key)
if not self.private_key.kid:
self.private_key.kid = str(uuid.uuid4())
elif isinstance(private_key, CoseKey):
self.private_key = private_key
else:
raise ValueError("private_key must be a dict or CoseKey object")
else:
if not hsm:
raise MsoPrivateKeyRequired("MSO Writer requires a valid private key")

if not validity:
Expand All @@ -85,9 +83,8 @@ def __init__(

self.data: dict = data
self.hash_map: dict = {}
self.cert_path = cert_path
self.disclosure_map: dict = {}
self.digest_alg: str = digest_alg
self.digest_alg = digest_alg
self.key_label = key_label
self.user_pin = user_pin
self.lib_path = lib_path
Expand All @@ -98,9 +95,20 @@ def __init__(
self.validity = validity
self.revocation = revocation

self.cert_path = cert_path
self.cert_info = cert_info

if not self.cert_path and (not self.cert_info or not self.private_key):
raise ValueError(
"cert_path or cert_info with a private key must be provided to properly insert a certificate"
)

alg_map = {"ES256": "sha256", "ES384": "sha384", "ES512": "sha512"}

hashfunc = getattr(hashlib, alg_map.get(self.alg))
if self.alg not in alg_map:
raise ValueError(f"Unsupported algorithm: {self.alg}")

hashfunc = getattr(hashlib, alg_map[self.alg])

digest_cnt = 0
for ns, values in data.items():
Expand Down Expand Up @@ -157,9 +165,9 @@ def format_datetime_repr(self, dt: datetime.datetime) -> str:

def sign(
self,
device_key: Union[dict, None] = None,
valid_from: Union[None, datetime.datetime] = None,
doctype: str = None,
device_key: dict | None = None,
valid_from: datetime.datetime | None = None,
doctype: str | None = None,
) -> Sign1Message:
"""
Sign a mso and returns it
Expand Down Expand Up @@ -230,7 +238,14 @@ def sign(
raise Exception(f"Certificate at {self.cert_path} failed parse")
_cert = cert.public_bytes(getattr(serialization.Encoding, "DER"))
else:
_cert = self.selfsigned_x509cert()
if not self.cert_info:
raise ValueError("cert_info must be provided if cert_path is not set")

logger.warning(
"A self-signed certificate will be created using the provided cert_info but this is not recommended for production use."
)

_cert = selfsigned_x509cert(self.cert_info, self.private_key)

if self.hsm:
# print("payload diganostic notation: \n",cbor2diag(cbor2.dumps(cbor2.CBORTag(24, cbor2.dumps(payload)))))
Expand Down
25 changes: 0 additions & 25 deletions pymdoccbor/settings.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import datetime
import os

from datetime import timezone

COSEKEY_HAZMAT_CRV_MAP = {
"secp256r1": "P_256",
"secp384r1": "P_384",
Expand All @@ -23,30 +20,8 @@

DIGEST_SALT_LENGTH = 32


X509_DER_CERT = os.getenv("X509_DER_CERT", None)

# OR

X509_COUNTRY_NAME = os.getenv('X509_COUNTRY_NAME', "US")
X509_STATE_OR_PROVINCE_NAME = os.getenv('X509_STATE_OR_PROVINCE_NAME', "California")
X509_LOCALITY_NAME = os.getenv('X509_LOCALITY_NAME', "San Francisco")
X509_ORGANIZATION_NAME = os.getenv('X509_ORGANIZATION_NAME', "My Company")
X509_COMMON_NAME = os.getenv('X509_COMMON_NAME', "mysite.com")

X509_NOT_VALID_BEFORE = os.getenv('X509_NOT_VALID_BEFORE', datetime.datetime.now(timezone.utc))
X509_NOT_VALID_AFTER_DAYS = os.getenv('X509_NOT_VALID_AFTER_DAYS', 10)
X509_NOT_VALID_AFTER = os.getenv(
'X509_NOT_VALID_AFTER',
datetime.datetime.now(timezone.utc) + datetime.timedelta(
days=X509_NOT_VALID_AFTER_DAYS
)
)

X509_SAN_URL = os.getenv(
"X509_SAN_URL", "https://credential-issuer.example.org"
)

CBORTAGS_ATTR_MAP = {
"birth_date": 1004,
"expiry_date": 1004,
Expand Down
Loading