diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index adc3d2e..36f4042 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -20,7 +20,7 @@ jobs: sudo apt install -y --no-install-recommends \ curl git build-essential \ libgtk-4-dev gettext libdbus-1-dev libssl-dev libudev-dev \ - libxml2-utils desktop-file-utils \ + libxml2-utils blueprint-compiler desktop-file-utils \ python3-pip ninja-build libnfc-dev libpcsclite-dev - name: Install Meson run: | diff --git a/demo_client/gui.py b/demo_client/gui.py new file mode 100755 index 0000000..dd84e5f --- /dev/null +++ b/demo_client/gui.py @@ -0,0 +1,429 @@ +#!/usr/bin/env python3 +from contextlib import closing +import json +import math +import os +from pathlib import Path +from pprint import pprint +import secrets +import sqlite3 +import sys +import time +from typing import Optional +import uuid + +from dbus_next.glib import MessageBus, ProxyInterface +from dbus_next import DBusError, Message, MessageType, Variant + +import gi + +gi.require_version("Gtk", "4.0") +gi.require_version("Adw", "1") +from gi.repository import Gio, GObject, Gtk, Adw # noqa: E402 + +import webauthn # noqa: E402 +import util # noqa: E402 + + +def dbus_error_from_message(msg: Message): + assert msg.message_type == MessageType.ERROR + return DBusError(msg.error_name, msg.body[0] if msg.body else None, reply=msg) + + +DBusError._from_message = dbus_error_from_message + +INTERFACE = None +DB = None +KEY = None + +RESOURCE_FILE = Gio.Resource.load( + f"{os.path.dirname(os.path.realpath(__file__))}/resources.gresource" +) +Gio.resources_register(RESOURCE_FILE) + + +@Gtk.Template(resource_path="/xyz/iinuwa/credentialsd/DemoCredentialsUi/window.ui") +class MainWindow(Gtk.ApplicationWindow): + __gtype_name__ = "MyAppWindow" + + username = Gtk.Template.Child() + make_credential_btn = Gtk.Template.Child() + get_assertion_btn = Gtk.Template.Child() + uv_pref_dropdown = Gtk.Template.Child() + discoverable_cred_pref_dropdown = Gtk.Template.Child() + rp_id = "example.com" + origin = "https://example.com" + interface = None + + def on_activate(self, app): + # Create a Builder + builder = Gtk.Builder() + builder.add_from_file("build/window.ui") + # Obtain and show the main window + self.win = builder.get_object("main_window") + self.win.set_application( + self + ) # Application will close once it no longer has active windows attached to it + self.win.present() + + @Gtk.Template.Callback() + def on_register(self, *args): + print("register clicked") + now = math.floor(time.time()) + cur = DB.cursor() + username = self.username.get_text() + cur.execute( + "select user_id, user_handle from users where username = ?", (username,) + ) + if row := cur.fetchone(): + user_id = row[0] + user_handle = row[1] + print(f"user found for {username}: ") + else: + user_handle = secrets.token_bytes(16) + user_id = None + print( + f"user created for {username}: " + ) + options = self._get_registration_options(user_handle, username) + print(f"registration options: {options}") + auth_data = create_passkey(INTERFACE, self.origin, self.origin, options) + if not user_id: + cur.execute( + "insert into users (username, user_handle, created_time) values (?, ?, ?)", + (username, user_handle, now), + ) + user_id = cur.lastrowid + params = { + "user_handle": user_handle, + "cred_id": auth_data.cred_id, + "aaguid": str(uuid.UUID(bytes=bytes(auth_data.aaguid))), + "sign_count": None if auth_data.sign_count == 0 else auth_data.sign_count, + "backup_eligible": 1 if "BE" in auth_data.flags else 0, + "backup_state": 1 if "BS" in auth_data.flags else 0, + "uv_initialized": 1 if "UV" in auth_data.flags else 0, + "cose_pub_key": auth_data.pub_key_bytes, + "created_time": now, + } + + add_passkey_sql = """ + insert into user_passkeys + (user_handle, cred_id, aaguid, sign_count, backup_eligible, backup_state, uv_initialized, cose_pub_key, created_time) + values + (:user_handle, :cred_id, :aaguid, :sign_count, :backup_eligible, :backup_state, :uv_initialized, :cose_pub_key, :created_time) + """ + cur.execute(add_passkey_sql, params) + print("Added passkey") + DB.commit() + cur.close() + + @Gtk.Template.Callback() + def on_authenticate(self, *args): + username = self.username.get_text() + if username: + print(f"Using username-flow: {username}") + sql = """ + select p.user_handle, cred_id, backup_eligible, backup_state, cose_pub_key, sign_count + from user_passkeys p + inner join users u on u.user_handle = p.user_handle + where u.username = ? + """ + with closing(DB.cursor()) as cur: + cur.execute(sql, (username,)) + user_creds = [] + for row in cur.fetchall(): + [ + user_handle, + cred_id, + backup_eligible, + backup_state, + pub_key, + sign_count, + ] = row + user_cred = { + "user_handle": user_handle, + "cred_id": cred_id, + "backup_eligible": backup_eligible, + "backup_state": backup_state, + "pub_key": pub_key, + "sign_count": sign_count, + } + user_creds.append(user_cred) + cred_ids = [c["cred_id"] for c in user_creds] + else: + print("using username-less flow") + cred_ids = [] + + options = self._get_authentication_options(cred_ids) + print(f"authenticate clicked: {options}") + + def retrieve_user_cred( + user_handle: Optional[bytes], cred_id: bytes + ) -> Optional[dict]: + with closing(DB.cursor()) as cur: + if username: + print("using cached user creds") + return next( + ( + u + for u in user_creds + if u["cred_id"] == cred_id + and (user_handle is None or user_handle == u["user_handle"]) + ), + None, + ) + else: + if not user_handle: + print("No user handle given, cannot look up user") + return None + sql = """ + select user_handle, cred_id, backup_eligible, backup_state, cose_pub_key, sign_count + from user_passkeys + where user_handle = ? and cred_id = ? + """ + cur.execute(sql, (user_handle, cred_id)) + if row := cur.fetchone(): + [ + user_handle, + cred_id, + backup_eligible, + backup_state, + pub_key, + sign_count, + ] = row + user_cred = { + "user_handle": user_handle, + "cred_id": cred_id, + "backup_eligible": backup_eligible, + "backup_state": backup_state, + "pub_key": pub_key, + "sign_count": sign_count, + } + return user_cred + else: + return None + + auth_data = get_passkey( + INTERFACE, + self.origin, + self.origin, + self.rp_id, + cred_ids, + retrieve_user_cred, + ) + print("Received passkey:") + pprint(auth_data) + + @GObject.Property(type=Gtk.StringList) + def uv_pref(self): + model = Gtk.StringList() + for o in ["preferred", "required", "discouraged"]: + model.append(o) + return model + + @GObject.Property(type=Gtk.StringList) + def discoverable_cred_pref(self): + model = Gtk.StringList() + for o in ["preferred", "required", "discouraged"]: + model.append(o) + return model + + def _get_registration_options(self, user_handle: bytes, username: str): + username = self.username.get_text() + user_verification = self.uv_pref_dropdown.get_selected_item().get_string() + resident_key = ( + self.discoverable_cred_pref_dropdown.get_selected_item().get_string() + ) + options = { + "challenge": util.b64_encode(secrets.token_bytes(16)), + "rp": { + "name": "Example Org", + "id": self.rp_id, + }, + "user": { + "id": util.b64_encode(user_handle), + "name": username, + "displayName": username, + }, + "pubKeyCredParams": [ + {"type": "public-key", "alg": -7}, + {"type": "public-key", "alg": -257}, + {"type": "public-key", "alg": -8}, + ], + "userVerification": user_verification, + "authenticatorSelection": { + "residentKey": resident_key, + }, + } + + return options + + def _get_authentication_options(self, cred_ids): + options = { + "challenge": util.b64_encode(secrets.token_bytes(16)), + "rpId": self.rp_id, + "allowCredentials": [ + {"type": "public-key", "id": util.b64_encode(cred_id)} + for cred_id in cred_ids + ], + } + return options + + +class MyApp(Adw.Application): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.connect("activate", self.on_activate) + + def on_activate(self, app): + self.win = MainWindow(application=app) + self.win.present() + + +def create_passkey( + interface: ProxyInterface, origin: str, top_origin: str, options: dict +) -> webauthn.AuthenticatorData: + is_same_origin = origin == top_origin + print( + f"Sending {'same' if is_same_origin else 'cross'}-origin request for {origin} using options:" + ) + # pprint(options) + print() + + req_json = json.dumps(options) + req = { + "type": Variant("s", "publicKey"), + "origin": Variant("s", origin), + "is_same_origin": Variant("b", is_same_origin), + "publicKey": Variant("a{sv}", {"request_json": Variant("s", req_json)}), + } + + rsp = interface.call_create_credential_sync(req) + + # print("Received response") + # pprint(rsp) + if rsp["type"].value != "public-key": + raise Exception( + f"Invalid credential type received: expected 'public-key', received {rsp['type'.value]}" + ) + + response_json = json.loads( + rsp["public_key"].value["registration_response_json"].value + ) + return webauthn.verify_create_response(response_json, options, origin) + + +def get_passkey(interface, origin, top_origin, rp_id, cred_ids, cred_lookup_fn): + is_same_origin = origin == top_origin + options = { + "challenge": util.b64_encode(secrets.token_bytes(16)), + "rpId": rp_id, + "allowCredentials": [ + {"type": "public-key", "id": util.b64_encode(c)} for c in cred_ids + ], + } + + print( + f"Sending {'same' if is_same_origin else 'cross'}-origin request for {origin} using options:" + ) + # pprint(options) + print() + + req_json = json.dumps(options) + req = { + "type": Variant("s", "publicKey"), + "origin": Variant("s", origin), + "is_same_origin": Variant("b", is_same_origin), + "publicKey": Variant("a{sv}", {"request_json": Variant("s", req_json)}), + } + + rsp = interface.call_get_credential_sync(req) + # print("Received response") + # pprint(rsp) + if rsp["type"].value != "public-key": + raise Exception( + f"Invalid credential type received: expected 'public-key', received {rsp['type'.value]}" + ) + + response_json = json.loads( + rsp["public_key"].value["authentication_response_json"].value + ) + response_json["rawId"] = util.b64_decode(response_json["rawId"]) + if user_handle := response_json["response"].get("userHandle"): + response_json["response"]["userHandle"] = util.b64_decode(user_handle) + + return webauthn.verify_get_response(response_json, options, origin, cred_lookup_fn) + + +def connect_to_bus(): + global INTERFACE + bus = MessageBus().connect_sync() + + with open( + f"{os.path.dirname(os.path.realpath(__file__))}/xyz.iinuwa.credentialsd.Credentials.xml", + "r", + ) as f: + introspection = f.read() + + proxy_object = bus.get_proxy_object( + "xyz.iinuwa.credentialsd.Credentials", + "/xyz/iinuwa/credentialsd/Credentials", + introspection, + ) + INTERFACE = proxy_object.get_interface("xyz.iinuwa.credentialsd.Credentials1") + + +def setup_db(): + global DB + # This is just for testing/temporary use, so put it in cache + db_path = ( + Path(os.environ.get("XDG_CACHE_HOME", Path.home() / ".cache")) + / "xyz.iinuwa.credentialsd.DemoCredentialsUi" + / "users.db" + ) + db_path.parent.mkdir(exist_ok=True) + + DB = sqlite3.connect(db_path) + DB.execute("pragma foreign_keys = on") + user_table_sql = """ + create table if not exists users ( + user_id integer primary key autoincrement + , username text not null + , user_handle blob unique not null + , created_time integer not null + ) + strict + """ + passkey_table_sql = """ + create table if not exists user_passkeys ( + user_handle blob + , cred_id blob + , aaguid text not null + , sign_count integer null + , backup_eligible integer not null + , backup_state integer not null + , uv_initialized integer not null + , cose_pub_key blob not null + , created_time integer not null + , updated_time integer + , primary key (user_handle, cred_id) + ) + strict + """ + cur = DB.cursor() + cur.execute(user_table_sql) + cur.execute(passkey_table_sql) + cur.close() + + +def main(): + connect_to_bus() + setup_db() + + app = MyApp(application_id="xyz.iinuwa.credentialsd.DemoCredentialsUi") + app.run(sys.argv) + DB.close() + + +if __name__ == "__main__": + main() diff --git a/demo_client/main.py b/demo_client/main.py index 504af35..3a9cebf 100755 --- a/demo_client/main.py +++ b/demo_client/main.py @@ -75,6 +75,8 @@ async def run(cmd): elif cmd == "get": user_data = json.load(open("./user.json", "r")) cred_id = util.b64_decode(user_data["cred_id"]) + user_data["cred_id"] = cred_id + user_data["pub_key"] = util.b64_decode(user_data["pub_key"]) try: auth_data = await get_passkey( interface, origin, top_origin, rp_id, cred_id, user_data @@ -224,8 +226,14 @@ async def get_passkey( response_json = json.loads( rsp["public_key"].value["authentication_response_json"].value ) - print(user) - return webauthn.verify_get_response(response_json, options, origin, user, None) + response_json["rawId"] = util.b64_decode(response_json["rawId"]) + if user_handle_b64 := response_json["response"]["userHandle"]: + response_json["response"]["userHandle"] = util.b64_decode(user_handle_b64) + + def lookup_fn(user_handle, cred_id): + return user + + return webauthn.verify_get_response(response_json, options, origin, lookup_fn) def main(): @@ -313,7 +321,7 @@ def test_get_credential(self): expected_origin = "https://example.com" auth_data = webauthn.verify_get_response( - response, options, "https://example.com", user, None + response, options, expected_origin, user, None ) self.assertTrue(auth_data.has_flag("UV")) self.assertFalse(auth_data.has_flag("BS")) diff --git a/demo_client/meson.build b/demo_client/meson.build new file mode 100644 index 0000000..d61e304 --- /dev/null +++ b/demo_client/meson.build @@ -0,0 +1,53 @@ +dependency('dbus-1', version: '>= 1.6') +dependency('glib-2.0', version: '>= 2.66') +dependency('gio-2.0', version: '>= 2.66') +dependency('gtk4', version: '>= 4.6.2') + +glib_compile_resources = find_program('glib-compile-resources', required: true) + +demo_blueprints = custom_target( + 'demo-blueprints', + input: files('window.blp'), + output: 'ui', + command: [ + find_program('blueprint-compiler', required: false), + 'batch-compile', + '@OUTPUT@', + '@CURRENT_SOURCE_DIR@', + '@INPUT@', + ], +) + +demo_resources = custom_target( + 'demo-resource', + input: files('resources.gresource.xml'), + depend_files: files('style.css'), + depfile: 'gresource.deps', + output: 'resources.gresource', + command: [ + glib_compile_resources, + '--target=@OUTPUT@', + '--dependency-file=@DEPFILE@', + '--sourcedir=@CURRENT_SOURCE_DIR@', + '--sourcedir', demo_blueprints[0], + '@INPUT@', + ], +) + +gui_sources = files( + '../doc/xyz.iinuwa.credentialsd.Credentials.xml', + 'cbor.py', + 'gui.py', + 'main.py', + 'util.py', + 'webauthn.py', +) + +custom_target( + 'demo-gui', + input: [gui_sources], + build_by_default: false, + output: '.', + depends: [demo_resources], + command: ['cp', '@INPUT@', '@OUTPUT@'], +) diff --git a/demo_client/resources.gresource.xml b/demo_client/resources.gresource.xml new file mode 100644 index 0000000..d6f9b72 --- /dev/null +++ b/demo_client/resources.gresource.xml @@ -0,0 +1,8 @@ + + + + window.ui + style.css + + + diff --git a/demo_client/style.css b/demo_client/style.css new file mode 100644 index 0000000..05aa264 --- /dev/null +++ b/demo_client/style.css @@ -0,0 +1,8 @@ +box { + margin: 20px; +} + +button { + margin-left: 10px; + margin-right: 10px; +} diff --git a/demo_client/webauthn.py b/demo_client/webauthn.py index af7f27f..a974d0e 100644 --- a/demo_client/webauthn.py +++ b/demo_client/webauthn.py @@ -27,41 +27,56 @@ COSE_EC2_X = -2 COSE_EC2_Y = -3 + def verify_create_response(response, create_request, expected_origin): - client_data_bytes = util.b64_decode(response['response']['clientDataJSON']) + client_data_bytes = util.b64_decode(response["response"]["clientDataJSON"]) client_data = json.loads(client_data_bytes.decode("utf-8")) - if client_data['type'] != "webauthn.create": + if client_data["type"] != "webauthn.create": raise Exception(f"Invalid operation type received: {client_data['type']}") - challenge_str = client_data['challenge'] - if challenge_str != create_request['challenge']: - raise Exception(f"Challenge does not match original request. Rejecting.") + challenge_str = client_data["challenge"] + if challenge_str != create_request["challenge"]: + raise Exception("Challenge does not match original request. Rejecting.") - origin = client_data['origin'] + origin = client_data["origin"] if origin != expected_origin: - raise Exception(f"Origin does not match original request. Rejecting.") + raise Exception( + f"Origin {origin} does not match original request ({expected_origin}). Rejecting." + ) client_data_hash = hashlib.sha256(client_data_bytes).digest() # Verify that the rpIdHash in authData is the SHA-256 hash of the RP ID expected by the Relying Party. - attestation = cbor.loads(util.b64_decode(response['response']['attestationObject'])) + attestation = cbor.loads(util.b64_decode(response["response"]["attestationObject"])) auth_data_view = attestation["authData"] auth_data = _parse_authenticator_data(auth_data_view) att_stmt = attestation["attStmt"] - expected_rp_id_hash = hashlib.sha256(create_request['rp']['id'].encode('utf-8')).digest() + expected_rp_id_hash = hashlib.sha256( + create_request["rp"]["id"].encode("utf-8") + ).digest() if not hmac.compare_digest(auth_data.rp_id_hash, expected_rp_id_hash): - raise Exception("Relying party in authenticator data does not match request. Rejecting.") + raise Exception( + "Relying party in authenticator data does not match request. Rejecting." + ) # Verify that the User Present bit of the flags in authData is set. - if 'UP' not in auth_data.flags: - raise Exception("User presence was not asserted by the authenticator. Rejecting.") - - if create_request.get('authenticatorSelection', {}).get('userVerification') == 'required' and 'UV' not in auth_data.flags: - raise Exception("User verification is required but was not asserted by the authenticator. Rejecting.") - - if 'AT' not in auth_data.flags: + if "UP" not in auth_data.flags: + raise Exception( + "User presence was not asserted by the authenticator. Rejecting." + ) + + if ( + create_request.get("authenticatorSelection", {}).get("userVerification") + == "required" + and "UV" not in auth_data.flags + ): + raise Exception( + "User verification is required but was not asserted by the authenticator. Rejecting." + ) + + if "AT" not in auth_data.flags: raise Exception("Attested credential data not included in request. Rejecting.") cred_pub_key = auth_data.get_pub_key() @@ -69,16 +84,22 @@ def verify_create_response(response, create_request, expected_origin): kty = cred_pub_key[COSE_KTY] alg = cred_pub_key[COSE_ALG] # Verify that the "alg" parameter in the credential public key in authData matches the alg attribute of one of the items in options.pubKeyCredParams. - if alg not in (p['alg'] for p in create_request['pubKeyCredParams']): - raise Exception("Public key algorithm not in list of accepted key types. Rejecting.") + if alg not in (p["alg"] for p in create_request["pubKeyCredParams"]): + raise Exception( + "Public key algorithm not in list of accepted key types. Rejecting." + ) # verify parameters for supported algorithms if alg == COSE_ALG_ECDSA: if kty != COSE_KTY_EC2: - raise Exception(f"Invalid key type specified: expected {COSE_KTY_EC2} (EC2), received {kty}") + raise Exception( + f"Invalid key type specified: expected {COSE_KTY_EC2} (EC2), received {kty}" + ) elif alg == COSE_ALG_EDDSA: if kty != COSE_KTY_OKP: - raise Exception(f"Invalid key type specified: expected {COSE_KTY_OKP} (OKP), received {kty}") + raise Exception( + f"Invalid key type specified: expected {COSE_KTY_OKP} (OKP), received {kty}" + ) crv = cred_pub_key[COSE_OKP_CRV] if crv != COSE_CRV_ED25519: raise Exception(f"Unsupported EdDSA curve specified: {crv}") @@ -92,8 +113,8 @@ def verify_create_response(response, create_request, expected_origin): # pass # Determine the attestation statement format by performing a USASCII case-sensitive match on fmt against the set of supported WebAuthn Attestation Statement Format Identifier values. An up-to-date list of registered WebAuthn Attestation Statement Format Identifier values is maintained in the IANA "WebAuthn Attestation Statement Format Identifiers" registry [IANA-WebAuthn-Registries] established by [RFC8809]. - supported_att_fmts = ['none', 'packed', 'fido-u2f'] - fmt = attestation['fmt'] + supported_att_fmts = ["none", "packed", "fido-u2f"] + fmt = attestation["fmt"] if fmt not in supported_att_fmts: raise Exception(f"Unsupported attestation format: {fmt}") @@ -102,32 +123,43 @@ def verify_create_response(response, create_request, expected_origin): pass elif fmt == "packed": att_payload = auth_data_view.tobytes() + client_data_hash - sig = att_stmt['sig'] - att_alg = att_stmt['alg'] - if 'x5c' in att_stmt: + sig = att_stmt["sig"] + att_alg = att_stmt["alg"] + if "x5c" in att_stmt: if att_alg == COSE_ALG_ECDSA: - signing_cert = x509.load_der_x509_certificate(att_stmt['x5c'][0].tobytes()) - assert(signing_cert.version == x509.Version.v3) + signing_cert = x509.load_der_x509_certificate( + att_stmt["x5c"][0].tobytes() + ) + assert signing_cert.version == x509.Version.v3 try: - fido_oid = signing_cert.extensions.get_extension_for_oid(x509.ObjectIdentifier("1.3.6.1.4.1.45724.1.1.4")) - assert(fido_oid.critical is False) + fido_oid = signing_cert.extensions.get_extension_for_oid( + x509.ObjectIdentifier("1.3.6.1.4.1.45724.1.1.4") + ) + assert fido_oid.critical is False cert_aaguid_der = fido_oid.value.value # strip first two header bytes for OCTET STRING of length 16 - assert(cert_aaguid_der[:2] == b'\x04\x10') + assert cert_aaguid_der[:2] == b"\x04\x10" cert_aaguid = cert_aaguid_der[2:] - assert(auth_data.aaguid.tobytes() == cert_aaguid) + assert auth_data.aaguid.tobytes() == cert_aaguid except x509.ExtensionNotFound: # no FIDO OID found in cert. pass - assert(signing_cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.BASIC_CONSTRAINTS).value.ca is False) + assert ( + signing_cert.extensions.get_extension_for_oid( + x509.oid.ExtensionOID.BASIC_CONSTRAINTS + ).value.ca + is False + ) signing_key = signing_cert.public_key() signing_key.verify(sig, att_payload, ec.ECDSA(hashes.SHA256())) - if len(att_stmt['x5c']) > 1: + if len(att_stmt["x5c"]) > 1: raise Exception("CA verification is not supported") else: # authenticator is using self attestation if alg != att_alg: - raise Exception("Self-attestation is in use, but credential algorithm and attestation algorithm do not match. Rejecting.") + raise Exception( + "Self-attestation is in use, but credential algorithm and attestation algorithm do not match. Rejecting." + ) if alg == COSE_ALG_ECDSA: raise Exception("ECDSA self-attestation not implemented") pass @@ -136,12 +168,14 @@ def verify_create_response(response, create_request, expected_origin): if crv != COSE_CRV_ED25519: raise Exception(f"Unsupported EdDSA curve specified: {crv}") pub_key_bytes = cred_pub_key[COSE_OKP_PUBLIC_KEY] - signing_key = Ed25519PublicKey.from_public_bytes(pub_key_bytes.tobytes()) + signing_key = Ed25519PublicKey.from_public_bytes( + pub_key_bytes.tobytes() + ) signing_key.verify(sig, att_payload) - elif fmt == 'fido-u2f': + elif fmt == "fido-u2f": # Verify that attStmt is valid CBOR conforming to the syntax defined above and perform CBOR decoding on it to extract the contained fields. - x5c = att_stmt['x5c'] - sig = att_stmt['sig'] + x5c = att_stmt["x5c"] + sig = att_stmt["sig"] # FIDO U2F only supports P-256 keys crv = cred_pub_key[COSE_OKP_CRV] if alg != COSE_ALG_ECDSA or crv != COSE_CRV_P256: @@ -149,14 +183,21 @@ def verify_create_response(response, create_request, expected_origin): # Check that x5c has exactly one element and let attCert be that element. Let certificate public key be the public key conveyed by attCert. if len(x5c) != 1: - raise Exception(f"Expected a single attestation certificate in fido-u2f attestation, received {len(x5c)}") + raise Exception( + f"Expected a single attestation certificate in fido-u2f attestation, received {len(x5c)}" + ) att_cert = x5c[0] signing_cert = x509.load_der_x509_certificate(att_cert.tobytes()) signing_key = signing_cert.public_key() # If certificate public key is not an Elliptic Curve (EC) public key over the P-256 curve, terminate this algorithm and return an appropriate error. - if not isinstance(signing_key, ec.EllipticCurvePublicKey) or not signing_key.curve.name == ec.SECP256R1.name: - raise Exception("Signing key for FIDO U2F attestation is not a valid P-256 public key.") + if ( + not isinstance(signing_key, ec.EllipticCurvePublicKey) + or not signing_key.curve.name == ec.SECP256R1.name + ): + raise Exception( + "Signing key for FIDO U2F attestation is not a valid P-256 public key." + ) # Extract the claimed rpIdHash from authenticatorData, and the claimed credentialId and credentialPublicKey from authenticatorData.attestedCredentialData. expected_rp_id_hash, cred_pub_key @@ -181,7 +222,13 @@ def verify_create_response(response, create_request, expected_origin): public_key_u2f = b"\x04" + x + y # Let verificationData be the concatenation of (0x00 || rpIdHash || clientDataHash || credentialId || publicKeyU2F) (see Section 4.3 of [FIDO-U2F-Message-Formats]). - verification_data = b"\x00" + expected_rp_id_hash + client_data_hash + credential_id + public_key_u2f + verification_data = ( + b"\x00" + + expected_rp_id_hash + + client_data_hash + + credential_id + + public_key_u2f + ) # Verify the sig using verificationData and the certificate public key # per section 4.1.4 of [SEC1] with SHA-256 as the hash function used in step two. @@ -189,7 +236,7 @@ def verify_create_response(response, create_request, expected_origin): signing_key.verify(sig, verification_data, ec.ECDSA(hashes.SHA256())) # Optionally, inspect x5c and consult externally provided knowledge to determine whether attStmt conveys a Basic or AttCA attestation. - # Skip + # Skip # If successful, return implementation-specific values representing attestation type Basic, AttCA or uncertainty, and attestation trust path x5c. else: @@ -204,8 +251,8 @@ def verify_create_response(response, create_request, expected_origin): return auth_data -def verify_get_response(credential, options, expected_origin, identified_user, users): - assert(identified_user or users) + +def verify_get_response(credential, options, expected_origin, cred_lookup_fn): # Let options be a new CredentialRequestOptions structure configured to the # Relying Party’s needs for the ceremony. Let pkOptions be # options.publicKey. @@ -221,7 +268,7 @@ def verify_get_response(credential, options, expected_origin, identified_user, u # Let response be credential.response. If response is not an instance of # AuthenticatorAssertionResponse, abort the ceremony with a user-visible # error. - response = credential['response'] + response = credential["response"] # Let clientExtensionResults be the result of calling # credential.getClientExtensionResults(). @@ -229,41 +276,56 @@ def verify_get_response(credential, options, expected_origin, identified_user, u # If pkOptions.allowCredentials is not empty, verify that credential.id # identifies one of the public key credentials listed in # pkOptions.allowCredentials. - if allow_credentials := options.get('allowCredentials'): - if not any(c['id'] == credential['id'] for c in allow_credentials): + if allow_credentials := options.get("allowCredentials"): + if not any(c["id"] == credential["id"] for c in allow_credentials): raise Exception("Credential not in list of allowed credentials") # Identify the user being authenticated and let credentialRecord be the # credential record for the credential: + user_handle = response.get("userHandle") + credential_record = cred_lookup_fn(user_handle, credential["rawId"]) + if not credential_record: + msg = f"No credential for user found matching credential ID `{credential['id']}` and " + if user_handle: + msg += f"user handle `{user_handle}`" + else: + msg += "no user handle was given" + raise Exception(msg) # If the user was identified before the authentication ceremony was # initiated, e.g., via a username or cookie, - user_handle = response['userHandle'] - if identified_user: - # verify that the identified user account contains a credential record - # whose id equals credential.rawId. Let credentialRecord be that - # credential record. - credential['id'] == identified_user['cred_id'] - # If response.userHandle is present, verify that it equals the user handle of the user account. - if user_handle and user_handle != identified_user['user_handle']: + if allow_credentials: + # verify that the identified user account contains a credential record + # whose id equals credential.rawId. Let credentialRecord be that + # credential record. + if credential["rawId"] != credential_record["cred_id"]: + raise Exception("Credential ID does not match expected ID") + # If response.userHandle is present, verify that it equals the user handle of the user account. + if user_handle and user_handle != credential_record["user_handle"]: raise Exception("Unexpected user handle received from credential") # If the user was not identified before the authentication ceremony was # initiated, elif user_handle: - # verify that response.userHandle is present. Verify that the user - # account identified by response.userHandle contains a credential record - # whose id equals credential.rawId. Let credentialRecord be that - # credential record. - identified_user = next(u for u in users if u['user_handle'] == user_handle) + # verify that response.userHandle is present. Verify that the user + # account identified by response.userHandle contains a credential record + # whose id equals credential.rawId. Let credentialRecord be that + # credential record. + # identified_user = next(u for u in users if u['user_handle'] == user_handle and u['cred_id'] == credential['id']) + if credential["rawId"] != credential_record["cred_id"]: + raise Exception("Credential ID does not match expected ID") + if user_handle != credential_record["user_handle"]: + raise Exception("Unexpected user handle received from credential") else: - raise Exception("User is unidentified and no user handle was returned by credential") + raise Exception( + "User is unidentified and no user handle was returned by credential" + ) # Let cData, authData and sig denote the value of response’s clientDataJSON, # authenticatorData, and signature respectively. - client_data_json = util.b64_decode(response['clientDataJSON']).decode("utf-8") - auth_data_bytes = util.b64_decode(response['authenticatorData']) + client_data_json = util.b64_decode(response["clientDataJSON"]).decode("utf-8") + auth_data_bytes = util.b64_decode(response["authenticatorData"]) auth_data = _parse_authenticator_data(auth_data_bytes) - sig_bytes = util.b64_decode(response['signature']) + sig_bytes = util.b64_decode(response["signature"]) # Let JSONtext be the result of running UTF-8 decode on the value of cData. # Note: Using any implementation of UTF-8 decode is acceptable as long as it @@ -277,51 +339,59 @@ def verify_get_response(credential, options, expected_origin, identified_user, u C = json.loads(client_data_json) # Verify that the value of C.type is the string webauthn.get. - if C['type'] != 'webauthn.get': - raise Exception(f"Invalid operation type asserted by credential: {C['type']}. Rejecting.") + if C["type"] != "webauthn.get": + raise Exception( + f"Invalid operation type asserted by credential: {C['type']}. Rejecting." + ) # Verify that the value of C.challenge equals the base64url encoding of # pkOptions.challenge. - if C['challenge'] != options['challenge']: + if C["challenge"] != options["challenge"]: raise Exception("Invalid challenge received from authenticator. Rejecting.") # Verify that the value of C.origin is an origin # expected by the Relying Party. See § 13.4.9 Validating the origin of a # credential for guidance. - if C['origin'] != expected_origin: - raise Exception(f"Attested origin `{C['origin']}` does not match expected origin `{expected_origin}`") + if C["origin"] != expected_origin: + raise Exception( + f"Attested origin `{C['origin']}` does not match expected origin `{expected_origin}`" + ) # If C.crossOrigin is present and set to true, verify that the Relying Party # expects this credential to be used within an iframe that is not # same-origin with its ancestors. - if C.get('crossOrigin') == True: + if C.get("crossOrigin") == True: # TODO: pass cross-origin policy as parameter pass # If C.topOrigin is present: - if C.get('topOrigin'): - # Verify that the Relying Party expects this credential to be used - # within an iframe that is not same-origin with its ancestors. + if C.get("topOrigin"): + # Verify that the Relying Party expects this credential to be used + # within an iframe that is not same-origin with its ancestors. # TODO: pass top-origin policy as parameter - # Verify that the value of C.topOrigin matches the origin of a page that - # the Relying Party expects to be sub-framed within. See § 13.4.9 - # Validating the origin of a credential for guidance. + # Verify that the value of C.topOrigin matches the origin of a page that + # the Relying Party expects to be sub-framed within. See § 13.4.9 + # Validating the origin of a credential for guidance. # TODO: pass top-origin policy as parameter pass # Verify that the # rpIdHash in authData is the SHA-256 hash of the RP ID expected by the # Relying Party. - expected_rp_id_hash = hashlib.sha256(options['rpId'].encode('utf-8')).digest() + expected_rp_id_hash = hashlib.sha256(options["rpId"].encode("utf-8")).digest() if not hmac.compare_digest(auth_data.rp_id_hash, expected_rp_id_hash): - raise Exception("Relying party in authenticator data does not match request. Rejecting.") + raise Exception( + "Relying party in authenticator data does not match request. Rejecting." + ) # Note: If using the appid extension, this step needs some special logic. # See § 10.1.1 FIDO AppID Extension (appid) for details. # TODO # Verify that the UP bit of the flags in authData is set. - if not auth_data.has_flag('UP'): - raise Exception("User presence was not asserted by the authenticator. Rejecting.") + if not auth_data.has_flag("UP"): + raise Exception( + "User presence was not asserted by the authenticator. Rejecting." + ) # Determine whether user verification is required for this assertion. User # verification SHOULD be required if, and only if, @@ -329,30 +399,38 @@ def verify_get_response(credential, options, expected_origin, identified_user, u # If user verification was determined to be required, verify that the UV bit # of the flags in authData is set. Otherwise, ignore the value of the UV # flag. - if options.get('userVerification') == 'required' and not auth_data.has_flag('UV'): - raise Exception("User verification is required but was not asserted by the authenticator. Rejecting.") + if options.get("userVerification") == "required" and not auth_data.has_flag("UV"): + raise Exception( + "User verification is required but was not asserted by the authenticator. Rejecting." + ) # If the BE bit of the flags in authData is not set, verify that the BS bit # is not set. - if not auth_data.has_flag('BE') and auth_data.has_flag('BS'): - raise Exception("Conflicted backup state: Authenticator reported to be backed up, but not backup-eligible. Rejecting.") + if not auth_data.has_flag("BE") and auth_data.has_flag("BS"): + raise Exception( + "Conflicted backup state: Authenticator reported to be backed up, but not backup-eligible. Rejecting." + ) # If the credential backup state is used as part of Relying Party business # logic or policy, let currentBe and currentBs be the values of the BE and # BS bits, respectively, of the flags in authData. - current_be = auth_data.has_flag('BE') - current_bs = auth_data.has_flag('BS') + current_be = auth_data.has_flag("BE") + current_bs = auth_data.has_flag("BS") # Compare currentBe and # currentBs with credentialRecord.backupEligible and # credentialRecord.backupState: # If credentialRecord.backupEligible is set, verify that currentBe is # set. - if identified_user['backup_eligible'] and not current_be: - raise Exception("Authenticator previously reported that it was backup eligible on creation, but now does not. Rejecting.") + if credential_record["backup_eligible"] and not current_be: + raise Exception( + "Authenticator previously reported that it was backup eligible on creation, but now does not. Rejecting." + ) # If credentialRecord.backupEligible is not set, verify that currentBe # is not set. - elif not identified_user['backup_eligible'] and current_be: - raise Exception("Authenticator attempted to upgrade to be backup eligible. Rejecting.") + elif not credential_record["backup_eligible"] and current_be: + raise Exception( + "Authenticator attempted to upgrade to be backup eligible. Rejecting." + ) # Apply Relying Party policy, if any. # Note: See § 6.1.3 Credential Backup State for examples of how a Relying # Party might process the BS flag values. @@ -365,46 +443,49 @@ def verify_get_response(credential, options, expected_origin, identified_user, u # credentials in order for some RPs to recognize this. # Let hash be the result of computing a hash over the cData using SHA-256. - client_data_hash = hashlib.sha256(client_data_json.encode('utf-8')).digest() + client_data_hash = hashlib.sha256(client_data_json.encode("utf-8")).digest() # Using credentialRecord.publicKey, verify that sig is a valid signature # over the binary concatenation of authData and hash. # Note: This verification step is compatible with signatures generated by # FIDO U2F authenticators. See § 6.1.2 FIDO U2F Signature Format # Compatibility. - pub_key = util.b64_decode(identified_user['pub_key']) - _cose_verify(pub_key, sig_bytes, auth_data_bytes + client_data_hash) + _cose_verify( + credential_record["pub_key"], sig_bytes, auth_data_bytes + client_data_hash + ) # If authData.signCount is nonzero or credentialRecord.signCount is nonzero, # then run the following sub-step: sc = auth_data.sign_count if auth_data.sign_count else 0 - user_sc = identified_user['sign_count'] if identified_user['sign_count'] else 0 + user_sc = credential_record["sign_count"] if credential_record["sign_count"] else 0 if sc > 0 or user_sc > 0: - # If authData.signCount is - # greater than credentialRecord.signCount: The signature counter is - # valid. + # If authData.signCount is + # greater than credentialRecord.signCount: The signature counter is + # valid. if sc > user_sc: pass - # less than or equal to credentialRecord.signCount: This is a - # signal, but not proof, that the authenticator may be cloned. For - # example it might mean that: - # - Two or more copies of the credential private key may exist and - # are being used in parallel. - # - An authenticator is malfunctioning. - # - A race condition exists where the Relying Party is processing - # assertion responses in an order other than the order they were - # generated at the authenticator. + # less than or equal to credentialRecord.signCount: This is a + # signal, but not proof, that the authenticator may be cloned. For + # example it might mean that: + # - Two or more copies of the credential private key may exist and + # are being used in parallel. + # - An authenticator is malfunctioning. + # - A race condition exists where the Relying Party is processing + # assertion responses in an order other than the order they were + # generated at the authenticator. else: - # Relying Parties should evaluate their own operational - # characteristics and incorporate this information into their risk - # scoring. Whether the Relying Party updates - # credentialRecord.signCount below in this case, or not, or fails - # the authentication ceremony or not, is Relying Party-specific. - - # For more information on signature counter considerations, see - # § 6.1.1 Signature Counter Considerations. + # Relying Parties should evaluate their own operational + # characteristics and incorporate this information into their risk + # scoring. Whether the Relying Party updates + # credentialRecord.signCount below in this case, or not, or fails + # the authentication ceremony or not, is Relying Party-specific. + + # For more information on signature counter considerations, see + # § 6.1.1 Signature Counter Considerations. # TODO: add policy - raise Exception("Authenticator signature count too low and the authenticator may have been cloned. Rejecting.") + raise Exception( + "Authenticator signature count too low and the authenticator may have been cloned. Rejecting." + ) # Process the client extension outputs in clientExtensionResults and the # authenticator extension outputs in the extensions in authData as required @@ -452,7 +533,9 @@ def _cose_verify(cose_key: bytes, signature: bytes, data: bytes): if cose_alg == COSE_ALG_ECDSA: if kty != COSE_KTY_EC2: - raise Exception(f"Invalid COSE key type specified for ECDSA: expected {COSE_KTY_EC2} (EC2), received {kty}") + raise Exception( + f"Invalid COSE key type specified for ECDSA: expected {COSE_KTY_EC2} (EC2), received {kty}" + ) x = cred_pub_key[COSE_EC2_X] y = cred_pub_key[COSE_EC2_Y] @@ -465,12 +548,14 @@ def _cose_verify(cose_key: bytes, signature: bytes, data: bytes): raise Exception(f"Unsupported COSE ECDSA curve specified: {crv}") # WebAuthn uses uncompressed points only. - pub_key_bytes = bytes(b'\x04' + x + y) + pub_key_bytes = bytes(b"\x04" + x + y) signing_key = ec.EllipticCurvePublicKey.from_encoded_point(crv, pub_key_bytes) signing_key.verify(signature, data, alg) elif cose_alg == COSE_ALG_EDDSA: if kty != COSE_KTY_OKP: - raise Exception(f"Invalid COSE key type specified for EdDSA: expected {COSE_KTY_OKP} (OKP), received {kty}") + raise Exception( + f"Invalid COSE key type specified for EdDSA: expected {COSE_KTY_OKP} (OKP), received {kty}" + ) pub_key_bytes = cred_pub_key[COSE_OKP_PUBLIC_KEY].tobytes() crv = cred_pub_key[COSE_OKP_CRV] @@ -484,7 +569,6 @@ def _cose_verify(cose_key: bytes, signature: bytes, data: bytes): raise Exception(f"Unsupported COSE key algorithm specified: {cose_alg}") - def _parse_authenticator_data(auth_data): client_rp_id_hash = auth_data[:32] @@ -497,17 +581,17 @@ def _parse_authenticator_data(auth_data): flags.add(bits[i]) flag_byte = flag_byte >> 1 - sign_count = struct.unpack('>I', auth_data[33:37])[0] + sign_count = struct.unpack(">I", auth_data[33:37])[0] - if 'AT' in flags: - aaguid = auth_data[37:37 + 16] - cred_id_length = struct.unpack('>H', auth_data[53:55])[0] - cred_id = auth_data[55:55+cred_id_length] - parser = cbor.Parser(auth_data[55 + cred_id_length:]) + if "AT" in flags: + aaguid = auth_data[37 : 37 + 16] + cred_id_length = struct.unpack(">H", auth_data[53:55])[0] + cred_id = auth_data[55 : 55 + cred_id_length] + parser = cbor.Parser(auth_data[55 + cred_id_length :]) _ = parser.parse() - cose_key_bytes = parser.data[:parser.pos] + cose_key_bytes = parser.data[: parser.pos] cose_key_bytes_len = len(cose_key_bytes) - assert(len(cose_key_bytes) == parser.pos) + assert len(cose_key_bytes) == parser.pos attested_cred_data_len = 55 + cred_id_length + cose_key_bytes_len else: @@ -516,8 +600,8 @@ def _parse_authenticator_data(auth_data): cred_id = None cose_key_bytes = None - if 'ED' in flags: - extensions = cbor.loads(auth_data[37 + attested_cred_data_len:]) + if "ED" in flags: + extensions = cbor.loads(auth_data[37 + attested_cred_data_len :]) else: extensions = None return AuthenticatorData( @@ -527,9 +611,10 @@ def _parse_authenticator_data(auth_data): aaguid=aaguid, cred_id=cred_id, pub_key_bytes=cose_key_bytes, - extensions=extensions + extensions=extensions, ) + @dataclass class AuthenticatorData: rp_id_hash: bytes @@ -546,5 +631,3 @@ def get_pub_key(self): def has_flag(self, flag): return flag in self.flags - - diff --git a/demo_client/window.blp b/demo_client/window.blp new file mode 100644 index 0000000..78755ab --- /dev/null +++ b/demo_client/window.blp @@ -0,0 +1,55 @@ +using Gtk 4.0; +using Adw 1; + +template $MyAppWindow: ApplicationWindow { + default-width: 600; + default-height: 300; + title: _("Hello, Blueprint!"); + + Box { + orientation: vertical; + margin-start: 100; + margin-end: 100; + + Entry username { + placeholder-text: _("Enter your username"); + } + + Box { + orientation: horizontal; + + Button make_credential_btn { + label: "Register"; + clicked => $on_register(); + } + + Button get_assertion_btn { + label: "Authenticate"; + clicked => $on_authenticate(); + } + } + + Adw.ExpanderRow { + title: "Settings"; + margin-top: 50; + + Adw.PreferencesGroup { + Adw.ActionRow { + title: "User Verification"; + + DropDown uv_pref_dropdown { + model: bind template.uv_pref; + } + } + + Adw.ActionRow { + title: "Discoverable Credential"; + + DropDown discoverable_cred_pref_dropdown { + model: bind template.discoverable_cred_pref; + } + } + } + } + } +} diff --git a/meson.build b/meson.build index e0f24a9..c012542 100644 --- a/meson.build +++ b/meson.build @@ -32,4 +32,5 @@ subdir('credentialsd-ui') subdir('dbus') subdir('systemd') subdir('webext') -subdir('doc') \ No newline at end of file +subdir('doc') +subdir('demo_client')