Skip to content

Commit e48b279

Browse files
rayrapetyanpipermerriam
authored andcommitted
Implement eth_syncing endpoint (#1421)
* Implement eth_syncing endpoint * Implement eth_syncing endpoint * Remove slash based line continuation.
1 parent 90423f1 commit e48b279

File tree

7 files changed

+174
-16
lines changed

7 files changed

+174
-16
lines changed

tests/trinity/core/json-rpc/test_ipc.py

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@
2222
NetworkIdRequest,
2323
NetworkIdResponse,
2424
)
25+
from trinity.sync.common.events import (
26+
SyncingRequest,
27+
SyncingResponse,
28+
)
29+
from trinity.sync.common.types import (
30+
SyncProgress
31+
)
2532

2633
from trinity.utils.version import construct_trinity_client_identifier
2734

@@ -447,7 +454,54 @@ async def mock_event_bus_interaction(bus):
447454
],
448455
)
449456
async def test_peer_pool_over_ipc(
450-
monkeypatch,
457+
jsonrpc_ipc_pipe_path,
458+
request_msg,
459+
event_bus_setup_fn,
460+
event_bus,
461+
expected,
462+
event_loop,
463+
ipc_server):
464+
465+
asyncio.ensure_future(event_bus_setup_fn(event_bus))
466+
467+
result = await get_ipc_response(
468+
jsonrpc_ipc_pipe_path,
469+
request_msg,
470+
event_loop
471+
)
472+
assert result == expected
473+
474+
475+
def mock_syncing(is_syncing, progress=None):
476+
async def mock_event_bus_interaction(bus):
477+
async for req in bus.stream(SyncingRequest):
478+
bus.broadcast(SyncingResponse(is_syncing, progress), req.broadcast_config())
479+
break
480+
481+
return mock_event_bus_interaction
482+
483+
484+
@pytest.mark.asyncio
485+
@pytest.mark.parametrize(
486+
'request_msg, event_bus_setup_fn, expected',
487+
(
488+
(
489+
build_request('eth_syncing'),
490+
mock_syncing(False),
491+
{'result': False, 'id': 3, 'jsonrpc': '2.0'},
492+
),
493+
(
494+
build_request('eth_syncing'),
495+
mock_syncing(True, SyncProgress(0, 1, 2)),
496+
{'result': {'startingBlock': 0, 'currentBlock': 1, 'highestBlock': 2}, 'id': 3,
497+
'jsonrpc': '2.0'},
498+
),
499+
),
500+
ids=[
501+
'eth_syncing_F', 'eth_syncing_T',
502+
],
503+
)
504+
async def test_eth_over_ipc(
451505
jsonrpc_ipc_pipe_path,
452506
request_msg,
453507
event_bus_setup_fn,

trinity/protocol/common/peer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class BaseChainPeer(BasePeer):
4848

4949
head_td: int = None
5050
head_hash: Hash32 = None
51+
head_number: BlockNumber = None
5152

5253
@property
5354
@abstractmethod

trinity/protocol/les/peer.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@
4444

4545

4646
class LESPeer(BaseChainPeer):
47-
head_number: BlockNumber = None
48-
4947
max_headers_fetch = MAX_HEADERS_FETCH
5048

5149
_supported_sub_protocols = [LESProtocol, LESProtocolV2]

trinity/rpc/modules/eth.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
List,
99
Union,
1010
)
11+
from mypy_extensions import (
12+
TypedDict,
13+
)
1114

1215
from eth_typing import (
1316
Address,
@@ -51,6 +54,9 @@
5154
from trinity.rpc.modules import (
5255
RPCModule,
5356
)
57+
from trinity.sync.common.events import (
58+
SyncingRequest,
59+
)
5460
from trinity.utils.validation import (
5561
validate_transaction_call_dict,
5662
validate_transaction_gas_estimation_dict,
@@ -264,5 +270,17 @@ async def mining(self) -> bool:
264270
async def protocolVersion(self) -> str:
265271
return "63"
266272

267-
async def syncing(self) -> bool:
268-
raise NotImplementedError()
273+
class SyncProgress(TypedDict):
274+
startingBlock: BlockNumber
275+
currentBlock: BlockNumber
276+
highestBlock: BlockNumber
277+
278+
async def syncing(self) -> Union[bool, SyncProgress]:
279+
res = await self._event_bus.request(SyncingRequest())
280+
if res.is_syncing:
281+
return {
282+
"startingBlock": res.progress.starting_block,
283+
"currentBlock": res.progress.current_block,
284+
"highestBlock": res.progress.highest_block
285+
}
286+
return False

trinity/sync/common/chain.py

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,15 @@
44
from typing import (
55
AsyncIterator,
66
Iterator,
7+
Optional,
78
Tuple,
89
Type,
910
)
1011

11-
from cancel_token import CancelToken, OperationCancelled
12+
from cancel_token import (
13+
CancelToken,
14+
OperationCancelled,
15+
)
1216

1317
from eth.constants import GENESIS_BLOCK_NUMBER
1418
from eth.chains import AsyncChain
@@ -23,17 +27,43 @@
2327
encode_hex,
2428
ValidationError,
2529
)
26-
from eth.rlp.headers import BlockHeader
30+
from eth.rlp.headers import (
31+
BlockHeader,
32+
)
33+
34+
from p2p.constants import (
35+
MAX_REORG_DEPTH,
36+
SEAL_CHECK_RANDOM_SAMPLE_RATE,
37+
)
38+
from p2p.p2p_proto import (
39+
DisconnectReason,
40+
)
41+
from p2p.service import (
42+
BaseService,
43+
)
2744

28-
from p2p.constants import MAX_REORG_DEPTH, SEAL_CHECK_RANDOM_SAMPLE_RATE
29-
from p2p.p2p_proto import DisconnectReason
30-
from p2p.service import BaseService
45+
from trinity.db.header import (
46+
AsyncHeaderDB,
47+
)
48+
from trinity.protocol.common.monitors import (
49+
BaseChainTipMonitor,
50+
)
51+
from trinity.protocol.common.peer import (
52+
BaseChainPeer,
53+
BaseChainPeerPool,
54+
)
55+
from trinity.protocol.eth.peer import (
56+
ETHPeer,
57+
)
58+
from trinity.sync.common.events import (
59+
SyncingRequest,
60+
SyncingResponse,
61+
)
62+
from trinity.utils.datastructures import (
63+
TaskQueue,
64+
)
3165

32-
from trinity.db.header import AsyncHeaderDB
33-
from trinity.protocol.common.monitors import BaseChainTipMonitor
34-
from trinity.protocol.common.peer import BaseChainPeer, BaseChainPeerPool
35-
from trinity.protocol.eth.peer import ETHPeer
36-
from trinity.utils.datastructures import TaskQueue
66+
from .types import SyncProgress
3767

3868

3969
class BaseHeaderChainSyncer(BaseService):
@@ -81,6 +111,8 @@ def tip_monitor_class(self) -> Type[BaseChainTipMonitor]:
81111

82112
async def _run(self) -> None:
83113
self.run_daemon(self._tip_monitor)
114+
if self.peer_pool.event_bus is not None:
115+
self.run_daemon_task(self.handle_sync_status_requests())
84116
try:
85117
async for highest_td_peer in self._tip_monitor.wait_tip_info():
86118
self.run_task(self.sync(highest_td_peer))
@@ -134,6 +166,16 @@ async def sync(self, peer: BaseChainPeer) -> None:
134166
new_headers = tuple(h for h in header_batch if h not in self.header_queue)
135167
await self.wait(self.header_queue.add(new_headers))
136168

169+
def get_sync_status(self) -> Tuple[bool, Optional[SyncProgress]]:
170+
if not self._syncing:
171+
return False, None
172+
return True, self._peer_header_syncer.sync_progress
173+
174+
async def handle_sync_status_requests(self) -> None:
175+
async for req in self.peer_pool.event_bus.stream(SyncingRequest):
176+
self.peer_pool.event_bus.broadcast(SyncingResponse(*self.get_sync_status()),
177+
req.broadcast_config())
178+
137179

138180
class PeerHeaderSyncer(BaseService):
139181
"""
@@ -152,6 +194,7 @@ def __init__(self,
152194
super().__init__(token)
153195
self.chain = chain
154196
self.db = db
197+
self.sync_progress: SyncProgress = None
155198
self._peer = peer
156199
self._target_header_hash = peer.head_hash
157200

@@ -183,7 +226,7 @@ async def next_header_batch(self) -> AsyncIterator[Tuple[BlockHeader, ...]]:
183226
self.logger.debug(
184227
"%s announced Head TD %d, which is higher than ours (%d), starting sync",
185228
peer, peer.head_td, head_td)
186-
229+
self.sync_progress = SyncProgress(head.block_number, head.block_number, peer.head_number)
187230
self.logger.info("Starting sync with %s", peer)
188231
last_received_header: BlockHeader = None
189232
# When we start the sync with a peer, we always request up to MAX_REORG_DEPTH extra
@@ -306,6 +349,9 @@ async def next_header_batch(self) -> AsyncIterator[Tuple[BlockHeader, ...]]:
306349

307350
yield headers
308351
last_received_header = headers[-1]
352+
self.sync_progress = self.sync_progress.update_current_block(
353+
last_received_header.block_number,
354+
)
309355
start_at = last_received_header.block_number + 1
310356

311357
async def _request_headers(

trinity/sync/common/events.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from typing import (
2+
Optional,
3+
Type,
4+
)
5+
6+
from lahja import (
7+
BaseEvent,
8+
BaseRequestResponseEvent,
9+
)
10+
11+
from trinity.sync.common.types import (
12+
SyncProgress
13+
)
14+
15+
16+
class SyncingResponse(BaseEvent):
17+
def __init__(self, is_syncing: bool, progress: Optional[SyncProgress]) -> None:
18+
self.is_syncing: bool = is_syncing
19+
self.progress: Optional[SyncProgress] = progress
20+
21+
22+
class SyncingRequest(BaseRequestResponseEvent[SyncingResponse]):
23+
@staticmethod
24+
def expected_response_type() -> Type[SyncingResponse]:
25+
return SyncingResponse

trinity/sync/common/types.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from typing import (
2+
NamedTuple,
3+
)
4+
5+
from eth_typing import (
6+
BlockNumber,
7+
)
8+
9+
10+
class SyncProgress(NamedTuple):
11+
starting_block: BlockNumber
12+
current_block: BlockNumber
13+
highest_block: BlockNumber
14+
15+
def update_current_block(self, new_current_block: BlockNumber) -> 'SyncProgress':
16+
return SyncProgress(self.starting_block, new_current_block, self.highest_block)

0 commit comments

Comments
 (0)