diff --git a/electrum/crypto.py b/electrum/crypto.py index f67b09970e9..6ee1751a8f4 100644 --- a/electrum/crypto.py +++ b/electrum/crypto.py @@ -134,9 +134,10 @@ def strip_PKCS7_padding(data: bytes) -> bytes: return data[0:-padlen] -def aes_encrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes: +def aes_encrypt_with_iv(key: bytes, iv: bytes, data: bytes, append_pkcs7=True) -> bytes: assert_bytes(key, iv, data) - data = append_PKCS7_padding(data) + if append_pkcs7: + data = append_PKCS7_padding(data) if HAS_CRYPTODOME: e = CD_AES.new(key, CD_AES.MODE_CBC, iv).encrypt(data) elif HAS_CRYPTOGRAPHY: @@ -152,7 +153,7 @@ def aes_encrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes: return e -def aes_decrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes: +def aes_decrypt_with_iv(key: bytes, iv: bytes, data: bytes, strip_pkcs7=True) -> bytes: assert_bytes(key, iv, data) if HAS_CRYPTODOME: cipher = CD_AES.new(key, CD_AES.MODE_CBC, iv) @@ -168,9 +169,11 @@ def aes_decrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes: else: raise Exception("no AES backend found") try: - return strip_PKCS7_padding(data) + if strip_pkcs7: + data = strip_PKCS7_padding(data) except InvalidPadding: raise InvalidPassword() + return data def EncodeAES_bytes(secret: bytes, msg: bytes) -> bytes: diff --git a/electrum/gui/qt/main_window.py b/electrum/gui/qt/main_window.py index 41448840f0c..b6619de166d 100644 --- a/electrum/gui/qt/main_window.py +++ b/electrum/gui/qt/main_window.py @@ -1955,14 +1955,13 @@ def update_buttons_on_seed(self): self.password_button.setVisible(self.wallet.may_have_password()) def change_password_dialog(self): - from electrum.storage import StorageEncryptionVersion - if StorageEncryptionVersion.XPUB_PASSWORD in self.wallet.get_available_storage_encryption_versions(): + if self.wallet.is_hw_encryption_available(): from .password_dialog import ChangePasswordDialogForHW d = ChangePasswordDialogForHW(self, self.wallet) ok, old_password, new_password, encrypt_with_xpub = d.run() if not ok: return - has_xpub_encryption = self.wallet.storage.get_encryption_version() == StorageEncryptionVersion.XPUB_PASSWORD + has_xpub_encryption = self.wallet.storage.is_encrypted_with_hw_device() def on_password(hw_dev_pw): self._update_wallet_password( old_password = hw_dev_pw if has_xpub_encryption else old_password, diff --git a/electrum/storage.py b/electrum/storage.py index 7c79f2de5ac..e1a9be80ff7 100644 --- a/electrum/storage.py +++ b/electrum/storage.py @@ -23,22 +23,35 @@ # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import os -import threading import stat import hashlib import base64 import zlib +import hmac from enum import IntEnum from typing import Optional +from secrets import token_bytes import electrum_ecc as ecc from . import crypto -from .util import (profiler, InvalidPassword, WalletFileException, bfh, standardize_path, - test_read_write_permissions, os_chmod) - -from .wallet_db import WalletDB +from .util import InvalidPassword, standardize_path, test_read_write_permissions, os_chmod from .logging import Logger +from .crypto import aes_encrypt_with_iv, aes_decrypt_with_iv, strip_PKCS7_padding + +# todo: remove those deps? +from .bitcoin import var_int +from .transaction import BCDataStream + + +STORAGE_VERSION = 0 +STORAGE_MAGIC_BYTES = b'Electrum' + +STORAGE_FLAG_ZIP_FIRST_BLOB = 0x01 +STORAGE_FLAGS = STORAGE_FLAG_ZIP_FIRST_BLOB + +KDF_FLAGS = 0 # update when we change the kdf +KDF_POWER = 10 # rounds = pow(2, kdf_power) def get_derivation_used_for_hw_device_encryption(): @@ -47,10 +60,9 @@ def get_derivation_used_for_hw_device_encryption(): "/1112098098'") # ascii 'BIE2' as decimal -class StorageEncryptionVersion(IntEnum): - PLAINTEXT = 0 - USER_PASSWORD = 1 - XPUB_PASSWORD = 2 +class PasswordType(IntEnum): + USER = 1 + XPUB = 2 class StorageReadWriteError(Exception): pass @@ -75,26 +87,31 @@ def __init__( self._file_exists = bool(self.path and os.path.exists(self.path)) self.logger.info(f"wallet path {self.path}") self._allow_partial_writes = allow_partial_writes - self.pubkey = None + self.master_key = None self.decrypted = '' + self._is_old_base64 = False + self.encrypted_keys = [] try: test_read_write_permissions(self.path) except IOError as e: raise StorageReadWriteError(e) from e if self.file_exists(): + self.read_header() with open(self.path, "rb") as f: - self.raw = f.read().decode("utf-8") self.pos = f.seek(0, os.SEEK_END) self.init_pos = self.pos - self._encryption_version = self._init_encryption_version() else: - self.raw = '' - self._encryption_version = StorageEncryptionVersion.PLAINTEXT + self.raw = b'' self.pos = 0 self.init_pos = 0 + self.encrypted_keys = [] + + @property + def mac_offset(self): + return len(self.header) def read(self): - return self.decrypted if self.is_encrypted() else self.raw + return self.decrypted if self.is_encrypted() else self.raw.decode('utf-8') def write(self, data: str) -> None: try: @@ -108,7 +125,7 @@ def write(self, data: str) -> None: os_chmod(temp_path, mode) # set restrictive perms *before* we write data except PermissionError as e: # tolerate NFS or similar weirdness? self.logger.warning(f"cannot chmod temp wallet file: {e!r}") - f.write(s.encode("utf-8")) + f.write(s) self.pos = f.seek(0, os.SEEK_END) f.flush() os.fsync(f.fileno()) @@ -120,14 +137,17 @@ def write(self, data: str) -> None: self.logger.info(f"saved {self.path}") def append(self, data: str) -> None: - """ append data to file. for the moment, only non-encrypted file""" + """ append data to encrypted file""" assert self._allow_partial_writes - assert not self.is_encrypted() + s, mac = self.encrypt_for_append(data) with open(self.path, "rb+") as f: pos = f.seek(0, os.SEEK_END) if pos != self.pos: raise StorageOnDiskUnexpectedlyChanged(f"expected size {self.pos}, found {pos}") - f.write(data.encode("utf-8")) + f.write(s) + if mac is not None: + f.seek(self.mac_offset, 0) + f.write(mac) self.pos = f.seek(0, os.SEEK_END) f.flush() os.fsync(f.fileno()) @@ -139,7 +159,6 @@ def should_do_full_write_next(self) -> bool: """If false, next action can be a partial-write ('append').""" return ( not self.file_exists() - or self.is_encrypted() or self._needs_consolidation() or not self._allow_partial_writes ) @@ -154,109 +173,266 @@ def is_past_initial_decryption(self) -> bool: if encryption is disabled completely (self.is_encrypted() == False), or if encryption is enabled but the contents have already been decrypted. """ - return not self.is_encrypted() or bool(self.pubkey) + return not self.is_encrypted() or bool(self.master_key) def is_encrypted(self) -> bool: """Return if storage encryption is currently enabled.""" - return self.get_encryption_version() != StorageEncryptionVersion.PLAINTEXT + return self._is_old_base64 or len(self.encrypted_keys) > 0 def is_encrypted_with_user_pw(self) -> bool: - return self.get_encryption_version() == StorageEncryptionVersion.USER_PASSWORD + return PasswordType.USER in self.get_encryption_versions() def is_encrypted_with_hw_device(self) -> bool: - return self.get_encryption_version() == StorageEncryptionVersion.XPUB_PASSWORD - - def get_encryption_version(self): - """Return the version of encryption used for this storage. + return PasswordType.XPUB in self.get_encryption_versions() - 0: plaintext / no encryption - - ECIES, private key derived from a password, - 1: password is provided by user - 2: password is derived from an xpub; used with hw wallets + def get_encryption_versions(self) -> list[PasswordType]: """ - return self._encryption_version - - def _init_encryption_version(self): - try: - magic = base64.b64decode(self.raw, validate=True)[0:4] - if magic == b'BIE1': - return StorageEncryptionVersion.USER_PASSWORD - elif magic == b'BIE2': - return StorageEncryptionVersion.XPUB_PASSWORD + Returns a list of encryption versions (password types) used for this storage. + Empty list if unencrypted. + """ + if self._is_old_base64: + return [self._encryption_version] + return [x[0] for x in self.encrypted_keys] + + def read_header(self): + f = open(self.path, "rb") + first_bytes = f.read(8) + if first_bytes.startswith(base64.b64encode(b'BIE')): + self._is_old_base64 = True + data = first_bytes + f.read() + self.raw = base64.b64decode(data, validate=True) + self._magic = self.raw[0:4] + if self._magic not in [b'BIE1', b'BIE2']: + raise Exception('unknown file format') + self._encryption_version = PasswordType.USER if self._magic == b'BIE1' else PasswordType.XPUB + else: + self._is_old_base64 = False + if first_bytes != STORAGE_MAGIC_BYTES: + self.raw = first_bytes + f.read() else: - return StorageEncryptionVersion.PLAINTEXT - except Exception: - return StorageEncryptionVersion.PLAINTEXT + version = ord(f.read(1)) + assert version == STORAGE_VERSION + self._storage_flags = ord(f.read(1)) + num_passwords = ord(f.read(1)) + self.encrypted_keys = [] + for i in range(num_passwords): + password_type = PasswordType(ord(f.read(1))) + kdf_flags = ord(f.read(1)) + kdf_power = ord(f.read(1)) + encrypted_master_key = f.read(32) + self.encrypted_keys.append((password_type, kdf_flags, kdf_power, encrypted_master_key)) + self.master_key_mac = f.read(32) + header_size = f.tell() + f.seek(0) + self.header = f.read(header_size) + f.close() + + def update_header(self, is_zipped=False) -> bytes: + N = len(self.encrypted_keys) + assert N < 256 + self._storage_flags = STORAGE_FLAGS + header = STORAGE_MAGIC_BYTES + bytes([STORAGE_VERSION, self._storage_flags]) + bytes([N]) + for item in self.encrypted_keys: + pw_type, kdf_flags, kdf_power, encrypted_master_key = item + header += bytes([pw_type, kdf_flags, kdf_power]) + encrypted_master_key + mac = hmac.new(self.master_key, None, hashlib.sha256).digest() + assert len(mac) == 32 + header += mac + self.header = header + self.master_key_mac = mac @staticmethod - def get_eckey_from_password(password): + def get_old_eckey_from_password(password): if password is None: password = "" secret = hashlib.pbkdf2_hmac('sha512', password.encode('utf-8'), b'', iterations=1024) ec_key = ecc.ECPrivkey.from_arbitrary_size_secret(secret) return ec_key - def _get_encryption_magic(self): - v = self._encryption_version - if v == StorageEncryptionVersion.USER_PASSWORD: - return b'BIE1' - elif v == StorageEncryptionVersion.XPUB_PASSWORD: - return b'BIE2' - else: - raise WalletFileException('no encryption magic for version: %s' % v) + def get_secret_from_password(self, password, kdf_flags, rounds): + # kdf flags are not used for the moment + return hashlib.pbkdf2_hmac('sha512', password.encode('utf-8'), b'', iterations=rounds) + + def read_all(self): + with open(self.path, "rb") as f: + self.raw = f.read() + + def decrypt_old(self, password) -> None: + self.read_all() + ec_key = self.get_old_eckey_from_password(password) + s = crypto.ecies_decrypt_message(ec_key, self.raw, magic=self._magic) + s = zlib.decompress(s) + # convert to new scheme + self.init_master_key() + self._add_password_to_header(password, self._encryption_version) + self.update_header() + self.decrypted = s.decode('utf8') + self.write(self.decrypted) def decrypt(self, password) -> None: """Raises an InvalidPassword exception on invalid password""" if self.is_past_initial_decryption(): return - ec_key = self.get_eckey_from_password(password) - if self.raw: - enc_magic = self._get_encryption_magic() - s = zlib.decompress(crypto.ecies_decrypt_message(ec_key, self.raw, magic=enc_magic)) - s = s.decode('utf8') - else: - s = '' - self.pubkey = ec_key.get_public_key_hex() + if self._is_old_base64: + self.decrypt_old(password) + return + self.check_password(password) + self.read_all() + mac = self.raw[self.mac_offset:self.mac_offset + 32] + ciphertext = self.raw[self.mac_offset + 32:] + self.iv = ciphertext[-16:] + key_e, key_m = self.master_key[0:16], self.master_key[16:32] + decrypted = aes_decrypt_with_iv(key_e, key_m, ciphertext, strip_pkcs7=False) + vds = BCDataStream() + vds.write(decrypted) + s = b'' + self.mac = hmac.new(key_m, b'', hashlib.sha256) + # break if the remaining bytes have not been commited + while self.mac.digest() != mac: + n = vds.read_compact_size() + n_size = len(var_int(n)) + vds.read_cursor -= n_size + # this may raise if the file has been corrupted + blob = vds.read_bytes(n*16) + blob = strip_PKCS7_padding(blob) + blob = blob[n_size:] + self.mac.update(blob) + if len(s) == 0: + if self._storage_flags & STORAGE_FLAG_ZIP_FIRST_BLOB: + blob = zlib.decompress(blob) + s += blob + s = s.decode('utf8') self.decrypted = s - def encrypt_before_writing(self, plaintext: str) -> str: - s = plaintext - if self.pubkey: - self.decrypted = plaintext - s = bytes(s, 'utf8') - c = zlib.compress(s, level=zlib.Z_BEST_SPEED) - enc_magic = self._get_encryption_magic() - public_key = ecc.ECPubkey(bfh(self.pubkey)) - s = crypto.ecies_encrypt_message(public_key, c, magic=enc_magic) - s = s.decode('utf8') + def get_prefixed_blob(self, s: bytes) -> bytes: + """return data prefixed by its size (number of 16 bytes blocks required, including bytes used for size and the padding) """ + for x in [1,3,5,9]: + size = len(s) + x + n = size // 16 + 1 # add one for pkcs7 padding + header = var_int(n) + if len(header) == x: + return header + s + else: + raise Exception('blob too large for var_int') + + def init_master_key(self): + self.master_key = token_bytes(32) + + def encrypt_before_writing(self, plaintext: str) -> bytes: + s = bytes(plaintext, 'utf8') + if self.master_key: + if self._storage_flags & STORAGE_FLAG_ZIP_FIRST_BLOB: + s = zlib.compress(s, level=zlib.Z_BEST_SPEED) + blob = self.get_prefixed_blob(s) + key_e, key_m = self.master_key[0:16], self.master_key[16:32] + ciphertext = aes_encrypt_with_iv(key_e, key_m, blob) + # save mac, key_e, key_m, and iv, for subsequent writes + self.iv = ciphertext[-16:] + self.mac = hmac.new(key_m, s, hashlib.sha256) + mac = self.mac.digest() + s = self.header + mac + ciphertext return s + def encrypt_for_append(self, plaintext: str) -> str: + s = bytes(plaintext, 'utf8') + if self.master_key: + self.mac.update(s) + mac = self.mac.digest() + blob = self.get_prefixed_blob(s) + key_e = self.master_key[0:16] + ciphertext = aes_encrypt_with_iv(key_e, self.iv, blob) + self.iv = ciphertext[-16:] + return ciphertext, mac + else: + return s, None + + def _old_check_password(self, password) -> None: + if not self.is_past_initial_decryption(): + self.decrypt(password) # this sets self.master_key + assert self.master_key is not None + if self.pubkey != self.get_old_eckey_from_password(password).get_public_key_hex(): + raise InvalidPassword() + + def _check_update_password(self, password: Optional[str], new_password: Optional[str], new_password_type: Optional[PasswordType]) -> None: + """ + if old_password == new_password, only check password + otherwise, check and update password + """ + assert self.is_encrypted() + # decrypt master_key and compare mac + for i, item in enumerate(self.encrypted_keys): + password_type, kdf_flags, kdf_power, encrypted_master_key = item + decrypted_master_key = self._get_decrypted_master_key(encrypted_master_key, password, kdf_flags, kdf_power) + if hmac.new(decrypted_master_key, None, hashlib.sha256).digest() == self.master_key_mac: + break + else: + raise InvalidPassword() + self.master_key = decrypted_master_key + if new_password: + if new_password != password: + assert new_password_type is not None + kdf_flags, kdf_power, encrypted_master_key = self._get_encrypted_master_key(new_password, new_password_type) + self.encrypted_keys[i] = new_password_type, kdf_flags, kdf_power, encrypted_master_key + else: + assert new_password_type is None + del self.encrypted_keys[i] + + def _get_encrypted_master_key(self, password, password_type): + # password_type not used currently. + # we could use it to make KDF dependent on it + kdf_flags, kdf_power = KDF_FLAGS, KDF_POWER + password_key = self.get_secret_from_password(password, kdf_flags, rounds=pow(2, kdf_power)) + key_e, key_m = password_key[0:16], password_key[16:32] + encrypted_master_key = aes_encrypt_with_iv(key_e, key_m, self.master_key, append_pkcs7=False) + assert len(encrypted_master_key) == 32 + return kdf_flags, kdf_power, encrypted_master_key + + def _get_decrypted_master_key(self, encrypted_master_key, password, kdf_flags, kdf_power): + password_key = self.get_secret_from_password(password, kdf_flags, rounds=pow(2, kdf_power)) + key_e, key_m = password_key[0:16], password_key[16:32] + decrypted_master_key = aes_decrypt_with_iv(key_e, key_m, encrypted_master_key, strip_pkcs7=False) + assert len(encrypted_master_key) == 32 + return decrypted_master_key + + def _add_password_to_header(self, password, password_type): + kdf_flags, kdf_power, encrypted_master_key = self._get_encrypted_master_key(password, password_type) + self.encrypted_keys.append((password_type, kdf_flags, kdf_power, encrypted_master_key)) + def check_password(self, password: Optional[str]) -> None: - """Raises an InvalidPassword exception on invalid password""" + """Raises an InvalidPassword exception on invalid password + """ if not self.is_encrypted(): if password is not None: raise InvalidPassword("password given but wallet has no password") return + if self._is_old_base64: + self._old_check_password(password) + return + self._check_update_password(password, password, None) + + def update_password(self, password, new_password, new_password_type): + self._check_update_password(password, new_password, new_password_type) + self.update_header() + + def remove_password(self, password): + """ remove password from list. disable encryption ig list is empty.""" if not self.is_past_initial_decryption(): - self.decrypt(password) # this sets self.pubkey - assert self.pubkey is not None - if self.pubkey != self.get_eckey_from_password(password).get_public_key_hex(): - raise InvalidPassword() + raise Exception("storage needs to be decrypted before changing password") + self._check_update_password(password, None, None) + if len(self.encrypted_keys) == 0: + self.master_key = None + else: + self.update_header() - def set_password(self, password, enc_version=None): + def add_password(self, password, password_type): """Set a password to be used for encrypting this storage.""" + assert password if not self.is_past_initial_decryption(): raise Exception("storage needs to be decrypted before changing password") - if enc_version is None: - enc_version = self._encryption_version - if password and enc_version != StorageEncryptionVersion.PLAINTEXT: - ec_key = self.get_eckey_from_password(password) - self.pubkey = ec_key.get_public_key_hex() - self._encryption_version = enc_version - else: - self.pubkey = None - self._encryption_version = StorageEncryptionVersion.PLAINTEXT + if len(self.encrypted_keys) == 0: + self.init_master_key() + self._add_password_to_header(password, password_type) + self.update_header() def basename(self) -> str: return os.path.basename(self.path) diff --git a/electrum/util.py b/electrum/util.py index 9cc1f334d59..300ae766e74 100644 --- a/electrum/util.py +++ b/electrum/util.py @@ -2230,16 +2230,16 @@ def test_read_write_permissions(path) -> None: # note: There might already be a file at 'path'. # Make sure we do NOT overwrite/corrupt that! temp_path = "%s.tmptest.%s" % (path, os.getpid()) - echo = "fs r/w test" + echo = b"fs r/w test" try: # test READ permissions for actual path if os.path.exists(path): with open(path, "rb") as f: f.read(1) # read 1 byte # test R/W sanity for "similar" path - with open(temp_path, "w", encoding='utf-8') as f: + with open(temp_path, "wb") as f: f.write(echo) - with open(temp_path, "r", encoding='utf-8') as f: + with open(temp_path, "rb") as f: echo2 = f.read() os.remove(temp_path) except Exception as e: diff --git a/electrum/wallet.py b/electrum/wallet.py index 88002e96af7..51bd3c6efae 100644 --- a/electrum/wallet.py +++ b/electrum/wallet.py @@ -62,7 +62,7 @@ ) from .simple_config import SimpleConfig from .fee_policy import FeePolicy, FixedFeePolicy, FEE_RATIO_HIGH_WARNING, FEERATE_WARNING_HIGH_FEE -from .storage import StorageEncryptionVersion, WalletStorage +from .storage import PasswordType, WalletStorage from .wallet_db import WalletDB from .transaction import ( Transaction, TxInput, TxOutput, PartialTransaction, PartialTxInput, PartialTxOutput, TxOutpoint, Sighash @@ -458,8 +458,10 @@ def __init__(self, db: WalletDB, *, config: SimpleConfig): self.test_addresses_sanity() if self.storage and self.has_storage_encryption(): - if (se := self.storage.get_encryption_version()) not in (ae := self.get_available_storage_encryption_versions()): - raise WalletFileException(f"unexpected storage encryption type. found: {se!r}. allowed: {ae!r}") + ae = self.get_available_storage_encryption_versions() + for se in self.storage.get_encryption_versions(): + if se not in ae: + raise WalletFileException(f"unexpected storage encryption type. found: {se!r}. allowed: {ae!r}") self.register_callbacks() @@ -3138,17 +3140,20 @@ def has_password(self) -> bool: def can_have_keystore_encryption(self): return self.keystore and self.keystore.may_have_password() - def get_available_storage_encryption_versions(self) -> Sequence[StorageEncryptionVersion]: + def get_available_storage_encryption_versions(self) -> Sequence[PasswordType]: """Returns the type of storage encryption offered to the user. A wallet file (storage) is either encrypted with this version or is stored in plaintext. """ - out = [StorageEncryptionVersion.USER_PASSWORD] + out = [PasswordType.USER] if isinstance(self.keystore, Hardware_KeyStore): - out.append(StorageEncryptionVersion.XPUB_PASSWORD) + out.append(PasswordType.XPUB) return out + def is_hw_encryption_available(self): + return PasswordType.XPUB in self.get_available_storage_encryption_versions() + def has_keystore_encryption(self) -> bool: """Returns whether encryption is enabled for the keystore. @@ -3182,12 +3187,17 @@ def update_password(self, old_pw, new_pw, *, encrypt_storage: bool = True, xpub_ raise InvalidPassword() self.check_password(old_pw) if self.storage: - if encrypt_storage: - enc_version = StorageEncryptionVersion.XPUB_PASSWORD if xpub_encrypt else StorageEncryptionVersion.USER_PASSWORD - assert enc_version in self.get_available_storage_encryption_versions() + if encrypt_storage and new_pw: + password_type = PasswordType.XPUB if xpub_encrypt else PasswordType.USER + assert password_type in self.get_available_storage_encryption_versions() + if self.storage.is_encrypted(): + self.storage.update_password(old_pw, new_pw, password_type) + else: + # we never add more than one password + self.storage.add_password(new_pw, password_type) else: - enc_version = StorageEncryptionVersion.PLAINTEXT - self.storage.set_password(new_pw, enc_version) + if self.storage.is_encrypted(): + self.storage.remove_password(old_pw) # make sure next storage.write() saves changes self.db.set_modified(True) @@ -4215,9 +4225,9 @@ def check_password(self, password): if self.has_storage_encryption(): self.storage.check_password(password) - def get_available_storage_encryption_versions(self) -> Sequence[StorageEncryptionVersion]: + def get_available_storage_encryption_versions(self) -> Sequence[PasswordType]: # multisig wallets are not offered hw device encryption - return [StorageEncryptionVersion.USER_PASSWORD] + return [PasswordType.USER] def has_seed(self): return self.keystore.has_seed() diff --git a/electrum/wizard.py b/electrum/wizard.py index f15ea6de5ed..e3ebfdad85b 100644 --- a/electrum/wizard.py +++ b/electrum/wizard.py @@ -12,7 +12,7 @@ from electrum.network import ProxySettings from electrum.plugin import run_hook from electrum.slip39 import EncryptedSeed -from electrum.storage import WalletStorage, StorageEncryptionVersion +from electrum.storage import WalletStorage, PasswordType from electrum.wallet_db import WalletDB from electrum.bip32 import normalize_bip32_derivation, xpub_type from electrum import keystore, mnemonic, bitcoin @@ -766,10 +766,10 @@ def create_storage(self, path: str, data: dict): if data['encrypt']: if data.get('xpub_encrypt'): assert data.get('keystore_type') == 'hardware' and data['wallet_type'] == 'standard' - enc_version = StorageEncryptionVersion.XPUB_PASSWORD + password_type = PasswordType.XPUB else: - enc_version = StorageEncryptionVersion.USER_PASSWORD - storage.set_password(data['password'], enc_version=enc_version) + password_type = PasswordType.USER + storage.add_password(data['password'], password_type) db = WalletDB('', storage=storage, upgrade=True) db.set_keystore_encryption(bool(data['password'])) diff --git a/tests/test_bitcoin.py b/tests/test_bitcoin.py index 5673dc2fb83..accfc4fcb5c 100644 --- a/tests/test_bitcoin.py +++ b/tests/test_bitcoin.py @@ -270,7 +270,7 @@ def test_signmessage_segwit_witness_v0_address_test_we_also_accept_sigs_from_tre @needs_test_with_all_aes_implementations def test_decrypt_message(self): - key = WalletStorage.get_eckey_from_password('pw123') + key = WalletStorage.get_old_eckey_from_password('pw123') self.assertEqual(b'me<(s_s)>age', crypto.ecies_decrypt_message( key, b'QklFMQMDFtgT3zWSQsa+Uie8H/WvfUjlu9UN9OJtTt3KlgKeSTi6SQfuhcg1uIz9hp3WIUOFGTLr4RNQBdjPNqzXwhkcPi2Xsbiw6UCNJncVPJ6QBg==')) self.assertEqual(b'me<(s_s)>age', crypto.ecies_decrypt_message( @@ -280,7 +280,7 @@ def test_decrypt_message(self): @needs_test_with_all_aes_implementations def test_encrypt_message(self): - key = WalletStorage.get_eckey_from_password('secret_password77') + key = WalletStorage.get_old_eckey_from_password('secret_password77') msgs = [ bytes([0] * 555), b'cannot think of anything funny' diff --git a/tests/test_wallet.py b/tests/test_wallet.py index b122506fb96..c46e5e9552a 100644 --- a/tests/test_wallet.py +++ b/tests/test_wallet.py @@ -8,7 +8,7 @@ from io import StringIO import asyncio -from electrum.storage import WalletStorage +from electrum.storage import WalletStorage, PasswordType from electrum.wallet_db import FINAL_SEED_VERSION from electrum.wallet import (Abstract_Wallet, Standard_Wallet, create_new_wallet, Imported_Wallet, Wallet) @@ -89,6 +89,28 @@ def test_write_dictionary_to_file(self): for key, value in some_dict.items(): self.assertEqual(d[key], value) + def test_add_update_remove_password(self): + with open(self.wallet_path, "w") as f: + f.write("blah blah") + storage = WalletStorage(self.wallet_path) + pw1 = "123456" + pw2 = "789012" + pw3 = "tttttt" + storage.add_password(pw1, PasswordType.USER) + storage.add_password(pw2, PasswordType.USER) + self.assertTrue(storage.is_encrypted()) + with self.assertRaises(InvalidPassword): + storage.remove_password(pw3) + storage.remove_password(pw1) + self.assertTrue(storage.is_encrypted()) + with self.assertRaises(InvalidPassword): + storage.remove_password(pw1) + with self.assertRaises(InvalidPassword): + storage.update_password(pw1, pw3, PasswordType.USER) + storage.update_password(pw2, pw3, PasswordType.USER) + storage.remove_password(pw3) + self.assertFalse(storage.is_encrypted()) + async def test_storage_imported_add_privkeys_persistence_test(self): text = ' '.join([ 'p2wpkh:L4jkdiXszG26SUYvwwJhzGwg37H2nLhrbip7u6crmgNeJysv5FHL',