|
| 1 | +import codecs |
| 2 | +import itertools |
| 3 | +import logging |
| 4 | +import secrets |
| 5 | + |
| 6 | +from collections import namedtuple |
| 7 | +from typing import Dict, List, Sequence, Tuple, Union, Callable |
| 8 | + |
| 9 | +from shamir_mnemonic import generate_mnemonics |
| 10 | + |
| 11 | +from .types import Account |
| 12 | +from .defaults import BITS_DEFAULT, BITS, MNEM_ROWS_COLS, cryptocurrency_supported |
| 13 | +from .util import ordinal |
| 14 | + |
| 15 | + |
| 16 | +RANDOM_BYTES = secrets.token_bytes |
| 17 | + |
| 18 | +log = logging.getLogger( __package__ ) |
| 19 | + |
| 20 | + |
| 21 | +def path_parser( |
| 22 | + paths: str, |
| 23 | + allow_unbounded: bool = True, |
| 24 | +) -> Tuple[str, Dict[str, Callable[[], int]]]: |
| 25 | + """Create a format and a dictionary of iterators to feed into it.""" |
| 26 | + path_segs = paths.split( '/' ) |
| 27 | + unbounded = False |
| 28 | + ranges = {} |
| 29 | + |
| 30 | + for i,s in list( enumerate( path_segs )): |
| 31 | + if '-' not in s: |
| 32 | + continue |
| 33 | + c = chr(ord('a')+i) |
| 34 | + tic = s.endswith( "'" ) |
| 35 | + if tic: |
| 36 | + s = s[:-1] |
| 37 | + b,e = s.split( '-' ) |
| 38 | + b = int( b or 0 ) |
| 39 | + if e: |
| 40 | + e = int( e ) |
| 41 | + ranges[c] = lambda b=b,e=e: range( b, e+1 ) |
| 42 | + else: |
| 43 | + assert allow_unbounded and not ( unbounded or ranges ), \ |
| 44 | + f"{'Only first' if allow_unbounded else 'No'} range allowed to be unbounded;" \ |
| 45 | + f" this is the {ordinal(len(ranges)+1)} range in {paths}" |
| 46 | + unbounded = True |
| 47 | + ranges[c] = lambda b=b: itertools.count( b ) |
| 48 | + path_segs[i] = f"{{{c}}}" + ( "'" if tic else "" ) |
| 49 | + |
| 50 | + path_fmt = '/'.join( path_segs ) |
| 51 | + return path_fmt, ranges |
| 52 | + |
| 53 | + |
| 54 | +def path_sequence( |
| 55 | + path_fmt: str, |
| 56 | + ranges: Dict[str, Callable[[], int]], |
| 57 | +): |
| 58 | + """Yield a sequence of paths, modulating the format specifiers of the |
| 59 | + path_fmt according to their value sources in ranges. |
| 60 | +
|
| 61 | + For example, a |
| 62 | +
|
| 63 | + path_fmt = "m/44'/60'/0'/0/{f}", with a |
| 64 | + ranges = dict( f=lambda b=0, e=2: range( b, e+1 ) ) |
| 65 | +
|
| 66 | + would yield the paths: |
| 67 | +
|
| 68 | + "m/44'/60'/0'/0/0" |
| 69 | + "m/44'/60'/0'/0/1" |
| 70 | + "m/44'/60'/0'/0/2" |
| 71 | + """ |
| 72 | + # Stert all the iterators |
| 73 | + viters = { |
| 74 | + k: iter( l() ) |
| 75 | + for k,l in ranges.items() |
| 76 | + } |
| 77 | + values = { # Initial round of values; must provide at least one |
| 78 | + k: next( viters[k] ) |
| 79 | + for k in viters |
| 80 | + } |
| 81 | + assert all( v is not None for v in values.values() ), \ |
| 82 | + "Iterators for path segment values must yield at least an initial value" |
| 83 | + |
| 84 | + while not any( v is None for v in values.values() ): |
| 85 | + yield path_fmt.format( **values ) |
| 86 | + if not ranges: |
| 87 | + break # No variable records at all; just one |
| 88 | + # Get the next value. Working from the lowest iterator up, cycle value(s) |
| 89 | + for i,k in enumerate( sorted( viters.keys(), reverse=True )): |
| 90 | + values[k] = next( viters[k], None ) |
| 91 | + if values[k] is not None: |
| 92 | + break |
| 93 | + # OK, this iterable has ended. Restart it, and cycle to the next one up, iff |
| 94 | + # there are remaining ranges |
| 95 | + if i+1 < len( ranges ): |
| 96 | + viters[k] = iter( ranges[k]() ) |
| 97 | + values[k] = next( viters[k], None ) |
| 98 | + |
| 99 | + |
| 100 | +def random_secret( |
| 101 | + seed_length = BITS_DEFAULT // 8 |
| 102 | +) -> bytes: |
| 103 | + """Generates a new random secret. |
| 104 | +
|
| 105 | + NOTE: There is a slightly less than 1 / 2^128 chance that any given random secret will lead to |
| 106 | + an invalid BTC wallet private key! This is because the 256-bit seed for bitcoin must be less than |
| 107 | + the secp256k1 field size: |
| 108 | +
|
| 109 | + 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141 |
| 110 | +
|
| 111 | + We cannot generate secrets that are guaranteed to be valid for every derived HD Wallet BTC |
| 112 | + address. The secret is derived through a complex hashing procedure for each wallet at each |
| 113 | + path. The probability of this occurring is so vanishingly small that we will instead simply opt |
| 114 | + to *not* generate an public wallet address, when asked. |
| 115 | +
|
| 116 | + Just to give you an understandable reference, the first 128 bits of an illegal secret unusable |
| 117 | + for generating a Bitcoin wallet must have 128 '1' bits at its start. If wallets were generated |
| 118 | + at a billion per second, by every person alive on earth, it should take about 1.5 trillion years |
| 119 | + to arrive at the first invalid seed. |
| 120 | +
|
| 121 | + 2**128 / 1e9 / 7e9 / (60*60*24*365) == 1,541,469,010,115.145 |
| 122 | +
|
| 123 | + """ |
| 124 | + return RANDOM_BYTES( seed_length ) |
| 125 | + |
| 126 | + |
| 127 | +Details = namedtuple( 'Details', ('name', 'group_threshold', 'groups', 'accounts') ) |
| 128 | + |
| 129 | + |
| 130 | +def enumerate_mnemonic( mnemonic ): |
| 131 | + if isinstance( mnemonic, str ): |
| 132 | + mnemonic = mnemonic.split( ' ' ) |
| 133 | + return dict( |
| 134 | + (i, f"{i+1:>2d} {w}") |
| 135 | + for i,w in enumerate( mnemonic ) |
| 136 | + ) |
| 137 | + |
| 138 | + |
| 139 | +def organize_mnemonic( mnemonic, rows=None, cols=None, label="" ): |
| 140 | + """Given a SLIP-39 "word word ... word" or ["word", "word", ..., "word"] mnemonic, emit rows |
| 141 | + organized in the desired rows and cols (with defaults, if not provided). We return the fully |
| 142 | + formatted line, plus the list of individual words in that line. |
| 143 | +
|
| 144 | + """ |
| 145 | + num_words = enumerate_mnemonic( mnemonic ) |
| 146 | + if not rows or not cols: |
| 147 | + rows,cols = MNEM_ROWS_COLS.get( len(num_words), (7, 3)) |
| 148 | + for r in range( rows ): |
| 149 | + line = label if r == 0 else ' ' * len( label ) |
| 150 | + words = [] |
| 151 | + for c in range( cols ): |
| 152 | + word = num_words.get( c * rows + r ) |
| 153 | + if word: |
| 154 | + words.append( word ) |
| 155 | + line += f"{word:<13}" |
| 156 | + yield line,words |
| 157 | + |
| 158 | + |
| 159 | +def create( |
| 160 | + name: str, |
| 161 | + group_threshold: int, |
| 162 | + groups: Dict[str,Tuple[int, int]], |
| 163 | + master_secret: bytes = None, # Default: 128-bit seeds |
| 164 | + passphrase: bytes = b"", |
| 165 | + iteration_exponent: int = 1, |
| 166 | + cryptopaths: Sequence[Tuple[str,str]] = None, # default: ETH, BTC at default paths |
| 167 | + strength: int = 128, |
| 168 | +) -> Tuple[str,int,Dict[str,Tuple[int,List[str]]], Sequence[Sequence[Account]]]: |
| 169 | + """Creates a SLIP-39 encoding and 1 or more Ethereum accounts. Returns the details, in a form |
| 170 | + compatible with the output API. |
| 171 | +
|
| 172 | + """ |
| 173 | + if master_secret is None: |
| 174 | + assert strength in BITS, f"Invalid {strength}-bit secret length specified" |
| 175 | + master_secret = random_secret( strength // 8 ) |
| 176 | + g_names,g_dims = list( zip( *groups.items() )) |
| 177 | + mnems = mnemonics( |
| 178 | + group_threshold = group_threshold, |
| 179 | + groups = g_dims, |
| 180 | + master_secret = master_secret, |
| 181 | + passphrase = passphrase, |
| 182 | + iteration_exponent= iteration_exponent |
| 183 | + ) |
| 184 | + # Derive the desired account(s) at the specified derivation paths, or the default |
| 185 | + accts = list( accountgroups( |
| 186 | + master_secret = master_secret, |
| 187 | + cryptopaths = cryptopaths or [('ETH',None), ('BTC',None)], |
| 188 | + allow_unbounded = False, |
| 189 | + )) |
| 190 | + |
| 191 | + groups = { |
| 192 | + g_name: (g_of, g_mnems) |
| 193 | + for (g_name,(g_of, _),g_mnems) in zip( g_names, g_dims, mnems ) |
| 194 | + } |
| 195 | + if log.isEnabledFor( logging.INFO ): |
| 196 | + group_reqs = list( |
| 197 | + f"{g_nam}({g_of}/{len(g_mns)})" if g_of != len(g_mns) else f"{g_nam}({g_of})" |
| 198 | + for g_nam,(g_of,g_mns) in groups.items() ) |
| 199 | + requires = f"Recover w/ {group_threshold} of {len(groups)} groups {', '.join(group_reqs)}" |
| 200 | + for g_n,(g_name,(g_of,g_mnems)) in enumerate( groups.items() ): |
| 201 | + log.info( f"{g_name}({g_of}/{len(g_mnems)}): {requires}" ) |
| 202 | + for mn_n,mnem in enumerate( g_mnems ): |
| 203 | + for line,_ in organize_mnemonic( mnem, label=f"{ordinal(mn_n+1)} " ): |
| 204 | + log.info( f"{line}" ) |
| 205 | + |
| 206 | + return Details(name, group_threshold, groups, accts) |
| 207 | + |
| 208 | + |
| 209 | +def mnemonics( |
| 210 | + group_threshold: int, |
| 211 | + groups: Sequence[Tuple[int, int]], |
| 212 | + master_secret: Union[str,bytes] = None, |
| 213 | + passphrase: bytes = b"", |
| 214 | + iteration_exponent: int = 1, |
| 215 | +) -> List[List[str]]: |
| 216 | + """Generate SLIP39 mnemonics for the supplied group_threshold of the given groups. Will generate a |
| 217 | + random master_secret, if necessary. |
| 218 | +
|
| 219 | + """ |
| 220 | + if master_secret is None: |
| 221 | + master_secret = random_secret() |
| 222 | + if len( master_secret ) not in (16, 32, 64): |
| 223 | + raise ValueError( |
| 224 | + f"Only 128-, 256- and 512bit seeds supported; {len(master_secret)*8}-bit master_secret supplied" ) |
| 225 | + return generate_mnemonics( |
| 226 | + group_threshold = group_threshold, |
| 227 | + groups = groups, |
| 228 | + master_secret = master_secret, |
| 229 | + passphrase = passphrase, |
| 230 | + iteration_exponent = iteration_exponent ) |
| 231 | + |
| 232 | + |
| 233 | +def account( |
| 234 | + master_secret: Union[str,bytes], |
| 235 | + crypto: str = None, # default 'ETH' |
| 236 | + path: str = None, # default to the crypto's DEFAULT_PATH |
| 237 | +): |
| 238 | + """Generate an HD wallet Account from the supplied master_secret seed, at the given HD derivation |
| 239 | + path, for the specified cryptocurrency. |
| 240 | +
|
| 241 | + """ |
| 242 | + if type( master_secret ) is bytes: |
| 243 | + master_secret = codecs.encode( master_secret, 'hex_codec' ).decode( 'ascii' ) |
| 244 | + acct = Account( |
| 245 | + symbol = cryptocurrency_supported( crypto or 'ETH' ) |
| 246 | + ).from_seed( |
| 247 | + seed = master_secret |
| 248 | + ) |
| 249 | + return acct.from_path( |
| 250 | + path = path or acct._cryptocurrency.DEFAULT_PATH |
| 251 | + ) |
| 252 | + |
| 253 | + |
| 254 | +def accounts( |
| 255 | + master_secret: Union[str,bytes], |
| 256 | + crypto: str = None, # default 'ETH' |
| 257 | + paths: str = None, # default to the crypto's DEFAULT_PATH; allow ranges |
| 258 | + allow_unbounded = True, |
| 259 | +): |
| 260 | + for path in [None] if paths is None else path_sequence( *path_parser( |
| 261 | + paths = paths, |
| 262 | + allow_unbounded = allow_unbounded, |
| 263 | + )): |
| 264 | + yield account( master_secret, crypto, path ) |
| 265 | + |
| 266 | + |
| 267 | +def accountgroups( |
| 268 | + master_secret: bytes, |
| 269 | + cryptopaths: Sequence[Tuple[str,str]], |
| 270 | + allow_unbounded: bool = True, |
| 271 | +) -> Sequence[Sequence[Account]]: |
| 272 | + """Generate the desired cryptocurrency account(s) at each crypto's given path(s). This is useful |
| 273 | + for generating sequences of groups of wallets for multiple cryptocurrencies, eg. for receiving |
| 274 | + multiple cryptocurrencies for each client. Since each cryptocurrency uses a different BIP-44 path, |
| 275 | + we have to generate different sequences. |
| 276 | +
|
| 277 | + Supports ranges in each path segment, eg.: |
| 278 | +
|
| 279 | + ('ETH', "m/44'/60'/0'/0/-") -- generates all accounts for ETH |
| 280 | + ('BTC', "m/44'/0'/0'/0/-") -- generates all accounts for BTC |
| 281 | +
|
| 282 | + [ |
| 283 | + [ "m/44'/60'/0'/0/0", "0x824b174803e688dE39aF5B3D7Cd39bE6515A19a1"], |
| 284 | + [ "m/44'/0'/0'/0/0", "1MAjc529bjmkC1iCXTw2XMHL2zof5StqdQ"] |
| 285 | + ], |
| 286 | + [ |
| 287 | + [ "m/44'/60'/0'/0/1", "0x8D342083549C635C0494d3c77567860ee7456963"], |
| 288 | + [ "m/44'/0'/0'/0/1", "1BGwDuVPJeXDG9upaHvVPds5MXwkTjZoav"] |
| 289 | + ], |
| 290 | + ... |
| 291 | +
|
| 292 | + """ |
| 293 | + yield from zip( *[ |
| 294 | + accounts( |
| 295 | + master_secret = master_secret, |
| 296 | + paths = paths, |
| 297 | + crypto = crypto, |
| 298 | + allow_unbounded = allow_unbounded, |
| 299 | + ) |
| 300 | + for crypto,paths in cryptopaths |
| 301 | + ]) |
| 302 | + |
| 303 | + |
| 304 | +def address( |
| 305 | + master_secret: bytes, |
| 306 | + crypto: str = None, |
| 307 | + path: str = None, |
| 308 | +): |
| 309 | + """Return the specified cryptocurrency HD account address at path.""" |
| 310 | + return account( |
| 311 | + master_secret, |
| 312 | + path = path, |
| 313 | + crypto = cryptocurrency_supported( crypto or 'ETH' ), |
| 314 | + ).address |
| 315 | + |
| 316 | + |
| 317 | +def addresses( |
| 318 | + master_secret: bytes, |
| 319 | + crypto: str = None, # default 'ETH' |
| 320 | + paths: str = None, # default: The crypto's DEFAULT_PATH; supports ranges |
| 321 | + allow_unbounded: bool = True, |
| 322 | +): |
| 323 | + """Generate a sequence of cryptocurrency account (path, address, ...) for all designated |
| 324 | + cryptocurrencies. Usually a single (<path>, <address>) tuple is desired (different |
| 325 | + cryptocurrencies typically have their own unique path derivations. |
| 326 | +
|
| 327 | + """ |
| 328 | + for acct in accounts( master_secret, crypto, paths, allow_unbounded=allow_unbounded ): |
| 329 | + yield (acct.crypto, acct.path, acct.address) |
| 330 | + |
| 331 | + |
| 332 | +def addressgroups( |
| 333 | + master_secret: bytes, |
| 334 | + cryptopaths: Sequence[Tuple[str,str]], |
| 335 | + allow_unbounded: bool = True, |
| 336 | +) -> Sequence[str]: |
| 337 | + """Yields account (<crypto>, <path>, <address>) records for the desired cryptocurrencies at paths. |
| 338 | +
|
| 339 | + """ |
| 340 | + yield from zip( *[ |
| 341 | + addresses( |
| 342 | + master_secret = master_secret, |
| 343 | + paths = paths, |
| 344 | + crypto = crypto, |
| 345 | + allow_unbounded = allow_unbounded, |
| 346 | + ) |
| 347 | + for crypto,paths in cryptopaths |
| 348 | + ]) |
0 commit comments