Skip to content

Commit d25cd3e

Browse files
committed
Add receiver-side protocol implementation for CMPCTBLOCK stuff
1 parent 9c837d5 commit d25cd3e

File tree

1 file changed

+207
-8
lines changed

1 file changed

+207
-8
lines changed

src/main.cpp

Lines changed: 207 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
#include "addrman.h"
99
#include "arith_uint256.h"
10+
#include "blockencodings.h"
1011
#include "chainparams.h"
1112
#include "checkpoints.h"
1213
#include "checkqueue.h"
@@ -197,8 +198,9 @@ namespace {
197198
/** Blocks that are in flight, and that are in the queue to be downloaded. Protected by cs_main. */
198199
struct QueuedBlock {
199200
uint256 hash;
200-
CBlockIndex* pindex; //!< Optional.
201-
bool fValidatedHeaders; //!< Whether this block has validated headers at the time of request.
201+
CBlockIndex* pindex; //!< Optional.
202+
bool fValidatedHeaders; //!< Whether this block has validated headers at the time of request.
203+
std::unique_ptr<PartiallyDownloadedBlock> partialBlock; //!< Optional, used for CMPCTBLOCK downloads
202204
};
203205
map<uint256, pair<NodeId, list<QueuedBlock>::iterator> > mapBlocksInFlight;
204206

@@ -364,6 +366,7 @@ void FinalizeNode(NodeId nodeid) {
364366

365367
// Requires cs_main.
366368
// Returns a bool indicating whether we requested this block.
369+
// Also used if a block was /not/ received and timed out or started with another peer
367370
bool MarkBlockAsReceived(const uint256& hash) {
368371
map<uint256, pair<NodeId, list<QueuedBlock>::iterator> >::iterator itInFlight = mapBlocksInFlight.find(hash);
369372
if (itInFlight != mapBlocksInFlight.end()) {
@@ -387,25 +390,37 @@ bool MarkBlockAsReceived(const uint256& hash) {
387390
}
388391

389392
// Requires cs_main.
390-
void MarkBlockAsInFlight(NodeId nodeid, const uint256& hash, const Consensus::Params& consensusParams, CBlockIndex *pindex = NULL) {
393+
// returns false, still setting pit, if the block was already in flight from the same peer
394+
// pit will only be valid as long as the same cs_main lock is being held
395+
bool MarkBlockAsInFlight(NodeId nodeid, const uint256& hash, const Consensus::Params& consensusParams, CBlockIndex *pindex = NULL, list<QueuedBlock>::iterator **pit = NULL) {
391396
CNodeState *state = State(nodeid);
392397
assert(state != NULL);
393398

399+
// Short-circuit most stuff in case its from the same node
400+
map<uint256, pair<NodeId, list<QueuedBlock>::iterator> >::iterator itInFlight = mapBlocksInFlight.find(hash);
401+
if (itInFlight != mapBlocksInFlight.end() && itInFlight->second.first == nodeid) {
402+
*pit = &itInFlight->second.second;
403+
return false;
404+
}
405+
394406
// Make sure it's not listed somewhere already.
395407
MarkBlockAsReceived(hash);
396408

397-
QueuedBlock newentry = {hash, pindex, pindex != NULL};
398-
list<QueuedBlock>::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), newentry);
409+
list<QueuedBlock>::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(),
410+
{hash, pindex, pindex != NULL, std::unique_ptr<PartiallyDownloadedBlock>(pit ? new PartiallyDownloadedBlock(&mempool) : NULL)});
399411
state->nBlocksInFlight++;
400-
state->nBlocksInFlightValidHeaders += newentry.fValidatedHeaders;
412+
state->nBlocksInFlightValidHeaders += it->fValidatedHeaders;
401413
if (state->nBlocksInFlight == 1) {
402414
// We're starting a block download (batch) from this peer.
403415
state->nDownloadingSince = GetTimeMicros();
404416
}
405417
if (state->nBlocksInFlightValidHeaders == 1 && pindex != NULL) {
406418
nPeersWithValidatedDownloads++;
407419
}
408-
mapBlocksInFlight[hash] = std::make_pair(nodeid, it);
420+
itInFlight = mapBlocksInFlight.insert(std::make_pair(hash, std::make_pair(nodeid, it))).first;
421+
if (pit)
422+
*pit = &itInFlight->second.second;
423+
return true;
409424
}
410425

411426
/** Check whether the last unknown block a peer advertised is not yet known. */
@@ -4783,6 +4798,16 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
47834798
// nodes)
47844799
pfrom->PushMessage(NetMsgType::SENDHEADERS);
47854800
}
4801+
if (pfrom->nVersion >= SHORT_IDS_BLOCKS_VERSION) {
4802+
// Tell our peer we are willing to provide version-1 cmpctblocks
4803+
// However, we do not request new block announcements using
4804+
// cmpctblock messages.
4805+
// We send this to non-NODE NETWORK peers as well, because
4806+
// they may wish to request compact blocks from us
4807+
bool fAnnounceUsingCMPCTBLOCK = false;
4808+
uint64_t nCMPCTBLOCKVersion = 1;
4809+
pfrom->PushMessage(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
4810+
}
47864811
}
47874812

47884813

@@ -4915,7 +4940,10 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
49154940
CNodeState *nodestate = State(pfrom->GetId());
49164941
if (CanDirectFetch(chainparams.GetConsensus()) &&
49174942
nodestate->nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
4918-
vToFetch.push_back(inv);
4943+
if (nodestate->fProvidesHeaderAndIDs)
4944+
vToFetch.push_back(CInv(MSG_CMPCT_BLOCK, inv.hash));
4945+
else
4946+
vToFetch.push_back(inv);
49194947
// Mark block as in flight already, even though the actual "getdata" message only goes out
49204948
// later (within the same cs_main lock, though).
49214949
MarkBlockAsInFlight(pfrom->GetId(), inv.hash, chainparams.GetConsensus());
@@ -5232,6 +5260,174 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
52325260
}
52335261

52345262

5263+
else if (strCommand == NetMsgType::CMPCTBLOCK && !fImporting && !fReindex) // Ignore blocks received while importing
5264+
{
5265+
CBlockHeaderAndShortTxIDs cmpctblock;
5266+
vRecv >> cmpctblock;
5267+
5268+
LOCK(cs_main);
5269+
5270+
if (mapBlockIndex.find(cmpctblock.header.hashPrevBlock) == mapBlockIndex.end()) {
5271+
// Doesn't connect (or is genesis), instead of DoSing in AcceptBlockHeader, request deeper headers
5272+
if (!IsInitialBlockDownload())
5273+
pfrom->PushMessage(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), uint256());
5274+
return true;
5275+
}
5276+
5277+
CBlockIndex *pindex = NULL;
5278+
CValidationState state;
5279+
if (!AcceptBlockHeader(cmpctblock.header, state, chainparams, &pindex)) {
5280+
int nDoS;
5281+
if (state.IsInvalid(nDoS)) {
5282+
if (nDoS > 0)
5283+
Misbehaving(pfrom->GetId(), nDoS);
5284+
LogPrintf("Peer %d sent us invalid header via cmpctblock\n", pfrom->id);
5285+
return true;
5286+
}
5287+
}
5288+
5289+
// If AcceptBlockHeader returned true, it set pindex
5290+
assert(pindex);
5291+
UpdateBlockAvailability(pfrom->GetId(), pindex->GetBlockHash());
5292+
5293+
std::map<uint256, pair<NodeId, list<QueuedBlock>::iterator> >::iterator blockInFlightIt = mapBlocksInFlight.find(pindex->GetBlockHash());
5294+
bool fAlreadyInFlight = blockInFlightIt != mapBlocksInFlight.end();
5295+
5296+
if (pindex->nStatus & BLOCK_HAVE_DATA) // Nothing to do here
5297+
return true;
5298+
5299+
if (pindex->nChainWork <= chainActive.Tip()->nChainWork || // We know something better
5300+
pindex->nTx != 0) { // We had this block at some point, but pruned it
5301+
if (fAlreadyInFlight) {
5302+
// We requested this block for some reason, but our mempool will probably be useless
5303+
// so we just grab the block via normal getdata
5304+
std::vector<CInv> vInv(1);
5305+
vInv[0] = CInv(MSG_BLOCK, cmpctblock.header.GetHash());
5306+
pfrom->PushMessage(NetMsgType::GETDATA, vInv);
5307+
return true;
5308+
}
5309+
}
5310+
5311+
// If we're not close to tip yet, give up and let parallel block fetch work its magic
5312+
if (!fAlreadyInFlight && !CanDirectFetch(chainparams.GetConsensus()))
5313+
return true;
5314+
5315+
CNodeState *nodestate = State(pfrom->GetId());
5316+
5317+
// We want to be a bit conservative just to be extra careful about DoS
5318+
// possibilities in compact block processing...
5319+
if (pindex->nHeight <= chainActive.Height() + 2) {
5320+
if ((!fAlreadyInFlight && nodestate->nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) ||
5321+
(fAlreadyInFlight && blockInFlightIt->second.first == pfrom->GetId())) {
5322+
list<QueuedBlock>::iterator *queuedBlockIt = NULL;
5323+
if (!MarkBlockAsInFlight(pfrom->GetId(), pindex->GetBlockHash(), chainparams.GetConsensus(), pindex, &queuedBlockIt)) {
5324+
if (!(*queuedBlockIt)->partialBlock)
5325+
(*queuedBlockIt)->partialBlock.reset(new PartiallyDownloadedBlock(&mempool));
5326+
else {
5327+
// The block was already in flight using compact blocks from the same peer
5328+
LogPrint("net", "Peer sent us compact block we were already syncing!\n");
5329+
return true;
5330+
}
5331+
}
5332+
5333+
PartiallyDownloadedBlock& partialBlock = *(*queuedBlockIt)->partialBlock;
5334+
ReadStatus status = partialBlock.InitData(cmpctblock);
5335+
if (status == READ_STATUS_INVALID) {
5336+
MarkBlockAsReceived(pindex->GetBlockHash()); // Reset in-flight state in case of whitelist
5337+
Misbehaving(pfrom->GetId(), 100);
5338+
LogPrintf("Peer %d sent us invalid compact block\n", pfrom->id);
5339+
return true;
5340+
} else if (status == READ_STATUS_FAILED) {
5341+
// Duplicate txindexes, the block is now in-flight, so just request it
5342+
std::vector<CInv> vInv(1);
5343+
vInv[0] = CInv(MSG_BLOCK, cmpctblock.header.GetHash());
5344+
pfrom->PushMessage(NetMsgType::GETDATA, vInv);
5345+
return true;
5346+
}
5347+
5348+
BlockTransactionsRequest req;
5349+
for (size_t i = 0; i < cmpctblock.BlockTxCount(); i++) {
5350+
if (!partialBlock.IsTxAvailable(i))
5351+
req.indexes.push_back(i);
5352+
}
5353+
if (req.indexes.empty()) {
5354+
// Dirty hack to jump to BLOCKTXN code (TODO: move message handling into their own functions)
5355+
BlockTransactions txn;
5356+
txn.blockhash = cmpctblock.header.GetHash();
5357+
CDataStream blockTxnMsg(SER_NETWORK, PROTOCOL_VERSION);
5358+
blockTxnMsg << txn;
5359+
return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams);
5360+
} else {
5361+
req.blockhash = pindex->GetBlockHash();
5362+
pfrom->PushMessage(NetMsgType::GETBLOCKTXN, req);
5363+
}
5364+
}
5365+
} else {
5366+
if (fAlreadyInFlight) {
5367+
// We requested this block, but its far into the future, so our
5368+
// mempool will probably be useless - request the block normally
5369+
std::vector<CInv> vInv(1);
5370+
vInv[0] = CInv(MSG_BLOCK, cmpctblock.header.GetHash());
5371+
pfrom->PushMessage(NetMsgType::GETDATA, vInv);
5372+
return true;
5373+
} else {
5374+
// If this was an announce-cmpctblock, we want the same treatment as a header message
5375+
// Dirty hack to process as if it were just a headers message (TODO: move message handling into their own functions)
5376+
std::vector<CBlock> headers;
5377+
headers.push_back(cmpctblock.header);
5378+
CDataStream vHeadersMsg(SER_NETWORK, PROTOCOL_VERSION);
5379+
vHeadersMsg << headers;
5380+
return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams);
5381+
}
5382+
}
5383+
5384+
CheckBlockIndex(chainparams.GetConsensus());
5385+
}
5386+
5387+
else if (strCommand == NetMsgType::BLOCKTXN && !fImporting && !fReindex) // Ignore blocks received while importing
5388+
{
5389+
BlockTransactions resp;
5390+
vRecv >> resp;
5391+
5392+
LOCK(cs_main);
5393+
5394+
map<uint256, pair<NodeId, list<QueuedBlock>::iterator> >::iterator it = mapBlocksInFlight.find(resp.blockhash);
5395+
if (it == mapBlocksInFlight.end() || !it->second.second->partialBlock ||
5396+
it->second.first != pfrom->GetId()) {
5397+
LogPrint("net", "Peer %d sent us block transactions for block we weren't expecting\n", pfrom->id);
5398+
return true;
5399+
}
5400+
5401+
PartiallyDownloadedBlock& partialBlock = *it->second.second->partialBlock;
5402+
CBlock block;
5403+
ReadStatus status = partialBlock.FillBlock(block, resp.txn);
5404+
if (status == READ_STATUS_INVALID) {
5405+
MarkBlockAsReceived(resp.blockhash); // Reset in-flight state in case of whitelist
5406+
Misbehaving(pfrom->GetId(), 100);
5407+
LogPrintf("Peer %d sent us invalid compact block/non-matching block transactions\n", pfrom->id);
5408+
return true;
5409+
} else if (status == READ_STATUS_FAILED) {
5410+
// Might have collided, fall back to getdata now :(
5411+
std::vector<CInv> invs;
5412+
invs.push_back(CInv(MSG_BLOCK, resp.blockhash));
5413+
pfrom->PushMessage(NetMsgType::GETDATA, invs);
5414+
} else {
5415+
CValidationState state;
5416+
ProcessNewBlock(state, chainparams, pfrom, &block, false, NULL);
5417+
int nDoS;
5418+
if (state.IsInvalid(nDoS)) {
5419+
assert (state.GetRejectCode() < REJECT_INTERNAL); // Blocks are never rejected with internal reject codes
5420+
pfrom->PushMessage(NetMsgType::REJECT, strCommand, (unsigned char)state.GetRejectCode(),
5421+
state.GetRejectReason().substr(0, MAX_REJECT_MESSAGE_LENGTH), block.GetHash());
5422+
if (nDoS > 0) {
5423+
LOCK(cs_main);
5424+
Misbehaving(pfrom->GetId(), nDoS);
5425+
}
5426+
}
5427+
}
5428+
}
5429+
5430+
52355431
else if (strCommand == NetMsgType::HEADERS && !fImporting && !fReindex) // Ignore headers received while importing
52365432
{
52375433
std::vector<CBlockHeader> headers;
@@ -5334,6 +5530,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
53345530
pindexLast->GetBlockHash().ToString(), pindexLast->nHeight);
53355531
}
53365532
if (vGetData.size() > 0) {
5533+
if (nodestate->fProvidesHeaderAndIDs && vGetData.size() == 1 && mapBlocksInFlight.size() == 1 && pindexLast->pprev->IsValid(BLOCK_VALID_CHAIN)) {
5534+
vGetData[0] = CInv(MSG_CMPCT_BLOCK, vGetData[0].hash);
5535+
}
53375536
pfrom->PushMessage(NetMsgType::GETDATA, vGetData);
53385537
}
53395538
}

0 commit comments

Comments
 (0)