Skip to content

Commit 8629750

Browse files
authored
Merge pull request #1392 from carver/header-skeleton-sync
Header skeleton sync
2 parents b3362ed + 9935848 commit 8629750

File tree

13 files changed

+1142
-234
lines changed

13 files changed

+1142
-234
lines changed

eth/chains/base.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,16 @@ def validate_chain(
349349
child, child.parent_hash, parent.hash))
350350
should_check_seal = index in indices_to_check_seal
351351
vm_class = cls.get_vm_class_for_block_number(child.block_number)
352-
vm_class.validate_header(child, parent, check_seal=should_check_seal)
352+
try:
353+
vm_class.validate_header(child, parent, check_seal=should_check_seal)
354+
except ValidationError as exc:
355+
raise ValidationError(
356+
"%s is not a valid child of %s: %s" % (
357+
child,
358+
parent,
359+
exc,
360+
)
361+
) from exc
353362

354363

355364
class Chain(BaseChain):

tests/trinity/core/integration_test_helpers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ def load_mining_chain(db):
147147

148148
class DBFixture(Enum):
149149
twenty_pow_headers = '20pow_headers.ldb'
150+
thousand_pow_headers = '1000pow_headers.ldb'
150151

151152

152153
def load_fixture_db(db_fixture, db_class=LevelDB):

tests/trinity/core/p2p-proto/test_sync.py

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,46 @@ def finalizer():
7474
assert head.state_root in chaindb_fresh.db
7575

7676

77+
@pytest.mark.asyncio
78+
async def test_skeleton_syncer(request, event_loop, chaindb_fresh, chaindb_1000):
79+
client_peer, server_peer = await get_directly_linked_peers(
80+
request, event_loop,
81+
alice_headerdb=FakeAsyncHeaderDB(chaindb_fresh.db),
82+
bob_headerdb=FakeAsyncHeaderDB(chaindb_1000.db))
83+
client_peer_pool = MockPeerPoolWithConnectedPeers([client_peer])
84+
client = FastChainSyncer(ByzantiumTestChain(chaindb_fresh.db), chaindb_fresh, client_peer_pool)
85+
server_peer_pool = MockPeerPoolWithConnectedPeers([server_peer])
86+
server = RegularChainSyncer(
87+
ByzantiumTestChain(chaindb_1000.db),
88+
chaindb_1000,
89+
server_peer_pool,
90+
)
91+
asyncio.ensure_future(server.run())
92+
server_request_handler = ETHRequestServer(FakeAsyncChainDB(chaindb_1000.db), server_peer_pool)
93+
asyncio.ensure_future(server_request_handler.run())
94+
client_peer.logger.info("%s is serving 1000 blocks", client_peer)
95+
server_peer.logger.info("%s is syncing up 1000 blocks", server_peer)
96+
97+
def finalizer():
98+
event_loop.run_until_complete(server.cancel())
99+
# Yield control so that server.run() returns, otherwise asyncio will complain.
100+
event_loop.run_until_complete(asyncio.sleep(0.1))
101+
request.addfinalizer(finalizer)
102+
103+
# FastChainSyncer.run() will return as soon as it's caught up with the peer.
104+
await asyncio.wait_for(client.run(), timeout=20)
105+
106+
head = chaindb_fresh.get_canonical_head()
107+
assert head == chaindb_1000.get_canonical_head()
108+
109+
# Now download the state for the chain's head.
110+
state_downloader = StateDownloader(
111+
chaindb_fresh, chaindb_fresh.db, head.state_root, client_peer_pool)
112+
await asyncio.wait_for(state_downloader.run(), timeout=20)
113+
114+
assert head.state_root in chaindb_fresh.db
115+
116+
77117
@pytest.mark.asyncio
78118
async def test_regular_syncer(request, event_loop, chaindb_fresh, chaindb_20):
79119
client_peer, server_peer = await get_directly_linked_peers(
@@ -108,9 +148,9 @@ def finalizer():
108148

109149
asyncio.ensure_future(client.run())
110150

111-
await wait_for_head(client.db, server.db.get_canonical_head())
112-
head = client.db.get_canonical_head()
113-
assert head.state_root in client.db.db
151+
await wait_for_head(chaindb_fresh, chaindb_20.get_canonical_head())
152+
head = chaindb_fresh.get_canonical_head()
153+
assert head.state_root in chaindb_fresh.db
114154

115155

116156
@pytest.mark.asyncio
@@ -148,14 +188,26 @@ def finalizer():
148188

149189
asyncio.ensure_future(client.run())
150190

151-
await wait_for_head(client.db, server.db.get_canonical_head())
191+
await wait_for_head(chaindb_fresh, chaindb_20.get_canonical_head())
152192

153193

154194
@pytest.fixture
155195
def leveldb_20():
156196
yield from load_fixture_db(DBFixture.twenty_pow_headers)
157197

158198

199+
@pytest.fixture
200+
def leveldb_1000():
201+
yield from load_fixture_db(DBFixture.thousand_pow_headers)
202+
203+
204+
@pytest.fixture
205+
def chaindb_1000(leveldb_1000):
206+
chain = load_mining_chain(FakeAsyncAtomicDB(leveldb_1000))
207+
assert chain.chaindb.get_canonical_head().block_number == 1000
208+
return chain.chaindb
209+
210+
159211
@pytest.fixture
160212
def chaindb_20(leveldb_20):
161213
chain = load_mining_chain(FakeAsyncAtomicDB(leveldb_20))
Binary file not shown.

trinity/protocol/eth/sync.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from trinity.protocol.eth.monitors import ETHChainTipMonitor
2+
from trinity.protocol.eth.peer import ETHPeer
3+
from trinity.sync.common.headers import BaseHeaderChainSyncer
4+
5+
6+
class ETHHeaderChainSyncer(BaseHeaderChainSyncer[ETHPeer]):
7+
tip_monitor_class = ETHChainTipMonitor

trinity/protocol/les/sync.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from trinity.protocol.les.monitors import LightChainTipMonitor
2+
from trinity.protocol.les.peer import LESPeer
3+
from trinity.sync.common.headers import BaseHeaderChainSyncer
4+
5+
6+
class LightHeaderChainSyncer(BaseHeaderChainSyncer[LESPeer]):
7+
tip_monitor_class = LightChainTipMonitor

trinity/sync/common/chain.py

Lines changed: 3 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,6 @@
1-
from abc import abstractmethod
2-
from contextlib import contextmanager
3-
from operator import attrgetter
41
from typing import (
52
AsyncIterator,
6-
Iterator,
7-
Optional,
83
Tuple,
9-
Type,
104
)
115

126
from cancel_token import (
@@ -43,126 +37,13 @@
4337

4438
from trinity.chains.base import BaseAsyncChain
4539
from trinity.db.header import AsyncHeaderDB
46-
from trinity.protocol.common.monitors import BaseChainTipMonitor
47-
from trinity.protocol.common.peer import BaseChainPeer, BaseChainPeerPool
48-
from trinity.protocol.eth.peer import ETHPeer
49-
from trinity.sync.common.events import SyncingRequest, SyncingResponse
50-
from trinity.utils.datastructures import TaskQueue
40+
from trinity.protocol.common.peer import (
41+
BaseChainPeer,
42+
)
5143

5244
from .types import SyncProgress
5345

5446

55-
class BaseHeaderChainSyncer(BaseService):
56-
"""
57-
Sync with the Ethereum network by fetching/storing block headers.
58-
59-
Here, the run() method will execute the sync loop until our local head is the same as the one
60-
with the highest TD announced by any of our peers.
61-
"""
62-
# We'll only sync if we are connected to at least min_peers_to_sync.
63-
min_peers_to_sync = 1
64-
# the latest header hash of the peer on the current sync
65-
header_queue: TaskQueue[BlockHeader]
66-
67-
def __init__(self,
68-
chain: BaseAsyncChain,
69-
db: AsyncHeaderDB,
70-
peer_pool: BaseChainPeerPool,
71-
token: CancelToken = None) -> None:
72-
super().__init__(token)
73-
self.chain = chain
74-
self.db = db
75-
self.peer_pool = peer_pool
76-
self._peer_header_syncer: 'PeerHeaderSyncer' = None
77-
self._last_target_header_hash: Hash32 = None
78-
self._tip_monitor = self.tip_monitor_class(peer_pool, token=self.cancel_token)
79-
80-
# pending queue size should be big enough to avoid starving the processing consumers, but
81-
# small enough to avoid wasteful over-requests before post-processing can happen
82-
max_pending_headers = ETHPeer.max_headers_fetch * 8
83-
self.header_queue = TaskQueue(max_pending_headers, attrgetter('block_number'))
84-
85-
def get_target_header_hash(self) -> Hash32:
86-
if self._peer_header_syncer is None and self._last_target_header_hash is None:
87-
raise ValidationError("Cannot check the target hash before a sync has run")
88-
elif self._peer_header_syncer is not None:
89-
return self._peer_header_syncer.get_target_header_hash()
90-
else:
91-
return self._last_target_header_hash
92-
93-
@property
94-
@abstractmethod
95-
def tip_monitor_class(self) -> Type[BaseChainTipMonitor]:
96-
pass
97-
98-
async def _run(self) -> None:
99-
self.run_daemon(self._tip_monitor)
100-
if self.peer_pool.event_bus is not None:
101-
self.run_daemon_task(self.handle_sync_status_requests())
102-
try:
103-
async for highest_td_peer in self._tip_monitor.wait_tip_info():
104-
self.run_task(self.sync(highest_td_peer))
105-
except OperationCancelled:
106-
# In the case of a fast sync, we return once the sync is completed, and our
107-
# caller must then run the StateDownloader.
108-
return
109-
else:
110-
self.logger.debug("chain tip monitor stopped returning tip info to %s", self)
111-
112-
@property
113-
def _syncing(self) -> bool:
114-
return self._peer_header_syncer is not None
115-
116-
@contextmanager
117-
def _get_peer_header_syncer(self, peer: BaseChainPeer) -> Iterator['PeerHeaderSyncer']:
118-
if self._syncing:
119-
raise ValidationError("Cannot sync headers from two peers at the same time")
120-
121-
self._peer_header_syncer = PeerHeaderSyncer(
122-
self.chain,
123-
self.db,
124-
peer,
125-
self.cancel_token,
126-
)
127-
self.run_child_service(self._peer_header_syncer)
128-
try:
129-
yield self._peer_header_syncer
130-
except OperationCancelled:
131-
pass
132-
else:
133-
self._peer_header_syncer.cancel_nowait()
134-
finally:
135-
self.logger.info("Header Sync with %s ended", peer)
136-
self._last_target_header_hash = self._peer_header_syncer.get_target_header_hash()
137-
self._peer_header_syncer = None
138-
139-
async def sync(self, peer: BaseChainPeer) -> None:
140-
if self._syncing:
141-
self.logger.debug(
142-
"Got a NewBlock or a new peer, but already syncing so doing nothing")
143-
return
144-
elif len(self.peer_pool) < self.min_peers_to_sync:
145-
self.logger.info(
146-
"Connected to less peers (%d) than the minimum (%d) required to sync, "
147-
"doing nothing", len(self.peer_pool), self.min_peers_to_sync)
148-
return
149-
150-
with self._get_peer_header_syncer(peer) as syncer:
151-
async for header_batch in syncer.next_header_batch():
152-
new_headers = tuple(h for h in header_batch if h not in self.header_queue)
153-
await self.wait(self.header_queue.add(new_headers))
154-
155-
def get_sync_status(self) -> Tuple[bool, Optional[SyncProgress]]:
156-
if not self._syncing:
157-
return False, None
158-
return True, self._peer_header_syncer.sync_progress
159-
160-
async def handle_sync_status_requests(self) -> None:
161-
async for req in self.peer_pool.event_bus.stream(SyncingRequest):
162-
self.peer_pool.event_bus.broadcast(SyncingResponse(*self.get_sync_status()),
163-
req.broadcast_config())
164-
165-
16647
class PeerHeaderSyncer(BaseService):
16748
"""
16849
Sync as many headers as possible with a given peer.

trinity/sync/common/constants.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# If a peer returns 0 results, wait this many seconds before asking it for anything else
2+
EMPTY_PEER_RESPONSE_PENALTY = 15.0
3+
4+
# Picked a reorg number that is covered by a single skeleton header request,
5+
# which covers about 6 days at 15s blocks
6+
MAX_SKELETON_REORG_DEPTH = 35000

0 commit comments

Comments
 (0)