|
| 1 | +# Copyright (c) 2025 Tulir Asokan |
| 2 | +# |
| 3 | +# This Source Code Form is subject to the terms of the Mozilla Public |
| 4 | +# License, v. 2.0. If a copy of the MPL was not distributed with this |
| 5 | +# file, You can obtain one at http://mozilla.org/MPL/2.0/. |
| 6 | +from mautrix import client as cli |
| 7 | +from mautrix.errors import MNotFound |
| 8 | +from mautrix.types import EventType, SecretStorageDefaultKeyEventContent |
| 9 | + |
| 10 | +from .key import Key, KeyMetadata |
| 11 | +from .types import EncryptedAccountDataEventContent |
| 12 | + |
| 13 | + |
| 14 | +class Machine: |
| 15 | + client: cli.Client |
| 16 | + |
| 17 | + def __init__(self, client: cli.Client) -> None: |
| 18 | + self.client = client |
| 19 | + |
| 20 | + async def get_default_key_id(self) -> str | None: |
| 21 | + try: |
| 22 | + data = await self.client.get_account_data(EventType.SECRET_STORAGE_DEFAULT_KEY) |
| 23 | + return SecretStorageDefaultKeyEventContent.deserialize(data).key |
| 24 | + except (MNotFound, ValueError): |
| 25 | + return None |
| 26 | + |
| 27 | + async def set_default_key_id(self, key_id: str) -> None: |
| 28 | + await self.client.set_account_data( |
| 29 | + EventType.SECRET_STORAGE_DEFAULT_KEY, |
| 30 | + SecretStorageDefaultKeyEventContent(key=key_id), |
| 31 | + ) |
| 32 | + |
| 33 | + async def get_key_data(self, key_id: str) -> KeyMetadata: |
| 34 | + data = await self.client.get_account_data(f"m.secret_storage.key.{key_id}") |
| 35 | + return KeyMetadata.deserialize(data) |
| 36 | + |
| 37 | + async def set_key_data(self, key_id: str, data: KeyMetadata) -> None: |
| 38 | + await self.client.set_account_data(f"m.secret_storage.key.{key_id}", data) |
| 39 | + |
| 40 | + async def get_default_key_data(self) -> tuple[str, KeyMetadata]: |
| 41 | + key_id = await self.get_default_key_id() |
| 42 | + if not key_id: |
| 43 | + raise ValueError("No default key ID set") |
| 44 | + return key_id, await self.get_key_data(key_id) |
| 45 | + |
| 46 | + async def get_decrypted_account_data(self, event_type: EventType | str, key: Key) -> bytes: |
| 47 | + data = await self.client.get_account_data(event_type) |
| 48 | + parsed = EncryptedAccountDataEventContent.deserialize(data) |
| 49 | + return parsed.decrypt(event_type, key) |
| 50 | + |
| 51 | + async def set_encrypted_account_data( |
| 52 | + self, event_type: EventType | str, data: bytes, *keys: Key |
| 53 | + ) -> None: |
| 54 | + encrypted_data = {} |
| 55 | + for key in keys: |
| 56 | + encrypted_data[key.id] = key.encrypt(event_type, data) |
| 57 | + await self.client.set_account_data( |
| 58 | + event_type, |
| 59 | + EncryptedAccountDataEventContent(encrypted=encrypted_data), |
| 60 | + ) |
| 61 | + |
| 62 | + async def generate_and_upload_key(self, passphrase: str | None = None) -> Key: |
| 63 | + key = Key.generate(passphrase) |
| 64 | + await self.set_key_data(key.id, key.metadata) |
| 65 | + return key |
0 commit comments