Skip to content

Commit 4e9f017

Browse files
committed
Add beacon block request server
1 parent 9aa48c6 commit 4e9f017

File tree

1 file changed

+123
-0
lines changed

1 file changed

+123
-0
lines changed

trinity/protocol/bcc/servers.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import functools
2+
import itertools
3+
4+
from cytoolz import (
5+
cons,
6+
sliding_window,
7+
take,
8+
)
9+
from eth_utils import (
10+
to_tuple,
11+
)
12+
13+
from cancel_token import CancelToken
14+
15+
from p2p import protocol
16+
from p2p.peer import BasePeer
17+
from p2p.protocol import Command
18+
19+
from eth.exceptions import BlockNotFound
20+
from eth.beacon.db.chain import BaseBeaconChainDB
21+
from eth.beacon.types.blocks import BaseBeaconBlock
22+
23+
from trinity.protocol.common.servers import BaseRequestServer
24+
from trinity.protocol.bcc import commands
25+
from trinity.protocol.bcc.peer import (
26+
BCCPeer,
27+
BCCPeerPool,
28+
)
29+
30+
from typing import (
31+
cast,
32+
Any,
33+
Dict,
34+
Set,
35+
Type,
36+
)
37+
from eth_typing import (
38+
Hash32,
39+
)
40+
41+
42+
class BCCRequestServer(BaseRequestServer):
43+
subscription_msg_types: Set[Type[Command]] = {
44+
commands.GetBeaconBlocks,
45+
}
46+
47+
def __init__(self,
48+
db: BaseBeaconChainDB,
49+
peer_pool: BCCPeerPool,
50+
token: CancelToken = None) -> None:
51+
super().__init__(peer_pool, token)
52+
self.db = db
53+
54+
async def _handle_msg(self, base_peer: BasePeer, cmd: Command,
55+
msg: protocol._DecodedMsgType) -> None:
56+
peer = cast(BCCPeer, base_peer)
57+
58+
if isinstance(cmd, commands.GetBeaconBlocks):
59+
await self._handle_get_beacon_blocks(peer, cast(Dict[str, Any], msg))
60+
else:
61+
raise Exception("Invariant: Only subscribed to GetBeaconBlocks")
62+
63+
async def _handle_get_beacon_blocks(self, peer: BCCPeer, msg: Dict[str, Any]):
64+
if not peer.is_operational:
65+
return
66+
67+
max_blocks = cast(int, msg["max_blocks"])
68+
block_slot_or_hash = msg["block_slot_or_hash"]
69+
70+
if isinstance(block_slot_or_hash, int):
71+
get_start_block = functools.partial(
72+
self.db.get_canonical_block_by_slot,
73+
cast(int, block_slot_or_hash),
74+
)
75+
elif isinstance(block_slot_or_hash, bytes):
76+
get_start_block = functools.partial(
77+
self.db.get_block_by_hash,
78+
cast(Hash32, block_slot_or_hash),
79+
)
80+
else:
81+
actual_type = type(block_slot_or_hash)
82+
raise TypeError(f"Invariant: unexpected type for 'block_slot_or_hash': {actual_type}")
83+
84+
try:
85+
start_block = get_start_block()
86+
except BlockNotFound:
87+
self.logger.trace("%s requested unknown block %s", block_slot_or_hash)
88+
blocks = ()
89+
else:
90+
self.logger.trace(
91+
"%s requested %d blocks starting with %s",
92+
peer,
93+
max_blocks,
94+
start_block,
95+
)
96+
blocks = self._get_blocks(start_block, max_blocks)
97+
finally:
98+
self.logger.trace("Replying to %s with %d blocks", peer, len(blocks))
99+
peer.sub_proto.send_blocks(blocks)
100+
101+
@to_tuple
102+
def _get_blocks(self, start_block: BaseBeaconBlock, max_blocks: int):
103+
if max_blocks <= 0:
104+
return
105+
106+
yield start_block
107+
108+
blocks_generator = cons(start_block, (
109+
self.db.get_canonical_block_by_slot(slot)
110+
for slot in itertools.count(start_block.slot + 1)
111+
))
112+
max_blocks_generator = take(max_blocks, blocks_generator)
113+
114+
try:
115+
# ensure only a connected chain is returned (breaks might occur if the start block is
116+
# not part of the canonical chain or if the canonical chain changes during execution)
117+
for parent, child in sliding_window(2, max_blocks_generator):
118+
if child.parent_hash == parent.hash:
119+
yield child
120+
else:
121+
break
122+
except BlockNotFound:
123+
return

0 commit comments

Comments
 (0)