Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.

Commit 127ecb7

Browse files
committed
Queue headers for processing, instead of batching
1 parent fcf73d0 commit 127ecb7

File tree

6 files changed

+107
-69
lines changed

6 files changed

+107
-69
lines changed

tests/p2p/test_service.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,16 @@ async def _run(self):
2626
async def test_daemon_exit_causes_parent_cancellation():
2727
service = ParentService()
2828
asyncio.ensure_future(service.run())
29+
2930
await asyncio.sleep(0.01)
31+
3032
assert service.daemon.is_operational
3133
assert service.daemon.is_running
34+
3235
await service.daemon.cancel()
3336
await asyncio.sleep(0.01)
37+
3438
assert not service.is_operational
3539
assert not service.is_running
36-
await service.events.cleaned_up.wait()
40+
41+
await asyncio.wait_for(service.events.cleaned_up.wait(), timeout=1)

trinity/protocol/eth/peer.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525

2626

2727
class ETHPeer(BasePeer):
28+
max_headers_fetch = constants.MAX_HEADERS_FETCH
29+
2830
_supported_sub_protocols = [ETHProtocol]
2931
sub_proto: ETHProtocol = None
3032

@@ -40,10 +42,6 @@ def requests(self) -> ETHExchangeHandler:
4042
self._requests = ETHExchangeHandler(self)
4143
return self._requests
4244

43-
@property
44-
def max_headers_fetch(self) -> int:
45-
return constants.MAX_HEADERS_FETCH
46-
4745
def handle_sub_proto_msg(self, cmd: Command, msg: _DecodedMsgType) -> None:
4846
if isinstance(cmd, NewBlock):
4947
msg = cast(Dict[str, Any], msg)

trinity/protocol/les/peer.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434

3535

3636
class LESPeer(BasePeer):
37+
max_headers_fetch = MAX_HEADERS_FETCH
38+
3739
_supported_sub_protocols = [LESProtocol, LESProtocolV2]
3840
sub_proto: LESProtocol = None
3941
# TODO: This will no longer be needed once we've fixed #891, and then it should be removed.
@@ -51,10 +53,6 @@ def requests(self) -> LESExchangeHandler:
5153
self._requests = LESExchangeHandler(self)
5254
return self._requests
5355

54-
@property
55-
def max_headers_fetch(self) -> int:
56-
return MAX_HEADERS_FETCH
57-
5856
def handle_sub_proto_msg(self, cmd: Command, msg: _DecodedMsgType) -> None:
5957
if isinstance(cmd, Announce):
6058
self.head_info = cmd.as_head_info(msg)

trinity/sync/common/chain.py

Lines changed: 61 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import asyncio
22
from abc import abstractmethod
33
from typing import (
4-
Any,
54
AsyncGenerator,
65
Tuple,
76
Union,
@@ -15,14 +14,16 @@
1514
from eth.exceptions import (
1615
HeaderNotFound,
1716
)
17+
from eth_typing import (
18+
Hash32,
19+
)
1820
from eth_utils import (
1921
ValidationError,
2022
)
2123
from eth.rlp.headers import BlockHeader
2224

2325
from p2p import protocol
2426
from p2p.constants import MAX_REORG_DEPTH, SEAL_CHECK_RANDOM_SAMPLE_RATE
25-
from p2p.exceptions import NoEligiblePeers
2627
from p2p.p2p_proto import DisconnectReason
2728
from p2p.peer import BasePeer, PeerPool, PeerSubscriber
2829
from p2p.service import BaseService
@@ -45,19 +46,26 @@ class BaseHeaderChainSyncer(BaseService, PeerSubscriber):
4546
"""
4647
# We'll only sync if we are connected to at least min_peers_to_sync.
4748
min_peers_to_sync = 1
48-
# Should we exit upon completing a sync with a given peer?
49-
_exit_on_sync_complete = False
49+
# Post-processing steps can exit out of sync (for example, fast sync) by triggering this token:
50+
complete_token = None
5051
# TODO: Instead of a fixed timeout, we should use a variable one that gets adjusted based on
5152
# the round-trip times from our download requests.
5253
_reply_timeout = 60
5354
_seal_check_random_sample_rate = SEAL_CHECK_RANDOM_SAMPLE_RATE
55+
# the latest header hash of the peer on the current sync
56+
_target_header_hash = None
5457

5558
def __init__(self,
5659
chain: AsyncChain,
5760
db: AsyncHeaderDB,
5861
peer_pool: PeerPool,
5962
token: CancelToken = None) -> None:
60-
super().__init__(token)
63+
self.complete_token = CancelToken('trinity.sync.common.BaseHeaderChainSyncer.SyncCompleted')
64+
if token is None:
65+
super_service_token = self.complete_token
66+
else:
67+
super_service_token = token.chain(self.complete_token)
68+
super().__init__(super_service_token)
6169
self.chain = chain
6270
self.db = db
6371
self.peer_pool = peer_pool
@@ -66,13 +74,24 @@ def __init__(self,
6674
self._sync_complete = asyncio.Event()
6775
self._sync_requests: asyncio.Queue[HeaderRequestingPeer] = asyncio.Queue()
6876

77+
# pending queue size should be big enough to avoid starving the processing consumers, but
78+
# small enough to avoid wasteful over-requests before post-processing can happen
79+
max_pending_headers = ETHPeer.max_headers_fetch * 5
80+
self.pending_headers: asyncio.Queue[BlockHeader] = asyncio.Queue(max_pending_headers)
81+
6982
@property
7083
def msg_queue_maxsize(self) -> int:
7184
# This is a rather arbitrary value, but when the sync is operating normally we never see
7285
# the msg queue grow past a few hundred items, so this should be a reasonable limit for
7386
# now.
7487
return 2000
7588

89+
def get_target_header_hash(self) -> Hash32:
90+
if self._target_header_hash is None:
91+
raise ValueError("Cannot check the target hash when there is no active sync")
92+
else:
93+
return self._target_header_hash
94+
7695
def register_peer(self, peer: BasePeer) -> None:
7796
self._sync_requests.put_nowait(cast(HeaderRequestingPeer, self.peer_pool.highest_td_peer))
7897

@@ -99,19 +118,14 @@ async def _run(self) -> None:
99118
self.run_task(self._handle_msg_loop())
100119
with self.subscribe(self.peer_pool):
101120
while self.is_operational:
102-
peer_or_finished: Any = await self.wait_first(
103-
self._sync_requests.get(),
104-
self._sync_complete.wait()
105-
)
106-
107-
# In the case of a fast sync, we return once the sync is completed, and our caller
108-
# must then run the StateDownloader.
109-
if self._sync_complete.is_set():
121+
try:
122+
peer = await self.wait(self._sync_requests.get())
123+
except OperationCancelled:
124+
# In the case of a fast sync, we return once the sync is completed, and our
125+
# caller must then run the StateDownloader.
110126
return
111-
112-
# Since self._sync_complete is not set, peer_or_finished can only be a Peer
113-
# instance.
114-
self.run_task(self.sync(peer_or_finished))
127+
else:
128+
self.run_task(self.sync(peer))
115129

116130
async def sync(self, peer: HeaderRequestingPeer) -> None:
117131
if self._syncing:
@@ -162,7 +176,11 @@ async def _sync(self, peer: HeaderRequestingPeer) -> None:
162176
break
163177

164178
try:
165-
headers = await self._fetch_missing_headers(peer, start_at)
179+
fetch_headers_coro = self._fetch_missing_headers(peer, start_at)
180+
headers = await self.complete_token.cancellable_wait(fetch_headers_coro)
181+
except OperationCancelled:
182+
self.logger.info("Sync with %s completed", peer)
183+
break
166184
except TimeoutError:
167185
self.logger.warn("Timeout waiting for header batch from %s, aborting sync", peer)
168186
await peer.disconnect(DisconnectReason.timeout)
@@ -192,22 +210,21 @@ async def _sync(self, peer: HeaderRequestingPeer) -> None:
192210
except ValidationError as e:
193211
self.logger.warn("Received invalid headers from %s, aborting sync: %s", peer, e)
194212
break
195-
try:
196-
head_number = await self._process_headers(peer, headers)
197-
except NoEligiblePeers:
198-
self.logger.info("No peers have the blocks we want, aborting sync")
199-
break
200-
start_at = head_number + 1
201213

202-
# Quite often the header batch we receive here includes headers past the peer's reported
203-
# head (via the NewBlock msg), so we can't compare our head's hash to the peer's in
204-
# order to see if the sync is completed. Instead we just check that we have the peer's
205-
# head_hash in our chain.
206-
if await self.wait(self.db.coro_header_exists(peer.head_hash)):
207-
self.logger.info("Sync with %s completed", peer)
208-
if self._exit_on_sync_complete:
209-
self._sync_complete.set()
210-
break
214+
# Setting the latest header hash for the peer, before queuing header processing tasks
215+
self._target_header_hash = peer.head_hash
216+
217+
await self._queue_headers_for_processing(headers)
218+
start_at = headers[-1].block_number + 1
219+
220+
async def _queue_headers_for_processing(self, headers: Tuple[BlockHeader, ...]) -> None:
221+
# this block is an optimization to avoid lots of await calls
222+
if len(headers) + self.pending_headers.qsize() <= self.pending_headers.maxsize:
223+
for header in headers:
224+
self.pending_headers.put_nowait(header)
225+
else:
226+
for header in headers:
227+
await self.pending_headers.put(header)
211228

212229
async def _fetch_missing_headers(
213230
self, peer: HeaderRequestingPeer, start_at: int) -> Tuple[BlockHeader, ...]:
@@ -245,12 +262,18 @@ async def get_missing_tail(self: 'BaseHeaderChainSyncer',
245262

246263
return tail_headers
247264

265+
async def pop_all_pending_headers(self) -> Tuple[BlockHeader, ...]:
266+
"""Get all the currently pending headers. If no headers pending, wait until one is"""
267+
queue = self.pending_headers
268+
if queue.empty():
269+
first_header = await queue.get()
270+
else:
271+
first_header = queue.get_nowait()
272+
273+
available = queue.qsize()
274+
return (first_header, ) + tuple(queue.get_nowait() for _ in range(available))
275+
248276
@abstractmethod
249277
async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
250278
msg: protocol._DecodedMsgType) -> None:
251279
raise NotImplementedError("Must be implemented by subclasses")
252-
253-
@abstractmethod
254-
async def _process_headers(
255-
self, peer: HeaderRequestingPeer, headers: Tuple[BlockHeader, ...]) -> int:
256-
raise NotImplementedError("Must be implemented by subclasses")

trinity/sync/full/chain.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ class FastChainSyncer(BaseHeaderChainSyncer):
6464
head.
6565
"""
6666
db: AsyncChainDB
67-
_exit_on_sync_complete = True
6867

6968
subscription_msg_types: Set[Type[Command]] = {
7069
commands.NewBlock,
@@ -78,6 +77,15 @@ class FastChainSyncer(BaseHeaderChainSyncer):
7877
commands.NewBlockHashes,
7978
}
8079

80+
async def _run(self) -> None:
81+
self.run_task(self._load_and_process_headers())
82+
await super()._run()
83+
84+
async def _load_and_process_headers(self) -> None:
85+
while self.is_operational:
86+
headers = await self.pop_all_pending_headers()
87+
await self._process_headers(headers)
88+
8189
async def _calculate_td(self, headers: Tuple[BlockHeader, ...]) -> int:
8290
"""Return the score (total difficulty) of the last header in the given list.
8391
@@ -94,8 +102,7 @@ async def _calculate_td(self, headers: Tuple[BlockHeader, ...]) -> int:
94102
td += header.difficulty
95103
return td
96104

97-
async def _process_headers(
98-
self, peer: HeaderRequestingPeer, headers: Tuple[BlockHeader, ...]) -> int:
105+
async def _process_headers(self, headers: Tuple[BlockHeader, ...]) -> None:
99106
timer = Timer()
100107
target_td = await self._calculate_td(headers)
101108
bodies_by_key = await self._download_block_bodies(target_td, headers)
@@ -123,7 +130,16 @@ async def _process_headers(
123130
self.logger.info(
124131
"Imported %d blocks (%d txs) in %0.2f seconds, new head: #%d",
125132
len(headers), txs, timer.elapsed, head.block_number)
126-
return head.block_number
133+
134+
# during fast sync, exit the service when reaching the target hash
135+
target_hash = self.get_target_header_hash()
136+
137+
# Quite often the header batch we receive includes headers past the peer's reported
138+
# head (via the NewBlock msg), so we can't compare our head's hash to the peer's in
139+
# order to see if the sync is completed. Instead we just check that we have the peer's
140+
# head_hash in our chain.
141+
if await self.wait(self.db.coro_header_exists(target_hash)):
142+
self.complete_token.trigger()
127143

128144
async def _download_block_bodies(self,
129145
target_td: int,
@@ -357,11 +373,9 @@ class RegularChainSyncer(FastChainSyncer):
357373
358374
Here, the run() method will execute the sync loop forever, until our CancelToken is triggered.
359375
"""
360-
_exit_on_sync_complete = False
361376
_seal_check_random_sample_rate = 1
362377

363-
async def _process_headers(
364-
self, peer: HeaderRequestingPeer, headers: Tuple[BlockHeader, ...]) -> int:
378+
async def _process_headers(self, headers: Tuple[BlockHeader, ...]) -> None:
365379
target_td = await self._calculate_td(headers)
366380
bodies_by_key = await self._download_block_bodies(target_td, headers)
367381
self.logger.info("Got block bodies for chain segment")
@@ -410,7 +424,6 @@ async def _process_headers(
410424

411425
head = await self.wait(self.db.coro_get_canonical_head())
412426
self.logger.info("Imported chain segment, new head: #%d", head.block_number)
413-
return head.block_number
414427

415428

416429
def _is_body_empty(header: BlockHeader) -> bool:

trinity/sync/light/chain.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,10 @@
33
cast,
44
Dict,
55
Set,
6-
Tuple,
76
Type,
87
Union,
98
)
109

11-
from eth.rlp.headers import BlockHeader
12-
1310
from p2p.protocol import (
1411
Command,
1512
_DecodedMsgType,
@@ -22,7 +19,6 @@
2219
from trinity.sync.common.chain import BaseHeaderChainSyncer
2320
from trinity.utils.timer import Timer
2421

25-
2622
HeaderRequestingPeer = Union[ETHPeer, LESPeer]
2723

2824

@@ -35,6 +31,10 @@ class LightChainSyncer(BaseHeaderChainSyncer):
3531
commands.BlockHeaders,
3632
}
3733

34+
async def _run(self) -> None:
35+
self.run_task(self._persist_headers())
36+
await super()._run()
37+
3838
async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: Command,
3939
msg: _DecodedMsgType) -> None:
4040
if isinstance(cmd, commands.Announce):
@@ -61,14 +61,15 @@ async def _handle_get_block_headers(self, peer: LESPeer, msg: Dict[str, Any]) ->
6161
self.logger.trace("Replying to %s with %d headers", peer, len(headers))
6262
peer.sub_proto.send_block_headers(headers, buffer_value=0, request_id=request.request_id)
6363

64-
async def _process_headers(
65-
self, peer: HeaderRequestingPeer, headers: Tuple[BlockHeader, ...]) -> int:
66-
timer = Timer()
67-
for header in headers:
68-
await self.wait(self.db.coro_persist_header(header))
64+
async def _persist_headers(self) -> None:
65+
while self.is_operational:
66+
headers = await self.wait(self.pop_all_pending_headers())
67+
68+
timer = Timer()
69+
for header in headers:
70+
await self.wait(self.db.coro_persist_header(header))
6971

70-
head = await self.wait(self.db.coro_get_canonical_head())
71-
self.logger.info(
72-
"Imported %d headers in %0.2f seconds, new head: #%d",
73-
len(headers), timer.elapsed, head.block_number)
74-
return head.block_number
72+
head = await self.wait(self.db.coro_get_canonical_head())
73+
self.logger.info(
74+
"Imported %d headers in %0.2f seconds, new head: #%d",
75+
len(headers), timer.elapsed, head.block_number)

0 commit comments

Comments
 (0)