Skip to content

Commit 799cb27

Browse files
committed
Add utilities for SSSS
1 parent ecd10f5 commit 799cb27

File tree

11 files changed

+508
-2
lines changed

11 files changed

+508
-2
lines changed

mautrix/crypto/ssss/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from .key import Key, KeyMetadata, PassphraseMetadata
2+
from .types import (
3+
Algorithm,
4+
EncryptedAccountDataEventContent,
5+
EncryptedKeyData,
6+
PassphraseAlgorithm,
7+
)

mautrix/crypto/ssss/key.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
# Copyright (c) 2025 Tulir Asokan
2+
#
3+
# This Source Code Form is subject to the terms of the Mozilla Public
4+
# License, v. 2.0. If a copy of the MPL was not distributed with this
5+
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
6+
from typing import Optional
7+
import base64
8+
import hashlib
9+
import hmac
10+
11+
from attr import dataclass
12+
import unpaddedbase64
13+
14+
from mautrix.types import EventType, SerializableAttrs
15+
16+
from .types import Algorithm, EncryptedKeyData, PassphraseAlgorithm
17+
from .util import (
18+
calculate_hash,
19+
cryptorand,
20+
decode_base58_recovery_key,
21+
derive_keys,
22+
encode_base58_recovery_key,
23+
prepare_aes,
24+
)
25+
26+
try:
27+
from Crypto.Cipher import AES
28+
from Crypto.Util import Counter
29+
except ImportError:
30+
from Cryptodome.Cipher import AES
31+
from Cryptodome.Util import Counter
32+
33+
34+
@dataclass
35+
class PassphraseMetadata(SerializableAttrs):
36+
algorithm: PassphraseAlgorithm
37+
iterations: int
38+
salt: str
39+
bits: int = 256
40+
41+
def get_key(self, passphrase: str) -> bytes:
42+
if self.algorithm != PassphraseAlgorithm.PBKDF2:
43+
raise ValueError(f"Unsupported passphrase algorithm {self.algorithm}")
44+
return hashlib.pbkdf2_hmac(
45+
"sha512",
46+
passphrase.encode("utf-8"),
47+
self.salt.encode("utf-8"),
48+
self.iterations,
49+
self.bits // 8,
50+
)
51+
52+
53+
@dataclass
54+
class KeyMetadata(SerializableAttrs):
55+
algorithm: Algorithm
56+
57+
iv: str | None = None
58+
mac: str | None = None
59+
60+
name: str | None = None
61+
passphrase: Optional[PassphraseMetadata] = None
62+
63+
def verify_passphrase(self, key_id: str, phrase: str) -> "Key":
64+
if not self.passphrase:
65+
raise ValueError("Passphrase not set on this key")
66+
return self.verify_raw_key(key_id, self.passphrase.get_key(phrase))
67+
68+
def verify_recovery_key(self, key_id: str, recovery_key: str) -> "Key":
69+
return self.verify_raw_key(key_id, decode_base58_recovery_key(recovery_key))
70+
71+
def verify_raw_key(self, key_id: str, key: bytes) -> "Key":
72+
if self.mac.rstrip("=") != calculate_hash(key, self.iv):
73+
raise ValueError("Key MAC does not match")
74+
return Key(id=key_id, key=key, metadata=self)
75+
76+
77+
@dataclass
78+
class Key:
79+
id: str
80+
key: bytes
81+
metadata: KeyMetadata
82+
83+
@classmethod
84+
def generate(cls, passphrase: str | None = None) -> "Key":
85+
passphrase_meta = (
86+
PassphraseMetadata(
87+
algorithm=PassphraseAlgorithm.PBKDF2,
88+
iterations=500_000,
89+
salt=base64.b64encode(cryptorand.read(24)).decode("utf-8"),
90+
bits=256,
91+
)
92+
if passphrase
93+
else None
94+
)
95+
key = passphrase_meta.get_key(passphrase) if passphrase else cryptorand.read(32)
96+
iv = unpaddedbase64.encode_base64(cryptorand.read(16))
97+
metadata = KeyMetadata(
98+
algorithm=Algorithm.AES_HMAC_SHA2,
99+
passphrase=passphrase_meta,
100+
mac=calculate_hash(key, iv),
101+
iv=iv,
102+
)
103+
key_id = unpaddedbase64.encode_base64(cryptorand.read(24))
104+
return cls(key=key, id=key_id, metadata=metadata)
105+
106+
@property
107+
def recovery_key(self) -> str:
108+
return encode_base58_recovery_key(self.key)
109+
110+
def encrypt(self, event_type: str | EventType, data: str | bytes) -> EncryptedKeyData:
111+
if isinstance(data, str):
112+
data = data.encode("utf-8")
113+
data = base64.b64encode(data).rstrip(b"=")
114+
115+
aes_key, hmac_key = derive_keys(self.key, event_type)
116+
iv = bytearray(cryptorand.read(16))
117+
iv[8] &= 0x7F
118+
ciphertext = prepare_aes(aes_key, iv).encrypt(data)
119+
digest = hmac.digest(hmac_key, ciphertext, hashlib.sha256)
120+
return EncryptedKeyData(
121+
ciphertext=unpaddedbase64.encode_base64(ciphertext),
122+
iv=unpaddedbase64.encode_base64(iv),
123+
mac=unpaddedbase64.encode_base64(digest),
124+
)
125+
126+
def decrypt(self, event_type: str | EventType, data: EncryptedKeyData) -> bytes:
127+
aes_key, hmac_key = derive_keys(self.key, event_type)
128+
ciphertext = unpaddedbase64.decode_base64(data.ciphertext)
129+
mac = unpaddedbase64.decode_base64(data.mac)
130+
131+
expected_mac = hmac.digest(hmac_key, ciphertext, hashlib.sha256)
132+
if not hmac.compare_digest(mac, expected_mac):
133+
raise ValueError("Invalid MAC")
134+
135+
plaintext = prepare_aes(aes_key, data.iv).decrypt(ciphertext)
136+
return unpaddedbase64.decode_base64(plaintext.decode("utf-8"))

mautrix/crypto/ssss/key_test.py

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
# Copyright (c) 2025 Tulir Asokan
2+
#
3+
# This Source Code Form is subject to the terms of the Mozilla Public
4+
# License, v. 2.0. If a copy of the MPL was not distributed with this
5+
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
6+
import pytest
7+
8+
from ...types.event.type import EventType
9+
from .key import Key, KeyMetadata
10+
from .types import EncryptedAccountDataEventContent
11+
12+
KEY1_CROSS_SIGNING_MASTER_KEY = """{
13+
"encrypted": {
14+
"gEJqbfSEMnP5JXXcukpXEX1l0aI3MDs0": {
15+
"iv": "BpKP9nQJTE9jrsAssoxPqQ==",
16+
"ciphertext": "fNRiiiidezjerTgV+G6pUtmeF3izzj5re/mVvY0hO2kM6kYGrxLuIu2ej80=",
17+
"mac": "/gWGDGMyOLmbJp+aoSLh5JxCs0AdS6nAhjzpe+9G2Q0="
18+
}
19+
}
20+
}"""
21+
22+
KEY1_CROSS_SIGNING_MASTER_KEY_DECRYPTED = bytes(
23+
[
24+
0x68,
25+
0xF9,
26+
0x7F,
27+
0xD1,
28+
0x92,
29+
0x2E,
30+
0xEC,
31+
0xF6,
32+
0xB8,
33+
0x2B,
34+
0xB8,
35+
0x90,
36+
0xD2,
37+
0x4D,
38+
0x06,
39+
0x52,
40+
0x98,
41+
0x4E,
42+
0x7A,
43+
0x1D,
44+
0x70,
45+
0x3B,
46+
0x9E,
47+
0x86,
48+
0x7B,
49+
0x7E,
50+
0xBA,
51+
0xF7,
52+
0xFE,
53+
0xB9,
54+
0x5B,
55+
0x6F,
56+
]
57+
)
58+
59+
KEY1_META = """{
60+
"algorithm": "m.secret_storage.v1.aes-hmac-sha2",
61+
"passphrase": {
62+
"algorithm": "m.pbkdf2",
63+
"iterations": 500000,
64+
"salt": "y863BOoqOadgDp8S3FtHXikDJEalsQ7d"
65+
},
66+
"iv": "xxkTK0L4UzxgAFkQ6XPwsw",
67+
"mac": "MEhooO0ZhFJNxUhvRMSxBnJfL20wkLgle3ocY0ee/eA"
68+
}"""
69+
KEY1_ID = "gEJqbfSEMnP5JXXcukpXEX1l0aI3MDs0"
70+
KEY1_RECOVERY_KEY = "EsTE s92N EtaX s2h6 VQYF 9Kao tHYL mkyL GKMh isZb KJ4E tvoC"
71+
KEY1_PASSPHRASE = "correct horse battery staple"
72+
73+
KEY2_META = """{
74+
"algorithm": "m.secret_storage.v1.aes-hmac-sha2",
75+
"iv": "O0BOvTqiIAYjC+RMcyHfWw==",
76+
"mac": "7k6OruQlWg0UmQjxGZ0ad4Q6DdwkgnoI7G6X3IjBYtI="
77+
}"""
78+
KEY2_ID = "NVe5vK6lZS9gEMQLJw0yqkzmE5Mr7dLv"
79+
KEY2_RECOVERY_KEY = "EsUC xSxt XJgQ dz19 8WBZ rHdE GZo7 ybsn EFmG Y5HY MDAG GNWe"
80+
81+
KEY2_META_BROKEN_IV = """{
82+
"algorithm": "m.secret_storage.v1.aes-hmac-sha2",
83+
"iv": "O0BOvTqiIAYjC+RMcyHfWwMeowMeowMeow",
84+
"mac": "7k6OruQlWg0UmQjxGZ0ad4Q6DdwkgnoI7G6X3IjBYtI="
85+
}"""
86+
87+
KEY2_META_BROKEN_MAC = """{
88+
"algorithm": "m.secret_storage.v1.aes-hmac-sha2",
89+
"iv": "O0BOvTqiIAYjC+RMcyHfWw==",
90+
"mac": "7k6OruQlWg0UmQjxGZ0ad4Q6DdwkgnoI7G6X3IjBYtIMeowMeowMeow"
91+
}"""
92+
93+
94+
def get_key_meta(meta: str) -> KeyMetadata:
95+
return KeyMetadata.parse_json(meta)
96+
97+
98+
def get_key1() -> Key:
99+
return get_key_meta(KEY1_META).verify_recovery_key(KEY1_ID, KEY1_RECOVERY_KEY)
100+
101+
102+
def get_key2() -> Key:
103+
return get_key_meta(KEY2_META).verify_recovery_key(KEY2_ID, KEY2_RECOVERY_KEY)
104+
105+
106+
def get_encrypted_master_key() -> EncryptedAccountDataEventContent:
107+
return EncryptedAccountDataEventContent.parse_json(KEY1_CROSS_SIGNING_MASTER_KEY)
108+
109+
110+
def test_decrypt_success() -> None:
111+
key = get_key1()
112+
emk = get_encrypted_master_key()
113+
assert (
114+
emk.decrypt(EventType.CROSS_SIGNING_MASTER, key) == KEY1_CROSS_SIGNING_MASTER_KEY_DECRYPTED
115+
)
116+
117+
118+
def test_decrypt_fail_wrong_key() -> None:
119+
key = get_key2()
120+
emk = get_encrypted_master_key()
121+
with pytest.raises(ValueError):
122+
emk.decrypt(EventType.CROSS_SIGNING_MASTER, key)
123+
124+
125+
def test_decrypt_fail_fake_key() -> None:
126+
key = get_key2()
127+
key.id = KEY1_ID
128+
emk = get_encrypted_master_key()
129+
with pytest.raises(ValueError):
130+
emk.decrypt(EventType.CROSS_SIGNING_MASTER, key)
131+
132+
133+
def test_decrypt_fail_wrong_type() -> None:
134+
key = get_key1()
135+
emk = get_encrypted_master_key()
136+
with pytest.raises(ValueError):
137+
emk.decrypt(EventType.CROSS_SIGNING_SELF_SIGNING, key)
138+
139+
140+
def test_encrypt_roundtrip() -> None:
141+
key = get_key1()
142+
data = bytes([0xDE, 0xAD, 0xBE, 0xEF])
143+
ciphertext = key.encrypt("net.maunium.data", data)
144+
plaintext = key.decrypt("net.maunium.data", ciphertext)
145+
assert plaintext == data
146+
147+
148+
def test_verify_recovery_key_correct() -> None:
149+
meta = get_key_meta(KEY1_META)
150+
key = meta.verify_recovery_key(KEY1_ID, KEY1_RECOVERY_KEY)
151+
assert key.recovery_key == KEY1_RECOVERY_KEY
152+
153+
154+
def test_verify_recovery_key_correct2() -> None:
155+
meta = get_key_meta(KEY2_META)
156+
key = meta.verify_recovery_key(KEY2_ID, KEY2_RECOVERY_KEY)
157+
assert key.recovery_key == KEY2_RECOVERY_KEY
158+
159+
160+
def test_verify_recovery_key_invalid() -> None:
161+
meta = get_key_meta(KEY1_META)
162+
with pytest.raises(ValueError):
163+
meta.verify_recovery_key(KEY1_ID, "foo")
164+
165+
166+
def test_verify_recovery_key_incorrect() -> None:
167+
meta = get_key_meta(KEY1_META)
168+
with pytest.raises(ValueError):
169+
meta.verify_recovery_key(KEY2_ID, KEY2_RECOVERY_KEY)
170+
171+
172+
def test_verify_recovery_key_broken_iv() -> None:
173+
meta = get_key_meta(KEY2_META_BROKEN_IV)
174+
with pytest.raises(ValueError):
175+
meta.verify_recovery_key(KEY2_ID, KEY2_RECOVERY_KEY)
176+
177+
178+
def test_verify_recovery_key_broken_mac() -> None:
179+
meta = get_key_meta(KEY2_META_BROKEN_MAC)
180+
with pytest.raises(ValueError):
181+
meta.verify_recovery_key(KEY2_ID, KEY2_RECOVERY_KEY)
182+
183+
184+
def test_verify_passphrase_correct() -> None:
185+
meta = get_key_meta(KEY1_META)
186+
key = meta.verify_passphrase(KEY1_ID, KEY1_PASSPHRASE)
187+
assert key.recovery_key == KEY1_RECOVERY_KEY
188+
189+
190+
def test_verify_passphrase_incorrect() -> None:
191+
meta = get_key_meta(KEY1_META)
192+
with pytest.raises(ValueError):
193+
meta.verify_passphrase(KEY1_ID, "incorrect horse battery staple")
194+
195+
196+
def test_verify_passphrase_notset() -> None:
197+
meta = get_key_meta(KEY2_META)
198+
with pytest.raises(ValueError):
199+
meta.verify_passphrase(KEY2_ID, "hmm")

mautrix/crypto/ssss/types.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# Copyright (c) 2025 Tulir Asokan
2+
#
3+
# This Source Code Form is subject to the terms of the Mozilla Public
4+
# License, v. 2.0. If a copy of the MPL was not distributed with this
5+
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
6+
from typing import TYPE_CHECKING
7+
8+
from attr import dataclass
9+
10+
from mautrix.types import EventType, SerializableAttrs, SerializableEnum
11+
from mautrix.types.event.account_data import account_data_event_content_map
12+
13+
if TYPE_CHECKING:
14+
from .key import Key
15+
16+
17+
class Algorithm(SerializableEnum):
18+
AES_HMAC_SHA2 = "m.secret_storage.v1.aes-hmac-sha2"
19+
CURVE25519_AES_SHA2 = "m.secret_storage.v1.curve25519-aes-sha2"
20+
21+
22+
class PassphraseAlgorithm(SerializableEnum):
23+
PBKDF2 = "m.pbkdf2"
24+
25+
26+
@dataclass
27+
class EncryptedKeyData(SerializableAttrs):
28+
ciphertext: str
29+
iv: str
30+
mac: str
31+
32+
33+
@dataclass
34+
class EncryptedAccountDataEventContent(SerializableAttrs):
35+
encrypted: dict[str, EncryptedKeyData]
36+
37+
def decrypt(self, event_type: str | EventType, key: "Key") -> bytes:
38+
try:
39+
encrypted_data = self.encrypted[key.id]
40+
except KeyError as e:
41+
raise ValueError(f"Event not encrypted for provided key") from e
42+
return key.decrypt(event_type, encrypted_data)
43+
44+
45+
for encrypted_account_data_type in (
46+
EventType.CROSS_SIGNING_MASTER,
47+
EventType.CROSS_SIGNING_USER_SIGNING,
48+
EventType.CROSS_SIGNING_SELF_SIGNING,
49+
EventType.MEGOLM_BACKUP_V1,
50+
):
51+
account_data_event_content_map[encrypted_account_data_type] = EncryptedAccountDataEventContent

0 commit comments

Comments
 (0)