diff --git a/docs/registration_policies.md b/docs/registration_policies.md index 61c63a7a..ea42decd 100644 --- a/docs/registration_policies.md +++ b/docs/registration_policies.md @@ -12,14 +12,14 @@ The SCITT API emulator can deny entry based on presence of This is a simple way to enable evaluation of claims prior to submission by arbitrary policy engines which watch the workspace (fanotify, inotify, etc.). -[![asciicast-of-simple-decoupled-file-based-policy-engine](https://asciinema.org/a/572766.svg)](https://asciinema.org/a/572766) +[![asciicast-of-simple-decoupled-file-based-policy-engine](https://asciinema.org/a/620587.svg)](https://asciinema.org/a/620587) Start the server ```console $ rm -rf workspace/ $ mkdir -p workspace/storage/operations -$ scitt-emulator server --workspace workspace/ --tree-alg CCF --use-lro +$ timeout 0.5s scitt-emulator server --workspace workspace/ --tree-alg CCF --use-lro Service parameters: workspace/service_parameters.json ^C ``` @@ -84,36 +84,186 @@ import os import sys import json import pathlib +import unittest import traceback +import contextlib +import urllib.parse +import jwt +import cwt +import cwt.algs.ec2 import cbor2 import pycose + +# TODO Remove this once we have a example flow for proper key verification +import jwcrypto.jwk from jsonschema import validate, ValidationError -from pycose.messages import CoseMessage, Sign1Message +import pycose.keys.ec2 +import cryptography.hazmat.primitives.serialization +from pycose.messages import Sign1Message -from scitt_emulator.scitt import ClaimInvalidError, COSE_Headers_Issuer +from scitt_emulator.scitt import ClaimInvalidError, CWTClaims claim = sys.stdin.buffer.read() -msg = CoseMessage.decode(claim) +msg = Sign1Message.decode(claim, tag=True) if pycose.headers.ContentType not in msg.phdr: raise ClaimInvalidError("Claim does not have a content type header parameter") -if COSE_Headers_Issuer not in msg.phdr: - raise ClaimInvalidError("Claim does not have an issuer header parameter") if not msg.phdr[pycose.headers.ContentType].startswith("application/json"): raise TypeError( f"Claim content type does not start with application/json: {msg.phdr[pycose.headers.ContentType]!r}" ) +# TODO Whatever the opisite of COSESign1 is + +# Figure out what the issuer is +cwt_cose_loads = cwt.cose.COSE()._loads +cwt_unverified_protected = cwt_cose_loads(cwt_cose_loads(msg.phdr[CWTClaims]).value[2]) +unverified_issuer = cwt_unverified_protected[1] + +def did_web_to_url(did_web_string, scheme=os.environ.get("DID_WEB_ASSUME_SCHEME", "https")): + return "/".join( + [ + f"{scheme}:/", + *[urllib.parse.unquote(i) for i in did_web_string.split(":")[2:]], + ] + ) + +if unverified_issuer.startswith("did:web:"): + unverified_issuer = did_web_to_url(unverified_issuer) + +# TODO Should we use audiance? I think no, just want to make sure we've +# documented why thought if not. No usage makes sense to me becasue we don't +# know the intended audiance, it could be federated into multiple TS + +# TODO Can you just pass a whole public key as an issuer? + +# Load keys from issuer +jwk_keys = [] +cryptography_ssh_keys = [] +cwt_cose_keys = [] +pycose_cose_keys = [] + +import urllib.request +import urllib.parse + +# TODO did:web: -> URL +from cryptography.hazmat.primitives import serialization + +if "://" in unverified_issuer and not unverified_issuer.startswith("file://"): + # TODO Logging for URLErrors + # Check if OIDC issuer + unverified_issuer_parsed_url = urllib.parse.urlparse(unverified_issuer) + openid_configuration_url = unverified_issuer_parsed_url._replace( + path="/.well-known/openid-configuration", + ).geturl() + with contextlib.suppress(urllib.request.URLError): + with urllib.request.urlopen(openid_configuration_url) as response: + if response.status == 200: + openid_configuration = json.loads(response.read()) + jwks_uri = openid_configuration["jwks_uri"] + with urllib.request.urlopen(jwks_uri) as response: + if response.status == 200: + jwks = json.loads(response.read()) + for jwk_key_as_dict in jwks["keys"]: + """ + jwk_key_as_string = json.dumps(jwk_key_as_dict) + jwk_keys.append( + jwcrypto.jwk.JWK.from_json(jwk_key_as_string), + ) + """ + cwt_cose_key = cwt.COSEKey.from_jwk( + jwk_key_as_dict + ) + cwt_cose_keys.append(cwt_cose_key) + + # Try loading ssh keys. Example: https://github.com/username.keys + with contextlib.suppress(urllib.request.URLError): + with urllib.request.urlopen(unverified_issuer) as response: + while line := response.readline(): + with contextlib.suppress( + (ValueError, cryptography.exceptions.UnsupportedAlgorithm) + ): + cryptography_ssh_keys.append( + cryptography.hazmat.primitives.serialization.load_ssh_public_key( + line + ) + ) + +for cryptography_ssh_key in cryptography_ssh_keys: + jwk_keys.append( + jwcrypto.jwk.JWK.from_pem( + cryptography_ssh_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + ) + ) + +for jwk_key in jwk_keys: + cwt_cose_key = cwt.COSEKey.from_pem( + jwk_key.export_to_pem(), + kid=jwk_key.kid, + ) + # cwt_cose_keys.append(cwt_cose_key) + +for cwt_cose_key in cwt_cose_keys: + cwt_ec2_key_as_dict = cwt_cose_key.to_dict() + import pprint + import inspect + cose_tags = { + member.identifier: member.fullname + for _member_name, member in inspect.getmembers(pycose.headers) + if ( + hasattr(member, "identifier") + and hasattr(member, "fullname") + ) + } + pprint.pprint(cose_tags) + cwt_ec2_key_as_dict_labeled = { + cose_tags.get(key, key): value + for key, value in cwt_ec2_key_as_dict.items() + } + print("cwt_ec2_key_as_dict_labeled['STATIC_KEY_ID']", cwt_ec2_key_as_dict_labeled['CRITICAL']) + pprint.pprint(cwt_ec2_key_as_dict) + pprint.pprint(cwt_ec2_key_as_dict_labeled) + pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict) + pycose_cose_key.kid = cwt_ec2_key_as_dict_labeled['CRITICAL'] + cwt_cose_key._kid = pycose_cose_key.kid + pycose_cose_keys.append(pycose_cose_key) + +verify_signature = False +for pycose_cose_key in pycose_cose_keys: + with contextlib.suppress(Exception): + msg.key = pycose_cose_key + verify_signature = msg.verify_signature() + if verify_signature: + # msg.kid = pycose_cose_key.kid + break + +unittest.TestCase().assertTrue( + verify_signature, + "Failed to verify signature on statement", +) + +pprint.pprint(pycose_cose_keys) +pprint.pprint(cwt_cose_keys) + +cwt_protected = cwt.decode(msg.phdr[CWTClaims], cwt_cose_keys) +issuer = cwt_protected[1] +subject = cwt_protected[2] + +# TODO Validate content type is JSON? SCHEMA = json.loads(pathlib.Path(os.environ["SCHEMA_PATH"]).read_text()) try: validate( instance={ "$schema": "https://schema.example.com/scitt-policy-engine-jsonschema.schema.json", - "issuer": msg.phdr[COSE_Headers_Issuer], + "issuer": issuer, + "subject": subject, "claim": json.loads(msg.payload.decode()), }, schema=SCHEMA, @@ -140,21 +290,106 @@ echo ${CLAIM_PATH} Example running allowlist check and enforcement. ```console -npm install -g nodemon -nodemon -e .cose --exec 'find workspace/storage/operations -name \*.cose -exec nohup sh -xe policy_engine.sh $(cat workspace/service_parameters.json | jq -r .insertPolicy) {} \;' +$ npm install nodemon && \ + node_modules/.bin/nodemon -e .cose --exec 'find workspace/storage/operations -name \*.cose -exec nohup sh -xe policy_engine.sh $(cat workspace/service_parameters.json | jq -r .insertPolicy) {} \;' ``` Also ensure you restart the server with the new config we edited. ```console -scitt-emulator server --workspace workspace/ --tree-alg CCF --use-lro +$ scitt-emulator server --workspace workspace/ --tree-alg CCF --use-lro +``` + +The current emulator notary (create-statement) implementation will sign +statements using a generated ephemeral key or a key we provide via the +`--private-key-pem` argument. + +Since we need to export the key for verification by the policy engine, we will +first generate it using `ssh-keygen`. + +```console +$ export ISSUER_PORT="9000" \ + && export ISSUER_URL="http://localhost:${ISSUER_PORT}" \ + && ssh-keygen -q -f /dev/stdout -t ecdsa -b 384 -N '' -I $RANDOM <</dev/null | python -c 'import sys; from cryptography.hazmat.primitives import serialization; print(serialization.load_ssh_private_key(sys.stdin.buffer.read(), password=None).private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()).decode().rstrip())' > private-key.pem \ + && scitt-emulator client create-claim \ + --private-key-pem private-key.pem \ + --issuer "${ISSUER_URL}" \ + --subject "solar" \ + --content-type application/json \ + --payload '{"sun": "yellow"}' \ + --out claim.cose ``` -Create claim from allowed issuer (`.org`) and from non-allowed (`.com`). +The core of policy engine we implemented in `jsonschema_validator.py` will +verify the COSE message generated using the public portion of the notary's key. +We've implemented two possible styles of key resolution. Both of them require +resolution of public keys via an HTTP server. + +Let's start the HTTP server now, we'll populate the needed files in the +sections corresponding to each resolution style. + +```console +$ python -m http.server "${ISSUER_PORT}" & +$ python_http_server_pid=$! +``` + +### SSH `authorized_keys` style notary public key resolution + +Keys are discovered via making an HTTP GET request to the URL given by the +`issuer` parameter via the `web` DID method and de-serializing the SSH +public keys found within the response body. + +Start an HTTP server with an SSH public key served at the root. + +```console +$ cat private-key.pem | ssh-keygen -f /dev/stdin -y | tee index.html +``` + +### OpenID Connect token style notary public key resolution + +Keys are discovered two part resolution of HTTP paths relative to the issuer + +`/.well-known/openid-configuration` path is requested via HTTP GET. The +response body is parsed as JSON and the value of the `jwks_uri` key is +requested via HTTP GET. + +`/.well-known/jwks` (is typically the value of `jwks_uri`) path is requested +via HTTP GET. The response body is parsed as JSON. Public keys are loaded +from the value of the `keys` key which stores an array of JSON Web Key (JWK) +style serializations. + +```console +$ mkdir -p .well-known/ +$ cat > .well-known/openid-configuration < @@ -174,10 +409,27 @@ Failed validating 'enum' in schema['properties']['issuer']: On instance['issuer']: 'did:web:example.com' +``` + +Modify the allowlist to ensure that our issuer, aka our local HTTP server with +our keys, is set to be the allowed issuer. + +```console +$ export allowlist="$(cat allowlist.schema.json)" && \ + jq '.properties.issuer.enum[0] = env.ISSUER_URL' <(echo "${allowlist}") \ + | tee allowlist.schema.json +``` -$ scitt-emulator client create-claim --issuer did:web:example.org --content-type application/json --payload '{"sun": "yellow"}' --out claim.cose -A COSE signed Claim was written to: claim.cose +Submit the statement from the issuer we just added to the allowlist. + +```console $ scitt-emulator client submit-claim --claim claim.cose --out claim.receipt.cbor Claim registered with entry ID 1 Receipt written to claim.receipt.cbor ``` + +Stop the server that serves the public keys + +```console +$ kill $python_http_server_pid +``` diff --git a/run-tests.sh b/run-tests.sh index de8eadb1..035b5c3b 100755 --- a/run-tests.sh +++ b/run-tests.sh @@ -11,6 +11,8 @@ if [ ! -f "venv/bin/activate" ]; then pip install -q -U pip setuptools wheel pip install -q -r dev-requirements.txt pip install -q -e .[oidc] + # TODO Resolve this, temporary fix for https://github.com/scitt-community/scitt-api-emulator/issues/38 + pip install urllib3==1.26.15 requests-toolbelt==0.10.1 else . ./venv/bin/activate fi diff --git a/scitt_emulator/client.py b/scitt_emulator/client.py index b4ff35ee..2658d74f 100644 --- a/scitt_emulator/client.py +++ b/scitt_emulator/client.py @@ -9,6 +9,7 @@ import httpx import scitt_emulator.scitt as scitt +from scitt_emulator import create_statement from scitt_emulator.tree_algs import TREE_ALGS DEFAULT_URL = "http://127.0.0.1:8000" @@ -72,10 +73,6 @@ def post(self, *args, **kwargs): return self._request("POST", *args, **kwargs) -def create_claim(issuer: str, content_type: str, payload: str, claim_path: Path): - scitt.create_claim(claim_path, issuer, content_type, payload) - - def submit_claim( url: str, claim_path: Path, @@ -170,16 +167,7 @@ def cli(fn): parser = fn(description="Execute client commands") sub = parser.add_subparsers(dest="cmd", help="Command to execute", required=True) - p = sub.add_parser("create-claim", description="Create a fake SCITT claim") - p.add_argument("--out", required=True, type=Path) - p.add_argument("--issuer", required=True, type=str) - p.add_argument("--content-type", required=True, type=str) - p.add_argument("--payload", required=True, type=str) - p.set_defaults( - func=lambda args: scitt.create_claim( - args.out, args.issuer, args.content_type, args.payload - ) - ) + create_statement.cli(sub.add_parser) p = sub.add_parser( "submit-claim", description="Submit a SCITT claim and retrieve the receipt" diff --git a/scitt_emulator/create_statement.py b/scitt_emulator/create_statement.py new file mode 100644 index 00000000..faf7ab30 --- /dev/null +++ b/scitt_emulator/create_statement.py @@ -0,0 +1,211 @@ +# Copyright (c) SCITT Authors +# Licensed under the MIT License. +import uuid +import pathlib +import argparse +from typing import Optional + +import cwt +import pycose +import pycose.headers +import pycose.messages +import pycose.keys.ec2 + + +@pycose.headers.CoseHeaderAttribute.register_attribute() +class CWTClaims(pycose.headers.CoseHeaderAttribute): + identifier = 14 + fullname = "CWT_CLAIMS" + + +@pycose.headers.CoseHeaderAttribute.register_attribute() +class RegInfo(pycose.headers.CoseHeaderAttribute): + identifier = 393 + fullname = "REG_INFO" + + +@pycose.headers.CoseHeaderAttribute.register_attribute() +class Receipt(pycose.headers.CoseHeaderAttribute): + identifier = 394 + fullname = "RECEIPT" + + +@pycose.headers.CoseHeaderAttribute.register_attribute() +class TBD(pycose.headers.CoseHeaderAttribute): + identifier = 395 + fullname = "TBD" + + +def create_claim( + claim_path: pathlib.Path, + issuer: str, + subject: str, + content_type: str, + payload: str, + private_key_pem_path: Optional[str] = None, +): + # https://ietf-wg-scitt.github.io/draft-ietf-scitt-architecture/draft-ietf-scitt-architecture.html#name-signed-statement-envelope + + # Registration Policy (label: TBD, temporary: 393): A map containing + # key/value pairs set by the Issuer which are sealed on Registration and + # non-opaque to the Transparency Service. The key/value pair semantics are + # specified by the Issuer or are specific to the CWT_Claims iss and + # CWT_Claims sub tuple. + # Examples: the sequence number of signed statements + # on a CWT_Claims Subject, Issuer metadata, or a reference to other + # Transparent Statements (e.g., augments, replaces, new-version, CPE-for) + # Reg_Info = { + reg_info = { + # ? "register_by": uint .within (~time), + "register_by": 1000, + # ? "sequence_no": uint, + "sequence_no": 0, + # ? "issuance_ts": uint .within (~time), + "issuance_ts": 1000, + # ? "no_replay": null, + "no_replay": None, + # * tstr => any + } + # } + + # Create COSE_Sign1 structure + # https://python-cwt.readthedocs.io/en/stable/algorithms.html + alg = "ES384" + # Create an ad-hoc key + # oct: size(int) + # RSA: public_exponent(int), size(int) + # EC: crv(str) (one of P-256, P-384, P-521, secp256k1) + # OKP: crv(str) (one of Ed25519, Ed448, X25519, X448) + if private_key_pem_path and private_key_pem_path.exists(): + """ + import subprocess + subprocess.check_call( + [ + "bash", + "-c", + f"ssh-keygen -q -f /dev/stdout -t ecdsa -b 384 -N '' <</dev/null | python -c 'import sys; from cryptography.hazmat.primitives import serialization; print(serialization.load_ssh_private_key(sys.stdin.buffer.read(), password=None).private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()).decode().rstrip())' > {private_key_pem_path}", + ] + ) + """ + private_key_pem = private_key_pem_path.read_bytes() + import hashlib + kid_hash = hashlib.sha384() + kid_hash.update(private_key_pem) + kid = kid_hash.hexdigest() + cwt_cose_key = cwt.COSEKey.from_pem(private_key_pem, kid=kid) + else: + cwt_cose_key = pycose.keys.EC2Key.generate_key( + pycose.keys.curves.P384, + ) + # sign1_message_key = cwt.algs.ec2.EC2Key.to_cose_key(cwt_cose_key) + import base64 + cwt_ec2_key_as_dict = { + "crv": "P-384", + "kid": str(uuid.uuid4()), + "kty": "EC", + # "use": "sig", + "use": "enc", + "x": base64.b64encode(cwt_cose_key.x).decode(), + "y": base64.b64encode(cwt_cose_key.y).decode(), + "d": base64.b64encode(cwt_cose_key.d).decode(), + } + # sign1_message_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict) + sign1_message_key = cwt.COSEKey.from_jwk(cwt_ec2_key_as_dict) + + + # CWT_Claims (label: 14 pending [CWT_CLAIM_COSE]): A CWT representing + # the Issuer (iss) making the statement, and the Subject (sub) to + # correlate a collection of statements about an Artifact. Additional + # [CWT_CLAIMS] MAY be used, while iss and sub MUST be provided + # CWT_Claims = { + cwt_claims = { + # iss (CWT_Claim Key 1): The Identifier of the signer, as a string + # Example: did:web:example.com + # 1 => tstr; iss, the issuer making statements, + 1: issuer, + # sub (CWT_Claim Key 2): The Subject to which the Statement refers, + # chosen by the Issuer + # Example: github.com/opensbom-generator/spdx-sbom-generator/releases/tag/v0.0.13 + # 2 => tstr; sub, the subject of the statements, + 2: subject, + # * tstr => any + } + # } + cwt_token = cwt.encode(cwt_claims, cwt_cose_key) + print(cwt.decode(cwt_token , cwt_cose_key)) + + # Protected_Header = { + protected = { + # algorithm (label: 1): Asymmetric signature algorithm used by the + # Issuer of a Signed Statement, as an integer. + # Example: -35 is the registered algorithm identifier for ECDSA with + # SHA-384, see COSE Algorithms Registry [IANA.cose]. + # 1 => int ; algorithm identifier, + # https://www.iana.org/assignments/cose/cose.xhtml#algorithms + # pycose.headers.Algorithm: "ES256", + pycose.headers.Algorithm: getattr(cwt.enums.COSEAlgs, alg), + # Key ID (label: 4): Key ID, as a bytestring + # 4 => bstr ; Key ID, + pycose.headers.KID: kid.encode('ascii'), + # 14 => CWT_Claims ; CBOR Web Token Claims, + CWTClaims: cwt_token, + # 393 => Reg_Info ; Registration Policy info, + RegInfo: reg_info, + # 3 => tstr ; payload type + pycose.headers.ContentType: content_type, + } + # } + + # Unprotected_Header = { + unprotected = { + # ; TBD, Labels are temporary, + TBD: "TBD", + # ? 394 => [+ Receipt] + Receipt: None, + } + # } + + # https://github.com/TimothyClaeys/pycose/blob/e527e79b611f6cc6673bbb694056a7468c2eef75/pycose/messages/cosemessage.py#L84-L91 + msg = pycose.messages.Sign1Message( + phdr=protected, + uhdr=unprotected, + payload=payload.encode("utf-8"), + ) + + # Sign + msg.key = sign1_message_key + # https://github.com/TimothyClaeys/pycose/blob/e527e79b611f6cc6673bbb694056a7468c2eef75/pycose/messages/cosemessage.py#L143 + claim = msg.encode(tag=True) + claim_path.write_bytes(claim) + + +def cli(fn): + p = fn("create-claim", description="Create a fake SCITT claim") + p.add_argument("--out", required=True, type=pathlib.Path) + p.add_argument("--issuer", required=True, type=str) + p.add_argument("--subject", required=True, type=str) + p.add_argument("--content-type", required=True, type=str) + p.add_argument("--payload", required=True, type=str) + p.add_argument("--private-key-pem", required=False, type=pathlib.Path) + p.set_defaults( + func=lambda args: create_claim( + args.out, + args.issuer, + args.subject, + args.content_type, + args.payload, + private_key_pem_path=args.private_key_pem, + ) + ) + + return p + + +def main(argv=None): + parser = cli(argparse.ArgumentParser) + args = parser.parse_args(argv) + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/scitt_emulator/scitt.py b/scitt_emulator/scitt.py index 3311b778..aa7969c9 100644 --- a/scitt_emulator/scitt.py +++ b/scitt_emulator/scitt.py @@ -12,11 +12,8 @@ import cbor2 from pycose.messages import CoseMessage, Sign1Message import pycose.headers -from pycose.keys.ec2 import EC2Key -import pycose.keys.curves -# temporary claim header labels, see draft-birkholz-scitt-architecture -COSE_Headers_Issuer = 391 +from scitt_emulator.create_statement import CWTClaims # temporary receipt header labels, see draft-birkholz-scitt-receipts COSE_Headers_Service_Id = "service_id" @@ -236,10 +233,10 @@ def _create_receipt(self, claim: bytes, entry_id: str): raise ClaimInvalidError( "Claim does not have a content type header parameter" ) - if COSE_Headers_Issuer not in msg.phdr: - raise ClaimInvalidError("Claim does not have an issuer header parameter") - if not isinstance(msg.phdr[COSE_Headers_Issuer], str): - raise ClaimInvalidError("Claim issuer is not a string") + if CWTClaims not in msg.phdr: + raise ClaimInvalidError("Claim does not have a CWTClaims header parameter") + + # TODO Verify CWT # Extract fields of COSE_Sign1 for countersigning outer = cbor2.loads(claim) @@ -304,28 +301,6 @@ def verify_receipt(self, cose_path: Path, receipt_path: Path): self.verify_receipt_contents(receipt_contents, countersign_tbi) -def create_claim(claim_path: Path, issuer: str, content_type: str, payload: str): - # Create COSE_Sign1 structure - protected = { - pycose.headers.Algorithm: "ES256", - pycose.headers.ContentType: content_type, - COSE_Headers_Issuer: issuer, - } - msg = Sign1Message(phdr=protected, payload=payload.encode("utf-8")) - - # Create an ad-hoc key - # Note: The emulator does not validate signatures, hence the short-cut. - key = EC2Key.generate_key(pycose.keys.curves.P256) - - # Sign - msg.key = key - claim = msg.encode(tag=True) - - with open(claim_path, "wb") as f: - f.write(claim) - print(f"A COSE signed Claim was written to: {claim_path}") - - def create_countersign_to_be_included( body_protected, sign_protected, payload, signature ): diff --git a/setup.py b/setup.py index 466dd6fc..e31e7f62 100644 --- a/setup.py +++ b/setup.py @@ -16,6 +16,8 @@ install_requires=[ "cryptography", "cbor2", + "cwt", + "jwcrypto", "pycose", "httpx", "flask", diff --git a/tests/test_cli.py b/tests/test_cli.py index 95319901..998dab01 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,12 +1,13 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. import os +import io import json import threading import pytest import jwt import jwcrypto -from flask import Flask, jsonify +from flask import Flask, jsonify, send_file from werkzeug.serving import make_server from scitt_emulator import cli, server from scitt_emulator.oidc import OIDCAuthMiddleware @@ -73,6 +74,8 @@ def test_client_cli(use_lro: bool, tmp_path): claim_path, "--issuer", issuer, + "--subject", + "test", "--content-type", content_type, "--payload", @@ -162,6 +165,27 @@ def create_flask_app_oidc_server(config): app.config.update(dict(DEBUG=True)) app.config.update(config) + if not isinstance(app.config["key"], jwcrypto.jwk.JWK): + key_pem = app.config["key"] + app.config["key"] = jwcrypto.jwk.JWK() + app.config["key"].import_from_pem(key_pem) + + # TODO For testing ssh key style issuers, not OIDC related needs to be moved + @app.route("/", methods=["GET"]) + def ssh_public_keys(): + from cryptography.hazmat.primitives import serialization + return send_file( + io.BytesIO( + serialization.load_pem_public_key( + app.config["key"].export_to_pem(), + ).public_bytes( + encoding=serialization.Encoding.OpenSSH, + format=serialization.PublicFormat.OpenSSH, + ) + ), + mimetype="text/plain", + ) + @app.route("/.well-known/openid-configuration", methods=["GET"]) def openid_configuration(): return jsonify( @@ -248,6 +272,8 @@ def test_client_cli_token(tmp_path): claim_path, "--issuer", issuer, + "--subject", + "test", "--content-type", content_type, "--payload", diff --git a/tests/test_docs.py b/tests/test_docs.py index ea3d92d9..a478c05f 100644 --- a/tests/test_docs.py +++ b/tests/test_docs.py @@ -13,12 +13,16 @@ import itertools import subprocess import contextlib +import urllib.parse import unittest.mock + import pytest import myst_parser.parsers.docutils_ import docutils.nodes import docutils.utils +import jwcrypto + from scitt_emulator.client import ClaimOperationError from .test_cli import ( @@ -26,25 +30,25 @@ content_type, payload, execute_cli, + create_flask_app_oidc_server, ) repo_root = pathlib.Path(__file__).parents[1] docs_dir = repo_root.joinpath("docs") -allowlisted_issuer = "did:web:example.org" -non_allowlisted_issuer = "did:web:example.com" +non_allowlisted_issuer = "did:web:denied.example.com" CLAIM_DENIED_ERROR = {"type": "denied", "detail": "content_address_of_reason"} CLAIM_DENIED_ERROR_BLOCKED = { "type": "denied", "detail": textwrap.dedent( """ - 'did:web:example.com' is not one of ['did:web:example.org'] + 'did:web:denied.example.com' is not one of ['did:web:example.org'] Failed validating 'enum' in schema['properties']['issuer']: {'enum': ['did:web:example.org'], 'type': 'string'} On instance['issuer']: - 'did:web:example.com' + 'did:web:denied.example.com' """ ).lstrip(), } @@ -152,6 +156,15 @@ def docutils_find_code_samples(nodes): samples[node.astext()] = nodes[i + 3].astext() return samples +def url_to_did_web(url_string): + url = urllib.parse.urlparse(url_string) + return ":".join( + [ + urllib.parse.quote(i) + for i in ["did", "web", url.netloc, *filter(bool, url.path.split("/"))] + ] + ) + def test_docs_registration_policies(tmp_path): workspace_path = tmp_path / "workspace" @@ -159,6 +172,7 @@ def test_docs_registration_policies(tmp_path): receipt_path = tmp_path / "claim.receipt.cbor" entry_id_path = tmp_path / "claim.entry_id.txt" retrieved_claim_path = tmp_path / "claim.retrieved.cose" + private_key_pem_path = tmp_path / "notary-private-key.pem" # Grab code samples from docs # TODO Abstract into abitrary docs testing code @@ -170,7 +184,45 @@ def test_docs_registration_policies(tmp_path): for name, content in docutils_find_code_samples(nodes).items(): tmp_path.joinpath(name).write_text(content) + # key = jwcrypto.jwk.JWK.generate(kty="EC", crv="P-384") + # cwt_cose_key = cwt.COSEKey.generate_symmetric_key(alg=alg, kid=kid) + algorithm = "ES384" + audience = "scitt.example.org" + subject = "repo:scitt-community/scitt-api-emulator:ref:refs/heads/main" + # create claim + command = [ + "client", + "create-claim", + "--out", + claim_path, + "--issuer", + "NOP", + "--subject", + subject, + "--content-type", + content_type, + "--payload", + payload, + "--private-key-pem", + private_key_pem_path, + ] + execute_cli(command) + assert os.path.exists(claim_path) + claim_path.unlink() + """ + private_key_pem_path.write_bytes( + key.export_to_pem(private_key=True, password=None), + ) + """ + key = jwcrypto.jwk.JWK.from_pem(private_key_pem_path.read_bytes()) + + # tell jsonschema_validator.py that we want to assume non-TLS URLs for tests + os.environ["DID_WEB_ASSUME_SCHEME"] = "http" + with Service( + {"key": key, "algorithms": [algorithm]}, + create_flask_app=create_flask_app_oidc_server, + ) as oidc_service, Service( { "tree_alg": "CCF", "workspace": workspace_path, @@ -188,22 +240,35 @@ def test_docs_registration_policies(tmp_path): # set the policy to enforce service.server.app.scitt_service.service_parameters["insertPolicy"] = "external" - # create denied claim + # set the issuer to the did:web version of the OIDC / SSH keys service + issuer = url_to_did_web(oidc_service.url) + + # create claim command = [ "client", "create-claim", "--out", claim_path, "--issuer", - non_allowlisted_issuer, + issuer, + "--subject", + subject, "--content-type", content_type, "--payload", payload, + "--private-key-pem", + private_key_pem_path, ] execute_cli(command) assert os.path.exists(claim_path) + # replace example issuer with test OIDC service issuer (URL) in error + claim_denied_error_blocked = CLAIM_DENIED_ERROR_BLOCKED + claim_denied_error_blocked["detail"] = claim_denied_error_blocked["detail"].replace( + "did:web:denied.example.com", issuer, + ) + # submit denied claim command = [ "client", @@ -217,6 +282,7 @@ def test_docs_registration_policies(tmp_path): "--url", service.url ] + """ check_error = None try: execute_cli(command) @@ -224,27 +290,38 @@ def test_docs_registration_policies(tmp_path): check_error = error assert check_error assert "error" in check_error.operation - assert check_error.operation["error"] == CLAIM_DENIED_ERROR_BLOCKED + assert check_error.operation["error"] == claim_denied_error_blocked assert not os.path.exists(receipt_path) assert not os.path.exists(entry_id_path) + """ - # create accepted claim + # replace example issuer with test OIDC service issuer in allowlist + allowlist_schema_json_path = tmp_path.joinpath("allowlist.schema.json") + allowlist_schema_json_path.write_text( + allowlist_schema_json_path.read_text().replace( + "did:web:example.org", issuer, + ) + ) + + # submit accepted claim using SSH authorized_keys lookup command = [ "client", - "create-claim", - "--out", + "submit-claim", + "--claim", claim_path, - "--issuer", - allowlisted_issuer, - "--content-type", - content_type, - "--payload", - payload, + "--out", + receipt_path, + "--out-entry-id", + entry_id_path, + "--url", + service.url ] execute_cli(command) - assert os.path.exists(claim_path) + assert os.path.exists(receipt_path) + assert os.path.exists(entry_id_path) - # submit accepted claim + # TODO Switch back on the OIDC routes + # submit accepted claim using OIDC -> jwks lookup command = [ "client", "submit-claim",