Skip to content

Commit 8ff637a

Browse files
committed
Subscribe wallet to mempool events
1 parent 6735ba3 commit 8ff637a

File tree

22 files changed

+514
-83
lines changed

22 files changed

+514
-83
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mempool/src/rpc.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ use rpc::RpcResult;
3131

3232
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, rpc::description::HasValueHint)]
3333
pub struct GetTxResponse {
34-
id: Id<Transaction>,
35-
status: TxStatus,
36-
transaction: HexEncoded<SignedTransaction>,
34+
pub id: Id<Transaction>,
35+
pub status: TxStatus,
36+
pub transaction: HexEncoded<SignedTransaction>,
3737
}
3838

3939
#[rpc::describe]

test/functional/test_runner.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ class UnicodeOnWindowsError(ValueError):
154154
'wallet_sweep_address.py',
155155
'wallet_sweep_delegation.py',
156156
'wallet_recover_accounts.py',
157+
'wallet_mempool_events.py',
157158
'wallet_tokens.py',
158159
'wallet_tokens_freeze.py',
159160
'wallet_tokens_transfer_from_multisig_addr.py',

test/functional/wallet_conflict.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,20 +155,23 @@ async def async_test(self):
155155
transactions.remove(transfer_tx)
156156
freeze_tx = transactions[0]
157157

158-
assert_equal(1, len(await wallet.list_pending_transactions()))
158+
assert_equal(2, len(await wallet.list_pending_transactions()))
159159

160160

161161
# try to send tokens again should fail as the tokens are already sent
162162
assert_in("Success", await wallet.select_account(1))
163163
assert_in("Coin selection error: No available UTXOs", await wallet.send_tokens_to_address(token_id, address, tokens_to_mint))
164164
# check that the mempool still has the transfer tx
165165
assert node.mempool_contains_tx(transfer_tx_id)
166-
# abandon it from the wallet side so it is not rebroadcasted
167-
assert_in("The transaction was marked as abandoned successfully", await wallet.abandon_transaction(transfer_tx_id))
166+
assert_in("Cannot change a transaction's state from InMempool", await wallet.abandon_transaction(transfer_tx_id))
168167

169168
# create a block with the freeze token transaction
170169
self.generate_block([freeze_tx])
171170
assert_in("Success", await wallet.sync())
171+
self.log.info(f"transfer_tx_id = {transfer_tx_id}")
172+
173+
# abandon it from the wallet side so it is not rebroadcasted
174+
assert_in("The transaction was marked as abandoned successfully", await wallet.abandon_transaction(transfer_tx_id))
172175

173176
# after the token is frozen the transfer token tx should be evicted by the mempool as conflicting
174177
# wait until mempool evicts the conflicting tx
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
#!/usr/bin/env python3
2+
# Copyright (c) 2026 RBB S.r.l
3+
# Copyright (c) 2017-2021 The Bitcoin Core developers
4+
5+
# SPDX-License-Identifier: MIT
6+
# Licensed under the MIT License;
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
"""Wallet mempool events test
18+
19+
Check that:
20+
* We can create 2 wallets with same mnemonic,
21+
* get an address
22+
* send coins to the wallet's address
23+
* sync the wallet with the node
24+
* check balance in both wallets
25+
* send coins from Acc 0 to Acc 1 without creating a block
26+
* the second wallet should get the new Tx from mempool events
27+
* second wallet can create a new unconfirmed Tx on top of the on in mempool
28+
"""
29+
30+
import asyncio
31+
32+
from test_framework.mintlayer import (ATOMS_PER_COIN, block_input_data_obj,
33+
make_tx, reward_input)
34+
from test_framework.test_framework import BitcoinTestFramework
35+
from test_framework.util import assert_equal, assert_in
36+
from test_framework.wallet_cli_controller import (DEFAULT_ACCOUNT_INDEX,
37+
WalletCliController)
38+
39+
40+
class WalletMempoolEvents(BitcoinTestFramework):
41+
42+
def set_test_params(self):
43+
self.setup_clean_chain = True
44+
self.num_nodes = 1
45+
self.extra_args = [
46+
[
47+
"--blockprod-min-peers-to-produce-blocks=0",
48+
]
49+
]
50+
51+
def setup_network(self):
52+
self.setup_nodes()
53+
self.sync_all(self.nodes[0:1])
54+
55+
def generate_block(self, transactions=[]):
56+
node = self.nodes[0]
57+
58+
block_input_data = {"PoW": {"reward_destination": "AnyoneCanSpend"}}
59+
block_input_data = block_input_data_obj.encode(block_input_data).to_hex()[2:]
60+
61+
# create a new block, taking transactions from mempool
62+
block = node.blockprod_generate_block(
63+
block_input_data, transactions, [], "FillSpaceFromMempool"
64+
)
65+
node.chainstate_submit_block(block)
66+
block_id = node.chainstate_best_block_id()
67+
68+
# Wait for mempool to sync
69+
self.wait_until(
70+
lambda: node.mempool_local_best_block_id() == block_id, timeout=5
71+
)
72+
73+
return block_id
74+
75+
def run_test(self):
76+
asyncio.run(self.async_test())
77+
78+
async def async_test(self):
79+
node = self.nodes[0]
80+
async with WalletCliController(node, self.config, self.log) as wallet, \
81+
WalletCliController(node, self.config, self.log) as wallet2:
82+
# new wallet
83+
await wallet.create_wallet()
84+
# create wallet2 with the same mnemonic
85+
mnemonic = await wallet.show_seed_phrase()
86+
assert mnemonic is not None
87+
assert_in("Wallet recovered successfully", await wallet2.recover_wallet(mnemonic))
88+
89+
# check it is on genesis
90+
best_block_height = await wallet.get_best_block_height()
91+
self.log.info(f"best block height = {best_block_height}")
92+
assert_equal(best_block_height, "0")
93+
best_block_height = await wallet2.get_best_block_height()
94+
assert_equal(best_block_height, "0")
95+
96+
# new address
97+
pub_key_bytes = await wallet.new_public_key()
98+
assert_equal(len(pub_key_bytes), 33)
99+
100+
# Get chain tip
101+
tip_id = node.chainstate_best_block_id()
102+
103+
# Submit a valid transaction
104+
token_fee = 1000
105+
coins_to_send = 1
106+
token_fee_output = {
107+
"Transfer": [
108+
{"Coin": token_fee * ATOMS_PER_COIN},
109+
{
110+
"PublicKey": {
111+
"key": {"Secp256k1Schnorr": {"pubkey_data": pub_key_bytes}}
112+
}
113+
},
114+
],
115+
}
116+
tx_fee_output = {
117+
"Transfer": [
118+
{"Coin": coins_to_send * ATOMS_PER_COIN},
119+
{
120+
"PublicKey": {
121+
"key": {"Secp256k1Schnorr": {"pubkey_data": pub_key_bytes}}
122+
}
123+
},
124+
],
125+
}
126+
encoded_tx, tx_id = make_tx(
127+
[reward_input(tip_id)], [token_fee_output] + [tx_fee_output] * 2, 0
128+
)
129+
130+
self.log.debug(f"Encoded transaction {tx_id}: {encoded_tx}")
131+
132+
assert_in("No transaction found", await wallet.get_transaction(tx_id))
133+
134+
node.mempool_submit_transaction(encoded_tx, {})
135+
assert node.mempool_contains_tx(tx_id)
136+
137+
self.generate_block()
138+
assert not node.mempool_contains_tx(tx_id)
139+
140+
# sync the wallet
141+
assert_in("Success", await wallet.sync())
142+
assert_in("Success", await wallet2.sync())
143+
144+
acc0_address = await wallet.new_address()
145+
146+
# both wallets have the same balances after syncing the new block
147+
assert_in(
148+
f"Coins amount: {coins_to_send * 2 + token_fee}",
149+
await wallet.get_balance(),
150+
)
151+
assert_in(
152+
f"Coins amount: {coins_to_send * 2 + token_fee}",
153+
await wallet2.get_balance(),
154+
)
155+
156+
# create new account and get an address
157+
assert_in("Success", await wallet.create_new_account())
158+
assert_in("Success", await wallet2.create_new_account())
159+
assert_in("Success", await wallet.select_account(1))
160+
acc1_address = await wallet.new_address()
161+
162+
# go back to Acc 0 and send 1 coin to Acc 1
163+
coins_to_send = 2
164+
assert_in("Success", await wallet.select_account(DEFAULT_ACCOUNT_INDEX))
165+
assert_in(
166+
"The transaction was submitted successfully",
167+
await wallet.send_to_address(acc1_address, coins_to_send),
168+
)
169+
170+
# check mempool has 1 transaction now
171+
transactions = node.mempool_transactions()
172+
assert_equal(len(transactions), 1)
173+
transfer_tx = transactions[0]
174+
175+
# check wallet 1 has it as pending
176+
pending_txs = await wallet.list_pending_transactions()
177+
assert_equal(1, len(pending_txs))
178+
transfer_tx_id = pending_txs[0]
179+
180+
# check wallet 2 also received it from mempool events
181+
pending_txs = await wallet2.list_pending_transactions()
182+
assert_equal(1, len(pending_txs))
183+
assert_equal(transfer_tx_id, pending_txs[0])
184+
185+
assert_in("Success", await wallet.select_account(1))
186+
# wallet 2 should automatically recover Acc 1
187+
assert_in("Success", await wallet2.select_account(1))
188+
189+
# check both balances have `coins_to_send` coins in-mempool state
190+
assert_in(
191+
f"Coins amount: {coins_to_send}",
192+
await wallet.get_balance(utxo_states=['in-mempool']),
193+
)
194+
assert_in(
195+
f"Coins amount: {coins_to_send}",
196+
await wallet2.get_balance(utxo_states=['in-mempool']),
197+
)
198+
199+
# check wallet2 can send 1 coin back to Acc0 from the not yet confirmed tx in mempool
200+
assert_in(
201+
"The transaction was submitted successfully",
202+
await wallet2.send_to_address(acc0_address, 1),
203+
)
204+
205+
self.generate_block()
206+
207+
assert_in("Success", await wallet.sync())
208+
assert_in("Success", await wallet2.sync())
209+
210+
211+
if __name__ == "__main__":
212+
WalletMempoolEvents().main()

test/functional/wallet_tokens_change_supply.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,10 +195,10 @@ async def async_test(self):
195195
if total_tokens_supply > 0:
196196
assert_in(
197197
f"{token_id} ({valid_ticker}), amount: {total_tokens_supply}",
198-
await wallet.get_balance(utxo_states=['confirmed', 'inactive'])
198+
await wallet.get_balance(utxo_states=['confirmed', 'inactive', 'in-mempool'])
199199
)
200200
else:
201-
assert_not_in(f"{token_id}", await wallet.get_balance(utxo_states=['confirmed', 'inactive']))
201+
assert_not_in(f"{token_id}", await wallet.get_balance(utxo_states=['confirmed', 'inactive', 'in-mempool']))
202202

203203

204204
# lock token supply

utils/networking/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ itertools.workspace = true
1414
serde_with.workspace = true
1515
thiserror.workspace = true
1616
tokio = { workspace = true, features = ["sync"] }
17+
tokio-stream.workspace = true
1718

1819
[dev-dependencies]
1920
serde_test.workspace = true

utils/networking/src/broadcaster.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
//! Broadcaster is a reliable version of [tokio::sync::broadcast].
1717
1818
use tokio::sync::mpsc;
19+
use tokio_stream::{wrappers::UnboundedReceiverStream, Stream};
1920

2021
/// A reliable version of [tokio::sync::broadcast], sender part.
2122
///
@@ -97,6 +98,10 @@ impl<T> Receiver<T> {
9798
pub fn blocking_recv(&mut self) -> Option<T> {
9899
self.0.blocking_recv()
99100
}
101+
102+
pub fn into_stream(self) -> impl Stream<Item = T> {
103+
UnboundedReceiverStream::new(self.0)
104+
}
100105
}
101106

102107
#[cfg(test)]

wallet/src/account/output_cache/mod.rs

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -971,7 +971,23 @@ impl OutputCache {
971971
tx_id: OutPointSourceId,
972972
tx: WalletTx,
973973
) -> WalletResult<()> {
974-
let already_present = self.txs.get(&tx_id).is_some_and(|tx| match tx.state() {
974+
let existing_tx = self.txs.get(&tx_id);
975+
let existing_tx_already_confirmed_or_same = existing_tx.is_some_and(|existing_tx| {
976+
matches!(
977+
(existing_tx.state(), tx.state()),
978+
(TxState::Confirmed(_, _, _), _)
979+
| (TxState::Inactive(_), TxState::Inactive(_))
980+
| (TxState::Abandoned, TxState::Abandoned)
981+
| (TxState::Conflicted(_), TxState::Conflicted(_))
982+
| (TxState::InMempool(_), TxState::InMempool(_))
983+
)
984+
});
985+
986+
if existing_tx_already_confirmed_or_same {
987+
return Ok(());
988+
}
989+
990+
let already_present = existing_tx.is_some_and(|tx| match tx.state() {
975991
TxState::Abandoned | TxState::Conflicted(_) => false,
976992
TxState::Confirmed(_, _, _) | TxState::InMempool(_) | TxState::Inactive(_) => true,
977993
});
@@ -1270,19 +1286,19 @@ impl OutputCache {
12701286
fn update_token_issuance_state(
12711287
unconfirmed_descendants: &mut BTreeMap<OutPointSourceId, BTreeSet<OutPointSourceId>>,
12721288
data: &mut TokenIssuanceData,
1273-
delegation_id: &TokenId,
1289+
token_id: &TokenId,
12741290
token_nonce: AccountNonce,
12751291
tx_id: &OutPointSourceId,
12761292
) -> Result<(), WalletError> {
12771293
let next_nonce = data
12781294
.last_nonce
12791295
.map_or(Some(AccountNonce::new(0)), |nonce| nonce.increment())
1280-
.ok_or(WalletError::TokenIssuanceNonceOverflow(*delegation_id))?;
1296+
.ok_or(WalletError::TokenIssuanceNonceOverflow(*token_id))?;
12811297

12821298
ensure!(
12831299
token_nonce == next_nonce,
12841300
OutputCacheInconsistencyError::InconsistentTokenIssuanceDuplicateNonce(
1285-
*delegation_id,
1301+
*token_id,
12861302
token_nonce
12871303
)
12881304
);
@@ -1490,12 +1506,10 @@ impl OutputCache {
14901506
.filter_map(|tx| match tx {
14911507
WalletTx::Block(_) => None,
14921508
WalletTx::Tx(tx) => match tx.state() {
1493-
TxState::Inactive(_) | TxState::Conflicted(_) => {
1509+
TxState::Inactive(_) | TxState::Conflicted(_) | TxState::InMempool(_) => {
14941510
Some(tx.get_transaction_with_id())
14951511
}
1496-
TxState::Confirmed(_, _, _) | TxState::InMempool(_) | TxState::Abandoned => {
1497-
None
1498-
}
1512+
TxState::Confirmed(_, _, _) | TxState::Abandoned => None,
14991513
},
15001514
})
15011515
.collect()
@@ -1720,6 +1734,18 @@ impl OutputCache {
17201734
chain_config: &ChainConfig,
17211735
tx_id: Id<Transaction>,
17221736
) -> WalletResult<Vec<(Id<Transaction>, WalletTx)>> {
1737+
if let Some(tx) = self.txs.get(&tx_id.into()) {
1738+
let cannot_abandone = match tx.state() {
1739+
TxState::Confirmed(_, _, _) | TxState::InMempool(_) | TxState::Abandoned => true,
1740+
TxState::Inactive(_) | TxState::Conflicted(_) => false,
1741+
};
1742+
if cannot_abandone {
1743+
return Err(WalletError::CannotChangeTransactionState(
1744+
tx.state(),
1745+
TxState::Abandoned,
1746+
));
1747+
}
1748+
}
17231749
let all_abandoned = self.remove_from_unconfirmed_descendants(tx_id);
17241750
let mut txs_to_rollback = vec![];
17251751

0 commit comments

Comments
 (0)