Skip to content

Commit 0ef0d33

Browse files
committed
Merge #18038: P2P: Mempool tracks locally submitted transactions to improve wallet privacy
50fc4df [mempool] Persist unbroadcast set to mempool.dat (Amiti Uttarwar) 297a178 [test] Integration tests for unbroadcast functionality (Amiti Uttarwar) 6851502 [refactor/test] Extract P2PTxInvStore into test framework (Amiti Uttarwar) dc1da48 [wallet] Update the rebroadcast frequency to be ~1/day. (Amiti Uttarwar) e25e42f [p2p] Reattempt initial send of unbroadcast transactions (Amiti Uttarwar) 7e93eec [util] Add method that returns random time in milliseconds (Amiti Uttarwar) 89eeb4a [mempool] Track "unbroadcast" transactions (Amiti Uttarwar) Pull request description: This PR introduces mempool tracking of unbroadcast transactions and periodic reattempts at initial broadcast. This is a part of the rebroadcast project, and a standalone privacy win. The current rebroadcast logic is terrible for privacy because 1. only the source wallet rebroadcasts transactions and 2. it does so quite frequently. In the current system, if a user submits a transaction that does not immediately get broadcast to the network (eg. they are offline), this "rebroadcast" behavior is the safety net that can actually serve as the initial broadcast. So, keeping the attempts frequent is important for initial delivery within a reasonable timespan. This PR aims to improve # 2 by reducing the wallet rebroadcast frequency to ~1/day from ~1/15 min. It achieves this by separating the notion of initial broadcast from rebroadcasts. With these changes, the mempool tracks locally submitted transactions & periodically reattempts initial broadcast. Transactions submitted via the wallet or RPC are added to an "unbroadcast" set & are removed when a peer sends a `getdata` request, or the transaction is removed from the mempool. Every 10-15 minutes, the node reattempts an initial broadcast. This enables reducing the wallet rebroadcast frequency while ensuring the transactions will be propagated to the network. For privacy improvements around # 1, please see #16698. Thank you to gmaxwell for the idea of how to break out this subset of functionality (bitcoin/bitcoin#16698 (comment)) ACKs for top commit: fjahr: Code review ACK 50fc4df MarcoFalke: ACK 50fc4df, I think this is ready for merge now 👻 amitiuttarwar: The current tip `50fc4df` currently has 6 ACKs on it, so I've opened #18807 to address the last bits. jnewbery: utACK 50fc4df. ariard: Code Review ACK 50fc4df (minor points no need to invalid other ACKs) robot-visions: ACK 50fc4df sipa: utACK 50fc4df naumenkogs: utACK 50fc4df Tree-SHA512: 2dd935d645d5e209f8abf87bfaa3ef0e4492705ce7e89ea64279cb27ffd37f4727fa94ad62d41be331177332f8edbebf3c7f4972f8cda10dd951b80a28ab3c0f
2 parents ba348db + 50fc4df commit 0ef0d33

14 files changed

+248
-31
lines changed

src/net_processing.cpp

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,19 @@ void PeerLogicValidation::InitializeNode(CNode *pnode) {
810810
PushNodeVersion(pnode, connman, GetTime());
811811
}
812812

813+
void PeerLogicValidation::ReattemptInitialBroadcast(CScheduler& scheduler) const
814+
{
815+
std::set<uint256> unbroadcast_txids = m_mempool.GetUnbroadcastTxs();
816+
817+
for (const uint256& txid : unbroadcast_txids) {
818+
RelayTransaction(txid, *connman);
819+
}
820+
821+
// schedule next run for 10-15 minutes in the future
822+
const std::chrono::milliseconds delta = std::chrono::minutes{10} + GetRandMillis(std::chrono::minutes{5});
823+
scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta);
824+
}
825+
813826
void PeerLogicValidation::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) {
814827
fUpdateConnectionTime = false;
815828
LOCK(cs_main);
@@ -1159,6 +1172,10 @@ PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, BanMan* banman, CS
11591172
// timer.
11601173
static_assert(EXTRA_PEER_CHECK_INTERVAL < STALE_CHECK_INTERVAL, "peer eviction timer should be less than stale tip check timer");
11611174
scheduler.scheduleEvery([this, consensusParams] { this->CheckForStaleTipAndEvictPeers(consensusParams); }, std::chrono::seconds{EXTRA_PEER_CHECK_INTERVAL});
1175+
1176+
// schedule next run for 10-15 minutes in the future
1177+
const std::chrono::milliseconds delta = std::chrono::minutes{10} + GetRandMillis(std::chrono::minutes{5});
1178+
scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta);
11621179
}
11631180

11641181
/**
@@ -1587,7 +1604,7 @@ void static ProcessGetBlockData(CNode* pfrom, const CChainParams& chainparams, c
15871604
}
15881605
}
15891606

1590-
void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnman* connman, const CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) LOCKS_EXCLUDED(cs_main)
1607+
void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnman* connman, CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) LOCKS_EXCLUDED(cs_main)
15911608
{
15921609
AssertLockNotHeld(cs_main);
15931610

@@ -1636,7 +1653,13 @@ void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnm
16361653
push = true;
16371654
}
16381655
}
1639-
if (!push) {
1656+
1657+
if (push) {
1658+
// We interpret fulfilling a GETDATA for a transaction as a
1659+
// successful initial broadcast and remove it from our
1660+
// unbroadcast set.
1661+
mempool.RemoveUnbroadcastTx(inv.hash);
1662+
} else {
16401663
vNotFound.push_back(inv);
16411664
}
16421665
}

src/net_processing.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ class PeerLogicValidation final : public CValidationInterface, public NetEventsI
7676
void CheckForStaleTipAndEvictPeers(const Consensus::Params &consensusParams);
7777
/** If we have extra outbound peers, try to disconnect the one with the oldest block announcement */
7878
void EvictExtraOutboundPeers(int64_t time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
79+
/** Retrieve unbroadcast transactions from the mempool and reattempt sending to peers */
80+
void ReattemptInitialBroadcast(CScheduler& scheduler) const;
7981

8082
private:
8183
int64_t m_stale_tip_check_time; //!< Next time to check for stale tip

src/node/transaction.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t
7878
}
7979

8080
if (relay) {
81+
// the mempool tracks locally submitted transactions to make a
82+
// best-effort of initial broadcast
83+
node.mempool->AddUnbroadcastTx(hashTx);
84+
8185
RelayTransaction(hashTx, *node.connman);
8286
}
8387

src/random.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,11 @@ std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max)
592592
return std::chrono::microseconds{GetRand(duration_max.count())};
593593
}
594594

595+
std::chrono::milliseconds GetRandMillis(std::chrono::milliseconds duration_max) noexcept
596+
{
597+
return std::chrono::milliseconds{GetRand(duration_max.count())};
598+
}
599+
595600
int GetRandInt(int nMax) noexcept
596601
{
597602
return GetRand(nMax);

src/random.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
void GetRandBytes(unsigned char* buf, int num) noexcept;
7070
uint64_t GetRand(uint64_t nMax) noexcept;
7171
std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) noexcept;
72+
std::chrono::milliseconds GetRandMillis(std::chrono::milliseconds duration_max) noexcept;
7273
int GetRandInt(int nMax) noexcept;
7374
uint256 GetRandHash() noexcept;
7475

src/txmempool.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,8 @@ void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason)
417417
for (const CTxIn& txin : it->GetTx().vin)
418418
mapNextTx.erase(txin.prevout);
419419

420+
RemoveUnbroadcastTx(hash, true /* add logging because unchecked */ );
421+
420422
if (vTxHashes.size() > 1) {
421423
vTxHashes[it->vTxHashesIdx] = std::move(vTxHashes.back());
422424
vTxHashes[it->vTxHashesIdx].second->vTxHashesIdx = it->vTxHashesIdx;
@@ -919,6 +921,15 @@ size_t CTxMemPool::DynamicMemoryUsage() const {
919921
return memusage::MallocUsage(sizeof(CTxMemPoolEntry) + 12 * sizeof(void*)) * mapTx.size() + memusage::DynamicUsage(mapNextTx) + memusage::DynamicUsage(mapDeltas) + memusage::DynamicUsage(mapLinks) + memusage::DynamicUsage(vTxHashes) + cachedInnerUsage;
920922
}
921923

924+
void CTxMemPool::RemoveUnbroadcastTx(const uint256& txid, const bool unchecked) {
925+
LOCK(cs);
926+
927+
if (m_unbroadcast_txids.erase(txid))
928+
{
929+
LogPrint(BCLog::MEMPOOL, "Removed %i from set of unbroadcast txns%s\n", txid.GetHex(), (unchecked ? " before confirmation that txn was sent out" : ""));
930+
}
931+
}
932+
922933
void CTxMemPool::RemoveStaged(setEntries &stage, bool updateDescendants, MemPoolRemovalReason reason) {
923934
AssertLockHeld(cs);
924935
UpdateForRemoveFromMempool(stage, updateDescendants);

src/txmempool.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,9 @@ class CTxMemPool
549549

550550
std::vector<indexed_transaction_set::const_iterator> GetSortedDepthAndScore() const EXCLUSIVE_LOCKS_REQUIRED(cs);
551551

552+
/** track locally submitted transactions to periodically retry initial broadcast */
553+
std::set<uint256> m_unbroadcast_txids GUARDED_BY(cs);
554+
552555
public:
553556
indirectmap<COutPoint, const CTransaction*> mapNextTx GUARDED_BY(cs);
554557
std::map<uint256, CAmount> mapDeltas;
@@ -698,6 +701,21 @@ class CTxMemPool
698701

699702
size_t DynamicMemoryUsage() const;
700703

704+
/** Adds a transaction to the unbroadcast set */
705+
void AddUnbroadcastTx(const uint256& txid) {
706+
LOCK(cs);
707+
m_unbroadcast_txids.insert(txid);
708+
}
709+
710+
/** Removes a transaction from the unbroadcast set */
711+
void RemoveUnbroadcastTx(const uint256& txid, const bool unchecked = false);
712+
713+
/** Returns transactions in unbroadcast set */
714+
const std::set<uint256> GetUnbroadcastTxs() const {
715+
LOCK(cs);
716+
return m_unbroadcast_txids;
717+
}
718+
701719
private:
702720
/** UpdateForDescendants is used by UpdateTransactionsFromBlock to update
703721
* the descendants for a single transaction that has been added to the

src/validation.cpp

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4998,6 +4998,7 @@ bool LoadMempool(CTxMemPool& pool)
49984998
int64_t expired = 0;
49994999
int64_t failed = 0;
50005000
int64_t already_there = 0;
5001+
int64_t unbroadcast = 0;
50015002
int64_t nNow = GetTime();
50025003

50035004
try {
@@ -5051,12 +5052,21 @@ bool LoadMempool(CTxMemPool& pool)
50515052
for (const auto& i : mapDeltas) {
50525053
pool.PrioritiseTransaction(i.first, i.second);
50535054
}
5055+
5056+
std::set<uint256> unbroadcast_txids;
5057+
file >> unbroadcast_txids;
5058+
unbroadcast = unbroadcast_txids.size();
5059+
5060+
for (const auto& txid : unbroadcast_txids) {
5061+
pool.AddUnbroadcastTx(txid);
5062+
}
5063+
50545064
} catch (const std::exception& e) {
50555065
LogPrintf("Failed to deserialize mempool data on disk: %s. Continuing anyway.\n", e.what());
50565066
return false;
50575067
}
50585068

5059-
LogPrintf("Imported mempool transactions from disk: %i succeeded, %i failed, %i expired, %i already there\n", count, failed, expired, already_there);
5069+
LogPrintf("Imported mempool transactions from disk: %i succeeded, %i failed, %i expired, %i already there, %i waiting for initial broadcast\n", count, failed, expired, already_there, unbroadcast);
50605070
return true;
50615071
}
50625072

@@ -5066,6 +5076,7 @@ bool DumpMempool(const CTxMemPool& pool)
50665076

50675077
std::map<uint256, CAmount> mapDeltas;
50685078
std::vector<TxMempoolInfo> vinfo;
5079+
std::set<uint256> unbroadcast_txids;
50695080

50705081
static Mutex dump_mutex;
50715082
LOCK(dump_mutex);
@@ -5076,6 +5087,7 @@ bool DumpMempool(const CTxMemPool& pool)
50765087
mapDeltas[i.first] = i.second;
50775088
}
50785089
vinfo = pool.infoAll();
5090+
unbroadcast_txids = pool.GetUnbroadcastTxs();
50795091
}
50805092

50815093
int64_t mid = GetTimeMicros();
@@ -5100,6 +5112,10 @@ bool DumpMempool(const CTxMemPool& pool)
51005112
}
51015113

51025114
file << mapDeltas;
5115+
5116+
LogPrintf("Writing %d unbroadcast transactions to disk.\n", unbroadcast_txids.size());
5117+
file << unbroadcast_txids;
5118+
51035119
if (!FileCommit(file.Get()))
51045120
throw std::runtime_error("FileCommit failed");
51055121
file.fclose();

src/wallet/wallet.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1993,7 +1993,8 @@ void CWallet::ResendWalletTransactions()
19931993
// that these are our transactions.
19941994
if (GetTime() < nNextResend || !fBroadcastTransactions) return;
19951995
bool fFirst = (nNextResend == 0);
1996-
nNextResend = GetTime() + GetRand(30 * 60);
1996+
// resend 12-36 hours from now, ~1 day on average.
1997+
nNextResend = GetTime() + (12 * 60 * 60) + GetRand(24 * 60 * 60);
19971998
if (fFirst) return;
19981999

19992000
// Only do it if there's been a new block since last time

test/functional/mempool_persist.py

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,13 @@
4040
import time
4141

4242
from test_framework.test_framework import BitcoinTestFramework
43+
from test_framework.mininode import P2PTxInvStore
4344
from test_framework.util import (
4445
assert_equal,
4546
assert_greater_than_or_equal,
4647
assert_raises_rpc_error,
48+
connect_nodes,
49+
disconnect_nodes,
4750
wait_until,
4851
)
4952

@@ -80,6 +83,11 @@ def run_test(self):
8083
assert_greater_than_or_equal(tx_creation_time, tx_creation_time_lower)
8184
assert_greater_than_or_equal(tx_creation_time_higher, tx_creation_time)
8285

86+
# disconnect nodes & make a txn that remains in the unbroadcast set.
87+
disconnect_nodes(self.nodes[0], 2)
88+
self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), Decimal("12"))
89+
connect_nodes(self.nodes[0], 2)
90+
8391
self.log.debug("Stop-start the nodes. Verify that node0 has the transactions in its mempool and node1 does not. Verify that node2 calculates its balance correctly after loading wallet transactions.")
8492
self.stop_nodes()
8593
# Give this node a head-start, so we can be "extra-sure" that it didn't load anything later
@@ -89,7 +97,7 @@ def run_test(self):
8997
self.start_node(2)
9098
wait_until(lambda: self.nodes[0].getmempoolinfo()["loaded"], timeout=1)
9199
wait_until(lambda: self.nodes[2].getmempoolinfo()["loaded"], timeout=1)
92-
assert_equal(len(self.nodes[0].getrawmempool()), 5)
100+
assert_equal(len(self.nodes[0].getrawmempool()), 6)
93101
assert_equal(len(self.nodes[2].getrawmempool()), 5)
94102
# The others have loaded their mempool. If node_1 loaded anything, we'd probably notice by now:
95103
assert_equal(len(self.nodes[1].getrawmempool()), 0)
@@ -105,17 +113,18 @@ def run_test(self):
105113
self.nodes[2].syncwithvalidationinterfacequeue() # Flush mempool to wallet
106114
assert_equal(node2_balance, self.nodes[2].getbalance())
107115

116+
# start node0 with wallet disabled so wallet transactions don't get resubmitted
108117
self.log.debug("Stop-start node0 with -persistmempool=0. Verify that it doesn't load its mempool.dat file.")
109118
self.stop_nodes()
110-
self.start_node(0, extra_args=["-persistmempool=0"])
119+
self.start_node(0, extra_args=["-persistmempool=0", "-disablewallet"])
111120
wait_until(lambda: self.nodes[0].getmempoolinfo()["loaded"])
112121
assert_equal(len(self.nodes[0].getrawmempool()), 0)
113122

114123
self.log.debug("Stop-start node0. Verify that it has the transactions in its mempool.")
115124
self.stop_nodes()
116125
self.start_node(0)
117126
wait_until(lambda: self.nodes[0].getmempoolinfo()["loaded"])
118-
assert_equal(len(self.nodes[0].getrawmempool()), 5)
127+
assert_equal(len(self.nodes[0].getrawmempool()), 6)
119128

120129
mempooldat0 = os.path.join(self.nodes[0].datadir, self.chain, 'mempool.dat')
121130
mempooldat1 = os.path.join(self.nodes[1].datadir, self.chain, 'mempool.dat')
@@ -124,12 +133,12 @@ def run_test(self):
124133
self.nodes[0].savemempool()
125134
assert os.path.isfile(mempooldat0)
126135

127-
self.log.debug("Stop nodes, make node1 use mempool.dat from node0. Verify it has 5 transactions")
136+
self.log.debug("Stop nodes, make node1 use mempool.dat from node0. Verify it has 6 transactions")
128137
os.rename(mempooldat0, mempooldat1)
129138
self.stop_nodes()
130139
self.start_node(1, extra_args=[])
131140
wait_until(lambda: self.nodes[1].getmempoolinfo()["loaded"])
132-
assert_equal(len(self.nodes[1].getrawmempool()), 5)
141+
assert_equal(len(self.nodes[1].getrawmempool()), 6)
133142

134143
self.log.debug("Prevent bitcoind from writing mempool.dat to disk. Verify that `savemempool` fails")
135144
# to test the exception we are creating a tmp folder called mempool.dat.new
@@ -139,6 +148,27 @@ def run_test(self):
139148
assert_raises_rpc_error(-1, "Unable to dump mempool to disk", self.nodes[1].savemempool)
140149
os.rmdir(mempooldotnew1)
141150

151+
self.test_persist_unbroadcast()
152+
153+
def test_persist_unbroadcast(self):
154+
node0 = self.nodes[0]
155+
self.start_node(0)
156+
157+
# clear out mempool
158+
node0.generate(1)
159+
160+
# disconnect nodes to make a txn that remains in the unbroadcast set.
161+
disconnect_nodes(node0, 1)
162+
node0.sendtoaddress(self.nodes[1].getnewaddress(), Decimal("12"))
163+
164+
# shutdown, then startup with wallet disabled
165+
self.stop_nodes()
166+
self.start_node(0, extra_args=["-disablewallet"])
167+
168+
# check that txn gets broadcast due to unbroadcast logic
169+
conn = node0.add_p2p_connection(P2PTxInvStore())
170+
node0.mockscheduler(16*60) # 15 min + 1 for buffer
171+
wait_until(lambda: len(conn.get_invs()) == 1)
142172

143173
if __name__ == '__main__':
144174
MempoolPersistTest().main()

0 commit comments

Comments
 (0)