|
| 1 | +"""Key Management Service (CryptoService) interface for DIDComm Messaging.""" |
| 2 | + |
| 3 | + |
| 4 | +from abc import ABC, abstractmethod |
| 5 | +from typing import Generic, Literal, Mapping, NamedTuple, Optional, TypeVar, Union |
| 6 | + |
| 7 | +from didcomm_messaging.jwe import JweEnvelope, from_b64url |
| 8 | + |
| 9 | + |
| 10 | +P = TypeVar("P", bound="PublicKey") |
| 11 | +S = TypeVar("S", bound="SecretKey") |
| 12 | + |
| 13 | + |
| 14 | +class CryptoServiceError(Exception): |
| 15 | + """Represents an error from a CryptoService.""" |
| 16 | + |
| 17 | + |
| 18 | +class PublicKey(ABC): |
| 19 | + """Key representation for CryptoService.""" |
| 20 | + |
| 21 | + @classmethod |
| 22 | + @abstractmethod |
| 23 | + def from_verification_method(cls, vm: dict) -> "PublicKey": |
| 24 | + """Create a Key instance from a DID Document Verification Method.""" |
| 25 | + |
| 26 | + @property |
| 27 | + @abstractmethod |
| 28 | + def kid(self) -> str: |
| 29 | + """Get the key ID.""" |
| 30 | + |
| 31 | + @property |
| 32 | + @abstractmethod |
| 33 | + def multikey(self) -> str: |
| 34 | + """Get the key in multikey format.""" |
| 35 | + |
| 36 | + |
| 37 | +class SecretKey(ABC): |
| 38 | + """Secret Key Type.""" |
| 39 | + @property |
| 40 | + @abstractmethod |
| 41 | + def kid(self) -> str: |
| 42 | + """Get the key ID.""" |
| 43 | + |
| 44 | + |
| 45 | +class PackedMessageMetadata(NamedTuple): |
| 46 | + """Unpack result.""" |
| 47 | + |
| 48 | + wrapper: JweEnvelope |
| 49 | + method: Literal["ECDH-ES", "ECDH-1PU"] |
| 50 | + recip_kid: str |
| 51 | + sender_kid: Optional[str] |
| 52 | + |
| 53 | + |
| 54 | +class CryptoService(ABC, Generic[P, S]): |
| 55 | + """Key Management Service (CryptoService) interface for DIDComm Messaging.""" |
| 56 | + |
| 57 | + @abstractmethod |
| 58 | + async def ecdh_es_encrypt(self, to_keys: Mapping[str, P], message: bytes) -> bytes: |
| 59 | + """Encode a message into DIDComm v2 anonymous encryption.""" |
| 60 | + |
| 61 | + @abstractmethod |
| 62 | + async def ecdh_es_decrypt( |
| 63 | + self, message: bytes, recip_kid, str, recip_key: P |
| 64 | + ) -> bytes: |
| 65 | + """Decode a message from DIDComm v2 anonymous encryption.""" |
| 66 | + |
| 67 | + @abstractmethod |
| 68 | + async def ecdh_1pu_encrypt( |
| 69 | + self, |
| 70 | + to_keys: Mapping[str, P], |
| 71 | + sender_kid: str, |
| 72 | + sender_key: P, |
| 73 | + message: bytes, |
| 74 | + ) -> bytes: |
| 75 | + """Encode a message into DIDComm v2 authenticated encryption.""" |
| 76 | + |
| 77 | + @abstractmethod |
| 78 | + async def ecdh_1pu_decrypt( |
| 79 | + self, |
| 80 | + message: bytes, |
| 81 | + recip_kid: str, |
| 82 | + recip_key: P, |
| 83 | + sender_key: P, |
| 84 | + ) -> bytes: |
| 85 | + """Decode a message from DIDComm v2 authenticated encryption.""" |
| 86 | + |
| 87 | + @classmethod |
| 88 | + @abstractmethod |
| 89 | + def verification_method_to_public_key(cls, vm: dict) -> P: |
| 90 | + """Convert a DIDComm v2 verification method to a public key.""" |
| 91 | + |
| 92 | + |
| 93 | +class SecretsManager(ABC): |
| 94 | + """Secrets Resolver interface. |
| 95 | +
|
| 96 | + Thie secrets resolver may be used to supplement the CryptoService backend to provide |
| 97 | + greater flexibility. |
| 98 | + """ |
| 99 | + |
| 100 | + @abstractmethod |
| 101 | + async def get_secret_by_kid(self, kid: str) -> Optional[SecretKey]: |
| 102 | + """Get a secret key by its ID.""" |
| 103 | + |
| 104 | + |
| 105 | +class KMS(CryptoService, SecretsManager): |
| 106 | + """Key Management Service interface for DIDComm Messaging.""" |
| 107 | + |
| 108 | + async def extract_packed_message_metadata( |
| 109 | + self, enc_message: Union[str, bytes] |
| 110 | + ) -> PackedMessageMetadata: |
| 111 | + """Extract metadata from a packed DIDComm message.""" |
| 112 | + try: |
| 113 | + wrapper = JweEnvelope.from_json(enc_message) |
| 114 | + except ValueError: |
| 115 | + raise CryptoServiceError("Invalid packed message") |
| 116 | + |
| 117 | + alg = wrapper.protected.get("alg") |
| 118 | + if not alg: |
| 119 | + raise CryptoServiceError("Missing alg header") |
| 120 | + |
| 121 | + method = next((m for m in ("ECDH-1PU", "ECDH-ES") if m in alg), None) |
| 122 | + if not method: |
| 123 | + raise CryptoServiceError(f"Unsupported DIDComm encryption algorithm: {alg}") |
| 124 | + |
| 125 | + sender_kid = None |
| 126 | + recip_key = None |
| 127 | + for kid in wrapper.recipient_key_ids: |
| 128 | + recip_key = await self.get_secret_by_kid(kid) |
| 129 | + if recip_key: |
| 130 | + break |
| 131 | + |
| 132 | + if not recip_key: |
| 133 | + raise CryptoServiceError("No recognized recipient key") |
| 134 | + |
| 135 | + recip_kid = recip_key.kid |
| 136 | + |
| 137 | + if method == "ECDH-1PU": |
| 138 | + sender_kid_apu = None |
| 139 | + apu = wrapper.protected.get("apu") |
| 140 | + if apu: |
| 141 | + try: |
| 142 | + sender_kid_apu = from_b64url(apu).decode("utf-8") |
| 143 | + except (UnicodeDecodeError, ValueError): |
| 144 | + raise CryptoServiceError("Invalid apu value") |
| 145 | + sender_kid = wrapper.protected.get("skid") or sender_kid_apu |
| 146 | + if sender_kid_apu and sender_kid != sender_kid_apu: |
| 147 | + raise CryptoServiceError("Mismatch between skid and apu") |
| 148 | + if not sender_kid: |
| 149 | + raise CryptoServiceError("Sender key ID not provided") |
| 150 | + # FIXME - validate apv if present? |
| 151 | + |
| 152 | + return PackedMessageMetadata(wrapper, method, recip_kid, sender_kid) |
0 commit comments