11"""LegacyCryptoService implementation for pynacl."""
22
33from dataclasses import dataclass
4- from typing import Dict , Optional , Sequence
4+ from typing import Dict , Optional , OrderedDict , Sequence , Tuple
55
66import base58
7+ from pydid import VerificationMethod
8+
9+ from didcomm_messaging .crypto .base import PublicKey , SecretKey , SecretsManager
10+ from didcomm_messaging .crypto .jwe import JweBuilder , JweEnvelope , JweRecipient
11+ from didcomm_messaging .multiformats import multibase , multicodec
12+
13+ from .base import LegacyCryptoService , LegacyUnpackResult , RecipData
714
815try :
916 import nacl .bindings
1320 raise ImportError (
1421 "Legacy implementation requires 'legacy' extra to be installed"
1522 ) from err
16- from pydid import VerificationMethod
17-
18- from didcomm_messaging .crypto .base import (
19- PublicKey ,
20- SecretKey ,
21- SecretsManager ,
22- )
23- from didcomm_messaging .crypto .jwe import JweEnvelope
24- from didcomm_messaging .multiformats import multibase , multicodec
25-
26- from . import crypto
27- from .base import LegacyCryptoService , LegacyUnpackResult , RecipData
2823
2924
3025@dataclass
@@ -92,13 +87,66 @@ async def pack_message(
9287 message : bytes ,
9388 ) -> JweEnvelope :
9489 """Encode a message using the DIDComm v1 'pack' algorithm."""
95- packed = crypto .pack_message (
96- message = message .decode (),
97- to_verkeys = [vk .value for vk in to_verkeys ],
98- from_verkey = from_key .verkey if from_key else None ,
99- from_sigkey = from_key .sigkey if from_key else None ,
90+ builder = JweBuilder (
91+ with_protected_recipients = True , with_flatten_recipients = False
10092 )
101- return JweEnvelope .deserialize (packed )
93+ cek = nacl .bindings .crypto_secretstream_xchacha20poly1305_keygen ()
94+ sender_vk = from_key .verkey_b58 .encode () if from_key else None
95+ sender_xk = (
96+ nacl .bindings .crypto_sign_ed25519_sk_to_curve25519 (from_key .sigkey )
97+ if from_key
98+ else None
99+ )
100+ for target_vk in to_verkeys :
101+ target_xk = nacl .bindings .crypto_sign_ed25519_pk_to_curve25519 (
102+ target_vk .value
103+ )
104+ if sender_vk and sender_xk :
105+ enc_sender = nacl .bindings .crypto_box_seal (sender_vk , target_xk )
106+ nonce = nacl .utils .random (nacl .bindings .crypto_box_NONCEBYTES )
107+ enc_cek = nacl .bindings .crypto_box (cek , nonce , target_xk , sender_xk )
108+ builder .add_recipient (
109+ JweRecipient (
110+ encrypted_key = enc_cek ,
111+ header = OrderedDict (
112+ [
113+ ("kid" , target_vk .kid ),
114+ ("sender" , self .b64url .encode (enc_sender )),
115+ ("iv" , self .b64url .encode (nonce )),
116+ ]
117+ ),
118+ )
119+ )
120+ else :
121+ enc_sender = None
122+ nonce = None
123+ enc_cek = nacl .bindings .crypto_box_seal (cek , target_xk )
124+ builder .add_recipient (
125+ JweRecipient (encrypted_key = enc_cek , header = {"kid" : target_vk .kid })
126+ )
127+
128+ builder .set_protected (
129+ OrderedDict (
130+ [
131+ ("enc" , "xchacha20poly1305_ietf" ),
132+ ("typ" , "JWM/1.0" ),
133+ ("alg" , "Authcrypt" if from_key else "Anoncrypt" ),
134+ ]
135+ ),
136+ )
137+
138+ nonce = nacl .utils .random (
139+ nacl .bindings .crypto_aead_chacha20poly1305_ietf_NPUBBYTES
140+ )
141+ output = nacl .bindings .crypto_aead_chacha20poly1305_ietf_encrypt (
142+ message , builder .protected_bytes , nonce , cek
143+ )
144+ mlen = len (message )
145+ ciphertext = output [:mlen ]
146+ tag = output [mlen :]
147+ builder .set_payload (ciphertext , nonce , tag )
148+
149+ return builder .build ()
102150
103151 def _extract_payload_key (self , recip_key : KeyPair , recip_data : RecipData ):
104152 """Extract the payload key."""
@@ -110,7 +158,7 @@ def _extract_payload_key(self, recip_key: KeyPair, recip_data: RecipData):
110158 recip_data .enc_sender , pk , sk
111159 ).decode ()
112160 sender_pk = nacl .bindings .crypto_sign_ed25519_pk_to_curve25519 (
113- crypto . b58_to_bytes (sender_vk )
161+ base58 . b58decode (sender_vk )
114162 )
115163 cek = nacl .bindings .crypto_box_open (
116164 recip_data .enc_cek , recip_data .nonce , sender_pk , sk
@@ -127,10 +175,10 @@ async def unpack_message(
127175 cek , sender_vk = self ._extract_payload_key (recip_key , recip_data )
128176
129177 payload_bin = wrapper .ciphertext + wrapper .tag
130- message = crypto . decrypt_plaintext (
178+ message = nacl . bindings . crypto_aead_chacha20poly1305_ietf_decrypt (
131179 payload_bin , wrapper .protected_b64 , wrapper .iv , cek
132180 )
133- return LegacyUnpackResult (message . encode () , recip_key .kid , sender_vk )
181+ return LegacyUnpackResult (message , recip_key .kid , sender_vk )
134182
135183
136184class InMemSecretsManager (SecretsManager [KeyPair ]):
@@ -144,8 +192,21 @@ async def get_secret_by_kid(self, kid: str) -> Optional[KeyPair]:
144192 """Retrieve secret by kid."""
145193 return self .secrets .get (kid )
146194
195+ def _create_keypair (self , seed : Optional [bytes ] = None ) -> Tuple [bytes , bytes ]:
196+ """Create a keypair."""
197+ if seed :
198+ if not isinstance (seed , bytes ):
199+ raise ValueError ("Seed value is not a string or bytes" )
200+ if len (seed ) != 32 :
201+ raise ValueError ("Seed value must be 32 bytes in length" )
202+ else :
203+ seed = nacl .utils .random (nacl .bindings .crypto_secretbox_KEYBYTES )
204+
205+ pk , sk = nacl .bindings .crypto_sign_seed_keypair (seed )
206+ return pk , sk
207+
147208 def create (self , seed : Optional [bytes ] = None ) -> KeyPair :
148209 """Create and store a new keypair."""
149- keys = KeyPair (* crypto . create_keypair (seed ))
210+ keys = KeyPair (* self . _create_keypair (seed ))
150211 self .secrets [keys .kid ] = keys
151212 return keys
0 commit comments