Skip to content

Commit af4f627

Browse files
committed
Add LightPeerChainBridgePlugin
1 parent a904708 commit af4f627

File tree

8 files changed

+395
-13
lines changed

8 files changed

+395
-13
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from trinity.sync.light.service import (
2+
LightPeerChain
3+
)
4+
from trinity.plugins.builtin.light_peer_chain_bridge import (
5+
EventBusLightPeerChain,
6+
)
7+
8+
9+
# These tests may seem obvious but they safe us from runtime errors where
10+
# changes are made to the `BaseLightPeerChain` that are then forgotton to
11+
# implement on both derived chains.
12+
13+
def test_can_instantiate_eventbus_light_peer_chain():
14+
chain = EventBusLightPeerChain(None)
15+
assert chain is not None
16+
17+
18+
def test_can_instantiate_light_peer_chain():
19+
chain = LightPeerChain(None, None)
20+
assert chain is not None

tests/trinity/integration/test_lightchain_integration.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,26 +200,26 @@ async def wait_for_header_sync(block_number):
200200

201201
# https://ropsten.etherscan.io/block/11
202202
header = headerdb.get_canonical_block_header_by_number(n)
203-
body = await peer_chain.get_block_body_by_hash(header.hash)
203+
body = await peer_chain.coro_get_block_body_by_hash(header.hash)
204204
assert len(body['transactions']) == 15
205205

206-
receipts = await peer_chain.get_receipts(header.hash)
206+
receipts = await peer_chain.coro_get_receipts(header.hash)
207207
assert len(receipts) == 15
208208
assert encode_hex(keccak(rlp.encode(receipts[0]))) == (
209209
'0xf709ed2c57efc18a1675e8c740f3294c9e2cb36ba7bb3b89d3ab4c8fef9d8860')
210210

211211
assert len(peer_pool) == 1
212212
peer = peer_pool.highest_td_peer
213-
head = await peer_chain.get_block_header_by_hash(peer.head_hash)
213+
head = await peer_chain.coro_get_block_header_by_hash(peer.head_hash)
214214

215215
# In order to answer queries for contract code, geth needs the state trie entry for the block
216216
# we specify in the query, but because of fast sync we can only assume it has that for recent
217217
# blocks, so we use the current head to lookup the code for the contract below.
218218
# https://ropsten.etherscan.io/address/0x95a48dca999c89e4e284930d9b9af973a7481287
219219
contract_addr = decode_hex('0x8B09D9ac6A4F7778fCb22852e879C7F3B2bEeF81')
220-
contract_code = await peer_chain.get_contract_code(head.hash, contract_addr)
220+
contract_code = await peer_chain.coro_get_contract_code(head.hash, contract_addr)
221221
assert encode_hex(contract_code) == '0x600060006000600060006000356000f1'
222222

223-
account = await peer_chain.get_account(head.hash, contract_addr)
223+
account = await peer_chain.coro_get_account(head.hash, contract_addr)
224224
assert account.code_hash == keccak(contract_code)
225225
assert account.balance == 0

trinity/chains/light.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def get_block_by_hash(self, block_hash: Hash32) -> BaseBlock:
141141
def get_block_by_header(self, header: BlockHeader) -> BaseBlock:
142142
# TODO check local cache, before hitting peer
143143
block_body = self._run_async(
144-
self._peer_chain.get_block_body_by_hash(header.hash)
144+
self._peer_chain.coro_get_block_body_by_hash(header.hash)
145145
)
146146

147147
block_class = self.get_vm_class_for_block_number(header.block_number).get_block_class()
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from .light_peer_chain_bridge import ( # noqa: F401
2+
EventBusLightPeerChain,
3+
LightPeerChainEventBusHandler,
4+
)
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
from typing import (
2+
List,
3+
Type,
4+
TypeVar,
5+
)
6+
7+
from cancel_token import (
8+
CancelToken,
9+
)
10+
11+
from eth_typing import (
12+
Address,
13+
Hash32,
14+
)
15+
16+
from eth.rlp.accounts import (
17+
Account,
18+
)
19+
from eth.rlp.headers import (
20+
BlockHeader,
21+
)
22+
from eth.rlp.receipts import (
23+
Receipt,
24+
)
25+
26+
from lahja import (
27+
BaseEvent,
28+
BaseRequestResponseEvent,
29+
Endpoint,
30+
)
31+
32+
from p2p.service import (
33+
BaseService,
34+
)
35+
36+
from trinity.utils.async_errors import (
37+
await_and_wrap_errors,
38+
)
39+
from trinity.rlp.block_body import BlockBody
40+
from trinity.sync.light.service import (
41+
BaseLightPeerChain,
42+
)
43+
44+
45+
class BaseLightPeerChainResponse(BaseEvent):
46+
47+
def __init__(self, error: Exception) -> None:
48+
self.error = error
49+
50+
51+
class BlockHeaderResponse(BaseLightPeerChainResponse):
52+
53+
def __init__(self, block_header: BlockHeader, error: Exception=None) -> None:
54+
super().__init__(error)
55+
self.block_header = block_header
56+
57+
58+
class BlockBodyResponse(BaseLightPeerChainResponse):
59+
60+
def __init__(self, block_body: BlockBody, error: Exception=None) -> None:
61+
super().__init__(error)
62+
self.block_body = block_body
63+
64+
65+
class ReceiptsResponse(BaseLightPeerChainResponse):
66+
67+
def __init__(self, receipts: List[Receipt], error: Exception=None) -> None:
68+
super().__init__(error)
69+
self.receipts = receipts
70+
71+
72+
class AccountResponse(BaseLightPeerChainResponse):
73+
74+
def __init__(self, account: Account, error: Exception=None) -> None:
75+
super().__init__(error)
76+
self.account = account
77+
78+
79+
class BytesResponse(BaseLightPeerChainResponse):
80+
81+
def __init__(self, bytez: bytes, error: Exception=None) -> None:
82+
super().__init__(error)
83+
self.bytez = bytez
84+
85+
86+
class GetBlockHeaderByHashRequest(BaseRequestResponseEvent[BlockHeaderResponse]):
87+
88+
def __init__(self, block_hash: Hash32) -> None:
89+
self.block_hash = block_hash
90+
91+
@staticmethod
92+
def expected_response_type() -> Type[BlockHeaderResponse]:
93+
return BlockHeaderResponse
94+
95+
96+
class GetBlockBodyByHashRequest(BaseRequestResponseEvent[BlockBodyResponse]):
97+
98+
def __init__(self, block_hash: Hash32) -> None:
99+
self.block_hash = block_hash
100+
101+
@staticmethod
102+
def expected_response_type() -> Type[BlockBodyResponse]:
103+
return BlockBodyResponse
104+
105+
106+
class GetReceiptsRequest(BaseRequestResponseEvent[ReceiptsResponse]):
107+
108+
def __init__(self, block_hash: Hash32) -> None:
109+
self.block_hash = block_hash
110+
111+
@staticmethod
112+
def expected_response_type() -> Type[ReceiptsResponse]:
113+
return ReceiptsResponse
114+
115+
116+
class GetAccountRequest(BaseRequestResponseEvent[AccountResponse]):
117+
118+
def __init__(self, block_hash: Hash32, address: Address) -> None:
119+
self.block_hash = block_hash
120+
self.address = address
121+
122+
@staticmethod
123+
def expected_response_type() -> Type[AccountResponse]:
124+
return AccountResponse
125+
126+
127+
class GetContractCodeRequest(BaseRequestResponseEvent[BytesResponse]):
128+
129+
def __init__(self, block_hash: Hash32, address: Address) -> None:
130+
self.block_hash = block_hash
131+
self.address = address
132+
133+
@staticmethod
134+
def expected_response_type() -> Type[BytesResponse]:
135+
return BytesResponse
136+
137+
138+
class LightPeerChainEventBusHandler(BaseService):
139+
"""
140+
The ``LightPeerChainEventBusHandler`` listens for certain events on the eventbus and
141+
delegates them to the ``LightPeerChain`` to get answers. It then propagates responses
142+
back to the caller.
143+
"""
144+
145+
def __init__(self,
146+
chain: BaseLightPeerChain,
147+
event_bus: Endpoint,
148+
token: CancelToken = None) -> None:
149+
super().__init__(token)
150+
self.chain = chain
151+
self.event_bus = event_bus
152+
153+
async def _run(self) -> None:
154+
self.logger.info("Running LightPeerChainEventBusHandler")
155+
156+
self.run_daemon_task(self.handle_get_blockheader_by_hash_requests())
157+
self.run_daemon_task(self.handle_get_blockbody_by_hash_requests())
158+
self.run_daemon_task(self.handle_get_receipts_by_hash_requests())
159+
self.run_daemon_task(self.handle_get_account_requests())
160+
self.run_daemon_task(self.handle_get_contract_code_requests())
161+
162+
async def handle_get_blockheader_by_hash_requests(self) -> None:
163+
async for event in self.event_bus.stream(GetBlockHeaderByHashRequest):
164+
165+
val, error = await await_and_wrap_errors(
166+
self.chain.coro_get_block_header_by_hash(event.block_hash)
167+
)
168+
169+
self.event_bus.broadcast(
170+
event.expected_response_type()(val, error),
171+
event.broadcast_config()
172+
)
173+
174+
async def handle_get_blockbody_by_hash_requests(self) -> None:
175+
async for event in self.event_bus.stream(GetBlockBodyByHashRequest):
176+
177+
val, error = await await_and_wrap_errors(
178+
self.chain.coro_get_block_body_by_hash(event.block_hash)
179+
)
180+
181+
self.event_bus.broadcast(
182+
event.expected_response_type()(val, error),
183+
event.broadcast_config()
184+
)
185+
186+
async def handle_get_receipts_by_hash_requests(self) -> None:
187+
async for event in self.event_bus.stream(GetReceiptsRequest):
188+
189+
val, error = await await_and_wrap_errors(self.chain.coro_get_receipts(event.block_hash))
190+
191+
self.event_bus.broadcast(
192+
event.expected_response_type()(val, error),
193+
event.broadcast_config()
194+
)
195+
196+
async def handle_get_account_requests(self) -> None:
197+
async for event in self.event_bus.stream(GetAccountRequest):
198+
199+
val, error = await await_and_wrap_errors(
200+
self.chain.coro_get_account(event.block_hash, event.address)
201+
)
202+
203+
self.event_bus.broadcast(
204+
event.expected_response_type()(val, error),
205+
event.broadcast_config()
206+
)
207+
208+
async def handle_get_contract_code_requests(self) -> None:
209+
210+
async for event in self.event_bus.stream(GetContractCodeRequest):
211+
212+
val, error = await await_and_wrap_errors(
213+
self.chain.coro_get_contract_code(event.block_hash, event.address)
214+
)
215+
216+
self.event_bus.broadcast(
217+
event.expected_response_type()(val, error),
218+
event.broadcast_config()
219+
)
220+
221+
222+
class EventBusLightPeerChain(BaseLightPeerChain):
223+
"""
224+
The ``EventBusLightPeerChain`` is an implementation of the ``BaseLightPeerChain`` that can
225+
be used from within any process.
226+
"""
227+
228+
def __init__(self, event_bus: Endpoint) -> None:
229+
self.event_bus = event_bus
230+
231+
async def coro_get_block_header_by_hash(self, block_hash: Hash32) -> BlockHeader:
232+
event = GetBlockHeaderByHashRequest(block_hash)
233+
return self._pass_or_raise(await self.event_bus.request(event)).block_header
234+
235+
async def coro_get_block_body_by_hash(self, block_hash: Hash32) -> BlockBody:
236+
event = GetBlockBodyByHashRequest(block_hash)
237+
return self._pass_or_raise(await self.event_bus.request(event)).block_body
238+
239+
async def coro_get_receipts(self, block_hash: Hash32) -> List[Receipt]:
240+
event = GetReceiptsRequest(block_hash)
241+
return self._pass_or_raise(await self.event_bus.request(event)).receipts
242+
243+
async def coro_get_account(self, block_hash: Hash32, address: Address) -> Account:
244+
event = GetAccountRequest(block_hash, address)
245+
return self._pass_or_raise(await self.event_bus.request(event)).account
246+
247+
async def coro_get_contract_code(self, block_hash: Hash32, address: Address) -> bytes:
248+
event = GetContractCodeRequest(block_hash, address)
249+
return self._pass_or_raise(await self.event_bus.request(event)).bytez
250+
251+
TResponse = TypeVar("TResponse", bound=BaseLightPeerChainResponse)
252+
253+
def _pass_or_raise(self, response: TResponse) -> TResponse:
254+
if response.error is not None:
255+
raise response.error
256+
257+
return response
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import asyncio
2+
from typing import (
3+
cast
4+
)
5+
6+
from eth.chains.base import (
7+
BaseChain
8+
)
9+
10+
from trinity.constants import (
11+
SYNC_LIGHT
12+
)
13+
from trinity.extensibility import (
14+
BaseEvent,
15+
BasePlugin,
16+
)
17+
from trinity.chains.light import (
18+
LightDispatchChain,
19+
)
20+
from trinity.extensibility.events import (
21+
ResourceAvailableEvent
22+
)
23+
from trinity.plugins.builtin.light_peer_chain_bridge import (
24+
LightPeerChainEventBusHandler
25+
)
26+
27+
28+
class LightPeerChainBridgePlugin(BasePlugin):
29+
"""
30+
The ``LightPeerChainBridgePlugin`` runs in the ``networking`` process and acts as a bridge
31+
between other processes and the ``LightPeerChain``.
32+
It runs only in ``light`` mode.
33+
Other plugins can instantiate the ``EventBusLightPeerChain`` from separate processes to
34+
interact with the ``LightPeerChain`` indirectly.
35+
"""
36+
37+
chain: BaseChain = None
38+
39+
@property
40+
def name(self) -> str:
41+
return "LightPeerChain Bridge"
42+
43+
def should_start(self) -> bool:
44+
return self.chain is not None and self.context.chain_config.sync_mode == SYNC_LIGHT
45+
46+
def handle_event(self, activation_event: BaseEvent) -> None:
47+
if isinstance(activation_event, ResourceAvailableEvent):
48+
if activation_event.resource_type is BaseChain:
49+
self.chain = activation_event.resource
50+
51+
def start(self) -> None:
52+
self.logger.info('LightPeerChain Bridge started')
53+
chain = cast(LightDispatchChain, self.chain)
54+
handler = LightPeerChainEventBusHandler(chain._peer_chain, self.context.event_bus)
55+
asyncio.ensure_future(handler.run())

0 commit comments

Comments
 (0)