Skip to content

Commit 5c4fdd7

Browse files
committed
feat: add legacy interfaces
Signed-off-by: Daniel Bluhm <[email protected]>
1 parent d48d0e7 commit 5c4fdd7

File tree

11 files changed

+620
-31
lines changed

11 files changed

+620
-31
lines changed

didcomm_messaging/legacy/askar.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
"""Legacy DIDComm v1 Crypto implementation."""
2+
3+
from collections import OrderedDict
4+
from typing import Optional, Sequence, Tuple, cast
5+
6+
from base58 import b58decode, b58encode
7+
8+
from didcomm_messaging.crypto.jwe import JweBuilder, JweEnvelope, JweRecipient
9+
from didcomm_messaging.legacy.base import (
10+
LegacyUnpackResult,
11+
LegacyCryptoService,
12+
RecipData,
13+
)
14+
15+
try:
16+
from aries_askar import Key, KeyAlg, crypto_box
17+
from aries_askar.bindings import key_get_secret_bytes
18+
from didcomm_messaging.crypto.backend.askar import AskarKey, AskarSecretKey
19+
except ImportError:
20+
raise ImportError("Legacy Askar backend requires the 'askar' extra to be installed")
21+
22+
23+
class AskarLegacyCryptoService(LegacyCryptoService[AskarKey, AskarSecretKey]):
24+
"""Legacy crypto service implementation for askar."""
25+
26+
def kid_to_public_key(self, kid: str) -> AskarKey:
27+
"""Get a public key from a kid.
28+
29+
In DIDComm v1, kids are the base58 encoded keys.
30+
"""
31+
return AskarKey(Key.from_public_bytes(KeyAlg.ED25519, b58decode(kid)), kid)
32+
33+
async def pack_message(
34+
self,
35+
to_verkeys: Sequence[AskarKey],
36+
from_key: Optional[AskarSecretKey],
37+
message: bytes,
38+
) -> JweEnvelope:
39+
"""Encode a message using the DIDComm v1 'pack' algorithm."""
40+
builder = JweBuilder(
41+
with_protected_recipients=True, with_flatten_recipients=False
42+
)
43+
cek = Key.generate(KeyAlg.C20P)
44+
# avoid converting to bytes object: this way the only copy is zeroed afterward
45+
# tell type checking it's bytes to make it happy
46+
cek_b = cast(bytes, key_get_secret_bytes(cek._handle))
47+
sender_vk = (
48+
b58encode(from_key.key.get_public_bytes()).decode("utf-8")
49+
if from_key
50+
else None
51+
)
52+
sender_xk = from_key.key.convert_key(KeyAlg.X25519) if from_key else None
53+
54+
for target_vk in to_verkeys:
55+
target_xk = target_vk.key.convert_key(KeyAlg.X25519)
56+
if sender_vk and sender_xk:
57+
enc_sender = crypto_box.crypto_box_seal(target_xk, sender_vk)
58+
nonce = crypto_box.random_nonce()
59+
enc_cek = crypto_box.crypto_box(target_xk, sender_xk, cek_b, nonce)
60+
builder.add_recipient(
61+
JweRecipient(
62+
encrypted_key=enc_cek,
63+
header=OrderedDict(
64+
[
65+
("kid", target_vk.kid),
66+
("sender", self.b64url.encode(enc_sender)),
67+
("iv", self.b64url.encode(nonce)),
68+
]
69+
),
70+
)
71+
)
72+
else:
73+
enc_sender = None
74+
nonce = None
75+
enc_cek = crypto_box.crypto_box_seal(target_xk, cek_b)
76+
builder.add_recipient(
77+
JweRecipient(encrypted_key=enc_cek, header={"kid": target_vk.kid})
78+
)
79+
builder.set_protected(
80+
OrderedDict(
81+
[
82+
("enc", "xchacha20poly1305_ietf"),
83+
("typ", "JWM/1.0"),
84+
("alg", "Authcrypt" if from_key else "Anoncrypt"),
85+
]
86+
),
87+
)
88+
enc = cek.aead_encrypt(message, aad=builder.protected_bytes)
89+
ciphertext, tag, nonce = enc.parts
90+
builder.set_payload(ciphertext, nonce, tag)
91+
return builder.build()
92+
93+
async def unpack_message(
94+
self,
95+
wrapper: JweEnvelope,
96+
recip_key: AskarSecretKey,
97+
recip_data: RecipData,
98+
) -> LegacyUnpackResult:
99+
"""Decode a message using the DIDComm v1 'unpack' algorithm."""
100+
payload_key, sender_vk = self._extract_payload_key(recip_key.key, recip_data)
101+
102+
cek = Key.from_secret_bytes(KeyAlg.C20P, payload_key)
103+
message = cek.aead_decrypt(
104+
wrapper.ciphertext,
105+
nonce=wrapper.iv,
106+
tag=wrapper.tag,
107+
aad=wrapper.protected_b64,
108+
)
109+
return LegacyUnpackResult(message, recip_key.kid, sender_vk)
110+
111+
def _extract_payload_key(
112+
self, recip_key: Key, recip_data: RecipData
113+
) -> Tuple[bytes, Optional[str]]:
114+
"""Extract the payload key from pack recipient details.
115+
116+
Returns: A tuple of the CEK and sender verkey
117+
"""
118+
recip_x = recip_key.convert_key(KeyAlg.X25519)
119+
120+
if recip_data.nonce and recip_data.enc_sender:
121+
sender_vk = crypto_box.crypto_box_seal_open(
122+
recip_x, recip_data.enc_sender
123+
).decode("utf-8")
124+
sender_x = Key.from_public_bytes(
125+
KeyAlg.ED25519, b58decode(sender_vk)
126+
).convert_key(KeyAlg.X25519)
127+
cek = crypto_box.crypto_box_open(
128+
recip_x, sender_x, recip_data.enc_cek, recip_data.nonce
129+
)
130+
else:
131+
sender_vk = None
132+
cek = crypto_box.crypto_box_seal_open(recip_x, recip_data.enc_cek)
133+
return cek, sender_vk

didcomm_messaging/legacy/base.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""DIDComm v1 Base Services."""
2+
3+
from abc import ABC, abstractmethod
4+
from typing import Generic, NamedTuple, Optional, Sequence
5+
from didcomm_messaging.crypto.base import P, S
6+
from didcomm_messaging.crypto.jwe import JweEnvelope
7+
from didcomm_messaging.multiformats.multibase import Base64UrlEncoder
8+
9+
10+
class RecipData(NamedTuple):
11+
"""Recipient metadata."""
12+
13+
kid: str
14+
enc_sender: Optional[bytes]
15+
nonce: Optional[bytes]
16+
enc_cek: bytes
17+
18+
19+
class LegacyUnpackResult(NamedTuple):
20+
"""Result of unpacking."""
21+
22+
message: bytes
23+
recip: str
24+
sender: Optional[str]
25+
26+
27+
class LegacyCryptoService(ABC, Generic[P, S]):
28+
"""CryptoService interface for DIDComm v1."""
29+
30+
b64url = Base64UrlEncoder()
31+
32+
@abstractmethod
33+
def kid_to_public_key(self, kid: str) -> P:
34+
"""Get a public key from a kid.
35+
36+
In DIDComm v1, kids are the base58 encoded keys.
37+
"""
38+
39+
@abstractmethod
40+
async def pack_message(
41+
self, to_verkeys: Sequence[P], from_key: Optional[S], message: bytes
42+
) -> JweEnvelope:
43+
"""Encode a message using the DIDComm v1 'pack' algorithm."""
44+
45+
@abstractmethod
46+
async def unpack_message(
47+
self, wrapper: JweEnvelope, recip_key: S, recip_data: RecipData
48+
) -> LegacyUnpackResult:
49+
"""Decode a message using DIDCvomm v1 'unpack' algorithm."""

didcomm_messaging/legacy/crypto.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
"""DIDComm v1 packing and unpacking."""
1+
"""DIDComm v1 packing and unpacking.
2+
3+
This implementation is kept around for backwards compatibility. It is
4+
likely that you should use the LegacyCryptoService interfaces instead.
5+
"""
26

37
from collections import OrderedDict
48
from typing import Iterable, Optional, Sequence, Dict, Union, Tuple, cast

didcomm_messaging/legacy/nacl.py

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
"""LegacyCryptoService implementation for pynacl."""
2+
3+
from dataclasses import dataclass
4+
from typing import Dict, Optional, Sequence
5+
6+
import base58
7+
8+
try:
9+
import nacl.bindings
10+
import nacl.exceptions
11+
import nacl.utils
12+
except ImportError as err:
13+
raise ImportError(
14+
"Legacy implementation requires 'legacy' extra to be installed"
15+
) from err
16+
from pydid import VerificationMethod
17+
18+
from didcomm_messaging.crypto.base import (
19+
PublicKey,
20+
SecretKey,
21+
SecretsManager,
22+
)
23+
from didcomm_messaging.crypto.jwe import JweEnvelope
24+
from didcomm_messaging.multiformats import multibase, multicodec
25+
26+
from . import crypto
27+
from .base import LegacyCryptoService, LegacyUnpackResult, RecipData
28+
29+
30+
@dataclass
31+
class KeyPair(SecretKey):
32+
"""Keys."""
33+
34+
verkey: bytes
35+
sigkey: bytes
36+
37+
@property
38+
def verkey_b58(self) -> str:
39+
"""Return base58 encoding of verkey."""
40+
return base58.b58encode(self.verkey).decode()
41+
42+
@property
43+
def kid(self) -> str:
44+
"""Get the key ID."""
45+
return self.verkey_b58
46+
47+
48+
@dataclass
49+
class EdPublicKey(PublicKey):
50+
"""Simple public key representation as base58 encoded str."""
51+
52+
value: bytes
53+
54+
@classmethod
55+
def from_verification_method(cls, vm: VerificationMethod) -> "EdPublicKey":
56+
"""Create a Key instance from a DID Document Verification Method."""
57+
key_bytes = cls.key_bytes_from_verification_method(vm)
58+
return EdPublicKey(key_bytes)
59+
60+
@property
61+
def key(self) -> str:
62+
"""Return base58 encoded key."""
63+
return base58.b58encode(self.value).decode()
64+
65+
@property
66+
def kid(self) -> str:
67+
"""Get the key ID."""
68+
return self.key
69+
70+
@property
71+
def multikey(self) -> str:
72+
"""Get the key in multikey format."""
73+
return multibase.encode(
74+
multicodec.wrap("ed25519-pub", base58.b58decode(self.key)), "base58btc"
75+
)
76+
77+
78+
class NaclLegacyCryptoService(LegacyCryptoService[EdPublicKey, KeyPair]):
79+
"""Legacy crypto service using pynacl."""
80+
81+
def kid_to_public_key(self, kid: str):
82+
"""Get a public key from a kid.
83+
84+
In DIDComm v1, kids are the base58 encoded keys.
85+
"""
86+
return EdPublicKey(base58.b58decode(kid))
87+
88+
async def pack_message(
89+
self,
90+
to_verkeys: Sequence[EdPublicKey],
91+
from_key: Optional[KeyPair],
92+
message: bytes,
93+
) -> JweEnvelope:
94+
"""Encode a message using the DIDComm v1 'pack' algorithm."""
95+
packed = crypto.pack_message(
96+
message=message.decode(),
97+
to_verkeys=[vk.value for vk in to_verkeys],
98+
from_verkey=from_key.verkey if from_key else None,
99+
from_sigkey=from_key.sigkey if from_key else None,
100+
)
101+
return JweEnvelope.deserialize(packed)
102+
103+
def _extract_payload_key(self, recip_key: KeyPair, recip_data: RecipData):
104+
"""Extract the payload key."""
105+
pk = nacl.bindings.crypto_sign_ed25519_pk_to_curve25519(recip_key.verkey)
106+
sk = nacl.bindings.crypto_sign_ed25519_sk_to_curve25519(recip_key.sigkey)
107+
108+
if recip_data.nonce and recip_data.enc_sender:
109+
sender_vk = nacl.bindings.crypto_box_seal_open(
110+
recip_data.enc_sender, pk, sk
111+
).decode()
112+
sender_pk = nacl.bindings.crypto_sign_ed25519_pk_to_curve25519(
113+
crypto.b58_to_bytes(sender_vk)
114+
)
115+
cek = nacl.bindings.crypto_box_open(
116+
recip_data.enc_cek, recip_data.nonce, sender_pk, sk
117+
)
118+
else:
119+
sender_vk = None
120+
cek = nacl.bindings.crypto_box_seal_open(recip_data.enc_cek, pk, sk)
121+
return cek, sender_vk
122+
123+
async def unpack_message(
124+
self, wrapper: JweEnvelope, recip_key: KeyPair, recip_data: RecipData
125+
) -> LegacyUnpackResult:
126+
"""Decode a message using DIDCvomm v1 'unpack' algorithm."""
127+
cek, sender_vk = self._extract_payload_key(recip_key, recip_data)
128+
129+
payload_bin = wrapper.ciphertext + wrapper.tag
130+
message = crypto.decrypt_plaintext(
131+
payload_bin, wrapper.protected_b64, wrapper.iv, cek
132+
)
133+
return LegacyUnpackResult(message.encode(), recip_key.kid, sender_vk)
134+
135+
136+
class InMemSecretsManager(SecretsManager[KeyPair]):
137+
"""In-memory secrets manager for ed25519 key pairs."""
138+
139+
def __init__(self):
140+
"""Initialize the manager."""
141+
self.secrets: Dict[str, KeyPair] = {}
142+
143+
async def get_secret_by_kid(self, kid: str) -> Optional[KeyPair]:
144+
"""Retrieve secret by kid."""
145+
return self.secrets.get(kid)
146+
147+
def create(self, seed: Optional[bytes] = None) -> KeyPair:
148+
"""Create and store a new keypair."""
149+
keys = KeyPair(*crypto.create_keypair(seed))
150+
self.secrets[keys.kid] = keys
151+
return keys

0 commit comments

Comments
 (0)