Skip to content

Commit 644dcd5

Browse files
committed
Disallow repeat header tasks during sync
1 parent 39bec5a commit 644dcd5

File tree

5 files changed

+41
-13
lines changed

5 files changed

+41
-13
lines changed

eth/chains/base.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,10 @@ def validate_uncles(self, block: BaseBlock) -> None:
295295

296296
@abstractmethod
297297
def validate_chain(
298-
self, chain: Tuple[BlockHeader, ...], seal_check_random_sample_rate: int = 1) -> None:
298+
self,
299+
parent: BlockHeader,
300+
chain: Tuple[BlockHeader, ...],
301+
seal_check_random_sample_rate: int = 1) -> None:
299302
raise NotImplementedError("Chain classes must implement this method")
300303

301304

@@ -868,5 +871,8 @@ async def coro_import_block(self,
868871
raise NotImplementedError()
869872

870873
async def coro_validate_chain(
871-
self, chain: Tuple[BlockHeader, ...], seal_check_random_sample_rate: int = 1) -> None:
874+
self,
875+
parent: BlockHeader,
876+
chain: Tuple[BlockHeader, ...],
877+
seal_check_random_sample_rate: int = 1) -> None:
872878
raise NotImplementedError()

trinity/chains/light.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,11 @@ def validate_seal(self, header: BlockHeader) -> None:
225225
def validate_uncles(self, block: BaseBlock) -> None:
226226
raise NotImplementedError("Chain classes must implement " + inspect.stack()[0][3])
227227

228-
def validate_chain(self, chain: Tuple[BlockHeader, ...], seal_check_frequency: int = 1) -> None:
228+
def validate_chain(
229+
self,
230+
parent: BlockHeader,
231+
chain: Tuple[BlockHeader, ...],
232+
seal_check_random_sample_rate: int = 1) -> None:
229233
raise NotImplementedError("Chain classes must implement " + inspect.stack()[0][3])
230234

231235
#

trinity/sync/common/chain.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import asyncio
22
from abc import abstractmethod
3+
from operator import attrgetter
34
from typing import (
45
AsyncGenerator,
5-
Set,
66
Tuple,
77
Union,
88
cast,
@@ -81,7 +81,7 @@ def __init__(self,
8181
# pending queue size should be big enough to avoid starving the processing consumers, but
8282
# small enough to avoid wasteful over-requests before post-processing can happen
8383
max_pending_headers = ETHPeer.max_headers_fetch * 8
84-
self.header_queue = TaskQueue(max_pending_headers, lambda header: header.block_number)
84+
self.header_queue = TaskQueue(max_pending_headers, attrgetter('block_number'))
8585

8686
@property
8787
def msg_queue_maxsize(self) -> int:
@@ -168,7 +168,7 @@ async def _sync(self, peer: HeaderRequestingPeer) -> None:
168168
return
169169

170170
self.logger.info("Starting sync with %s", peer)
171-
last_received_header = None
171+
last_received_header: BlockHeader = None
172172
# When we start the sync with a peer, we always request up to MAX_REORG_DEPTH extra
173173
# headers before our current head's number, in case there were chain reorgs since the last
174174
# time _sync() was called. All of the extra headers that are already present in our DB
@@ -238,10 +238,21 @@ async def _sync(self, peer: HeaderRequestingPeer) -> None:
238238
# Setting the latest header hash for the peer, before queuing header processing tasks
239239
self._target_header_hash = peer.head_hash
240240

241-
await self.header_queue.add(headers)
241+
unrequested_headers = tuple(h for h in headers if h not in self.header_queue)
242+
await self.header_queue.add(unrequested_headers)
242243
last_received_header = headers[-1]
243244
start_at = last_received_header.block_number + 1
244245

246+
# erase any pending tasks, to restart on next _sync() run
247+
try:
248+
batch_id, pending_tasks = self.header_queue.get_nowait()
249+
except asyncio.QueueFull:
250+
# nothing pending, continue
251+
pass
252+
else:
253+
# fully remove pending tasks from queue
254+
self.header_queue.complete(batch_id, pending_tasks)
255+
245256
async def _fetch_missing_headers(
246257
self, peer: HeaderRequestingPeer, start_at: int) -> Tuple[BlockHeader, ...]:
247258
"""Fetch a batch of headers starting at start_at and return the ones we're missing."""

trinity/sync/full/chain.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
from concurrent.futures import CancelledError
23
import math
34
import operator
45
from typing import (
@@ -90,9 +91,9 @@ async def _load_and_process_headers(self) -> None:
9091
# in independent loops
9192
# TODO implement the maximum task size at each step instead of this magic number
9293
max_headers = min((MAX_BODIES_FETCH, MAX_RECEIPTS_FETCH)) * 4
93-
batch, headers = await self.header_queue.get(max_headers)
94+
batch_id, headers = await self.header_queue.get(max_headers)
9495
await self._process_headers(headers)
95-
self.header_queue.complete(batch, headers)
96+
self.header_queue.complete(batch_id, headers)
9697

9798
async def _calculate_td(self, headers: Tuple[BlockHeader, ...]) -> int:
9899
"""Return the score (total difficulty) of the last header in the given list.
@@ -215,8 +216,11 @@ async def _get_block_bodies(self,
215216
"Timed out requesting block bodies for %d headers from %s", len(batch), peer,
216217
)
217218
return tuple(), batch
219+
except CancelledError:
220+
self.logger.debug("Pending block bodies call to %r future cancelled", peer)
221+
return tuple(), batch
218222
except OperationCancelled:
219-
self.logger.trace("Pending block bodies call to %r cancelled", peer)
223+
self.logger.trace("Pending block bodies call to %r operation cancelled", peer)
220224
return tuple(), batch
221225
except PeerConnectionLost:
222226
self.logger.debug("Peer went away, cancelling the block body request and moving on...")
@@ -317,8 +321,11 @@ async def _get_receipts(self,
317321
"Timed out requesting receipts for %d headers from %s", len(batch), peer,
318322
)
319323
return tuple(), batch
324+
except CancelledError:
325+
self.logger.debug("Pending receipts call to %r future cancelled", peer)
326+
return tuple(), batch
320327
except OperationCancelled:
321-
self.logger.trace("Pending receipts call to %r cancelled", peer)
328+
self.logger.trace("Pending receipts call to %r operation cancelled", peer)
322329
return tuple(), batch
323330
except PeerConnectionLost:
324331
self.logger.debug("Peer went away, cancelling the receipts request and moving on...")

trinity/sync/light/chain.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ async def _handle_get_block_headers(self, peer: LESPeer, msg: Dict[str, Any]) ->
6363

6464
async def _persist_headers(self) -> None:
6565
while self.is_operational:
66-
batch, headers = await self.wait(self.header_queue.get())
66+
batch_id, headers = await self.wait(self.header_queue.get())
6767

6868
timer = Timer()
6969
for header in headers:
@@ -74,4 +74,4 @@ async def _persist_headers(self) -> None:
7474
"Imported %d headers in %0.2f seconds, new head: #%d",
7575
len(headers), timer.elapsed, head.block_number)
7676

77-
self.header_queue.complete(batch, headers)
77+
self.header_queue.complete(batch_id, headers)

0 commit comments

Comments
 (0)