@@ -44,22 +44,12 @@ using namespace bc::network;
4444using namespace std ::chrono;
4545using namespace std ::placeholders;
4646
47- static constexpr auto perpetual_timer = true ;
48-
49- static inline uint32_t get_poll_seconds (const node::settings& settings)
50- {
51- // Set 136 years as equivalent to "never" if configured to disable.
52- const auto value = settings.block_poll_seconds ;
53- return value == 0 ? max_uint32 : value;
54- }
55-
5647protocol_block_in::protocol_block_in (full_node& node, channel::ptr channel,
5748 safe_chain& chain)
58- : protocol_timer(node, channel, perpetual_timer , NAME),
49+ : protocol_timer(node, channel, false , NAME),
5950 node_ (node),
6051 chain_(chain),
61- last_locator_top_(null_hash),
62- block_poll_seconds_(get_poll_seconds(node.node_settings())),
52+ block_latency_(node.node_settings().block_latency()),
6353
6454 // TODO: move send_headers to a derived class protocol_block_in_70012.
6555 headers_from_peer_(negotiated_version() >= version::level::bip130),
@@ -77,9 +67,8 @@ protocol_block_in::protocol_block_in(full_node& node, channel::ptr channel,
7767
7868void protocol_block_in::start ()
7969{
80- // Use perpetual protocol timer to prevent stall (our heartbeat).
81- protocol_timer::start (asio::seconds (block_poll_seconds_),
82- BIND1 (get_block_inventory, _1));
70+ // Use timer to drop slow peers.
71+ protocol_timer::start (block_latency_, BIND1 (handle_timeout, _1));
8372
8473 // TODO: move headers to a derived class protocol_block_in_31800.
8574 SUBSCRIBE2 (headers, handle_receive_headers, _1, _2);
@@ -93,59 +82,18 @@ void protocol_block_in::start()
9382 if (headers_from_peer_)
9483 {
9584 // Ask peer to send headers vs. inventory block announcements.
96- SEND2 (send_headers () , handle_send, _1, send_headers::command);
85+ SEND2 (send_headers{} , handle_send, _1, send_headers::command);
9786 }
9887
99- // Send initial get_[blocks|headers] message by simulating first heartbeat.
100- set_event (error::success);
88+ send_get_blocks (null_hash);
10189}
10290
10391// Send get_[headers|blocks] sequence.
10492// -----------------------------------------------------------------------------
10593
106- // This is fired by the callback (i.e. base timer and stop handler).
107- void protocol_block_in::get_block_inventory (const code& ec)
108- {
109- if (stopped (ec))
110- {
111- // This may get called more than once per stop.
112- handle_stop (ec);
113- return ;
114- }
115-
116- // Since we need blocks do not stay connected to peer in bad version range.
117- if (!blocks_from_peer_)
118- {
119- stop (ec);
120- return ;
121- }
122-
123- if (ec && ec != error::channel_timeout)
124- {
125- LOG_DEBUG (LOG_NODE)
126- << " Failure in block timer for [" << authority () << " ] "
127- << ec.message ();
128- stop (ec);
129- return ;
130- }
131-
132- send_get_blocks (null_hash);
133- }
134-
13594void protocol_block_in::send_get_blocks (const hash_digest& stop_hash)
13695{
137- const auto chain_top = node_.top_block ();
138- const auto & chain_top_hash = chain_top.hash ();
139- const auto last_locator_top = last_locator_top_.load ();
140-
141- // Avoid requesting from the same start as last request to this peer.
142- // This does not guarantee prevention, it's just an optimization.
143- // If the peer does not respond to the previous request this will stall
144- // unless a block announcement is connected or another channel advances.
145- if (chain_top_hash != null_hash && chain_top_hash == last_locator_top)
146- return ;
147-
148- const auto heights = block::locator_heights (chain_top.height ());
96+ const auto heights = block::locator_heights (node_.top_block ().height ());
14997
15098 chain_.fetch_block_locator (heights,
15199 BIND3 (handle_fetch_block_locator, _1, _2, stop_hash));
@@ -189,8 +137,6 @@ void protocol_block_in::handle_fetch_block_locator(const code& ec,
189137 << encode_hash (stop_hash) << " ]" ;
190138 }
191139
192- // Save the locator top to prevent a redundant future request.
193- last_locator_top_.store (last_hash);
194140 message->set_stop_hash (stop_hash);
195141
196142 if (use_headers)
@@ -262,6 +208,25 @@ void protocol_block_in::send_get_data(const code& ec, get_data_ptr message)
262208 if (message->inventories ().empty ())
263209 return ;
264210
211+ // /////////////////////////////////////////////////////////////////////////
212+ // Critical Section
213+ mutex.lock_upgrade ();
214+ const auto fresh = backlog_.empty ();
215+ mutex.unlock_upgrade_and_lock ();
216+ // +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
217+
218+ // Enqueue the block inventory behind the preceding block inventory.
219+ for (const auto & inventory: message->inventories ())
220+ if (inventory.type () == inventory::type_id::block)
221+ backlog_.push (inventory.hash ());
222+
223+ mutex.unlock ();
224+ // /////////////////////////////////////////////////////////////////////////
225+
226+ // There was no backlog so the timer must be started now.
227+ if (fresh)
228+ reset_timer ();
229+
265230 // inventory|headers->get_data[blocks]
266231 SEND2 (*message, handle_send, _1, message->command );
267232}
@@ -288,15 +253,19 @@ bool protocol_block_in::handle_receive_not_found(const code& ec,
288253 hash_list hashes;
289254 message->to_hashes (hashes, inventory::type_id::block);
290255
291- // The peer cannot locate a block that it told us it had.
292- // This only results from reorganization assuming peer is proper.
293- for (const auto hash: hashes)
256+ for (const auto & hash: hashes)
294257 {
295258 LOG_DEBUG (LOG_NODE)
296259 << " Block not_found [" << encode_hash (hash) << " ] from ["
297260 << authority () << " ]" ;
298261 }
299262
263+ // The peer cannot locate one or more blocks that it told us it had.
264+ // This only results from reorganization assuming peer is proper.
265+ // Drop the peer so next channgel generates a new locator and backlog.
266+ if (!hashes.empty ())
267+ stop (error::channel_stopped);
268+
300269 return true ;
301270}
302271
@@ -309,16 +278,50 @@ bool protocol_block_in::handle_receive_block(const code& ec,
309278 if (stopped (ec))
310279 return false ;
311280
312- // Reset the timer because we just received a block from this peer.
313- // Once we are at the top this will end up polling the peer.
314- reset_timer ();
281+ // /////////////////////////////////////////////////////////////////////////
282+ // Critical Section
283+ mutex.lock ();
284+
285+ auto matched = !backlog_.empty () && backlog_.front () == message->hash ();
286+
287+ if (matched)
288+ backlog_.pop ();
289+
290+ // Empty after pop means we need to make a new request.
291+ const auto cleared = backlog_.empty ();
292+
293+ mutex.unlock ();
294+ // /////////////////////////////////////////////////////////////////////////
295+
296+ // It is common for block announcements to cause block requests to be sent
297+ // out of backlog order due to interleaving of threads. This results in
298+ // channel drops during initial block download but not after sync. The
299+ // resolution to this issue is use of headers-first sync, but short of that
300+ // the current implementation performs well and drops peers no more
301+ // frequently than block announcements occur during initial block download,
302+ // and not typically after it is complete.
303+ if (!matched)
304+ {
305+ LOG_DEBUG (LOG_NODE)
306+ << " Block [" << encode_hash (message->hash ())
307+ << " ] unexpected or out of order from [" << authority () << " ]" ;
308+ stop (error::channel_stopped);
309+ return false ;
310+ }
315311
316312 message->validation .originator = nonce ();
317313 chain_.organize (message, BIND2 (handle_store_block, _1, message));
314+
315+ // Sending a new request will reset the timer as necessary.
316+ if (cleared)
317+ send_get_blocks (null_hash);
318+ else
319+ reset_timer ();
320+
318321 return true ;
319322}
320323
321- // The transaction has been saved to the block chain (or not).
324+ // The block has been saved to the block chain (or not).
322325// This will be picked up by subscription in block_out and will cause the block
323326// to be announced to non-originating peers.
324327void protocol_block_in::handle_store_block (const code& ec,
@@ -372,6 +375,49 @@ void protocol_block_in::handle_store_block(const code& ec,
372375// Subscription.
373376// -----------------------------------------------------------------------------
374377
378+ // This is fired by the callback (i.e. base timer and stop handler).
379+ void protocol_block_in::handle_timeout (const code& ec)
380+ {
381+ if (stopped (ec))
382+ {
383+ // This may get called more than once per stop.
384+ handle_stop (ec);
385+ return ;
386+ }
387+
388+ // Since we need blocks do not stay connected to peer in bad version range.
389+ if (!blocks_from_peer_)
390+ {
391+ stop (error::channel_stopped);
392+ return ;
393+ }
394+
395+ if (ec && ec != error::channel_timeout)
396+ {
397+ LOG_DEBUG (LOG_NODE)
398+ << " Failure in block timer for [" << authority () << " ] "
399+ << ec.message ();
400+ stop (ec);
401+ return ;
402+ }
403+
404+ // /////////////////////////////////////////////////////////////////////////
405+ // Critical Section
406+ mutex.lock_shared ();
407+ const auto backlog_empty = backlog_.empty ();
408+ mutex.unlock_shared ();
409+ // /////////////////////////////////////////////////////////////////////////
410+
411+ // Can only end up here if time was not extended.
412+ if (!backlog_empty)
413+ {
414+ LOG_DEBUG (LOG_NODE)
415+ << " Peer [" << authority ()
416+ << " ] exceeded configured block latency." ;
417+ stop (ec);
418+ }
419+ }
420+
375421void protocol_block_in::handle_stop (const code&)
376422{
377423 LOG_DEBUG (LOG_NETWORK)
0 commit comments