Skip to content

Commit ba6b09c

Browse files
committed
Start moving the ETH p2p protocol stuff into the trinity module
1 parent 3d186e5 commit ba6b09c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1066
-784
lines changed

p2p/chain.py

Lines changed: 58 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,11 @@
3939
from eth.utils.logging import TraceLogger
4040

4141
from p2p import protocol
42-
from p2p import eth
43-
from p2p import les
4442
from p2p.cancellable import CancellableMixin
4543
from p2p.constants import MAX_REORG_DEPTH, SEAL_CHECK_RANDOM_SAMPLE_RATE
4644
from p2p.exceptions import NoEligiblePeers, ValidationError
4745
from p2p.p2p_proto import DisconnectReason
48-
from p2p.peer import BasePeer, ETHPeer, LESPeer, PeerPool, PeerSubscriber
46+
from p2p.peer import BasePeer, PeerPool, PeerSubscriber
4947
from p2p.rlp import BlockBody
5048
from p2p.service import BaseService
5149
from p2p.utils import (
@@ -57,9 +55,12 @@
5755
if TYPE_CHECKING:
5856
from trinity.db.chain import AsyncChainDB # noqa: F401
5957
from trinity.db.header import AsyncHeaderDB # noqa: F401
58+
from trinity.protocol.eth.peer import ETHPeer # noqa: F401
59+
from trinity.protocol.les.peer import LESPeer # noqa: F401
60+
from trinity.protocol.base_request import BaseHeaderRequest # noqa: F401
6061

6162

62-
HeaderRequestingPeer = Union[LESPeer, ETHPeer]
63+
HeaderRequestingPeer = Union['LESPeer', 'ETHPeer']
6364

6465

6566
class BaseHeaderChainSyncer(BaseService, PeerSubscriber):
@@ -300,20 +301,23 @@ class LightChainSyncer(BaseHeaderChainSyncer):
300301

301302
async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
302303
msg: protocol._DecodedMsgType) -> None:
303-
if isinstance(cmd, les.Announce):
304+
from trinity.protocol.les import commands
305+
from trinity.protocol.les.peer import LESPeer # noqa: F811
306+
if isinstance(cmd, commands.Announce):
304307
self._sync_requests.put_nowait(peer)
305-
elif isinstance(cmd, les.GetBlockHeaders):
308+
elif isinstance(cmd, commands.GetBlockHeaders):
306309
msg = cast(Dict[str, Any], msg)
307310
await self._handle_get_block_headers(cast(LESPeer, peer), msg)
308-
elif isinstance(cmd, les.BlockHeaders):
311+
elif isinstance(cmd, commands.BlockHeaders):
309312
# `BlockHeaders` messages are handled at the peer level.
310313
pass
311314
else:
312315
self.logger.debug("Ignoring %s message from %s", cmd, peer)
313316

314-
async def _handle_get_block_headers(self, peer: LESPeer, msg: Dict[str, Any]) -> None:
317+
async def _handle_get_block_headers(self, peer: 'LESPeer', msg: Dict[str, Any]) -> None:
318+
from trinity.protocol.les.requests import HeaderRequest
315319
self.logger.debug("Peer %s made header request: %s", peer, msg)
316-
request = les.HeaderRequest(
320+
request = HeaderRequest(
317321
msg['query'].block_number_or_hash,
318322
msg['query'].max_headers,
319323
msg['query'].skip,
@@ -356,8 +360,8 @@ def __init__(self,
356360
super().__init__(chain, db, peer_pool, token)
357361
# Those are used by our msg handlers and _download_block_parts() in order to track missing
358362
# bodies/receipts for a given chain segment.
359-
self._downloaded_receipts: asyncio.Queue[Tuple[ETHPeer, List[DownloadedBlockPart]]] = asyncio.Queue() # noqa: E501
360-
self._downloaded_bodies: asyncio.Queue[Tuple[ETHPeer, List[DownloadedBlockPart]]] = asyncio.Queue() # noqa: E501
363+
self._downloaded_receipts: asyncio.Queue[Tuple['ETHPeer', List[DownloadedBlockPart]]] = asyncio.Queue() # noqa: E501
364+
self._downloaded_bodies: asyncio.Queue[Tuple['ETHPeer', List[DownloadedBlockPart]]] = asyncio.Queue() # noqa: E501
361365

362366
async def _calculate_td(self, headers: Tuple[BlockHeader, ...]) -> int:
363367
"""Return the score (total difficulty) of the last header in the given list.
@@ -483,7 +487,8 @@ def _request_block_parts(
483487
self,
484488
target_td: int,
485489
headers: List[BlockHeader],
486-
request_func: Callable[[ETHPeer, List[BlockHeader]], None]) -> int:
490+
request_func: Callable[['ETHPeer', List[BlockHeader]], None]) -> int:
491+
from trinity.protocol.eth.peer import ETHPeer # noqa: F811
487492
peers = self.peer_pool.get_peers(target_td)
488493
if not peers:
489494
raise NoEligiblePeers()
@@ -493,13 +498,13 @@ def _request_block_parts(
493498
request_func(cast(ETHPeer, peer), batch)
494499
return len(batches)
495500

496-
def _send_get_block_bodies(self, peer: ETHPeer, headers: List[BlockHeader]) -> None:
501+
def _send_get_block_bodies(self, peer: 'ETHPeer', headers: List[BlockHeader]) -> None:
497502
block_numbers = ", ".join(str(h.block_number) for h in headers)
498503
self.logger.debug(
499504
"Requesting %d block bodies (%s) to %s", len(headers), block_numbers, peer)
500505
peer.sub_proto.send_get_block_bodies([header.hash for header in headers])
501506

502-
def _send_get_receipts(self, peer: ETHPeer, headers: List[BlockHeader]) -> None:
507+
def _send_get_receipts(self, peer: 'ETHPeer', headers: List[BlockHeader]) -> None:
503508
block_numbers = ", ".join(str(h.block_number) for h in headers)
504509
self.logger.debug(
505510
"Requesting %d block receipts (%s) to %s", len(headers), block_numbers, peer)
@@ -527,47 +532,54 @@ def request_receipts(self, target_td: int, headers: List[BlockHeader]) -> int:
527532

528533
async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
529534
msg: protocol._DecodedMsgType) -> None:
535+
from trinity.protocol.eth.peer import ETHPeer # noqa: F811
536+
from trinity.protocol.eth import commands
537+
from trinity.protocol.eth import (
538+
constants as eth_constants,
539+
)
540+
530541
peer = cast(ETHPeer, peer)
531-
if isinstance(cmd, eth.BlockBodies):
542+
543+
if isinstance(cmd, commands.BlockBodies):
532544
await self._handle_block_bodies(peer, list(cast(Tuple[BlockBody], msg)))
533-
elif isinstance(cmd, eth.Receipts):
545+
elif isinstance(cmd, commands.Receipts):
534546
await self._handle_block_receipts(peer, cast(List[List[Receipt]], msg))
535-
elif isinstance(cmd, eth.NewBlock):
547+
elif isinstance(cmd, commands.NewBlock):
536548
await self._handle_new_block(peer, cast(Dict[str, Any], msg))
537-
elif isinstance(cmd, eth.GetBlockHeaders):
549+
elif isinstance(cmd, commands.GetBlockHeaders):
538550
await self._handle_get_block_headers(peer, cast(Dict[str, Any], msg))
539-
elif isinstance(cmd, eth.BlockHeaders):
551+
elif isinstance(cmd, commands.BlockHeaders):
540552
# `BlockHeaders` messages are handled at the peer level.
541553
pass
542-
elif isinstance(cmd, eth.GetBlockBodies):
543-
# Only serve up to eth.MAX_BODIES_FETCH items in every request.
544-
block_hashes = cast(List[Hash32], msg)[:eth.MAX_BODIES_FETCH]
554+
elif isinstance(cmd, commands.GetBlockBodies):
555+
# Only serve up to MAX_BODIES_FETCH items in every request.
556+
block_hashes = cast(List[Hash32], msg)[:eth_constants.MAX_BODIES_FETCH]
545557
await self._handler.handle_get_block_bodies(peer, block_hashes)
546-
elif isinstance(cmd, eth.GetReceipts):
547-
# Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
548-
block_hashes = cast(List[Hash32], msg)[:eth.MAX_RECEIPTS_FETCH]
558+
elif isinstance(cmd, commands.GetReceipts):
559+
# Only serve up to MAX_RECEIPTS_FETCH items in every request.
560+
block_hashes = cast(List[Hash32], msg)[:eth_constants.MAX_RECEIPTS_FETCH]
549561
await self._handler.handle_get_receipts(peer, block_hashes)
550-
elif isinstance(cmd, eth.GetNodeData):
551-
# Only serve up to eth.MAX_STATE_FETCH items in every request.
552-
node_hashes = cast(List[Hash32], msg)[:eth.MAX_STATE_FETCH]
562+
elif isinstance(cmd, commands.GetNodeData):
563+
# Only serve up to MAX_STATE_FETCH items in every request.
564+
node_hashes = cast(List[Hash32], msg)[:eth_constants.MAX_STATE_FETCH]
553565
await self._handler.handle_get_node_data(peer, node_hashes)
554-
elif isinstance(cmd, eth.Transactions):
566+
elif isinstance(cmd, commands.Transactions):
555567
# Transactions msgs are handled by our TxPool service.
556568
pass
557-
elif isinstance(cmd, eth.NodeData):
569+
elif isinstance(cmd, commands.NodeData):
558570
# When doing a chain sync we never send GetNodeData requests, so peers should not send
559571
# us NodeData msgs.
560572
self.logger.warn("Unexpected NodeData msg from %s, disconnecting", peer)
561573
await peer.disconnect(DisconnectReason.bad_protocol)
562574
else:
563575
self.logger.debug("%s msg not handled yet, need to be implemented", cmd)
564576

565-
async def _handle_new_block(self, peer: ETHPeer, msg: Dict[str, Any]) -> None:
577+
async def _handle_new_block(self, peer: 'ETHPeer', msg: Dict[str, Any]) -> None:
566578
self._sync_requests.put_nowait(peer)
567579

568580
async def _handle_block_receipts(self,
569-
peer: ETHPeer,
570-
receipts_by_block: List[List[eth.Receipt]]) -> None:
581+
peer: 'ETHPeer',
582+
receipts_by_block: List[List[Receipt]]) -> None:
571583
self.logger.debug("Got Receipts for %d blocks from %s", len(receipts_by_block), peer)
572584
iterator = map(make_trie_root_and_nodes, receipts_by_block)
573585
# The map() call above is lazy (it returns an iterator! ;-), so it's only evaluated in
@@ -582,7 +594,7 @@ async def _handle_block_receipts(self,
582594
self._downloaded_receipts.put_nowait((peer, downloaded))
583595

584596
async def _handle_block_bodies(self,
585-
peer: ETHPeer,
597+
peer: 'ETHPeer',
586598
bodies: List[BlockBody]) -> None:
587599
self.logger.debug("Got Bodies for %d blocks from %s", len(bodies), peer)
588600
iterator = map(make_trie_root_and_nodes, [body.transactions for body in bodies])
@@ -601,10 +613,12 @@ async def _handle_block_bodies(self,
601613

602614
async def _handle_get_block_headers(
603615
self,
604-
peer: ETHPeer,
616+
peer: 'ETHPeer',
605617
query: Dict[str, Any]) -> None:
618+
from trinity.protocol.eth.requests import HeaderRequest # noqa: F811
619+
606620
self.logger.debug("Peer %s made header request: %s", peer, query)
607-
request = eth.HeaderRequest(
621+
request = HeaderRequest(
608622
query['block_number_or_hash'],
609623
query['max_headers'],
610624
query['skip'],
@@ -626,7 +640,7 @@ class RegularChainSyncer(FastChainSyncer):
626640
_seal_check_random_sample_rate = 1
627641

628642
async def _handle_block_receipts(
629-
self, peer: ETHPeer, receipts_by_block: List[List[eth.Receipt]]) -> None:
643+
self, peer: 'ETHPeer', receipts_by_block: List[List[Receipt]]) -> None:
630644
# When doing a regular sync we never request receipts.
631645
self.logger.warn("Unexpected BlockReceipts msg from %s, disconnecting", peer)
632646
await peer.disconnect(DisconnectReason.bad_protocol)
@@ -675,7 +689,7 @@ def __init__(self, db: 'AsyncHeaderDB', logger: TraceLogger, token: CancelToken)
675689
self.logger = logger
676690
self.cancel_token = token
677691

678-
async def handle_get_block_bodies(self, peer: ETHPeer, block_hashes: List[Hash32]) -> None:
692+
async def handle_get_block_bodies(self, peer: 'ETHPeer', block_hashes: List[Hash32]) -> None:
679693
self.logger.trace("%s requested bodies for %d blocks", peer, len(block_hashes))
680694
chaindb = cast('AsyncChainDB', self.db)
681695
bodies = []
@@ -692,7 +706,7 @@ async def handle_get_block_bodies(self, peer: ETHPeer, block_hashes: List[Hash32
692706
self.logger.trace("Replying to %s with %d block bodies", peer, len(bodies))
693707
peer.sub_proto.send_block_bodies(bodies)
694708

695-
async def handle_get_receipts(self, peer: ETHPeer, block_hashes: List[Hash32]) -> None:
709+
async def handle_get_receipts(self, peer: 'ETHPeer', block_hashes: List[Hash32]) -> None:
696710
self.logger.trace("%s requested receipts for %d blocks", peer, len(block_hashes))
697711
chaindb = cast('AsyncChainDB', self.db)
698712
receipts = []
@@ -708,7 +722,7 @@ async def handle_get_receipts(self, peer: ETHPeer, block_hashes: List[Hash32]) -
708722
self.logger.trace("Replying to %s with receipts for %d blocks", peer, len(receipts))
709723
peer.sub_proto.send_receipts(receipts)
710724

711-
async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) -> None:
725+
async def handle_get_node_data(self, peer: 'ETHPeer', node_hashes: List[Hash32]) -> None:
712726
self.logger.trace("%s requested %d trie nodes", peer, len(node_hashes))
713727
chaindb = cast('AsyncChainDB', self.db)
714728
nodes = []
@@ -723,7 +737,7 @@ async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) -
723737
peer.sub_proto.send_node_data(nodes)
724738

725739
async def lookup_headers(self,
726-
request: protocol.BaseHeaderRequest) -> Tuple[BlockHeader, ...]:
740+
request: 'BaseHeaderRequest') -> Tuple[BlockHeader, ...]:
727741
"""
728742
Lookup :max_headers: headers starting at :block_number_or_hash:, skipping :skip: items
729743
between each, in reverse order if :reverse: is True.
@@ -744,7 +758,7 @@ async def lookup_headers(self,
744758
return headers
745759

746760
async def _get_block_numbers_for_request(self,
747-
request: protocol.BaseHeaderRequest
761+
request: 'BaseHeaderRequest'
748762
) -> Tuple[BlockNumber, ...]:
749763
"""
750764
Generate the block numbers for a given `HeaderRequest`.
@@ -823,6 +837,8 @@ def _test() -> None:
823837
from tests.p2p.integration_test_helpers import (
824838
FakeAsyncChainDB, FakeAsyncMainnetChain, FakeAsyncRopstenChain, FakeAsyncHeaderDB,
825839
connect_to_peers_loop)
840+
from trinity.protocol.eth.peer import ETHPeer # noqa: F811
841+
from trinity.protocol.les.peer import LESPeer # noqa: F811
826842
from trinity.utils.chains import load_nodekey
827843

828844
parser = argparse.ArgumentParser()

p2p/constants.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,6 @@
4444
# Default timeout before giving up on a caller-initiated interaction
4545
COMPLETION_TIMEOUT = 5
4646

47-
# Types of LES Announce messages
48-
LES_ANNOUNCE_SIMPLE = 1
49-
LES_ANNOUNCE_SIGNED = 2
50-
5147
MAINNET_BOOTNODES = (
5248
'enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303', # noqa: E501
5349
'enode://aa36fdf33dd030378a0168efe6ed7d5cc587fafa3cdd375854fe735a2e11ea3650ba29644e2db48368c46e1f60e716300ba49396cd63778bf8a818c09bded46f@13.93.211.84:30303', # noqa: E501

p2p/lightchain.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
DisconnectReason,
5656
)
5757
from p2p.peer import (
58-
LESPeer,
5958
PeerPool,
6059
PeerSubscriber,
6160
)
@@ -64,10 +63,10 @@
6463
BaseService,
6564
service_timeout,
6665
)
67-
from p2p.utils import gen_request_id
6866

6967
if TYPE_CHECKING:
7068
from trinity.db.header import BaseAsyncHeaderDB # noqa: F401
69+
from trinity.protocol.les.peer import LESPeer # noqa: F401
7170

7271

7372
class LightPeerChain(PeerSubscriber, BaseService):
@@ -142,6 +141,9 @@ async def get_block_header_by_hash(self, block_hash: Hash32) -> BlockHeader:
142141
@alru_cache(maxsize=1024, cache_exceptions=False)
143142
@service_timeout(COMPLETION_TIMEOUT)
144143
async def get_block_body_by_hash(self, block_hash: Hash32) -> BlockBody:
144+
from trinity.protocol.les.peer import LESPeer # noqa: F811
145+
from trinity.protocol.les.utils import gen_request_id
146+
145147
peer = cast(LESPeer, self.peer_pool.highest_td_peer)
146148
self.logger.debug("Fetching block %s from %s", encode_hex(block_hash), peer)
147149
request_id = gen_request_id()
@@ -156,6 +158,9 @@ async def get_block_body_by_hash(self, block_hash: Hash32) -> BlockBody:
156158
@alru_cache(maxsize=1024, cache_exceptions=False)
157159
@service_timeout(COMPLETION_TIMEOUT)
158160
async def get_receipts(self, block_hash: Hash32) -> List[Receipt]:
161+
from trinity.protocol.les.peer import LESPeer # noqa: F811
162+
from trinity.protocol.les.utils import gen_request_id
163+
159164
peer = cast(LESPeer, self.peer_pool.highest_td_peer)
160165
self.logger.debug("Fetching %s receipts from %s", encode_hex(block_hash), peer)
161166
request_id = gen_request_id()
@@ -179,7 +184,7 @@ async def _get_account_from_peer(
179184
self,
180185
block_hash: Hash32,
181186
address: Address,
182-
peer: LESPeer) -> Account:
187+
peer: 'LESPeer') -> Account:
183188
key = keccak(address)
184189
proof = await self._get_proof(peer, block_hash, account_key=b'', key=key)
185190
header = await self._get_block_header_by_hash(block_hash, peer)
@@ -223,13 +228,15 @@ async def _get_contract_code_from_peer(
223228
block_hash: Hash32,
224229
address: Address,
225230
code_hash: Hash32,
226-
peer: LESPeer) -> bytes:
231+
peer: 'LESPeer') -> bytes:
227232
"""
228233
A single attempt to get the contract code from the given peer
229234
230235
:raise BadLESResponse: if the peer replies with contract code that does not match the
231236
account's code hash
232237
"""
238+
from trinity.protocol.les.utils import gen_request_id
239+
233240
# request contract code
234241
request_id = gen_request_id()
235242
peer.sub_proto.send_get_contract_code(block_hash, keccak(address), request_id)
@@ -261,7 +268,7 @@ async def _raise_for_empty_code(
261268
block_hash: Hash32,
262269
address: Address,
263270
code_hash: Hash32,
264-
peer: LESPeer) -> None:
271+
peer: 'LESPeer') -> None:
265272
"""
266273
A peer might return b'' if it doesn't have the block at the requested header,
267274
or it might maliciously return b'' when the code is non-empty. This method tries to tell the
@@ -315,12 +322,14 @@ async def _raise_for_empty_code(
315322
)
316323
)
317324

318-
async def _get_block_header_by_hash(self, block_hash: Hash32, peer: LESPeer) -> BlockHeader:
325+
async def _get_block_header_by_hash(self, block_hash: Hash32, peer: 'LESPeer') -> BlockHeader:
319326
"""
320327
A single attempt to get the block header from the given peer.
321328
322329
:raise BadLESResponse: if the peer replies with a header that has a different hash
323330
"""
331+
from trinity.protocol.les.utils import gen_request_id
332+
324333
self.logger.debug("Fetching header %s from %s", encode_hex(block_hash), peer)
325334
request_id = gen_request_id()
326335
max_headers = 1
@@ -336,17 +345,19 @@ async def _get_block_header_by_hash(self, block_hash: Hash32, peer: LESPeer) ->
336345
return header
337346

338347
async def _get_proof(self,
339-
peer: LESPeer,
348+
peer: 'LESPeer',
340349
block_hash: bytes,
341350
account_key: bytes,
342351
key: bytes,
343352
from_level: int = 0) -> List[bytes]:
353+
from trinity.protocol.les.utils import gen_request_id
354+
344355
request_id = gen_request_id()
345356
peer.sub_proto.send_get_proof(block_hash, account_key, key, from_level, request_id)
346357
reply = await self._wait_for_reply(request_id)
347358
return reply['proof']
348359

349-
async def _retry_on_bad_response(self, make_request_to_peer: Callable[[LESPeer], Any]) -> Any:
360+
async def _retry_on_bad_response(self, make_request_to_peer: Callable[['LESPeer'], Any]) -> Any:
350361
"""
351362
Make a call to a peer. If it behaves badly, drop it and retry with a different peer.
352363
@@ -355,6 +366,8 @@ async def _retry_on_bad_response(self, make_request_to_peer: Callable[[LESPeer],
355366
:raise NoEligiblePeers: if no peers are available to fulfill the request
356367
:raise TimeoutError: if an individual request or the overall process times out
357368
"""
369+
from trinity.protocol.les.peer import LESPeer # noqa: F811
370+
358371
for _ in range(MAX_REQUEST_ATTEMPTS):
359372
try:
360373
peer = cast(LESPeer, self.peer_pool.highest_td_peer)

0 commit comments

Comments
 (0)