|
| 1 | +#!/usr/bin/env python3 |
| 2 | +# Copyright (c) 2025-present The Bitcoin Core developers |
| 3 | +# Distributed under the MIT software license, see the accompanying |
| 4 | +# file COPYING or https://www.opensource.org/licenses/mit-license.php. |
| 5 | + |
| 6 | +import time |
| 7 | + |
| 8 | +from test_framework.blocktools import MAX_FUTURE_BLOCK_TIME |
| 9 | +from test_framework.descriptors import descsum_create |
| 10 | +from test_framework.messages import ( |
| 11 | + COutPoint, |
| 12 | + CTxIn, |
| 13 | + CTxInWitness, |
| 14 | + CTxOut, |
| 15 | +) |
| 16 | +from test_framework.script_util import ( |
| 17 | + ANCHOR_ADDRESS, |
| 18 | + PAY_TO_ANCHOR, |
| 19 | +) |
| 20 | +from test_framework.test_framework import BitcoinTestFramework |
| 21 | +from test_framework.util import ( |
| 22 | + assert_equal, |
| 23 | + assert_raises_rpc_error, |
| 24 | +) |
| 25 | +from test_framework.wallet import MiniWallet |
| 26 | + |
| 27 | +class WalletAnchorTest(BitcoinTestFramework): |
| 28 | + def set_test_params(self): |
| 29 | + self.num_nodes = 1 |
| 30 | + |
| 31 | + def skip_test_if_missing_module(self): |
| 32 | + self.skip_if_no_wallet() |
| 33 | + |
| 34 | + def test_0_value_anchor_listunspent(self): |
| 35 | + self.log.info("Test that 0-value anchor outputs are detected as UTXOs") |
| 36 | + |
| 37 | + # Create an anchor output, and spend it |
| 38 | + sender = MiniWallet(self.nodes[0]) |
| 39 | + anchor_tx = sender.create_self_transfer(fee_rate=0, version=3)["tx"] |
| 40 | + anchor_tx.vout.append(CTxOut(0, PAY_TO_ANCHOR)) |
| 41 | + anchor_spend = sender.create_self_transfer(version=3)["tx"] |
| 42 | + anchor_spend.vin.append(CTxIn(COutPoint(anchor_tx.txid_int, 1), b"")) |
| 43 | + anchor_spend.wit.vtxinwit.append(CTxInWitness()) |
| 44 | + submit_res = self.nodes[0].submitpackage([anchor_tx.serialize().hex(), anchor_spend.serialize().hex()]) |
| 45 | + assert_equal(submit_res["package_msg"], "success") |
| 46 | + anchor_txid = anchor_tx.txid_hex |
| 47 | + anchor_spend_txid = anchor_spend.txid_hex |
| 48 | + |
| 49 | + # Mine each tx in separate blocks |
| 50 | + self.generateblock(self.nodes[0], sender.get_address(), [anchor_tx.serialize().hex()]) |
| 51 | + anchor_tx_height = self.nodes[0].getblockcount() |
| 52 | + self.generateblock(self.nodes[0], sender.get_address(), [anchor_spend.serialize().hex()]) |
| 53 | + |
| 54 | + # Mock time forward and generate some blocks to avoid rescanning of latest blocks |
| 55 | + self.nodes[0].setmocktime(int(time.time()) + MAX_FUTURE_BLOCK_TIME + 1) |
| 56 | + self.generate(self.nodes[0], 10) |
| 57 | + |
| 58 | + self.nodes[0].createwallet(wallet_name="anchor", disable_private_keys=True) |
| 59 | + wallet = self.nodes[0].get_wallet_rpc("anchor") |
| 60 | + import_res = wallet.importdescriptors([{"desc": descsum_create(f"addr({ANCHOR_ADDRESS})"), "timestamp": "now"}]) |
| 61 | + assert_equal(import_res[0]["success"], True) |
| 62 | + |
| 63 | + # The wallet should have no UTXOs, and not know of the anchor tx or its spend |
| 64 | + assert_equal(wallet.listunspent(), []) |
| 65 | + assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", wallet.gettransaction, anchor_txid) |
| 66 | + assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", wallet.gettransaction, anchor_spend_txid) |
| 67 | + |
| 68 | + # Rescanning the block containing the anchor so that listunspent will list the output |
| 69 | + wallet.rescanblockchain(0, anchor_tx_height) |
| 70 | + utxos = wallet.listunspent() |
| 71 | + assert_equal(len(utxos), 1) |
| 72 | + assert_equal(utxos[0]["txid"], anchor_txid) |
| 73 | + assert_equal(utxos[0]["address"], ANCHOR_ADDRESS) |
| 74 | + assert_equal(utxos[0]["amount"], 0) |
| 75 | + wallet.gettransaction(anchor_txid) |
| 76 | + assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", wallet.gettransaction, anchor_spend_txid) |
| 77 | + |
| 78 | + # Rescan the rest of the blockchain to see the anchor was spent |
| 79 | + wallet.rescanblockchain() |
| 80 | + assert_equal(wallet.listunspent(), []) |
| 81 | + wallet.gettransaction(anchor_spend_txid) |
| 82 | + |
| 83 | + def test_cannot_sign_anchors(self): |
| 84 | + self.log.info("Test that the wallet cannot spend anchor outputs") |
| 85 | + for disable_privkeys in [False, True]: |
| 86 | + self.nodes[0].createwallet(wallet_name=f"anchor_spend_{disable_privkeys}", disable_private_keys=disable_privkeys) |
| 87 | + wallet = self.nodes[0].get_wallet_rpc(f"anchor_spend_{disable_privkeys}") |
| 88 | + import_res = wallet.importdescriptors([ |
| 89 | + {"desc": descsum_create(f"addr({ANCHOR_ADDRESS})"), "timestamp": "now"}, |
| 90 | + {"desc": descsum_create(f"raw({PAY_TO_ANCHOR.hex()})"), "timestamp": "now"} |
| 91 | + ]) |
| 92 | + assert_equal(import_res[0]["success"], disable_privkeys) |
| 93 | + assert_equal(import_res[1]["success"], disable_privkeys) |
| 94 | + |
| 95 | + anchor_txid = self.default_wallet.sendtoaddress(ANCHOR_ADDRESS, 1) |
| 96 | + self.generate(self.nodes[0], 1) |
| 97 | + |
| 98 | + wallet = self.nodes[0].get_wallet_rpc("anchor_spend_True") |
| 99 | + utxos = wallet.listunspent() |
| 100 | + assert_equal(len(utxos), 1) |
| 101 | + assert_equal(utxos[0]["txid"], anchor_txid) |
| 102 | + assert_equal(utxos[0]["address"], ANCHOR_ADDRESS) |
| 103 | + assert_equal(utxos[0]["amount"], 1) |
| 104 | + |
| 105 | + assert_raises_rpc_error(-4, "Missing solving data for estimating transaction size", wallet.send, [{self.default_wallet.getnewaddress(): 0.9999}]) |
| 106 | + assert_raises_rpc_error(-4, "Error: Private keys are disabled for this wallet", wallet.sendtoaddress, self.default_wallet.getnewaddress(), 0.9999) |
| 107 | + assert_raises_rpc_error(-4, "Unable to determine the size of the transaction, the wallet contains unsolvable descriptors", wallet.sendall, recipients=[self.default_wallet.getnewaddress()], inputs=utxos) |
| 108 | + assert_raises_rpc_error(-4, "Unable to determine the size of the transaction, the wallet contains unsolvable descriptors", wallet.sendall, recipients=[self.default_wallet.getnewaddress()]) |
| 109 | + |
| 110 | + def run_test(self): |
| 111 | + self.default_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name) |
| 112 | + self.test_0_value_anchor_listunspent() |
| 113 | + self.test_cannot_sign_anchors() |
| 114 | + |
| 115 | +if __name__ == '__main__': |
| 116 | + WalletAnchorTest(__file__).main() |
0 commit comments