Skip to content

Commit 50fc4df

Browse files
committed
[mempool] Persist unbroadcast set to mempool.dat
Ensure that the unbroadcast set will still be meaningful if the node is restarted.
1 parent 297a178 commit 50fc4df

File tree

3 files changed

+55
-6
lines changed

3 files changed

+55
-6
lines changed

src/validation.cpp

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4970,6 +4970,7 @@ bool LoadMempool(CTxMemPool& pool)
49704970
int64_t expired = 0;
49714971
int64_t failed = 0;
49724972
int64_t already_there = 0;
4973+
int64_t unbroadcast = 0;
49734974
int64_t nNow = GetTime();
49744975

49754976
try {
@@ -5023,12 +5024,21 @@ bool LoadMempool(CTxMemPool& pool)
50235024
for (const auto& i : mapDeltas) {
50245025
pool.PrioritiseTransaction(i.first, i.second);
50255026
}
5027+
5028+
std::set<uint256> unbroadcast_txids;
5029+
file >> unbroadcast_txids;
5030+
unbroadcast = unbroadcast_txids.size();
5031+
5032+
for (const auto& txid : unbroadcast_txids) {
5033+
pool.AddUnbroadcastTx(txid);
5034+
}
5035+
50265036
} catch (const std::exception& e) {
50275037
LogPrintf("Failed to deserialize mempool data on disk: %s. Continuing anyway.\n", e.what());
50285038
return false;
50295039
}
50305040

5031-
LogPrintf("Imported mempool transactions from disk: %i succeeded, %i failed, %i expired, %i already there\n", count, failed, expired, already_there);
5041+
LogPrintf("Imported mempool transactions from disk: %i succeeded, %i failed, %i expired, %i already there, %i waiting for initial broadcast\n", count, failed, expired, already_there, unbroadcast);
50325042
return true;
50335043
}
50345044

@@ -5038,6 +5048,7 @@ bool DumpMempool(const CTxMemPool& pool)
50385048

50395049
std::map<uint256, CAmount> mapDeltas;
50405050
std::vector<TxMempoolInfo> vinfo;
5051+
std::set<uint256> unbroadcast_txids;
50415052

50425053
static Mutex dump_mutex;
50435054
LOCK(dump_mutex);
@@ -5048,6 +5059,7 @@ bool DumpMempool(const CTxMemPool& pool)
50485059
mapDeltas[i.first] = i.second;
50495060
}
50505061
vinfo = pool.infoAll();
5062+
unbroadcast_txids = pool.GetUnbroadcastTxs();
50515063
}
50525064

50535065
int64_t mid = GetTimeMicros();
@@ -5072,6 +5084,10 @@ bool DumpMempool(const CTxMemPool& pool)
50725084
}
50735085

50745086
file << mapDeltas;
5087+
5088+
LogPrintf("Writing %d unbroadcast transactions to disk.\n", unbroadcast_txids.size());
5089+
file << unbroadcast_txids;
5090+
50755091
if (!FileCommit(file.Get()))
50765092
throw std::runtime_error("FileCommit failed");
50775093
file.fclose();

test/functional/mempool_persist.py

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,13 @@
4040
import time
4141

4242
from test_framework.test_framework import BitcoinTestFramework
43+
from test_framework.mininode import P2PTxInvStore
4344
from test_framework.util import (
4445
assert_equal,
4546
assert_greater_than_or_equal,
4647
assert_raises_rpc_error,
48+
connect_nodes,
49+
disconnect_nodes,
4750
wait_until,
4851
)
4952

@@ -80,6 +83,11 @@ def run_test(self):
8083
assert_greater_than_or_equal(tx_creation_time, tx_creation_time_lower)
8184
assert_greater_than_or_equal(tx_creation_time_higher, tx_creation_time)
8285

86+
# disconnect nodes & make a txn that remains in the unbroadcast set.
87+
disconnect_nodes(self.nodes[0], 2)
88+
self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), Decimal("12"))
89+
connect_nodes(self.nodes[0], 2)
90+
8391
self.log.debug("Stop-start the nodes. Verify that node0 has the transactions in its mempool and node1 does not. Verify that node2 calculates its balance correctly after loading wallet transactions.")
8492
self.stop_nodes()
8593
# Give this node a head-start, so we can be "extra-sure" that it didn't load anything later
@@ -89,7 +97,7 @@ def run_test(self):
8997
self.start_node(2)
9098
wait_until(lambda: self.nodes[0].getmempoolinfo()["loaded"], timeout=1)
9199
wait_until(lambda: self.nodes[2].getmempoolinfo()["loaded"], timeout=1)
92-
assert_equal(len(self.nodes[0].getrawmempool()), 5)
100+
assert_equal(len(self.nodes[0].getrawmempool()), 6)
93101
assert_equal(len(self.nodes[2].getrawmempool()), 5)
94102
# The others have loaded their mempool. If node_1 loaded anything, we'd probably notice by now:
95103
assert_equal(len(self.nodes[1].getrawmempool()), 0)
@@ -105,17 +113,18 @@ def run_test(self):
105113
self.nodes[2].syncwithvalidationinterfacequeue() # Flush mempool to wallet
106114
assert_equal(node2_balance, self.nodes[2].getbalance())
107115

116+
# start node0 with wallet disabled so wallet transactions don't get resubmitted
108117
self.log.debug("Stop-start node0 with -persistmempool=0. Verify that it doesn't load its mempool.dat file.")
109118
self.stop_nodes()
110-
self.start_node(0, extra_args=["-persistmempool=0"])
119+
self.start_node(0, extra_args=["-persistmempool=0", "-disablewallet"])
111120
wait_until(lambda: self.nodes[0].getmempoolinfo()["loaded"])
112121
assert_equal(len(self.nodes[0].getrawmempool()), 0)
113122

114123
self.log.debug("Stop-start node0. Verify that it has the transactions in its mempool.")
115124
self.stop_nodes()
116125
self.start_node(0)
117126
wait_until(lambda: self.nodes[0].getmempoolinfo()["loaded"])
118-
assert_equal(len(self.nodes[0].getrawmempool()), 5)
127+
assert_equal(len(self.nodes[0].getrawmempool()), 6)
119128

120129
mempooldat0 = os.path.join(self.nodes[0].datadir, self.chain, 'mempool.dat')
121130
mempooldat1 = os.path.join(self.nodes[1].datadir, self.chain, 'mempool.dat')
@@ -124,12 +133,12 @@ def run_test(self):
124133
self.nodes[0].savemempool()
125134
assert os.path.isfile(mempooldat0)
126135

127-
self.log.debug("Stop nodes, make node1 use mempool.dat from node0. Verify it has 5 transactions")
136+
self.log.debug("Stop nodes, make node1 use mempool.dat from node0. Verify it has 6 transactions")
128137
os.rename(mempooldat0, mempooldat1)
129138
self.stop_nodes()
130139
self.start_node(1, extra_args=[])
131140
wait_until(lambda: self.nodes[1].getmempoolinfo()["loaded"])
132-
assert_equal(len(self.nodes[1].getrawmempool()), 5)
141+
assert_equal(len(self.nodes[1].getrawmempool()), 6)
133142

134143
self.log.debug("Prevent bitcoind from writing mempool.dat to disk. Verify that `savemempool` fails")
135144
# to test the exception we are creating a tmp folder called mempool.dat.new
@@ -139,6 +148,27 @@ def run_test(self):
139148
assert_raises_rpc_error(-1, "Unable to dump mempool to disk", self.nodes[1].savemempool)
140149
os.rmdir(mempooldotnew1)
141150

151+
self.test_persist_unbroadcast()
152+
153+
def test_persist_unbroadcast(self):
154+
node0 = self.nodes[0]
155+
self.start_node(0)
156+
157+
# clear out mempool
158+
node0.generate(1)
159+
160+
# disconnect nodes to make a txn that remains in the unbroadcast set.
161+
disconnect_nodes(node0, 1)
162+
node0.sendtoaddress(self.nodes[1].getnewaddress(), Decimal("12"))
163+
164+
# shutdown, then startup with wallet disabled
165+
self.stop_nodes()
166+
self.start_node(0, extra_args=["-disablewallet"])
167+
168+
# check that txn gets broadcast due to unbroadcast logic
169+
conn = node0.add_p2p_connection(P2PTxInvStore())
170+
node0.mockscheduler(16*60) # 15 min + 1 for buffer
171+
wait_until(lambda: len(conn.get_invs()) == 1)
142172

143173
if __name__ == '__main__':
144174
MempoolPersistTest().main()

test/functional/mempool_unbroadcast.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ def test_broadcast(self):
5858
assert rpc_tx_hsh not in mempool
5959
assert wallet_tx_hsh not in mempool
6060

61+
# ensure that unbroadcast txs are persisted to mempool.dat
62+
self.restart_node(0)
63+
6164
self.log.info("Reconnect nodes & check if they are sent to node 1")
6265
connect_nodes(node, 1)
6366

0 commit comments

Comments
 (0)