Skip to content

Commit fedd40b

Browse files
authored
Merge pull request #1249 from carver/reliability-messaging-cleanup
Improve fast-sync reliability & clean up messages
2 parents dd304cc + c9a070c commit fedd40b

File tree

8 files changed

+66
-42
lines changed

8 files changed

+66
-42
lines changed

p2p/peer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ def decrypt_body(self, data: bytes, body_size: int) -> bytes:
497497
self.ingress_mac.update(sxor(self.mac_enc(fmac_seed), fmac_seed))
498498
expected_frame_mac = self.ingress_mac.digest()[:MAC_LEN]
499499
if not bytes_eq(expected_frame_mac, frame_mac):
500-
raise DecryptionError('Invalid frame mac: expected %s, got %s'.format(
500+
raise DecryptionError('Invalid frame mac: expected {}, got {}'.format(
501501
expected_frame_mac, frame_mac))
502502
return self.aes_dec.update(frame_ciphertext)[:body_size]
503503

p2p/protocol.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def decode_payload(self, rlp_data: bytes) -> PayloadType:
9393
data = rlp.decode(rlp_data, sedes=decoder, recursive_cache=True)
9494
except rlp.DecodingError as err:
9595
raise MalformedMessage(
96-
"Malformed %s message: %r".format(type(self).__name__, err)
96+
"Malformed {} message: {!r}".format(type(self).__name__, err)
9797
) from err
9898

9999
if isinstance(self.structure, sedes.CountableList):

p2p/service.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,7 @@ async def cleanup(self) -> None:
207207

208208
self.events.cleaned_up.set()
209209

210-
async def cancel(self) -> None:
211-
"""Trigger the CancelToken and wait for the cleaned_up event to be set."""
210+
def cancel_nowait(self) -> None:
212211
if self.is_cancelled:
213212
self.logger.warning("Tried to cancel %s, but it was already cancelled", self)
214213
return
@@ -218,6 +217,11 @@ async def cancel(self) -> None:
218217
self.logger.debug("Cancelling %s", self)
219218
self.events.cancelled.set()
220219
self.cancel_token.trigger()
220+
221+
async def cancel(self) -> None:
222+
"""Trigger the CancelToken and wait for the cleaned_up event to be set."""
223+
self.cancel_nowait()
224+
221225
try:
222226
await asyncio.wait_for(
223227
self.events.cleaned_up.wait(), timeout=self._wait_until_finished_timeout)

trinity/protocol/common/managers.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def subscription_msg_types(self) -> Set[Type[Command]]:
5151

5252
msg_queue_maxsize = 100
5353

54-
response_timout: float = ROUND_TRIP_TIMEOUT
54+
response_timeout: float = ROUND_TRIP_TIMEOUT
5555

5656
pending_request: Tuple[float, 'asyncio.Future[TResponsePayload]'] = None
5757

@@ -80,7 +80,7 @@ async def payload_candidates(
8080
candidates will stop arriving.
8181
"""
8282
if timeout is None:
83-
timeout = self.response_timout
83+
timeout = self.response_timeout
8484

8585
start_at = time.perf_counter()
8686

@@ -174,13 +174,13 @@ def _is_pending(self) -> bool:
174174

175175
async def _cleanup(self) -> None:
176176
if self.pending_request is not None:
177-
self.logger.debug("Stream shutting down, raising an exception on the pending request")
177+
self.logger.debug("Stream %r shutting down, cancelling the pending request", self)
178178
_, future = self.pending_request
179179
future.set_exception(PeerConnectionLost("Pending request can't complete: peer is gone"))
180180

181181
def deregister_peer(self, peer: BasePeer) -> None:
182182
if self.pending_request is not None:
183-
self.logger.debug("Peer disconnected, raising an exception on the pending request")
183+
self.logger.debug("Peer stream %r shutting down, cancelling the pending request", self)
184184
_, future = self.pending_request
185185
future.set_exception(PeerConnectionLost("Pending request can't complete: peer is gone"))
186186

@@ -201,6 +201,9 @@ def __init__(
201201
self._response_command_type = listening_for
202202

203203
async def launch_service(self) -> None:
204+
if self._cancel_token.triggered:
205+
raise PeerConnectionLost("Peer %s is gone. Ignoring new requests to it" % self._peer)
206+
204207
self._response_stream = ResponseCandidateStream(
205208
self._peer,
206209
self._response_command_type,

trinity/protocol/eth/peer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919
NewBlock,
2020
Status,
2121
)
22-
from trinity.protocol.eth import constants
22+
from .constants import MAX_HEADERS_FETCH
2323
from .proto import ETHProtocol
2424
from .handlers import ETHExchangeHandler
2525

2626

2727
class ETHPeer(BasePeer):
28-
max_headers_fetch = constants.MAX_HEADERS_FETCH
28+
max_headers_fetch = MAX_HEADERS_FETCH
2929

3030
_supported_sub_protocols = [ETHProtocol]
3131
sub_proto: ETHProtocol = None

trinity/protocol/les/peer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
Status,
2424
StatusV2,
2525
)
26-
from trinity.protocol.les.constants import (
26+
from .constants import (
2727
MAX_HEADERS_FETCH,
2828
)
2929
from .proto import (

trinity/sync/common/chain.py

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
from trinity.protocol.les.peer import LESPeer
3737
from trinity.utils.datastructures import TaskQueue
3838

39-
4039
HeaderRequestingPeer = Union[LESPeer, ETHPeer]
4140

4241

@@ -49,8 +48,6 @@ class BaseHeaderChainSyncer(BaseService, PeerSubscriber):
4948
"""
5049
# We'll only sync if we are connected to at least min_peers_to_sync.
5150
min_peers_to_sync = 1
52-
# Post-processing steps can exit out of sync (for example, fast sync) by triggering this token:
53-
complete_token = None
5451
# TODO: Instead of a fixed timeout, we should use a variable one that gets adjusted based on
5552
# the round-trip times from our download requests.
5653
_reply_timeout = 60
@@ -64,12 +61,7 @@ def __init__(self,
6461
db: AsyncHeaderDB,
6562
peer_pool: PeerPool,
6663
token: CancelToken = None) -> None:
67-
self.complete_token = CancelToken('trinity.sync.common.BaseHeaderChainSyncer.SyncCompleted')
68-
if token is None:
69-
master_service_token = self.complete_token
70-
else:
71-
master_service_token = token.chain(self.complete_token)
72-
super().__init__(master_service_token)
64+
super().__init__(token)
7365
self.chain = chain
7466
self.db = db
7567
self.peer_pool = peer_pool
@@ -146,7 +138,7 @@ async def sync(self, peer: HeaderRequestingPeer) -> None:
146138
try:
147139
await self._sync(peer)
148140
except OperationCancelled as e:
149-
self.logger.info("Sync with %s aborted: %s", peer, e)
141+
self.logger.info("Sync with %s was shut down: %s", peer, e)
150142
finally:
151143
self._syncing = False
152144

@@ -199,7 +191,23 @@ async def _sync(self, peer: HeaderRequestingPeer) -> None:
199191
break
200192

201193
if not headers:
202-
self.logger.info("Got no new headers from %s, aborting sync", peer)
194+
if last_received_header is None:
195+
request_parent = head
196+
else:
197+
request_parent = last_received_header
198+
if head_td < peer.head_td:
199+
# peer claims to have a better header, but didn't return it. Boot peer
200+
# TODO ... also blacklist, because it keeps trying to reconnect
201+
self.logger.warning(
202+
"%s announced difficulty %s, but didn't return any headers after %r@%s",
203+
peer,
204+
peer.head_td,
205+
request_parent,
206+
head_td,
207+
)
208+
await peer.disconnect(DisconnectReason.subprotocol_error)
209+
else:
210+
self.logger.info("Got no new headers from %s, aborting sync", peer)
203211
break
204212

205213
first = headers[0]
@@ -235,24 +243,17 @@ async def _sync(self, peer: HeaderRequestingPeer) -> None:
235243
await peer.disconnect(DisconnectReason.subprotocol_error)
236244
break
237245

246+
for header in headers:
247+
head_td += header.difficulty
248+
238249
# Setting the latest header hash for the peer, before queuing header processing tasks
239250
self._target_header_hash = peer.head_hash
240251

241-
unrequested_headers = tuple(h for h in headers if h not in self.header_queue)
242-
await self.header_queue.add(unrequested_headers)
252+
new_headers = tuple(h for h in headers if h not in self.header_queue)
253+
await self.header_queue.add(new_headers)
243254
last_received_header = headers[-1]
244255
start_at = last_received_header.block_number + 1
245256

246-
# erase any pending tasks, to restart on next _sync() run
247-
try:
248-
batch_id, pending_tasks = self.header_queue.get_nowait()
249-
except asyncio.QueueFull:
250-
# nothing pending, continue
251-
pass
252-
else:
253-
# fully remove pending tasks from queue
254-
self.header_queue.complete(batch_id, pending_tasks)
255-
256257
async def _fetch_missing_headers(
257258
self, peer: HeaderRequestingPeer, start_at: int) -> Tuple[BlockHeader, ...]:
258259
"""Fetch a batch of headers starting at start_at and return the ones we're missing."""
@@ -273,12 +274,15 @@ async def get_missing_tail(self: 'BaseHeaderChainSyncer',
273274
) -> AsyncGenerator[BlockHeader, None]:
274275
iter_headers = iter(headers)
275276
for header in iter_headers:
276-
is_missing = not await self.wait(self.db.coro_header_exists(header.hash))
277-
if is_missing:
277+
if header in self.header_queue:
278+
self.logger.debug("Discarding header that is already queued: %s", header)
279+
continue
280+
is_present = await self.wait(self.db.coro_header_exists(header.hash))
281+
if is_present:
282+
self.logger.debug("Discarding header that we already have: %s", header)
283+
else:
278284
yield header
279285
break
280-
else:
281-
self.logger.debug("Discarding header that we already have: %s", header)
282286

283287
for header in iter_headers:
284288
yield header

trinity/sync/full/chain.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
from trinity.sync.common.chain import BaseHeaderChainSyncer
4848
from trinity.utils.timer import Timer
4949

50-
5150
HeaderRequestingPeer = Union[LESPeer, ETHPeer]
5251
# (ReceiptBundle, (Receipt, (root_hash, receipt_trie_data))
5352
ReceiptBundle = Tuple[Tuple[Receipt, ...], Tuple[Hash32, Dict[Hash32, bytes]]]
@@ -67,6 +66,9 @@ class FastChainSyncer(BaseHeaderChainSyncer):
6766
highest TD, at which point we must run the StateDownloader to fetch the state for our chain
6867
head.
6968
"""
69+
NO_PEER_RETRY_PAUSE = 5
70+
"""If no peers are available for downloading the chain data, retry after this many seconds"""
71+
7072
db: AsyncChainDB
7173

7274
subscription_msg_types: Set[Type[Command]] = {
@@ -92,8 +94,16 @@ async def _load_and_process_headers(self) -> None:
9294
# TODO implement the maximum task size at each step instead of this magic number
9395
max_headers = min((MAX_BODIES_FETCH, MAX_RECEIPTS_FETCH)) * 4
9496
batch_id, headers = await self.header_queue.get(max_headers)
95-
await self._process_headers(headers)
96-
self.header_queue.complete(batch_id, headers)
97+
try:
98+
await self._process_headers(headers)
99+
except NoEligiblePeers:
100+
self.logger.info(
101+
f"No available peers to sync with, retrying in {self.NO_PEER_RETRY_PAUSE}s"
102+
)
103+
self.header_queue.complete(batch_id, tuple())
104+
await self.sleep(self.NO_PEER_RETRY_PAUSE)
105+
else:
106+
self.header_queue.complete(batch_id, headers)
97107

98108
async def _calculate_td(self, headers: Tuple[BlockHeader, ...]) -> int:
99109
"""Return the score (total difficulty) of the last header in the given list.
@@ -148,7 +158,7 @@ async def _process_headers(self, headers: Tuple[BlockHeader, ...]) -> None:
148158
# order to see if the sync is completed. Instead we just check that we have the peer's
149159
# head_hash in our chain.
150160
if await self.wait(self.db.coro_header_exists(target_hash)):
151-
self.complete_token.trigger()
161+
self.cancel_nowait()
152162

153163
async def _download_block_bodies(self,
154164
target_td: int,
@@ -295,6 +305,9 @@ async def _download_receipts(self,
295305
receipt_bundles = tuple(concat(all_receipt_bundles))
296306
headers = tuple(concat(all_missing_headers))
297307

308+
if len(receipt_bundles) == 0:
309+
continue
310+
298311
# process all of the returned receipts, storing their trie data
299312
# dicts in the database
300313
receipts, trie_roots_and_data_dicts = zip(*receipt_bundles)

0 commit comments

Comments
 (0)