Skip to content

Commit d72bddf

Browse files
committed
Use a LevelDB instance as the node cache in StateDownloader
Closes: #1154
1 parent 13acad0 commit d72bddf

File tree

3 files changed

+29
-10
lines changed

3 files changed

+29
-10
lines changed

tests/trinity/core/p2p-proto/test_state_sync.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ def test_trie_sync(random, event_loop):
6060
async def _test_trie_sync():
6161
src_trie, contents = make_random_trie(random)
6262
dest_db = FakeAsyncMemoryDB()
63-
scheduler = HexaryTrieSync(src_trie.root_hash, dest_db, TraceLogger("test"))
63+
nodes_cache = MemoryDB()
64+
scheduler = HexaryTrieSync(src_trie.root_hash, dest_db, nodes_cache, TraceLogger("test"))
6465
requests = scheduler.next_batch()
6566
while len(requests) > 0:
6667
results = []
@@ -99,7 +100,8 @@ def make_random_state(n):
99100
async def test_state_sync():
100101
raw_db, state_root, contents = make_random_state(1000)
101102
dest_db = FakeAsyncMemoryDB()
102-
scheduler = StateSync(state_root, dest_db, TraceLogger('test'))
103+
nodes_cache = MemoryDB()
104+
scheduler = StateSync(state_root, dest_db, nodes_cache, TraceLogger('test'))
103105
requests = scheduler.next_batch(10)
104106
while requests:
105107
results = []

trinity/sync/full/hexary_trie.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,20 @@
44
Callable,
55
Dict,
66
List,
7-
Set,
87
Tuple,
98
)
109

1110
from eth_utils import (
1211
encode_hex,
1312
)
14-
from eth.utils.logging import TraceLogger
1513

1614
from eth_typing import (
1715
Hash32
1816
)
1917

18+
from eth.db.backends.base import BaseDB
19+
from eth.utils.logging import TraceLogger
20+
2021
from trie.constants import (
2122
NODE_TYPE_BLANK,
2223
NODE_TYPE_BRANCH,
@@ -111,16 +112,24 @@ def _get_children(node: Hash32, depth: int
111112

112113
class HexaryTrieSync:
113114

114-
def __init__(self, root_hash: Hash32, db: AsyncBaseDB, logger: TraceLogger) -> None:
115+
def __init__(self,
116+
root_hash: Hash32,
117+
db: AsyncBaseDB,
118+
nodes_cache: BaseDB,
119+
logger: TraceLogger) -> None:
120+
# Nodes that haven't been requested yet.
115121
self.queue: List[SyncRequest] = []
122+
# Nodes that have been requested to a peer, but not yet committed to the DB, either
123+
# because we haven't processed a reply containing them or because some of their children
124+
# haven't been retrieved/committed yet.
116125
self.requests: Dict[Hash32, SyncRequest] = {}
117126
self.db = db
118127
self.root_hash = root_hash
119128
self.logger = logger
120129
# A cache of node hashes we know to exist in our DB, used to avoid querying the DB
121130
# unnecessarily as that's the main bottleneck when dealing with a large DB like for
122131
# ethereum's mainnet/ropsten.
123-
self._existing_nodes: Set[Hash32] = set()
132+
self.nodes_cache = nodes_cache
124133
self.committed_nodes = 0
125134
if root_hash in self.db:
126135
self.logger.info("Root node (%s) already exists in DB, nothing to do", root_hash)
@@ -150,11 +159,11 @@ async def schedule(self, node_key: Hash32, parent: SyncRequest, depth: int,
150159
leaf_callback: Callable[[bytes, 'SyncRequest'], Awaitable[None]],
151160
is_raw: bool = False) -> None:
152161
"""Schedule a request for the node with the given key."""
153-
if node_key in self._existing_nodes:
162+
if node_key in self.nodes_cache:
154163
self.logger.trace("Node %s already exists in db", encode_hex(node_key))
155164
return
156165
if await self.db.coro_exists(node_key):
157-
self._existing_nodes.add(node_key)
166+
self.nodes_cache[node_key] = b''
158167
self.logger.trace("Node %s already exists in db", encode_hex(node_key))
159168
return
160169
self._schedule(node_key, parent, depth, leaf_callback, is_raw)
@@ -224,7 +233,7 @@ async def commit(self, request: SyncRequest) -> None:
224233
"""
225234
self.committed_nodes += 1
226235
await self.db.coro_set(request.node_key, request.data)
227-
self._existing_nodes.add(request.node_key)
236+
self.nodes_cache[request.node_key] = b''
228237
self.requests.pop(request.node_key)
229238
for ancestor in request.parents:
230239
ancestor.dependencies -= 1

trinity/sync/full/state.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import collections
33
import itertools
44
import logging
5+
from pathlib import Path
6+
import tempfile
57
import time
68
from typing import (
79
Any,
@@ -33,6 +35,7 @@
3335
BLANK_ROOT_HASH,
3436
EMPTY_SHA3,
3537
)
38+
from eth.db.backends.level import LevelDB
3639
from eth.rlp.accounts import Account
3740
from eth.utils.logging import TraceLogger
3841

@@ -80,7 +83,11 @@ def __init__(self,
8083
self.chaindb = chaindb
8184
self.peer_pool = peer_pool
8285
self.root_hash = root_hash
83-
self.scheduler = StateSync(root_hash, account_db, self.logger)
86+
# We use a LevelDB instance for the nodes cache because a full state download, if run
87+
# uninterrupted will visit more than 180M nodes, making an in-memory cache unfeasible.
88+
self._nodes_cache_dir = tempfile.TemporaryDirectory(prefix="pyevm-state-sync-cache")
89+
self.scheduler = StateSync(
90+
root_hash, account_db, LevelDB(cast(Path, self._nodes_cache_dir.name)), self.logger)
8491
self._handler = PeerRequestHandler(self.chaindb, self.logger, self.cancel_token)
8592
self.request_tracker = TrieNodeRequestTracker(self._reply_timeout, self.logger)
8693
self._peer_missing_nodes: Dict[ETHPeer, Set[Hash32]] = collections.defaultdict(set)
@@ -193,6 +200,7 @@ async def _handle_get_block_headers(self, peer: ETHPeer, request: HeaderRequest)
193200
peer.sub_proto.send_block_headers(headers)
194201

195202
async def _cleanup(self) -> None:
203+
self._nodes_cache_dir.cleanup()
196204
# We don't need to cancel() anything, but we yield control just so that the coroutines we
197205
# run in the background notice the cancel token has been triggered and return.
198206
# Also, don't use self.sleep() here as the cancel token will be triggered and that will

0 commit comments

Comments
 (0)