Skip to content

Commit f59d8f0

Browse files
committed
Per-peer block download tracking and stalled download detection.
Keep track of which block is being requested (and to be requested) from each peer, and limit the number of blocks in-flight per peer. In addition, detect stalled downloads, and disconnect if they persist for too long. This means blocks are never requested twice, and should eliminate duplicate downloads during synchronization.
1 parent 95e6624 commit f59d8f0

File tree

4 files changed

+131
-29
lines changed

4 files changed

+131
-29
lines changed

src/main.cpp

Lines changed: 122 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,16 @@ uint32_t nBlockSequenceId = 1;
112112
// Sources of received blocks, to be able to send them reject messages or ban
113113
// them, if processing happens afterwards. Protected by cs_main.
114114
map<uint256, NodeId> mapBlockSource;
115+
116+
// Blocks that are in flight, and that are in the queue to be downloaded.
117+
// Protected by cs_main.
118+
struct QueuedBlock {
119+
uint256 hash;
120+
int64_t nTime; // Time of "getdata" request in microseconds.
121+
int nQueuedBefore; // Number of blocks in flight at the time of request.
122+
};
123+
map<uint256, pair<NodeId, list<QueuedBlock>::iterator> > mapBlocksInFlight;
124+
map<uint256, pair<NodeId, list<uint256>::iterator> > mapBlocksToDownload;
115125
}
116126

117127
//////////////////////////////////////////////////////////////////////////////
@@ -195,10 +205,20 @@ struct CNodeState {
195205
std::string name;
196206
// List of asynchronously-determined block rejections to notify this peer about.
197207
std::vector<CBlockReject> rejects;
208+
list<QueuedBlock> vBlocksInFlight;
209+
int nBlocksInFlight;
210+
list<uint256> vBlocksToDownload;
211+
int nBlocksToDownload;
212+
int64_t nLastBlockReceive;
213+
int64_t nLastBlockProcess;
198214

199215
CNodeState() {
200216
nMisbehavior = 0;
201217
fShouldBan = false;
218+
nBlocksToDownload = 0;
219+
nBlocksInFlight = 0;
220+
nLastBlockReceive = 0;
221+
nLastBlockProcess = 0;
202222
}
203223
};
204224

@@ -227,8 +247,71 @@ void InitializeNode(NodeId nodeid, const CNode *pnode) {
227247

228248
void FinalizeNode(NodeId nodeid) {
229249
LOCK(cs_main);
250+
CNodeState *state = State(nodeid);
251+
252+
BOOST_FOREACH(const QueuedBlock& entry, state->vBlocksInFlight)
253+
mapBlocksInFlight.erase(entry.hash);
254+
BOOST_FOREACH(const uint256& hash, state->vBlocksToDownload)
255+
mapBlocksToDownload.erase(hash);
256+
230257
mapNodeState.erase(nodeid);
231258
}
259+
260+
// Requires cs_main.
261+
void MarkBlockAsReceived(const uint256 &hash, NodeId nodeFrom = -1) {
262+
map<uint256, pair<NodeId, list<uint256>::iterator> >::iterator itToDownload = mapBlocksToDownload.find(hash);
263+
if (itToDownload != mapBlocksToDownload.end()) {
264+
CNodeState *state = State(itToDownload->second.first);
265+
state->vBlocksToDownload.erase(itToDownload->second.second);
266+
state->nBlocksToDownload--;
267+
mapBlocksToDownload.erase(itToDownload);
268+
}
269+
270+
map<uint256, pair<NodeId, list<QueuedBlock>::iterator> >::iterator itInFlight = mapBlocksInFlight.find(hash);
271+
if (itInFlight != mapBlocksInFlight.end()) {
272+
CNodeState *state = State(itInFlight->second.first);
273+
state->vBlocksInFlight.erase(itInFlight->second.second);
274+
state->nBlocksInFlight--;
275+
if (itInFlight->second.first == nodeFrom)
276+
state->nLastBlockReceive = GetTimeMicros();
277+
mapBlocksInFlight.erase(itInFlight);
278+
}
279+
280+
}
281+
282+
// Requires cs_main.
283+
bool AddBlockToQueue(NodeId nodeid, const uint256 &hash) {
284+
if (mapBlocksToDownload.count(hash) || mapBlocksInFlight.count(hash))
285+
return false;
286+
287+
CNodeState *state = State(nodeid);
288+
if (state == NULL)
289+
return false;
290+
291+
list<uint256>::iterator it = state->vBlocksToDownload.insert(state->vBlocksToDownload.end(), hash);
292+
state->nBlocksToDownload++;
293+
if (state->nBlocksToDownload > 5000)
294+
Misbehaving(nodeid, 10);
295+
mapBlocksToDownload[hash] = std::make_pair(nodeid, it);
296+
return true;
297+
}
298+
299+
// Requires cs_main.
300+
void MarkBlockAsInFlight(NodeId nodeid, const uint256 &hash) {
301+
CNodeState *state = State(nodeid);
302+
assert(state != NULL);
303+
304+
// Make sure it's not listed somewhere already.
305+
MarkBlockAsReceived(hash);
306+
307+
QueuedBlock newentry = {hash, GetTimeMicros(), state->nBlocksInFlight};
308+
if (state->nBlocksInFlight == 0)
309+
state->nLastBlockReceive = newentry.nTime; // Reset when a first request is sent.
310+
list<QueuedBlock>::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), newentry);
311+
state->nBlocksInFlight++;
312+
mapBlocksInFlight[hash] = std::make_pair(nodeid, it);
313+
}
314+
232315
}
233316

234317
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) {
@@ -1299,6 +1382,7 @@ void CheckForkWarningConditionsOnNewFork(CBlockIndex* pindexNewForkTip)
12991382
CheckForkWarningConditions();
13001383
}
13011384

1385+
// Requires cs_main.
13021386
void Misbehaving(NodeId pnode, int howmuch)
13031387
{
13041388
if (howmuch == 0)
@@ -2021,7 +2105,6 @@ bool AddToBlockIndex(CBlock& block, CValidationState& state, const CDiskBlockPos
20212105
pindexNew->nSequenceId = nBlockSequenceId++;
20222106
}
20232107
assert(pindexNew);
2024-
mapAlreadyAskedFor.erase(CInv(MSG_BLOCK, hash));
20252108
map<uint256, CBlockIndex*>::iterator mi = mapBlockIndex.insert(make_pair(hash, pindexNew)).first;
20262109
pindexNew->phashBlock = &((*mi).first);
20272110
map<uint256, CBlockIndex*>::iterator miPrev = mapBlockIndex.find(block.hashPrevBlock);
@@ -2367,11 +2450,8 @@ bool ProcessBlock(CValidationState &state, CNode* pfrom, CBlock* pblock, CDiskBl
23672450
return state.Invalid(error("ProcessBlock() : already have block (orphan) %s", hash.ToString()), 0, "duplicate");
23682451

23692452
// Preliminary checks
2370-
if (!CheckBlock(*pblock, state)) {
2371-
if (state.CorruptionPossible())
2372-
mapAlreadyAskedFor.erase(CInv(MSG_BLOCK, hash));
2453+
if (!CheckBlock(*pblock, state))
23732454
return error("ProcessBlock() : CheckBlock FAILED");
2374-
}
23752455

23762456
CBlockIndex* pcheckpoint = Checkpoints::GetLastCheckpoint(mapBlockIndex);
23772457
if (pcheckpoint && pblock->hashPrevBlock != (chainActive.Tip() ? chainActive.Tip()->GetBlockHash() : uint256(0)))
@@ -3223,7 +3303,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
32233303
return true;
32243304
}
32253305

3226-
3306+
State(pfrom->GetId())->nLastBlockProcess = GetTimeMicros();
32273307

32283308

32293309

@@ -3426,15 +3506,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
34263506
return error("message inv size() = %"PRIszu"", vInv.size());
34273507
}
34283508

3429-
// find last block in inv vector
3430-
unsigned int nLastBlock = (unsigned int)(-1);
3431-
for (unsigned int nInv = 0; nInv < vInv.size(); nInv++) {
3432-
if (vInv[vInv.size() - 1 - nInv].type == MSG_BLOCK) {
3433-
nLastBlock = vInv.size() - 1 - nInv;
3434-
break;
3435-
}
3436-
}
3437-
34383509
LOCK(cs_main);
34393510

34403511
for (unsigned int nInv = 0; nInv < vInv.size(); nInv++)
@@ -3448,17 +3519,14 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
34483519
LogPrint("net", " got inventory: %s %s\n", inv.ToString(), fAlreadyHave ? "have" : "new");
34493520

34503521
if (!fAlreadyHave) {
3451-
if (!fImporting && !fReindex)
3452-
pfrom->AskFor(inv);
3522+
if (!fImporting && !fReindex) {
3523+
if (inv.type == MSG_BLOCK)
3524+
AddBlockToQueue(pfrom->GetId(), inv.hash);
3525+
else
3526+
pfrom->AskFor(inv);
3527+
}
34533528
} else if (inv.type == MSG_BLOCK && mapOrphanBlocks.count(inv.hash)) {
34543529
PushGetBlocks(pfrom, chainActive.Tip(), GetOrphanRoot(inv.hash));
3455-
} else if (nInv == nLastBlock) {
3456-
// In case we are on a very long side-chain, it is possible that we already have
3457-
// the last block in an inv bundle sent in response to getblocks. Try to detect
3458-
// this situation and push another getblocks to continue.
3459-
PushGetBlocks(pfrom, mapBlockIndex[inv.hash], uint256(0));
3460-
if (fDebug)
3461-
LogPrintf("force request: %s\n", inv.ToString());
34623530
}
34633531

34643532
// Track requests for our stuff
@@ -3665,6 +3733,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
36653733
LOCK(cs_main);
36663734
// Remember who we got this block from.
36673735
mapBlockSource[inv.hash] = pfrom->GetId();
3736+
MarkBlockAsReceived(inv.hash, pfrom->GetId());
36683737

36693738
CValidationState state;
36703739
ProcessBlock(state, pfrom, &block);
@@ -4192,12 +4261,38 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
41924261
pto->PushMessage("inv", vInv);
41934262

41944263

4264+
// Detect stalled peers. Require that blocks are in flight, we haven't
4265+
// received a (requested) block in one minute, and that all blocks are
4266+
// in flight for over two minutes, since we first had a chance to
4267+
// process an incoming block.
4268+
int64_t nNow = GetTimeMicros();
4269+
if (!pto->fDisconnect && state.nBlocksInFlight &&
4270+
state.nLastBlockReceive < state.nLastBlockProcess - BLOCK_DOWNLOAD_TIMEOUT*1000000 &&
4271+
state.vBlocksInFlight.front().nTime < state.nLastBlockProcess - 2*BLOCK_DOWNLOAD_TIMEOUT*1000000) {
4272+
LogPrintf("Peer %s is stalling block download, disconnecting\n", state.name.c_str());
4273+
pto->fDisconnect = true;
4274+
}
4275+
41954276
//
4196-
// Message: getdata
4277+
// Message: getdata (blocks)
41974278
//
41984279
vector<CInv> vGetData;
4199-
int64_t nNow = GetTime() * 1000000;
4200-
while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
4280+
while (!pto->fDisconnect && state.nBlocksToDownload && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
4281+
uint256 hash = state.vBlocksToDownload.front();
4282+
vGetData.push_back(CInv(MSG_BLOCK, hash));
4283+
MarkBlockAsInFlight(pto->GetId(), hash);
4284+
LogPrint("net", "Requesting block %s from %s\n", hash.ToString().c_str(), state.name.c_str());
4285+
if (vGetData.size() >= 1000)
4286+
{
4287+
pto->PushMessage("getdata", vGetData);
4288+
vGetData.clear();
4289+
}
4290+
}
4291+
4292+
//
4293+
// Message: getdata (non-blocks)
4294+
//
4295+
while (!pto->fDisconnect && !pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
42014296
{
42024297
const CInv& inv = (*pto->mapAskFor.begin()).second;
42034298
if (!AlreadyHave(inv))

src/main.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ static const int COINBASE_MATURITY = 100;
5959
static const unsigned int LOCKTIME_THRESHOLD = 500000000; // Tue Nov 5 00:53:20 1985 UTC
6060
/** Maximum number of script-checking threads allowed */
6161
static const int MAX_SCRIPTCHECK_THREADS = 16;
62+
/** Number of blocks that can be requested at any given time from a single peer. */
63+
static const int MAX_BLOCKS_IN_TRANSIT_PER_PEER = 128;
64+
/** Timeout in seconds before considering a block download peer unresponsive. */
65+
static const unsigned int BLOCK_DOWNLOAD_TIMEOUT = 60;
66+
6267
#ifdef USE_UPNP
6368
static const int fHaveUPnP = true;
6469
#else
@@ -182,6 +187,9 @@ bool VerifySignature(const CCoins& txFrom, const CTransaction& txTo, unsigned in
182187
bool AbortNode(const std::string &msg);
183188
/** Get statistics from node state */
184189
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats);
190+
/** Increase a node's misbehavior score. */
191+
void Misbehaving(NodeId nodeid, int howmuch);
192+
185193

186194
/** (try to) add transaction to memory pool **/
187195
bool AcceptToMemoryPool(CTxMemPool& pool, CValidationState &state, const CTransaction &tx, bool fLimitFree,

src/net.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ class CNode
427427
LogPrint("net", "askfor %s %"PRId64" (%s)\n", inv.ToString().c_str(), nRequestTime, DateTimeStrFormat("%H:%M:%S", nRequestTime/1000000).c_str());
428428

429429
// Make sure not to reuse time indexes to keep things in the same order
430-
int64_t nNow = (GetTime() - 1) * 1000000;
430+
int64_t nNow = GetTimeMicros() - 1000000;
431431
static int64_t nLastTime;
432432
++nLastTime;
433433
nNow = std::max(nNow, nLastTime);

src/test/DoS_tests.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
// Tests this internal-to-main.cpp method:
2222
extern bool AddOrphanTx(const CTransaction& tx);
2323
extern unsigned int LimitOrphanTxSize(unsigned int nMaxOrphans);
24-
extern void Misbehaving(NodeId nodeid, int howmuch);
2524
extern std::map<uint256, CTransaction> mapOrphanTransactions;
2625
extern std::map<uint256, std::set<uint256> > mapOrphanTransactionsByPrev;
2726

0 commit comments

Comments
 (0)