Skip to content

Commit 3b9a0bf

Browse files
committed
Merge #7840: Several performance and privacy improvements to inv/mempool handling
b559914 Move bloom and feerate filtering to just prior to tx sending. (Gregory Maxwell) 4578215 Return mempool queries in dependency order (Pieter Wuille) ed70683 Handle mempool requests in send loop, subject to trickle (Pieter Wuille) dc13dcd Split up and optimize transaction and block inv queues (Pieter Wuille) f2d3ba7 Eliminate TX trickle bypass, sort TX invs for privacy and priority. (Gregory Maxwell)
2 parents d51618e + b559914 commit 3b9a0bf

File tree

6 files changed

+189
-86
lines changed

6 files changed

+189
-86
lines changed

src/main.cpp

Lines changed: 136 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -4556,12 +4556,16 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
45564556
vRecv >> LIMITED_STRING(pfrom->strSubVer, MAX_SUBVERSION_LENGTH);
45574557
pfrom->cleanSubVer = SanitizeString(pfrom->strSubVer);
45584558
}
4559-
if (!vRecv.empty())
4559+
if (!vRecv.empty()) {
45604560
vRecv >> pfrom->nStartingHeight;
4561-
if (!vRecv.empty())
4562-
vRecv >> pfrom->fRelayTxes; // set to true after we get the first filter* message
4563-
else
4564-
pfrom->fRelayTxes = true;
4561+
}
4562+
{
4563+
LOCK(pfrom->cs_filter);
4564+
if (!vRecv.empty())
4565+
vRecv >> pfrom->fRelayTxes; // set to true after we get the first filter* message
4566+
else
4567+
pfrom->fRelayTxes = true;
4568+
}
45654569

45664570
// Disconnect if we connected to ourself
45674571
if (nNonce == nLocalHostNonce && nNonce > 1)
@@ -5234,34 +5238,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
52345238
pfrom->fDisconnect = true;
52355239
return true;
52365240
}
5237-
LOCK2(cs_main, pfrom->cs_filter);
52385241

5239-
std::vector<uint256> vtxid;
5240-
mempool.queryHashes(vtxid);
5241-
vector<CInv> vInv;
5242-
BOOST_FOREACH(uint256& hash, vtxid) {
5243-
CInv inv(MSG_TX, hash);
5244-
if (pfrom->pfilter) {
5245-
CTransaction tx;
5246-
bool fInMemPool = mempool.lookup(hash, tx);
5247-
if (!fInMemPool) continue; // another thread removed since queryHashes, maybe...
5248-
if (!pfrom->pfilter->IsRelevantAndUpdate(tx)) continue;
5249-
}
5250-
if (pfrom->minFeeFilter) {
5251-
CFeeRate feeRate;
5252-
mempool.lookupFeeRate(hash, feeRate);
5253-
LOCK(pfrom->cs_feeFilter);
5254-
if (feeRate.GetFeePerK() < pfrom->minFeeFilter)
5255-
continue;
5256-
}
5257-
vInv.push_back(inv);
5258-
if (vInv.size() == MAX_INV_SZ) {
5259-
pfrom->PushMessage(NetMsgType::INV, vInv);
5260-
vInv.clear();
5261-
}
5262-
}
5263-
if (vInv.size() > 0)
5264-
pfrom->PushMessage(NetMsgType::INV, vInv);
5242+
LOCK(pfrom->cs_inventory);
5243+
pfrom->fSendMempool = true;
52655244
}
52665245

52675246

@@ -5349,12 +5328,13 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
53495328
CBloomFilter filter;
53505329
vRecv >> filter;
53515330

5331+
LOCK(pfrom->cs_filter);
5332+
53525333
if (!filter.IsWithinSizeConstraints())
53535334
// There is no excuse for sending a too-large filter
53545335
Misbehaving(pfrom->GetId(), 100);
53555336
else
53565337
{
5357-
LOCK(pfrom->cs_filter);
53585338
delete pfrom->pfilter;
53595339
pfrom->pfilter = new CBloomFilter(filter);
53605340
pfrom->pfilter->UpdateEmptyFull();
@@ -5559,6 +5539,22 @@ bool ProcessMessages(CNode* pfrom)
55595539
return fOk;
55605540
}
55615541

5542+
class CompareInvMempoolOrder
5543+
{
5544+
CTxMemPool *mp;
5545+
public:
5546+
CompareInvMempoolOrder(CTxMemPool *mempool)
5547+
{
5548+
mp = mempool;
5549+
}
5550+
5551+
bool operator()(std::set<uint256>::iterator a, std::set<uint256>::iterator b)
5552+
{
5553+
/* As std::make_heap produces a max-heap, we want the entries with the
5554+
* fewest ancestors/highest fee to sort later. */
5555+
return mp->CompareDepthAndScore(*b, *a);
5556+
}
5557+
};
55625558

55635559
bool SendMessages(CNode* pto)
55645560
{
@@ -5798,49 +5794,127 @@ bool SendMessages(CNode* pto)
57985794
// Message: inventory
57995795
//
58005796
vector<CInv> vInv;
5801-
vector<CInv> vInvWait;
58025797
{
5798+
LOCK(pto->cs_inventory);
5799+
vInv.reserve(std::max<size_t>(pto->vInventoryBlockToSend.size(), INVENTORY_BROADCAST_MAX));
5800+
5801+
// Add blocks
5802+
BOOST_FOREACH(const uint256& hash, pto->vInventoryBlockToSend) {
5803+
vInv.push_back(CInv(MSG_BLOCK, hash));
5804+
if (vInv.size() == MAX_INV_SZ) {
5805+
pto->PushMessage(NetMsgType::INV, vInv);
5806+
vInv.clear();
5807+
}
5808+
}
5809+
pto->vInventoryBlockToSend.clear();
5810+
5811+
// Check whether periodic sends should happen
58035812
bool fSendTrickle = pto->fWhitelisted;
58045813
if (pto->nNextInvSend < nNow) {
58055814
fSendTrickle = true;
5806-
pto->nNextInvSend = PoissonNextSend(nNow, AVG_INVENTORY_BROADCAST_INTERVAL);
5815+
// Use half the delay for outbound peers, as there is less privacy concern for them.
5816+
pto->nNextInvSend = PoissonNextSend(nNow, INVENTORY_BROADCAST_INTERVAL >> !pto->fInbound);
5817+
}
5818+
5819+
// Time to send but the peer has requested we not relay transactions.
5820+
if (fSendTrickle) {
5821+
LOCK(pto->cs_filter);
5822+
if (!pto->fRelayTxes) pto->setInventoryTxToSend.clear();
58075823
}
5808-
LOCK(pto->cs_inventory);
5809-
vInv.reserve(std::min<size_t>(1000, pto->vInventoryToSend.size()));
5810-
vInvWait.reserve(pto->vInventoryToSend.size());
5811-
BOOST_FOREACH(const CInv& inv, pto->vInventoryToSend)
5812-
{
5813-
if (inv.type == MSG_TX && pto->filterInventoryKnown.contains(inv.hash))
5814-
continue;
58155824

5816-
// trickle out tx inv to protect privacy
5817-
if (inv.type == MSG_TX && !fSendTrickle)
5825+
// Respond to BIP35 mempool requests
5826+
if (fSendTrickle && pto->fSendMempool) {
5827+
std::vector<uint256> vtxid;
5828+
mempool.queryHashes(vtxid);
5829+
pto->fSendMempool = false;
5830+
CAmount filterrate = 0;
58185831
{
5819-
// 1/4 of tx invs blast to all immediately
5820-
static uint256 hashSalt;
5821-
if (hashSalt.IsNull())
5822-
hashSalt = GetRandHash();
5823-
uint256 hashRand = ArithToUint256(UintToArith256(inv.hash) ^ UintToArith256(hashSalt));
5824-
hashRand = Hash(BEGIN(hashRand), END(hashRand));
5825-
bool fTrickleWait = ((UintToArith256(hashRand) & 3) != 0);
5832+
LOCK(pto->cs_feeFilter);
5833+
filterrate = pto->minFeeFilter;
5834+
}
58265835

5827-
if (fTrickleWait)
5828-
{
5829-
vInvWait.push_back(inv);
5830-
continue;
5836+
LOCK(pto->cs_filter);
5837+
5838+
BOOST_FOREACH(const uint256& hash, vtxid) {
5839+
CInv inv(MSG_TX, hash);
5840+
pto->setInventoryTxToSend.erase(hash);
5841+
if (filterrate) {
5842+
CFeeRate feeRate;
5843+
mempool.lookupFeeRate(hash, feeRate);
5844+
if (feeRate.GetFeePerK() < filterrate)
5845+
continue;
5846+
}
5847+
if (pto->pfilter) {
5848+
CTransaction tx;
5849+
bool fInMemPool = mempool.lookup(hash, tx);
5850+
if (!fInMemPool) continue; // another thread removed since queryHashes, maybe...
5851+
if (!pto->pfilter->IsRelevantAndUpdate(tx)) continue;
5852+
}
5853+
pto->filterInventoryKnown.insert(hash);
5854+
vInv.push_back(inv);
5855+
if (vInv.size() == MAX_INV_SZ) {
5856+
pto->PushMessage(NetMsgType::INV, vInv);
5857+
vInv.clear();
58315858
}
58325859
}
5860+
}
58335861

5834-
pto->filterInventoryKnown.insert(inv.hash);
5835-
5836-
vInv.push_back(inv);
5837-
if (vInv.size() >= 1000)
5862+
// Determine transactions to relay
5863+
if (fSendTrickle) {
5864+
// Produce a vector with all candidates for sending
5865+
vector<std::set<uint256>::iterator> vInvTx;
5866+
vInvTx.reserve(pto->setInventoryTxToSend.size());
5867+
for (std::set<uint256>::iterator it = pto->setInventoryTxToSend.begin(); it != pto->setInventoryTxToSend.end(); it++) {
5868+
vInvTx.push_back(it);
5869+
}
5870+
CAmount filterrate = 0;
58385871
{
5839-
pto->PushMessage(NetMsgType::INV, vInv);
5840-
vInv.clear();
5872+
LOCK(pto->cs_feeFilter);
5873+
filterrate = pto->minFeeFilter;
5874+
}
5875+
// Topologically and fee-rate sort the inventory we send for privacy and priority reasons.
5876+
// A heap is used so that not all items need sorting if only a few are being sent.
5877+
CompareInvMempoolOrder compareInvMempoolOrder(&mempool);
5878+
std::make_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
5879+
// No reason to drain out at many times the network's capacity,
5880+
// especially since we have many peers and some will draw much shorter delays.
5881+
unsigned int nRelayedTransactions = 0;
5882+
LOCK(pto->cs_filter);
5883+
while (!vInvTx.empty() && nRelayedTransactions < INVENTORY_BROADCAST_MAX) {
5884+
// Fetch the top element from the heap
5885+
std::pop_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
5886+
std::set<uint256>::iterator it = vInvTx.back();
5887+
vInvTx.pop_back();
5888+
uint256 hash = *it;
5889+
// Remove it from the to-be-sent set
5890+
pto->setInventoryTxToSend.erase(it);
5891+
// Check if not in the filter already
5892+
if (pto->filterInventoryKnown.contains(hash)) {
5893+
continue;
5894+
}
5895+
// Not in the mempool anymore? don't bother sending it.
5896+
CFeeRate feeRate;
5897+
if (!mempool.lookupFeeRate(hash, feeRate)) {
5898+
continue;
5899+
}
5900+
if (filterrate && feeRate.GetFeePerK() < filterrate) {
5901+
continue;
5902+
}
5903+
if (pto->pfilter) {
5904+
CTransaction tx;
5905+
if (!mempool.lookup(hash, tx)) continue;
5906+
if (!pto->pfilter->IsRelevantAndUpdate(tx)) continue;
5907+
}
5908+
// Send
5909+
vInv.push_back(CInv(MSG_TX, hash));
5910+
nRelayedTransactions++;
5911+
if (vInv.size() == MAX_INV_SZ) {
5912+
pto->PushMessage(NetMsgType::INV, vInv);
5913+
vInv.clear();
5914+
}
5915+
pto->filterInventoryKnown.insert(hash);
58415916
}
58425917
}
5843-
pto->vInventoryToSend = vInvWait;
58445918
}
58455919
if (!vInv.empty())
58465920
pto->PushMessage(NetMsgType::INV, vInv);

src/main.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,12 @@ static const unsigned int MAX_REJECT_MESSAGE_LENGTH = 111;
9999
static const unsigned int AVG_LOCAL_ADDRESS_BROADCAST_INTERVAL = 24 * 24 * 60;
100100
/** Average delay between peer address broadcasts in seconds. */
101101
static const unsigned int AVG_ADDRESS_BROADCAST_INTERVAL = 30;
102-
/** Average delay between trickled inventory broadcasts in seconds.
103-
* Blocks, whitelisted receivers, and a random 25% of transactions bypass this. */
104-
static const unsigned int AVG_INVENTORY_BROADCAST_INTERVAL = 5;
102+
/** Average delay between trickled inventory transmissions in seconds.
103+
* Blocks and whitelisted receivers bypass this, outbound peers get half this delay. */
104+
static const unsigned int INVENTORY_BROADCAST_INTERVAL = 5;
105+
/** Maximum number of inventory items to send per transmission.
106+
* Limits the impact of low-fee transaction floods. */
107+
static const unsigned int INVENTORY_BROADCAST_MAX = 7 * INVENTORY_BROADCAST_INTERVAL;
105108
/** Average delay between feefilter broadcasts in seconds. */
106109
static const unsigned int AVG_FEEFILTER_BROADCAST_INTERVAL = 10 * 60;
107110
/** Maximum feefilter broadcast delay after significant change. */

src/net.cpp

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2088,20 +2088,7 @@ void RelayTransaction(const CTransaction& tx, CFeeRate feerate)
20882088
LOCK(cs_vNodes);
20892089
BOOST_FOREACH(CNode* pnode, vNodes)
20902090
{
2091-
if(!pnode->fRelayTxes)
2092-
continue;
2093-
{
2094-
LOCK(pnode->cs_feeFilter);
2095-
if (feerate.GetFeePerK() < pnode->minFeeFilter)
2096-
continue;
2097-
}
2098-
LOCK(pnode->cs_filter);
2099-
if (pnode->pfilter)
2100-
{
2101-
if (pnode->pfilter->IsRelevantAndUpdate(tx))
2102-
pnode->PushInventory(inv);
2103-
} else
2104-
pnode->PushInventory(inv);
2091+
pnode->PushInventory(inv);
21052092
}
21062093
}
21072094

@@ -2387,6 +2374,7 @@ CNode::CNode(SOCKET hSocketIn, const CAddress& addrIn, const std::string& addrNa
23872374
hashContinue = uint256();
23882375
nStartingHeight = -1;
23892376
filterInventoryKnown.reset();
2377+
fSendMempool = false;
23902378
fGetAddr = false;
23912379
nNextLocalAddrSend = 0;
23922380
nNextAddrSend = 0;

src/net.h

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ class CNode
357357
// a) it allows us to not relay tx invs before receiving the peer's version message
358358
// b) the peer may tell us in its version message that we should not relay tx invs
359359
// unless it loads a bloom filter.
360-
bool fRelayTxes;
360+
bool fRelayTxes; //protected by cs_filter
361361
bool fSentAddr;
362362
CSemaphoreGrant grantOutbound;
363363
CCriticalSection cs_filter;
@@ -397,14 +397,22 @@ class CNode
397397

398398
// inventory based relay
399399
CRollingBloomFilter filterInventoryKnown;
400-
std::vector<CInv> vInventoryToSend;
400+
// Set of transaction ids we still have to announce.
401+
// They are sorted by the mempool before relay, so the order is not important.
402+
std::set<uint256> setInventoryTxToSend;
403+
// List of block ids we still have announce.
404+
// There is no final sorting before sending, as they are always sent immediately
405+
// and in the order requested.
406+
std::vector<uint256> vInventoryBlockToSend;
401407
CCriticalSection cs_inventory;
402408
std::set<uint256> setAskFor;
403409
std::multimap<int64_t, CInv> mapAskFor;
404410
int64_t nNextInvSend;
405411
// Used for headers announcements - unfiltered blocks to relay
406412
// Also protected by cs_inventory
407413
std::vector<uint256> vBlockHashesToAnnounce;
414+
// Used for BIP35 mempool sending, also protected by cs_inventory
415+
bool fSendMempool;
408416

409417
// Ping time measurement:
410418
// The pong reply we're expecting, or 0 if no pong expected.
@@ -517,11 +525,13 @@ class CNode
517525

518526
void PushInventory(const CInv& inv)
519527
{
520-
{
521-
LOCK(cs_inventory);
522-
if (inv.type == MSG_TX && filterInventoryKnown.contains(inv.hash))
523-
return;
524-
vInventoryToSend.push_back(inv);
528+
LOCK(cs_inventory);
529+
if (inv.type == MSG_TX) {
530+
if (!filterInventoryKnown.contains(inv.hash)) {
531+
setInventoryTxToSend.insert(inv.hash);
532+
}
533+
} else if (inv.type == MSG_BLOCK) {
534+
vInventoryBlockToSend.push_back(inv.hash);
525535
}
526536
}
527537

src/txmempool.cpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,31 @@ void CTxMemPool::check(const CCoinsViewCache *pcoins) const
752752
assert(innerUsage == cachedInnerUsage);
753753
}
754754

755+
bool CTxMemPool::CompareDepthAndScore(const uint256& hasha, const uint256& hashb)
756+
{
757+
LOCK(cs);
758+
indexed_transaction_set::const_iterator i = mapTx.find(hasha);
759+
if (i == mapTx.end()) return false;
760+
indexed_transaction_set::const_iterator j = mapTx.find(hashb);
761+
if (j == mapTx.end()) return true;
762+
uint64_t counta = i->GetCountWithAncestors();
763+
uint64_t countb = j->GetCountWithAncestors();
764+
if (counta == countb) {
765+
return CompareTxMemPoolEntryByScore()(*i, *j);
766+
}
767+
return counta < countb;
768+
}
769+
770+
namespace {
771+
class DepthAndScoreComparator
772+
{
773+
CTxMemPool *mp;
774+
public:
775+
DepthAndScoreComparator(CTxMemPool *mempool) : mp(mempool) {}
776+
bool operator()(const uint256& a, const uint256& b) { return mp->CompareDepthAndScore(a, b); }
777+
};
778+
}
779+
755780
void CTxMemPool::queryHashes(vector<uint256>& vtxid)
756781
{
757782
vtxid.clear();
@@ -760,6 +785,8 @@ void CTxMemPool::queryHashes(vector<uint256>& vtxid)
760785
vtxid.reserve(mapTx.size());
761786
for (indexed_transaction_set::iterator mi = mapTx.begin(); mi != mapTx.end(); ++mi)
762787
vtxid.push_back(mi->GetTx().GetHash());
788+
789+
std::sort(vtxid.begin(), vtxid.end(), DepthAndScoreComparator(this));
763790
}
764791

765792
bool CTxMemPool::lookup(uint256 hash, CTransaction& result) const

0 commit comments

Comments
 (0)