Skip to content

Commit 381bb35

Browse files
committed
key helpers: verification key to object: In progress
Tests passing as of https://asciinema.org/a/627194 Asciinema: https://asciinema.org/a/627150 Asciinema: https://asciinema.org/a/627165 Asciinema: https://asciinema.org/a/627183 Asciinema: https://asciinema.org/a/627193 Asciinema: https://asciinema.org/a/627194 Signed-off-by: John Andersen <[email protected]>
1 parent b697db6 commit 381bb35

12 files changed

+305
-117
lines changed

docs/registration_policies.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ from jsonschema import validate, ValidationError
9393

9494
from scitt_emulator.scitt import ClaimInvalidError, CWTClaims
9595
from scitt_emulator.verify_statement import verify_statement
96+
from scitt_emulator.key_helpers import verification_key_to_object
9697

9798

9899
def main():
@@ -107,23 +108,30 @@ def main():
107108
f"Claim content type does not start with application/json: {msg.phdr[pycose.headers.ContentType]!r}"
108109
)
109110

110-
cwt_cose_key, _pycose_cose_key = verify_statement(msg)
111+
verification_key = verify_statement(msg)
111112
unittest.TestCase().assertTrue(
112-
cwt_cose_key,
113+
verification_key,
113114
"Failed to verify signature on statement",
114115
)
115116

116-
cwt_protected = cwt.decode(msg.phdr[CWTClaims], cwt_cose_key)
117+
cwt_protected = cwt.decode(msg.phdr[CWTClaims], verification_key.cwt)
117118
issuer = cwt_protected[1]
118119
subject = cwt_protected[2]
119120

121+
issuer_key_as_object = verification_key_to_object(verification_key)
122+
unittest.TestCase().assertTrue(
123+
issuer_key_as_object,
124+
"Failed to convert issuer key to JSON schema verifiable object",
125+
)
126+
120127
SCHEMA = json.loads(pathlib.Path(os.environ["SCHEMA_PATH"]).read_text())
121128

122129
try:
123130
validate(
124131
instance={
125132
"$schema": "https://schema.example.com/scitt-policy-engine-jsonschema.schema.json",
126133
"issuer": issuer,
134+
"issuer_key": issuer_key_as_object,
127135
"subject": subject,
128136
"claim": json.loads(msg.payload.decode()),
129137
},
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from dataclasses import dataclass, field
2+
from typing import List, Any, Union
3+
4+
import cwt
5+
import pycose.keys.ec2
6+
7+
8+
@dataclass
9+
class VerificationKey:
10+
transforms: List[Any]
11+
original: Any
12+
original_content_type: str
13+
original_bytes: bytes
14+
original_bytes_encoding: str
15+
usable: bool
16+
cwt: Union[cwt.COSEKey, None]
17+
cose: Union[pycose.keys.ec2.EC2Key, None]

scitt_emulator/key_helpers.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import itertools
2+
import importlib.metadata
3+
from typing import Optional, Callable, List, Tuple
4+
5+
from scitt_emulator.key_helper_dataclasses import VerificationKey
6+
7+
8+
ENTRYPOINT_KEY_TRANSFORMS_TO_OBJECT = "scitt_emulator.key_helpers.verification_key_to_object"
9+
10+
11+
def verification_key_to_object(
12+
verification_key: VerificationKey,
13+
*,
14+
key_transforms: Optional[List[Callable[[VerificationKey], dict]]] = None,
15+
) -> bool:
16+
"""
17+
Resolve keys for statement issuer and verify signature on COSESign1
18+
statement and embedded CWT
19+
"""
20+
if key_transforms is None:
21+
key_transforms = []
22+
# There is some difference in the return value of entry_points across
23+
# Python versions/envs (conda vs. non-conda). Python 3.8 returns a dict.
24+
entrypoints = importlib.metadata.entry_points()
25+
if isinstance(entrypoints, dict):
26+
for entrypoint in entrypoints.get(ENTRYPOINT_KEY_TRANSFORMS_TO_OBJECT, []):
27+
key_transforms.append(entrypoint.load())
28+
elif isinstance(entrypoints, getattr(importlib.metadata, "EntryPoints", list)):
29+
for entrypoint in entrypoints:
30+
if entrypoint.group == ENTRYPOINT_KEY_TRANSFORMS_TO_OBJECT:
31+
key_transforms.append(entrypoint.load())
32+
else:
33+
raise TypeError(f"importlib.metadata.entry_points returned unknown type: {type(entrypoints)}: {entrypoints!r}")
34+
35+
for key_transform in key_transforms:
36+
verification_key_as_object = key_transform(verification_key)
37+
# Skip keys that we couldn't derive COSE keys for
38+
if verification_key_as_object:
39+
return verification_key_as_object
40+
41+
return None

scitt_emulator/key_loader_format_did_key.py

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,45 +4,48 @@
44
import cwt.algs.ec2
55
import pycose
66
import pycose.keys.ec2
7+
import cryptography.hazmat.primitives.asymmetric.ec
78
from cryptography.hazmat.primitives import serialization
89

910
# TODO Remove this once we have a example flow for proper key verification
1011
import jwcrypto.jwk
1112

1213
from scitt_emulator.did_helpers import DID_KEY_METHOD, did_key_to_cryptography_key
14+
from scitt_emulator.key_helper_dataclasses import VerificationKey
15+
16+
17+
# TODO What is the correct content type? Should we differ if it's been expanded?
18+
CONTENT_TYPE = "application/key+did"
1319

1420

1521
def key_loader_format_did_key(
1622
unverified_issuer: str,
17-
) -> List[Tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]:
18-
jwk_keys = []
19-
cwt_cose_keys = []
20-
pycose_cose_keys = []
21-
cryptography_keys = []
22-
23+
) -> List[VerificationKey]:
2324
if not unverified_issuer.startswith(DID_KEY_METHOD):
24-
return pycose_cose_keys
25-
26-
cryptography_keys.append(did_key_to_cryptography_key(unverified_issuer))
27-
28-
for cryptography_key in cryptography_keys:
29-
jwk_keys.append(
30-
jwcrypto.jwk.JWK.from_pem(
31-
cryptography_key.public_bytes(
32-
encoding=serialization.Encoding.PEM,
33-
format=serialization.PublicFormat.SubjectPublicKeyInfo,
34-
)
35-
)
25+
return []
26+
key = did_key_to_cryptography_key(unverified_issuer)
27+
return [
28+
VerificationKey(
29+
transforms=[key],
30+
original=key,
31+
original_content_type=CONTENT_TYPE,
32+
original_bytes=unverified_issuer.encode("utf-8"),
33+
original_bytes_encoding="utf-8",
34+
usable=False,
35+
cwt=None,
36+
cose=None,
3637
)
37-
38-
for jwk_key in jwk_keys:
39-
cwt_cose_key = cwt.COSEKey.from_pem(
40-
jwk_key.export_to_pem(),
41-
kid=jwk_key.thumbprint(),
38+
]
39+
40+
41+
def transform_key_instance_cryptography_ecc_public_to_jwcrypto_jwk(
42+
key: cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey,
43+
) -> jwcrypto.jwk.JWK:
44+
if not isinstance(key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey):
45+
raise TypeError(key)
46+
return jwcrypto.jwk.JWK.from_pem(
47+
key.public_bytes(
48+
encoding=serialization.Encoding.PEM,
49+
format=serialization.PublicFormat.SubjectPublicKeyInfo,
4250
)
43-
cwt_cose_keys.append(cwt_cose_key)
44-
cwt_ec2_key_as_dict = cwt_cose_key.to_dict()
45-
pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict)
46-
pycose_cose_keys.append((cwt_cose_key, pycose_cose_key))
47-
48-
return pycose_cose_keys
51+
)

scitt_emulator/key_loader_format_url_referencing_oidc_issuer.py

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,22 @@
1313
import jwcrypto.jwk
1414

1515
from scitt_emulator.did_helpers import did_web_to_url
16+
from scitt_emulator.key_helper_dataclasses import VerificationKey
17+
18+
19+
CONTENT_TYPE = "application/jwk+json"
1620

1721

1822
def key_loader_format_url_referencing_oidc_issuer(
1923
unverified_issuer: str,
2024
) -> List[Tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]:
21-
jwk_keys = []
22-
cwt_cose_keys = []
23-
pycose_cose_keys = []
25+
keys = []
2426

2527
if unverified_issuer.startswith("did:web:"):
2628
unverified_issuer = did_web_to_url(unverified_issuer)
2729

2830
if "://" not in unverified_issuer or unverified_issuer.startswith("file://"):
29-
return pycose_cose_keys
31+
return keys
3032

3133
# TODO Logging for URLErrors
3234
# Check if OIDC issuer
@@ -44,18 +46,40 @@ def key_loader_format_url_referencing_oidc_issuer(
4446
jwks = json.loads(response.read())
4547
for jwk_key_as_dict in jwks["keys"]:
4648
jwk_key_as_string = json.dumps(jwk_key_as_dict)
47-
jwk_keys.append(
48-
jwcrypto.jwk.JWK.from_json(jwk_key_as_string),
49+
jwk_key = jwcrypto.jwk.JWK.from_json(jwk_key_as_string)
50+
keys.append(
51+
VerificationKey(
52+
transforms=[jwk_key],
53+
original=jwk_key,
54+
original_content_type=CONTENT_TYPE,
55+
original_bytes=jwk_key_as_string.encode("utf-8"),
56+
original_bytes_encoding="utf-8",
57+
usable=False,
58+
cwt=None,
59+
cose=None,
60+
)
4961
)
5062

51-
for jwk_key in jwk_keys:
52-
cwt_cose_key = cwt.COSEKey.from_pem(
53-
jwk_key.export_to_pem(),
54-
kid=jwk_key.thumbprint(),
55-
)
56-
cwt_cose_keys.append(cwt_cose_key)
57-
cwt_ec2_key_as_dict = cwt_cose_key.to_dict()
58-
pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict)
59-
pycose_cose_keys.append((cwt_cose_key, pycose_cose_key))
60-
61-
return pycose_cose_keys
63+
return keys
64+
65+
66+
def transform_key_instance_jwcrypto_jwk_to_cwt_cose(
67+
key: jwcrypto.jwk.JWK,
68+
) -> cwt.COSEKey:
69+
if not isinstance(key, jwcrypto.jwk.JWK):
70+
raise TypeError(key)
71+
return cwt.COSEKey.from_pem(
72+
key.export_to_pem(),
73+
kid=key.thumbprint(),
74+
)
75+
76+
77+
def to_object_oidc_issuer(verification_key: VerificationKey) -> dict:
78+
if verification_key.original_content_type != CONTENT_TYPE:
79+
return
80+
81+
return {
82+
**verification_key.original.export_public(as_dict=True),
83+
"use": "sig",
84+
"kid": verification_key.original.thumbprint(),
85+
}

scitt_emulator/key_loader_format_url_referencing_ssh_authorized_keys.py

Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,13 @@
1919
def key_loader_format_url_referencing_ssh_authorized_keys(
2020
unverified_issuer: str,
2121
) -> List[Tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]:
22-
jwk_keys = []
23-
cwt_cose_keys = []
24-
pycose_cose_keys = []
25-
26-
cryptography_ssh_keys = []
22+
keys = []
2723

2824
if unverified_issuer.startswith("did:web:"):
2925
unverified_issuer = did_web_to_url(unverified_issuer)
3026

3127
if "://" not in unverified_issuer or unverified_issuer.startswith("file://"):
32-
return pycose_cose_keys
28+
return keys
3329

3430
# Try loading ssh keys. Example: https://github.com/username.keys
3531
with contextlib.suppress(urllib.request.URLError):
@@ -38,28 +34,18 @@ def key_loader_format_url_referencing_ssh_authorized_keys(
3834
with contextlib.suppress(
3935
(ValueError, cryptography.exceptions.UnsupportedAlgorithm)
4036
):
41-
cryptography_ssh_keys.append(
42-
serialization.load_ssh_public_key(line)
37+
key = serialization.load_ssh_public_key(line)
38+
keys.append(
39+
VerificationKey(
40+
transforms=[key],
41+
original=key,
42+
original_content_type=CONTENT_TYPE,
43+
original_bytes=line.encode("utf-8"),
44+
original_bytes_encoding="utf-8",
45+
usable=False,
46+
cwt=None,
47+
cose=None,
48+
)
4349
)
4450

45-
for cryptography_ssh_key in cryptography_ssh_keys:
46-
jwk_keys.append(
47-
jwcrypto.jwk.JWK.from_pem(
48-
cryptography_ssh_key.public_bytes(
49-
encoding=serialization.Encoding.PEM,
50-
format=serialization.PublicFormat.SubjectPublicKeyInfo,
51-
)
52-
)
53-
)
54-
55-
for jwk_key in jwk_keys:
56-
cwt_cose_key = cwt.COSEKey.from_pem(
57-
jwk_key.export_to_pem(),
58-
kid=jwk_key.thumbprint(),
59-
)
60-
cwt_cose_keys.append(cwt_cose_key)
61-
cwt_ec2_key_as_dict = cwt_cose_key.to_dict()
62-
pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict)
63-
pycose_cose_keys.append((cwt_cose_key, pycose_cose_key))
64-
65-
return pycose_cose_keys
51+
return keys

0 commit comments

Comments
 (0)