Skip to content

Commit 1006923

Browse files
committed
Make JSON-RPC Server an isolated plugin
1 parent c66650a commit 1006923

File tree

29 files changed

+384
-191
lines changed

29 files changed

+384
-191
lines changed

eth/chains/base.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -904,3 +904,15 @@ async def coro_validate_receipt(self,
904904
receipt: Receipt,
905905
at_header: BlockHeader) -> None:
906906
raise NotImplementedError()
907+
908+
async def coro_get_block_by_hash(self,
909+
block_hash: Hash32) -> BaseBlock:
910+
raise NotImplementedError()
911+
912+
async def coro_get_block_by_header(self,
913+
header: BlockHeader) -> BaseBlock:
914+
raise NotImplementedError()
915+
916+
async def coro_get_canonical_block_by_number(self,
917+
block_number: BlockNumber) -> BaseBlock:
918+
raise NotImplementedError()

eth/tools/fixtures/helpers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,12 +146,12 @@ def genesis_params_from_fixture(fixture):
146146
}
147147

148148

149-
def new_chain_from_fixture(fixture):
149+
def new_chain_from_fixture(fixture, chain_cls=MainnetChain):
150150
base_db = AtomicDB()
151151

152152
vm_config = chain_vm_configuration(fixture)
153153

154-
ChainFromFixture = MainnetChain.configure(
154+
ChainFromFixture = chain_cls.configure(
155155
'ChainFromFixture',
156156
vm_configuration=vm_config,
157157
)

p2p/events.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from typing import (
2+
Type,
3+
)
4+
5+
from lahja import (
6+
BaseEvent,
7+
BaseRequestResponseEvent,
8+
)
9+
10+
11+
class PeerCountResponse(BaseEvent):
12+
13+
def __init__(self, peer_count: int) -> None:
14+
self.peer_count = peer_count
15+
16+
17+
class PeerCountRequest(BaseRequestResponseEvent[PeerCountResponse]):
18+
19+
@staticmethod
20+
def expected_response_type() -> Type[PeerCountResponse]:
21+
return PeerCountResponse

p2p/peer.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@
4848

4949
from cancel_token import CancelToken, OperationCancelled
5050

51+
from lahja import (
52+
Endpoint,
53+
)
54+
5155
from eth.constants import GENESIS_BLOCK_NUMBER
5256
from eth.rlp.headers import BlockHeader
5357
from eth.vm.base import BaseVM
@@ -95,6 +99,10 @@
9599
MAC_LEN,
96100
)
97101

102+
from .events import (
103+
PeerCountRequest,
104+
PeerCountResponse,
105+
)
98106

99107
if TYPE_CHECKING:
100108
from trinity.db.header import BaseAsyncHeaderDB # noqa: F401
@@ -173,7 +181,7 @@ async def _run(self) -> None:
173181
# exit via exception rather than cleanly shutting down. By using
174182
# `run_task`, this service finishes exiting prior to the
175183
# cancellation.
176-
self.run_task(self.peer.disconnect(DisconnectReason.useless_peer))
184+
self.run_daemon_task(self.peer.disconnect(DisconnectReason.useless_peer))
177185

178186
async def ensure_same_side_on_dao_fork(self) -> None:
179187
"""Ensure we're on the same side of the DAO fork as the given peer.
@@ -841,6 +849,7 @@ def __init__(self,
841849
context: BasePeerContext,
842850
max_peers: int = DEFAULT_MAX_PEERS,
843851
token: CancelToken = None,
852+
event_bus: Endpoint = None
844853
) -> None:
845854
super().__init__(token)
846855

@@ -850,6 +859,17 @@ def __init__(self,
850859

851860
self.connected_nodes: Dict[Node, BasePeer] = {}
852861
self._subscribers: List[PeerSubscriber] = []
862+
self.event_bus = event_bus
863+
if self.event_bus is not None:
864+
self.run_task(self.handle_peer_count_requests())
865+
866+
async def handle_peer_count_requests(self) -> None:
867+
async for req in self.event_bus.stream(PeerCountRequest):
868+
# We are listening for all `PeerCountRequest` events but we ensure to
869+
# only send a `PeerCountResponse` to the callsite that made the request.
870+
# We do that by retrieving a `BroadcastConfig` from the request via the
871+
# `event.broadcast_config()` API.
872+
self.event_bus.broadcast(PeerCountResponse(len(self)), req.broadcast_config())
853873

854874
def __len__(self) -> int:
855875
return len(self.connected_nodes)

tests/conftest.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,7 @@ def funded_address_initial_balance():
7878
return to_wei(1000, 'ether')
7979

8080

81-
@pytest.fixture
82-
def chain_with_block_validation(base_db, genesis_state):
81+
def _chain_with_block_validation(base_db, genesis_state, chain_cls=Chain):
8382
"""
8483
Return a Chain object containing just the genesis block.
8584
@@ -107,7 +106,8 @@ def chain_with_block_validation(base_db, genesis_state):
107106
"transaction_root": decode_hex("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"), # noqa: E501
108107
"uncles_hash": decode_hex("1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347") # noqa: E501
109108
}
110-
klass = Chain.configure(
109+
110+
klass = chain_cls.configure(
111111
__name__='TestChain',
112112
vm_configuration=(
113113
(constants.GENESIS_BLOCK_NUMBER, SpuriousDragonVM),
@@ -118,6 +118,11 @@ def chain_with_block_validation(base_db, genesis_state):
118118
return chain
119119

120120

121+
@pytest.fixture
122+
def chain_with_block_validation(base_db, genesis_state):
123+
return _chain_with_block_validation(base_db, genesis_state)
124+
125+
121126
def import_block_without_validation(chain, block):
122127
return super(type(chain), chain).import_block(block, perform_validation=False)
123128

tests/trinity/conftest.py

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,17 @@
55
import tempfile
66
import uuid
77

8-
from p2p.tools.paragon import (
9-
ParagonContext,
10-
ParagonPeerPool,
8+
from lahja import (
9+
EventBus,
1110
)
1211

12+
from eth.chains import (
13+
Chain,
14+
)
15+
16+
from trinity.chains.coro import (
17+
AsyncChainMixin,
18+
)
1319
from trinity.rpc.main import (
1420
RPCServer,
1521
)
@@ -22,6 +28,13 @@
2228
from trinity.utils.filesystem import (
2329
is_under_path,
2430
)
31+
from tests.conftest import (
32+
_chain_with_block_validation,
33+
)
34+
35+
36+
class TestAsyncChain(Chain, AsyncChainMixin):
37+
pass
2538

2639

2740
def pytest_addoption(parser):
@@ -51,26 +64,43 @@ def event_loop():
5164
loop.close()
5265

5366

67+
@pytest.fixture(scope='module')
68+
def event_bus(event_loop):
69+
bus = EventBus()
70+
endpoint = bus.create_endpoint('test')
71+
bus.start(event_loop)
72+
endpoint.connect(event_loop)
73+
try:
74+
yield endpoint
75+
finally:
76+
endpoint.stop()
77+
bus.stop()
78+
79+
5480
@pytest.fixture(scope='session')
5581
def jsonrpc_ipc_pipe_path():
5682
with tempfile.TemporaryDirectory() as temp_dir:
5783
yield Path(temp_dir) / '{0}.ipc'.format(uuid.uuid4())
5884

5985

86+
@pytest.fixture
87+
def chain_with_block_validation(base_db, genesis_state):
88+
return _chain_with_block_validation(base_db, genesis_state, TestAsyncChain)
89+
90+
6091
@pytest.mark.asyncio
6192
@pytest.fixture
6293
async def ipc_server(
6394
monkeypatch,
95+
event_bus,
6496
jsonrpc_ipc_pipe_path,
6597
event_loop,
6698
chain_with_block_validation):
6799
'''
68100
This fixture runs a single RPC server over IPC over
69101
the course of all tests. It yields the IPC server only for monkeypatching purposes
70102
'''
71-
peer_pool = ParagonPeerPool(b'unicornsrainbows' * 2, ParagonContext())
72-
73-
rpc = RPCServer(chain_with_block_validation, peer_pool)
103+
rpc = RPCServer(chain_with_block_validation, event_bus)
74104
ipc_server = IPCServer(rpc, jsonrpc_ipc_pipe_path, loop=event_loop)
75105

76106
asyncio.ensure_future(ipc_server.run(), loop=event_loop)

tests/trinity/core/json-rpc/test_ipc.py

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@
1414
to_hex,
1515
)
1616

17+
from p2p.events import (
18+
PeerCountRequest,
19+
PeerCountResponse,
20+
)
21+
1722
from trinity.utils.version import construct_trinity_client_identifier
1823

1924

@@ -35,15 +40,6 @@ def build_request(method, params=[]):
3540
return json.dumps(request).encode()
3641

3742

38-
class MockPeerPool:
39-
40-
def __init__(self, peer_count=0):
41-
self.peer_count = peer_count
42-
43-
def __len__(self):
44-
return self.peer_count
45-
46-
4743
def id_from_rpc_request(param):
4844
if isinstance(param, bytes):
4945
request = json.loads(param.decode())
@@ -68,6 +64,7 @@ async def get_ipc_response(
6864
jsonrpc_ipc_pipe_path,
6965
request_msg,
7066
event_loop):
67+
7168
assert wait_for(jsonrpc_ipc_pipe_path), "IPC server did not successfully start with IPC file"
7269

7370
reader, writer = await asyncio.open_unix_connection(str(jsonrpc_ipc_pipe_path), loop=event_loop)
@@ -399,18 +396,27 @@ async def test_eth_call_with_contract_on_ipc(
399396
assert result == expected
400397

401398

399+
def mock_peer_count(count):
400+
async def mock_event_bus_interaction(bus):
401+
async for req in bus.stream(PeerCountRequest):
402+
bus.broadcast(PeerCountResponse(count), req.broadcast_config())
403+
break
404+
405+
return mock_event_bus_interaction
406+
407+
402408
@pytest.mark.asyncio
403409
@pytest.mark.parametrize(
404-
'request_msg, mock_peer_pool, expected',
410+
'request_msg, event_bus_setup_fn, expected',
405411
(
406412
(
407413
build_request('net_peerCount'),
408-
MockPeerPool(peer_count=1),
414+
mock_peer_count(1),
409415
{'result': '0x1', 'id': 3, 'jsonrpc': '2.0'},
410416
),
411417
(
412418
build_request('net_peerCount'),
413-
MockPeerPool(peer_count=0),
419+
mock_peer_count(0),
414420
{'result': '0x0', 'id': 3, 'jsonrpc': '2.0'},
415421
),
416422
),
@@ -422,10 +428,17 @@ async def test_peer_pool_over_ipc(
422428
monkeypatch,
423429
jsonrpc_ipc_pipe_path,
424430
request_msg,
425-
mock_peer_pool,
431+
event_bus_setup_fn,
432+
event_bus,
426433
expected,
427434
event_loop,
428435
ipc_server):
429-
monkeypatch.setattr(ipc_server.rpc.modules['net'], '_peer_pool', mock_peer_pool)
430-
result = await get_ipc_response(jsonrpc_ipc_pipe_path, request_msg, event_loop)
436+
437+
asyncio.ensure_future(event_bus_setup_fn(event_bus))
438+
439+
result = await get_ipc_response(
440+
jsonrpc_ipc_pipe_path,
441+
request_msg,
442+
event_loop
443+
)
431444
assert result == expected

tests/trinity/json-fixtures-over-rpc/test_rpc_fixtures.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
should_run_slow_tests,
2626
)
2727

28+
from trinity.chains.mainnet import (
29+
MainnetFullChain
30+
)
2831
from trinity.rpc import RPCServer
2932
from trinity.rpc.format import (
3033
empty_to_0x,
@@ -383,7 +386,7 @@ def chain(chain_without_block_validation):
383386

384387
@pytest.mark.asyncio
385388
async def test_rpc_against_fixtures(chain, ipc_server, chain_fixture, fixture_data):
386-
rpc = RPCServer(None)
389+
rpc = RPCServer(MainnetFullChain(None))
387390

388391
setup_result, setup_error = await call_rpc(rpc, 'evm_resetToGenesisFixture', [chain_fixture])
389392
assert setup_error is None and setup_result is True, "cannot load chain for %r" % fixture_data

trinity/chains/coro.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from trinity.utils.async_dispatch import (
2+
async_method,
3+
)
4+
5+
6+
class AsyncChainMixin:
7+
8+
coro_get_canonical_block_by_number = async_method('get_canonical_block_by_number')
9+
coro_get_block_by_hash = async_method('get_block_by_hash')
10+
coro_get_block_by_header = async_method('get_block_by_header')

0 commit comments

Comments
 (0)