4
4
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
5
"""Test the wallet keypool and interaction with wallet encryption/locking."""
6
6
7
+ import re
7
8
import time
8
9
from decimal import Decimal
9
10
11
+ from test_framework .descriptors import descsum_create
10
12
from test_framework .test_framework import BitcoinTestFramework
11
13
from test_framework .util import assert_equal , assert_raises_rpc_error
12
14
from test_framework .wallet_util import WalletUnlock
13
15
16
+ TEST_KEYPOOL_SIZE = 10
17
+ TEST_NEW_KEYPOOL_SIZE = TEST_KEYPOOL_SIZE + 2
18
+
14
19
class KeyPoolTest (BitcoinTestFramework ):
15
20
def add_options (self , parser ):
16
21
self .add_wallet_options (parser )
17
22
18
23
def set_test_params (self ):
19
24
self .num_nodes = 1
25
+ self .extra_args = [[f"-keypool={ TEST_KEYPOOL_SIZE } " ]]
20
26
21
27
def skip_test_if_missing_module (self ):
22
28
self .skip_if_no_wallet ()
23
29
24
30
def run_test (self ):
25
31
nodes = self .nodes
32
+
33
+ # Derive addresses from the wallet without removing them from keypool
34
+ addrs = []
35
+ if not self .options .descriptors :
36
+ path = str (self .nodes [0 ].datadir_path / 'wallet.dump' )
37
+ nodes [0 ].dumpwallet (path )
38
+ file = open (path , "r" , encoding = "utf8" )
39
+ m = re .search (r"masterkey: (\w+)" , file .read ())
40
+ file .close ()
41
+ xpriv = m .group (1 )
42
+ desc = descsum_create (f"wpkh({ xpriv } /0h/0h/*h)" )
43
+ addrs = nodes [0 ].deriveaddresses (descriptor = desc , range = [0 , 9 ])
44
+ else :
45
+ list_descriptors = nodes [0 ].listdescriptors ()
46
+ for desc in list_descriptors ["descriptors" ]:
47
+ if desc ['active' ] and not desc ["internal" ] and desc ["desc" ][:4 ] == "wpkh" :
48
+ addrs = nodes [0 ].deriveaddresses (descriptor = desc ["desc" ], range = [0 , 9 ])
49
+
50
+ addr0 = addrs [0 ]
51
+ addr9 = addrs [9 ] # arbitrary future address index
52
+
53
+ # Address is mine and active before it is removed from keypool by getnewaddress
54
+ addr0_before_getting_data = nodes [0 ].getaddressinfo (addr0 )
55
+ assert addr0_before_getting_data ['ismine' ]
56
+ assert addr0_before_getting_data ['isactive' ]
57
+
26
58
addr_before_encrypting = nodes [0 ].getnewaddress ()
27
59
addr_before_encrypting_data = nodes [0 ].getaddressinfo (addr_before_encrypting )
60
+ assert addr0 == addr_before_encrypting
61
+ # Address is still mine and active even after being removed from keypool
62
+ assert addr_before_encrypting_data ['ismine' ]
63
+ assert addr_before_encrypting_data ['isactive' ]
64
+
28
65
wallet_info_old = nodes [0 ].getwalletinfo ()
29
66
if not self .options .descriptors :
30
67
assert addr_before_encrypting_data ['hdseedid' ] == wallet_info_old ['hdseedid' ]
31
68
69
+ # Address is mine and active before wallet is encrypted (resetting keypool)
70
+ addr9_before_encrypting_data = nodes [0 ].getaddressinfo (addr9 )
71
+ assert addr9_before_encrypting_data ['ismine' ]
72
+ assert addr9_before_encrypting_data ['isactive' ]
73
+
74
+ # Imported things are never considered active, no need to rescan
75
+ # Imported public keys / addresses can't be mine because they are not spendable
76
+ if self .options .descriptors :
77
+ nodes [0 ].importdescriptors ([{
78
+ "desc" : "addr(bcrt1q95gp4zeaah3qcerh35yhw02qeptlzasdtst55v)" ,
79
+ "timestamp" : "now"
80
+ }])
81
+ else :
82
+ nodes [0 ].importaddress ("bcrt1q95gp4zeaah3qcerh35yhw02qeptlzasdtst55v" , "label" , rescan = False )
83
+ import_addr_data = nodes [0 ].getaddressinfo ("bcrt1q95gp4zeaah3qcerh35yhw02qeptlzasdtst55v" )
84
+ assert import_addr_data ["iswatchonly" ] is not self .options .descriptors
85
+ assert not import_addr_data ["ismine" ]
86
+ assert not import_addr_data ["isactive" ]
87
+
88
+ if self .options .descriptors :
89
+ nodes [0 ].importdescriptors ([{
90
+ "desc" : "pk(02f893ca95b0d55b4ce4e72ae94982eb679158cb2ebc120ff62c17fedfd1f0700e)" ,
91
+ "timestamp" : "now"
92
+ }])
93
+ else :
94
+ nodes [0 ].importpubkey ("02f893ca95b0d55b4ce4e72ae94982eb679158cb2ebc120ff62c17fedfd1f0700e" , "label" , rescan = False )
95
+ import_pub_data = nodes [0 ].getaddressinfo ("bcrt1q4v7a8wn5vqd6fk4026s5gzzxyu7cfzz23n576h" )
96
+ assert import_pub_data ["iswatchonly" ] is not self .options .descriptors
97
+ assert not import_pub_data ["ismine" ]
98
+ assert not import_pub_data ["isactive" ]
99
+
100
+ nodes [0 ].importprivkey ("cPMX7v5CNV1zCphFSq2hnR5rCjzAhA1GsBfD1qrJGdj4QEfu38Qx" , "label" , rescan = False )
101
+ import_priv_data = nodes [0 ].getaddressinfo ("bcrt1qa985v5d53qqtrfujmzq2zrw3r40j6zz4ns02kj" )
102
+ assert not import_priv_data ["iswatchonly" ]
103
+ assert import_priv_data ["ismine" ]
104
+ assert not import_priv_data ["isactive" ]
105
+
32
106
# Encrypt wallet and wait to terminate
33
107
nodes [0 ].encryptwallet ('test' )
108
+ addr9_after_encrypting_data = nodes [0 ].getaddressinfo (addr9 )
109
+ # Key is from unencrypted seed, no longer considered active
110
+ assert not addr9_after_encrypting_data ['isactive' ]
111
+ # ...however it *IS* still mine since we can spend with this key
112
+ assert addr9_after_encrypting_data ['ismine' ]
113
+
34
114
if self .options .descriptors :
35
115
# Import hardened derivation only descriptors
36
116
nodes [0 ].walletpassphrase ('test' , 10 )
@@ -76,7 +156,9 @@ def run_test(self):
76
156
}
77
157
])
78
158
nodes [0 ].walletlock ()
79
- # Keep creating keys
159
+ # Keep creating keys until we run out
160
+ for _ in range (TEST_KEYPOOL_SIZE - 1 ):
161
+ nodes [0 ].getnewaddress ()
80
162
addr = nodes [0 ].getnewaddress ()
81
163
addr_data = nodes [0 ].getaddressinfo (addr )
82
164
wallet_info = nodes [0 ].getwalletinfo ()
@@ -85,24 +167,23 @@ def run_test(self):
85
167
assert addr_data ['hdseedid' ] == wallet_info ['hdseedid' ]
86
168
assert_raises_rpc_error (- 12 , "Error: Keypool ran out, please call keypoolrefill first" , nodes [0 ].getnewaddress )
87
169
88
- # put six (plus 2) new keys in the keypool (100% external-, +100% internal-keys, 1 in min)
170
+ # put two new keys in the keypool
89
171
with WalletUnlock (nodes [0 ], 'test' ):
90
- nodes [0 ].keypoolrefill (6 )
172
+ nodes [0 ].keypoolrefill (TEST_NEW_KEYPOOL_SIZE )
91
173
wi = nodes [0 ].getwalletinfo ()
92
174
if self .options .descriptors :
93
- assert_equal (wi ['keypoolsize_hd_internal' ], 24 )
94
- assert_equal (wi ['keypoolsize' ], 24 )
175
+ # Descriptors wallet: keypool size applies to both internal and external
176
+ # chains and there are four of each (legacy, nested, segwit, and taproot)
177
+ assert_equal (wi ['keypoolsize_hd_internal' ], TEST_NEW_KEYPOOL_SIZE * 4 )
178
+ assert_equal (wi ['keypoolsize' ], TEST_NEW_KEYPOOL_SIZE * 4 )
95
179
else :
96
- assert_equal (wi ['keypoolsize_hd_internal' ], 6 )
97
- assert_equal (wi ['keypoolsize' ], 6 )
180
+ # Legacy wallet: keypool size applies to both internal and external HD chains
181
+ assert_equal (wi ['keypoolsize_hd_internal' ], TEST_NEW_KEYPOOL_SIZE )
182
+ assert_equal (wi ['keypoolsize' ], TEST_NEW_KEYPOOL_SIZE )
98
183
99
184
# drain the internal keys
100
- nodes [0 ].getrawchangeaddress ()
101
- nodes [0 ].getrawchangeaddress ()
102
- nodes [0 ].getrawchangeaddress ()
103
- nodes [0 ].getrawchangeaddress ()
104
- nodes [0 ].getrawchangeaddress ()
105
- nodes [0 ].getrawchangeaddress ()
185
+ for _ in range (TEST_NEW_KEYPOOL_SIZE ):
186
+ nodes [0 ].getrawchangeaddress ()
106
187
# remember keypool sizes
107
188
wi = nodes [0 ].getwalletinfo ()
108
189
kp_size_before = [wi ['keypoolsize_hd_internal' ], wi ['keypoolsize' ]]
@@ -115,13 +196,8 @@ def run_test(self):
115
196
116
197
# drain the external keys
117
198
addr = set ()
118
- addr .add (nodes [0 ].getnewaddress (address_type = "bech32" ))
119
- addr .add (nodes [0 ].getnewaddress (address_type = "bech32" ))
120
- addr .add (nodes [0 ].getnewaddress (address_type = "bech32" ))
121
- addr .add (nodes [0 ].getnewaddress (address_type = "bech32" ))
122
- addr .add (nodes [0 ].getnewaddress (address_type = "bech32" ))
123
- addr .add (nodes [0 ].getnewaddress (address_type = "bech32" ))
124
- assert len (addr ) == 6
199
+ for _ in range (TEST_NEW_KEYPOOL_SIZE ):
200
+ addr .add (nodes [0 ].getnewaddress (address_type = "bech32" ))
125
201
# remember keypool sizes
126
202
wi = nodes [0 ].getwalletinfo ()
127
203
kp_size_before = [wi ['keypoolsize_hd_internal' ], wi ['keypoolsize' ]]
@@ -132,16 +208,18 @@ def run_test(self):
132
208
kp_size_after = [wi ['keypoolsize_hd_internal' ], wi ['keypoolsize' ]]
133
209
assert_equal (kp_size_before , kp_size_after )
134
210
135
- # refill keypool with three new addresses
211
+ # refill keypool
136
212
nodes [0 ].walletpassphrase ('test' , 1 )
137
- nodes [0 ].keypoolrefill (3 )
213
+ # At this point the keypool has >45 keys in it
214
+ # calling keypoolrefill with anything smaller than that is a noop
215
+ nodes [0 ].keypoolrefill (50 )
138
216
139
217
# test walletpassphrase timeout
140
218
time .sleep (1.1 )
141
219
assert_equal (nodes [0 ].getwalletinfo ()["unlocked_until" ], 0 )
142
220
143
221
# drain the keypool
144
- for _ in range (3 ):
222
+ for _ in range (50 ):
145
223
nodes [0 ].getnewaddress ()
146
224
assert_raises_rpc_error (- 12 , "Keypool ran out" , nodes [0 ].getnewaddress )
147
225
0 commit comments