@@ -111,6 +111,16 @@ uint32_t nBlockSequenceId = 1;
111
111
// Sources of received blocks, to be able to send them reject messages or ban
112
112
// them, if processing happens afterwards. Protected by cs_main.
113
113
map<uint256, NodeId> mapBlockSource;
114
+
115
+ // Blocks that are in flight, and that are in the queue to be downloaded.
116
+ // Protected by cs_main.
117
+ struct QueuedBlock {
118
+ uint256 hash;
119
+ int64_t nTime; // Time of "getdata" request in microseconds.
120
+ int nQueuedBefore; // Number of blocks in flight at the time of request.
121
+ };
122
+ map<uint256, pair<NodeId, list<QueuedBlock>::iterator> > mapBlocksInFlight;
123
+ map<uint256, pair<NodeId, list<uint256>::iterator> > mapBlocksToDownload;
114
124
}
115
125
116
126
// ////////////////////////////////////////////////////////////////////////////
@@ -194,10 +204,20 @@ struct CNodeState {
194
204
std::string name;
195
205
// List of asynchronously-determined block rejections to notify this peer about.
196
206
std::vector<CBlockReject> rejects;
207
+ list<QueuedBlock> vBlocksInFlight;
208
+ int nBlocksInFlight;
209
+ list<uint256> vBlocksToDownload;
210
+ int nBlocksToDownload;
211
+ int64_t nLastBlockReceive;
212
+ int64_t nLastBlockProcess;
197
213
198
214
CNodeState () {
199
215
nMisbehavior = 0 ;
200
216
fShouldBan = false ;
217
+ nBlocksToDownload = 0 ;
218
+ nBlocksInFlight = 0 ;
219
+ nLastBlockReceive = 0 ;
220
+ nLastBlockProcess = 0 ;
201
221
}
202
222
};
203
223
@@ -226,8 +246,71 @@ void InitializeNode(NodeId nodeid, const CNode *pnode) {
226
246
227
247
void FinalizeNode (NodeId nodeid) {
228
248
LOCK (cs_main);
249
+ CNodeState *state = State (nodeid);
250
+
251
+ BOOST_FOREACH (const QueuedBlock& entry, state->vBlocksInFlight )
252
+ mapBlocksInFlight.erase (entry.hash );
253
+ BOOST_FOREACH (const uint256& hash, state->vBlocksToDownload )
254
+ mapBlocksToDownload.erase (hash);
255
+
229
256
mapNodeState.erase (nodeid);
230
257
}
258
+
259
+ // Requires cs_main.
260
+ void MarkBlockAsReceived (const uint256 &hash, NodeId nodeFrom = -1 ) {
261
+ map<uint256, pair<NodeId, list<uint256>::iterator> >::iterator itToDownload = mapBlocksToDownload.find (hash);
262
+ if (itToDownload != mapBlocksToDownload.end ()) {
263
+ CNodeState *state = State (itToDownload->second .first );
264
+ state->vBlocksToDownload .erase (itToDownload->second .second );
265
+ state->nBlocksToDownload --;
266
+ mapBlocksToDownload.erase (itToDownload);
267
+ }
268
+
269
+ map<uint256, pair<NodeId, list<QueuedBlock>::iterator> >::iterator itInFlight = mapBlocksInFlight.find (hash);
270
+ if (itInFlight != mapBlocksInFlight.end ()) {
271
+ CNodeState *state = State (itInFlight->second .first );
272
+ state->vBlocksInFlight .erase (itInFlight->second .second );
273
+ state->nBlocksInFlight --;
274
+ if (itInFlight->second .first == nodeFrom)
275
+ state->nLastBlockReceive = GetTimeMicros ();
276
+ mapBlocksInFlight.erase (itInFlight);
277
+ }
278
+
279
+ }
280
+
281
+ // Requires cs_main.
282
+ bool AddBlockToQueue (NodeId nodeid, const uint256 &hash) {
283
+ if (mapBlocksToDownload.count (hash) || mapBlocksInFlight.count (hash))
284
+ return false ;
285
+
286
+ CNodeState *state = State (nodeid);
287
+ if (state == NULL )
288
+ return false ;
289
+
290
+ list<uint256>::iterator it = state->vBlocksToDownload .insert (state->vBlocksToDownload .end (), hash);
291
+ state->nBlocksToDownload ++;
292
+ if (state->nBlocksToDownload > 5000 )
293
+ Misbehaving (nodeid, 10 );
294
+ mapBlocksToDownload[hash] = std::make_pair (nodeid, it);
295
+ return true ;
296
+ }
297
+
298
+ // Requires cs_main.
299
+ void MarkBlockAsInFlight (NodeId nodeid, const uint256 &hash) {
300
+ CNodeState *state = State (nodeid);
301
+ assert (state != NULL );
302
+
303
+ // Make sure it's not listed somewhere already.
304
+ MarkBlockAsReceived (hash);
305
+
306
+ QueuedBlock newentry = {hash, GetTimeMicros (), state->nBlocksInFlight };
307
+ if (state->nBlocksInFlight == 0 )
308
+ state->nLastBlockReceive = newentry.nTime ; // Reset when a first request is sent.
309
+ list<QueuedBlock>::iterator it = state->vBlocksInFlight .insert (state->vBlocksInFlight .end (), newentry);
310
+ state->nBlocksInFlight ++;
311
+ mapBlocksInFlight[hash] = std::make_pair (nodeid, it);
312
+ }
313
+
231
314
}
232
315
233
316
bool GetNodeStateStats (NodeId nodeid, CNodeStateStats &stats) {
@@ -1310,6 +1393,7 @@ void CheckForkWarningConditionsOnNewFork(CBlockIndex* pindexNewForkTip)
1310
1393
CheckForkWarningConditions ();
1311
1394
}
1312
1395
1396
+ // Requires cs_main.
1313
1397
void Misbehaving (NodeId pnode, int howmuch)
1314
1398
{
1315
1399
if (howmuch == 0 )
@@ -2049,7 +2133,6 @@ bool AddToBlockIndex(CBlock& block, CValidationState& state, const CDiskBlockPos
2049
2133
pindexNew->nSequenceId = nBlockSequenceId++;
2050
2134
}
2051
2135
assert (pindexNew);
2052
- mapAlreadyAskedFor.erase (CInv (MSG_BLOCK, hash));
2053
2136
map<uint256, CBlockIndex*>::iterator mi = mapBlockIndex.insert (make_pair (hash, pindexNew)).first ;
2054
2137
pindexNew->phashBlock = &((*mi).first );
2055
2138
map<uint256, CBlockIndex*>::iterator miPrev = mapBlockIndex.find (block.hashPrevBlock );
@@ -2400,11 +2483,8 @@ bool ProcessBlock(CValidationState &state, CNode* pfrom, CBlock* pblock, CDiskBl
2400
2483
return state.Invalid (error (" ProcessBlock() : already have block (orphan) %s" , hash.ToString ()), 0 , " duplicate" );
2401
2484
2402
2485
// Preliminary checks
2403
- if (!CheckBlock (*pblock, state)) {
2404
- if (state.CorruptionPossible ())
2405
- mapAlreadyAskedFor.erase (CInv (MSG_BLOCK, hash));
2486
+ if (!CheckBlock (*pblock, state))
2406
2487
return error (" ProcessBlock() : CheckBlock FAILED" );
2407
- }
2408
2488
2409
2489
CBlockIndex* pcheckpoint = Checkpoints::GetLastCheckpoint (mapBlockIndex);
2410
2490
if (pcheckpoint && pblock->hashPrevBlock != (chainActive.Tip () ? chainActive.Tip ()->GetBlockHash () : uint256 (0 )))
@@ -3274,7 +3354,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
3274
3354
return true ;
3275
3355
}
3276
3356
3277
-
3357
+ State (pfrom-> GetId ())-> nLastBlockProcess = GetTimeMicros ();
3278
3358
3279
3359
3280
3360
@@ -3477,15 +3557,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
3477
3557
return error (" message inv size() = %" PRIszu" " , vInv.size ());
3478
3558
}
3479
3559
3480
- // find last block in inv vector
3481
- unsigned int nLastBlock = (unsigned int )(-1 );
3482
- for (unsigned int nInv = 0 ; nInv < vInv.size (); nInv++) {
3483
- if (vInv[vInv.size () - 1 - nInv].type == MSG_BLOCK) {
3484
- nLastBlock = vInv.size () - 1 - nInv;
3485
- break ;
3486
- }
3487
- }
3488
-
3489
3560
LOCK (cs_main);
3490
3561
3491
3562
for (unsigned int nInv = 0 ; nInv < vInv.size (); nInv++)
@@ -3499,17 +3570,14 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
3499
3570
LogPrint (" net" , " got inventory: %s %s\n " , inv.ToString (), fAlreadyHave ? " have" : " new" );
3500
3571
3501
3572
if (!fAlreadyHave ) {
3502
- if (!fImporting && !fReindex )
3503
- pfrom->AskFor (inv);
3573
+ if (!fImporting && !fReindex ) {
3574
+ if (inv.type == MSG_BLOCK)
3575
+ AddBlockToQueue (pfrom->GetId (), inv.hash );
3576
+ else
3577
+ pfrom->AskFor (inv);
3578
+ }
3504
3579
} else if (inv.type == MSG_BLOCK && mapOrphanBlocks.count (inv.hash )) {
3505
3580
PushGetBlocks (pfrom, chainActive.Tip (), GetOrphanRoot (inv.hash ));
3506
- } else if (nInv == nLastBlock) {
3507
- // In case we are on a very long side-chain, it is possible that we already have
3508
- // the last block in an inv bundle sent in response to getblocks. Try to detect
3509
- // this situation and push another getblocks to continue.
3510
- PushGetBlocks (pfrom, mapBlockIndex[inv.hash ], uint256 (0 ));
3511
- if (fDebug )
3512
- LogPrintf (" force request: %s\n " , inv.ToString ());
3513
3581
}
3514
3582
3515
3583
// Track requests for our stuff
@@ -3716,6 +3784,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
3716
3784
LOCK (cs_main);
3717
3785
// Remember who we got this block from.
3718
3786
mapBlockSource[inv.hash ] = pfrom->GetId ();
3787
+ MarkBlockAsReceived (inv.hash , pfrom->GetId ());
3719
3788
3720
3789
CValidationState state;
3721
3790
ProcessBlock (state, pfrom, &block);
@@ -4243,12 +4312,38 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
4243
4312
pto->PushMessage (" inv" , vInv);
4244
4313
4245
4314
4315
+ // Detect stalled peers. Require that blocks are in flight, we haven't
4316
+ // received a (requested) block in one minute, and that all blocks are
4317
+ // in flight for over two minutes, since we first had a chance to
4318
+ // process an incoming block.
4319
+ int64_t nNow = GetTimeMicros ();
4320
+ if (!pto->fDisconnect && state.nBlocksInFlight &&
4321
+ state.nLastBlockReceive < state.nLastBlockProcess - BLOCK_DOWNLOAD_TIMEOUT*1000000 &&
4322
+ state.vBlocksInFlight .front ().nTime < state.nLastBlockProcess - 2 *BLOCK_DOWNLOAD_TIMEOUT*1000000 ) {
4323
+ LogPrintf (" Peer %s is stalling block download, disconnecting\n " , state.name .c_str ());
4324
+ pto->fDisconnect = true ;
4325
+ }
4326
+
4246
4327
//
4247
- // Message: getdata
4328
+ // Message: getdata (blocks)
4248
4329
//
4249
4330
vector<CInv> vGetData;
4250
- int64_t nNow = GetTime () * 1000000 ;
4251
- while (!pto->mapAskFor .empty () && (*pto->mapAskFor .begin ()).first <= nNow)
4331
+ while (!pto->fDisconnect && state.nBlocksToDownload && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
4332
+ uint256 hash = state.vBlocksToDownload .front ();
4333
+ vGetData.push_back (CInv (MSG_BLOCK, hash));
4334
+ MarkBlockAsInFlight (pto->GetId (), hash);
4335
+ LogPrint (" net" , " Requesting block %s from %s\n " , hash.ToString ().c_str (), state.name .c_str ());
4336
+ if (vGetData.size () >= 1000 )
4337
+ {
4338
+ pto->PushMessage (" getdata" , vGetData);
4339
+ vGetData.clear ();
4340
+ }
4341
+ }
4342
+
4343
+ //
4344
+ // Message: getdata (non-blocks)
4345
+ //
4346
+ while (!pto->fDisconnect && !pto->mapAskFor .empty () && (*pto->mapAskFor .begin ()).first <= nNow)
4252
4347
{
4253
4348
const CInv& inv = (*pto->mapAskFor .begin ()).second ;
4254
4349
if (!AlreadyHave (inv))
0 commit comments