Skip to content

Commit a4bc402

Browse files
committed
verify statement: As standalone file
Signed-off-by: John Andersen <[email protected]>
1 parent 9a9eb37 commit a4bc402

8 files changed

+310
-105
lines changed

docs/registration_policies.md

Lines changed: 2 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -104,108 +104,7 @@ import cryptography.hazmat.primitives.serialization
104104
import jwcrypto.jwk
105105

106106
from scitt_emulator.scitt import ClaimInvalidError, CWTClaims
107-
108-
109-
def did_web_to_url(
110-
did_web_string, scheme=os.environ.get("DID_WEB_ASSUME_SCHEME", "https")
111-
):
112-
return "/".join(
113-
[
114-
f"{scheme}:/",
115-
*[urllib.parse.unquote(i) for i in did_web_string.split(":")[2:]],
116-
]
117-
)
118-
119-
120-
def verify_signature(msg: Sign1Message) -> bool:
121-
"""
122-
- TODOs
123-
- Should we use audiance? I think no, just want to make sure we've
124-
documented why thought if not. No usage makes sense to me becasue we
125-
don't know the intended audiance, it could be federated into
126-
multiple TS
127-
- Can you just pass a whole public key as an issuer?
128-
- Resolve DID keys (since that is what the arch says...)
129-
"""
130-
131-
# Figure out what the issuer is
132-
cwt_cose_loads = cwt.cose.COSE()._loads
133-
cwt_unverified_protected = cwt_cose_loads(
134-
cwt_cose_loads(msg.phdr[CWTClaims]).value[2]
135-
)
136-
unverified_issuer = cwt_unverified_protected[1]
137-
138-
if unverified_issuer.startswith("did:web:"):
139-
unverified_issuer = did_web_to_url(unverified_issuer)
140-
141-
# Load keys from issuer
142-
jwk_keys = []
143-
cwt_cose_keys = []
144-
pycose_cose_keys = []
145-
146-
from cryptography.hazmat.primitives import serialization
147-
148-
cryptography_ssh_keys = []
149-
if "://" in unverified_issuer and not unverified_issuer.startswith("file://"):
150-
# TODO Logging for URLErrors
151-
# Check if OIDC issuer
152-
unverified_issuer_parsed_url = urllib.parse.urlparse(unverified_issuer)
153-
openid_configuration_url = unverified_issuer_parsed_url._replace(
154-
path="/.well-known/openid-configuration",
155-
).geturl()
156-
with contextlib.suppress(urllib.request.URLError):
157-
with urllib.request.urlopen(openid_configuration_url) as response:
158-
if response.status == 200:
159-
openid_configuration = json.loads(response.read())
160-
jwks_uri = openid_configuration["jwks_uri"]
161-
with urllib.request.urlopen(jwks_uri) as response:
162-
if response.status == 200:
163-
jwks = json.loads(response.read())
164-
for jwk_key_as_dict in jwks["keys"]:
165-
jwk_key_as_string = json.dumps(jwk_key_as_dict)
166-
jwk_keys.append(
167-
jwcrypto.jwk.JWK.from_json(jwk_key_as_string),
168-
)
169-
170-
# Try loading ssh keys. Example: https://github.com/username.keys
171-
with contextlib.suppress(urllib.request.URLError):
172-
with urllib.request.urlopen(unverified_issuer) as response:
173-
while line := response.readline():
174-
with contextlib.suppress(
175-
(ValueError, cryptography.exceptions.UnsupportedAlgorithm)
176-
):
177-
cryptography_ssh_keys.append(
178-
cryptography.hazmat.primitives.serialization.load_ssh_public_key(
179-
line
180-
)
181-
)
182-
183-
for cryptography_ssh_key in cryptography_ssh_keys:
184-
jwk_keys.append(
185-
jwcrypto.jwk.JWK.from_pem(
186-
cryptography_ssh_key.public_bytes(
187-
encoding=serialization.Encoding.PEM,
188-
format=serialization.PublicFormat.SubjectPublicKeyInfo,
189-
)
190-
)
191-
)
192-
193-
for jwk_key in jwk_keys:
194-
cwt_cose_key = cwt.COSEKey.from_pem(
195-
jwk_key.export_to_pem(),
196-
kid=jwk_key.thumbprint(),
197-
)
198-
cwt_cose_keys.append(cwt_cose_key)
199-
cwt_ec2_key_as_dict = cwt_cose_key.to_dict()
200-
pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict)
201-
pycose_cose_keys.append((cwt_cose_key, pycose_cose_key))
202-
203-
for cwt_cose_key, pycose_cose_key in pycose_cose_keys:
204-
with contextlib.suppress(Exception):
205-
msg.key = pycose_cose_key
206-
verify_signature = msg.verify_signature()
207-
if verify_signature:
208-
return cwt_cose_key, pycose_cose_key
107+
from scitt_emulator.verify_statement import verify_statement
209108

210109

211110
def main():
@@ -220,7 +119,7 @@ def main():
220119
f"Claim content type does not start with application/json: {msg.phdr[pycose.headers.ContentType]!r}"
221120
)
222121

223-
cwt_cose_key, _pycose_cose_key = verify_signature(msg)
122+
cwt_cose_key, _pycose_cose_key = verify_statement(msg)
224123
unittest.TestCase().assertTrue(
225124
cwt_cose_key,
226125
"Failed to verify signature on statement",

pytest.ini

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[pytest]
2+
addopts = --doctest-modules

scitt_emulator/did_helpers.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import os
2+
import sys
3+
import inspect
4+
import urllib.parse
5+
from typing import Optional, Callable
6+
7+
8+
def did_web_to_url(
9+
did_web_string: str,
10+
*,
11+
scheme: str = os.environ.get("DID_WEB_ASSUME_SCHEME", "https"),
12+
):
13+
return "/".join(
14+
[
15+
f"{scheme}:/",
16+
*[urllib.parse.unquote(i) for i in did_web_string.split(":")[2:]],
17+
]
18+
)
19+
20+
21+
class DIDKeyDecoderNotFoundError(NotImplementedError):
22+
"""
23+
Raised when we don't have a function implemented to decode the given key
24+
"""
25+
26+
27+
DID_KEY_METHOD = "did:key:"
28+
29+
30+
def did_key_to_jwk_dict_is_p_384_startswith_z82(did_key: str) -> dict[str, str]:
31+
did_key = did_key.replace(DID_KEY_METHOD, "", 1)
32+
return
33+
34+
35+
def did_key_to_jwk_dict(
36+
did_key: str,
37+
*,
38+
decoders_by_prefix: Optional[dict[str, Callable[[str], dict[str, str]]]] = None,
39+
) -> dict[str, str]:
40+
"""
41+
References
42+
43+
- https://w3c-ccg.github.io/did-method-key/#p-384
44+
- RFC7515: JSON Web Key (JWK): https://www.rfc-editor.org/rfc/rfc7517
45+
- RFC8037: CFRG Elliptic Curve Diffie-Hellman (ECDH) and Signatures in JSON Object Signing and Encryption (JOSE): https://www.rfc-editor.org/rfc/rfc8037
46+
47+
Examples
48+
49+
>>> did_key_to_jwk_dict("did:key:invalid")
50+
Traceback (most recent call last):
51+
DIDKeyDecoderNotFoundError: ...
52+
>>> did_key_to_jwk_dict("did:key:z82LkvCwHNreneWpsgPEbV3gu1C6NFJEBg4srfJ5gdxEsMGRJUz2sG9FE42shbn2xkZJh54")
53+
"""
54+
if decoders_by_prefix is None:
55+
decoders_by_prefix = {
56+
function_name.split("_")[-1]: function
57+
for function_name, function in inspect.getmembers(sys.modules[__name__])
58+
if (
59+
function_name.startswith("did_key_to_jwk_dict_")
60+
and "_startswith_" in function_name
61+
)
62+
}
63+
64+
for prefix, decoder in decoders_by_prefix.items():
65+
if did_key.startswith(DID_KEY_METHOD + prefix):
66+
return decoder(did_key)
67+
68+
raise DIDKeyDecoderNotFoundError(did_key)
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import os
2+
import sys
3+
import json
4+
import pathlib
5+
import unittest
6+
import traceback
7+
import contextlib
8+
import urllib.parse
9+
import urllib.request
10+
import importlib.metadata
11+
from typing import Optional, Callable
12+
13+
import jwt
14+
import cbor2
15+
import cwt
16+
import cwt.algs.ec2
17+
import pycose
18+
import pycose.keys.ec2
19+
from pycose.messages import Sign1Message
20+
from cryptography.hazmat.primitives import serialization
21+
22+
# TODO Remove this once we have a example flow for proper key verification
23+
import jwcrypto.jwk
24+
25+
from scitt_emulator.scitt import ClaimInvalidError, CWTClaims
26+
from scitt_emulator.did_helpers import did_web_to_url
27+
28+
29+
def key_loader_format_url_referencing_oidc_issuer(
30+
unverified_issuer: str,
31+
) -> list[tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]:
32+
jwk_keys = []
33+
cwt_cose_keys = []
34+
pycose_cose_keys = []
35+
36+
cryptography_ssh_keys = []
37+
38+
if unverified_issuer.startswith("did:web:"):
39+
unverified_issuer = did_web_to_url(unverified_issuer)
40+
41+
if "://" not in unverified_issuer or unverified_issuer.startswith("file://"):
42+
return None, None
43+
44+
# TODO Logging for URLErrors
45+
# Check if OIDC issuer
46+
unverified_issuer_parsed_url = urllib.parse.urlparse(unverified_issuer)
47+
openid_configuration_url = unverified_issuer_parsed_url._replace(
48+
path="/.well-known/openid-configuration",
49+
).geturl()
50+
with contextlib.suppress(urllib.request.URLError):
51+
with urllib.request.urlopen(openid_configuration_url) as response:
52+
if response.status == 200:
53+
openid_configuration = json.loads(response.read())
54+
jwks_uri = openid_configuration["jwks_uri"]
55+
with urllib.request.urlopen(jwks_uri) as response:
56+
if response.status == 200:
57+
jwks = json.loads(response.read())
58+
for jwk_key_as_dict in jwks["keys"]:
59+
jwk_key_as_string = json.dumps(jwk_key_as_dict)
60+
jwk_keys.append(
61+
jwcrypto.jwk.JWK.from_json(jwk_key_as_string),
62+
)
63+
64+
for jwk_key in jwk_keys:
65+
cwt_cose_key = cwt.COSEKey.from_pem(
66+
jwk_key.export_to_pem(),
67+
kid=jwk_key.thumbprint(),
68+
)
69+
cwt_cose_keys.append(cwt_cose_key)
70+
cwt_ec2_key_as_dict = cwt_cose_key.to_dict()
71+
pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict)
72+
pycose_cose_keys.append((cwt_cose_key, pycose_cose_key))
73+
74+
return pycose_cose_keys
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import os
2+
import sys
3+
import json
4+
import pathlib
5+
import unittest
6+
import traceback
7+
import contextlib
8+
import urllib.parse
9+
import urllib.request
10+
import importlib.metadata
11+
from typing import Optional, Callable
12+
13+
import jwt
14+
import cbor2
15+
import cwt
16+
import cwt.algs.ec2
17+
import pycose
18+
import pycose.keys.ec2
19+
from pycose.messages import Sign1Message
20+
from cryptography.hazmat.primitives import serialization
21+
22+
# TODO Remove this once we have a example flow for proper key verification
23+
import jwcrypto.jwk
24+
25+
from scitt_emulator.scitt import ClaimInvalidError, CWTClaims
26+
from scitt_emulator.did_helpers import did_web_to_url
27+
28+
29+
def key_loader_format_url_referencing_ssh_authorized_keys(
30+
unverified_issuer: str,
31+
) -> list[tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]:
32+
jwk_keys = []
33+
cwt_cose_keys = []
34+
pycose_cose_keys = []
35+
36+
cryptography_ssh_keys = []
37+
38+
if unverified_issuer.startswith("did:web:"):
39+
unverified_issuer = did_web_to_url(unverified_issuer)
40+
41+
if "://" not in unverified_issuer or unverified_issuer.startswith("file://"):
42+
return None, None
43+
44+
# TODO Logging for URLErrors
45+
# Check if OIDC issuer
46+
unverified_issuer_parsed_url = urllib.parse.urlparse(unverified_issuer)
47+
openid_configuration_url = unverified_issuer_parsed_url._replace(
48+
path="/.well-known/openid-configuration",
49+
).geturl()
50+
with contextlib.suppress(urllib.request.URLError):
51+
with urllib.request.urlopen(openid_configuration_url) as response:
52+
if response.status == 200:
53+
openid_configuration = json.loads(response.read())
54+
jwks_uri = openid_configuration["jwks_uri"]
55+
with urllib.request.urlopen(jwks_uri) as response:
56+
if response.status == 200:
57+
jwks = json.loads(response.read())
58+
for jwk_key_as_dict in jwks["keys"]:
59+
jwk_key_as_string = json.dumps(jwk_key_as_dict)
60+
jwk_keys.append(
61+
jwcrypto.jwk.JWK.from_json(jwk_key_as_string),
62+
)
63+
64+
for jwk_key in jwk_keys:
65+
cwt_cose_key = cwt.COSEKey.from_pem(
66+
jwk_key.export_to_pem(),
67+
kid=jwk_key.thumbprint(),
68+
)
69+
cwt_cose_keys.append(cwt_cose_key)
70+
cwt_ec2_key_as_dict = cwt_cose_key.to_dict()
71+
pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict)
72+
pycose_cose_keys.append((cwt_cose_key, pycose_cose_key))
73+
74+
return pycose_cose_keys

scitt_emulator/scitt.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import pycose.headers
1515

1616
from scitt_emulator.create_statement import CWTClaims
17+
from scitt_emulator.verify_statement import verify_statement
1718

1819
# temporary receipt header labels, see draft-birkholz-scitt-receipts
1920
COSE_Headers_Service_Id = "service_id"
@@ -236,7 +237,10 @@ def _create_receipt(self, claim: bytes, entry_id: str):
236237
if CWTClaims not in msg.phdr:
237238
raise ClaimInvalidError("Claim does not have a CWTClaims header parameter")
238239

239-
# TODO Verify CWT
240+
msg = Sign1Message.decode(claim, tag=True)
241+
cwt_cose_key, _pycose_cose_key = verify_statement(msg)
242+
if not cwt_cose_key:
243+
raise ClaimInvalidError("Failed to verify signature on statement")
240244

241245
# Extract fields of COSE_Sign1 for countersigning
242246
outer = cbor2.loads(claim)

0 commit comments

Comments
 (0)