Skip to content

Commit a4d9d24

Browse files
authored
Move HexaryTrieSync from py-trie here, and make it async (#1124)
1 parent 2fac7ef commit a4d9d24

File tree

9 files changed

+371
-42
lines changed

9 files changed

+371
-42
lines changed

tests/trinity/core/integration_test_helpers.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@
66
from eth.chains.base import (
77
MiningChain,
88
)
9+
from eth.db.backends.level import LevelDB
10+
from eth.db.backends.memory import MemoryDB
911

12+
from trinity.db.base import AsyncBaseDB
1013
from trinity.db.chain import AsyncChainDB
1114
from trinity.db.header import AsyncHeaderDB
1215

@@ -31,6 +34,16 @@ async def passthrough_method(self, *args, **kwargs):
3134
return passthrough_method
3235

3336

37+
class FakeAsyncMemoryDB(MemoryDB, AsyncBaseDB):
38+
coro_set = async_passthrough('set')
39+
coro_exists = async_passthrough('exists')
40+
41+
42+
class FakeAsyncLevelDB(LevelDB, AsyncBaseDB):
43+
coro_set = async_passthrough('set')
44+
coro_exists = async_passthrough('exists')
45+
46+
3447
class FakeAsyncHeaderDB(AsyncHeaderDB):
3548
coro_get_canonical_block_hash = async_passthrough('get_canonical_block_hash')
3649
coro_get_canonical_block_header_by_number = async_passthrough('get_canonical_block_header_by_number') # noqa: E501

tests/trinity/core/p2p-proto/test_state_sync.py

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,75 @@
22
import random
33
import time
44

5+
import pytest
6+
7+
from hypothesis import (
8+
given,
9+
settings,
10+
strategies,
11+
example,
12+
)
13+
from hypothesis.types import RandomWithSeed
14+
15+
from trie import HexaryTrie
16+
517
from eth.db.backends.memory import MemoryDB
618
from eth.db.account import AccountDB
719
from eth.utils.logging import TraceLogger
820

21+
from trinity.sync.full.hexary_trie import HexaryTrieSync
922
from trinity.sync.full.state import StateSync, TrieNodeRequestTracker
1023

24+
from tests.trinity.core.integration_test_helpers import FakeAsyncMemoryDB
25+
26+
27+
# produces a branch node with an extention node who's encoding is less than 32
28+
# bytes in length so it is inlined.
29+
EXAMPLE_37968 = 37968
30+
31+
# produces an top level extension node who's encoding is less than 32 bytes in
32+
# length so it gets inlined.
33+
EXAMPLE_809368 = 809368
34+
35+
36+
def make_random_trie(random):
37+
trie = HexaryTrie({})
38+
contents = {}
39+
for _ in range(1000):
40+
key_length = random.randint(2, 32)
41+
key = bytes([random.randint(0, 255) for _ in range(key_length)])
42+
value_length = random.randint(2, 64)
43+
value = bytes([random.randint(0, 255) for _ in range(value_length)])
44+
trie[key] = value
45+
contents[key] = value
46+
return trie, contents
47+
48+
49+
@given(random=strategies.randoms())
50+
@settings(max_examples=10)
51+
@example(random=RandomWithSeed(EXAMPLE_37968))
52+
@example(random=RandomWithSeed(EXAMPLE_809368))
53+
def test_trie_sync(random, event_loop):
54+
55+
# Apparently hypothesis tests cannot be used in conjunction with pytest-asyncio yet, so do it
56+
# like this for now. https://github.com/HypothesisWorks/hypothesis/pull/1343
57+
async def _test_trie_sync():
58+
src_trie, contents = make_random_trie(random)
59+
dest_db = FakeAsyncMemoryDB()
60+
scheduler = HexaryTrieSync(src_trie.root_hash, dest_db, TraceLogger("test"))
61+
requests = scheduler.next_batch()
62+
while len(requests) > 0:
63+
results = []
64+
for request in requests:
65+
results.append([request.node_key, src_trie.db[request.node_key]])
66+
await scheduler.process(results)
67+
requests = scheduler.next_batch(10)
68+
dest_trie = HexaryTrie(dest_db, src_trie.root_hash)
69+
for key, value in contents.items():
70+
assert dest_trie[key] == value
71+
72+
event_loop.run_until_complete(_test_trie_sync())
73+
1174

1275
def make_random_state(n):
1376
raw_db = MemoryDB()
@@ -29,16 +92,17 @@ def make_random_state(n):
2992
return raw_db, account_db.state_root, contents
3093

3194

32-
def test_state_sync():
95+
@pytest.mark.asyncio
96+
async def test_state_sync():
3397
raw_db, state_root, contents = make_random_state(1000)
34-
dest_db = MemoryDB()
35-
scheduler = StateSync(state_root, dest_db)
98+
dest_db = FakeAsyncMemoryDB()
99+
scheduler = StateSync(state_root, dest_db, TraceLogger('test'))
36100
requests = scheduler.next_batch(10)
37101
while requests:
38102
results = []
39103
for request in requests:
40104
results.append([request.node_key, raw_db[request.node_key]])
41-
scheduler.process(results)
105+
await scheduler.process(results)
42106
requests = scheduler.next_batch(10)
43107

44108
result_account_db = AccountDB(dest_db, state_root)

tests/trinity/core/p2p-proto/test_sync.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from eth_utils import decode_hex
77

88
from eth import constants
9-
from eth.db.backends.memory import MemoryDB
109
from eth.vm.forks.frontier import FrontierVM, _PoWMiningVM
1110

1211

@@ -20,6 +19,7 @@
2019
FakeAsyncChain,
2120
FakeAsyncChainDB,
2221
FakeAsyncHeaderDB,
22+
FakeAsyncMemoryDB,
2323
)
2424
from tests.trinity.core.peer_helpers import (
2525
get_directly_linked_peers,
@@ -137,7 +137,7 @@ def finalizer():
137137

138138
@pytest.fixture
139139
def chaindb_20():
140-
chain = PoWMiningChain.from_genesis(MemoryDB(), GENESIS_PARAMS, GENESIS_STATE)
140+
chain = PoWMiningChain.from_genesis(FakeAsyncMemoryDB(), GENESIS_PARAMS, GENESIS_STATE)
141141
for i in range(20):
142142
tx = chain.create_unsigned_transaction(
143143
nonce=i,
@@ -154,7 +154,7 @@ def chaindb_20():
154154

155155
@pytest.fixture
156156
def chaindb_fresh():
157-
chain = PoWMiningChain.from_genesis(MemoryDB(), GENESIS_PARAMS, GENESIS_STATE)
157+
chain = PoWMiningChain.from_genesis(FakeAsyncMemoryDB(), GENESIS_PARAMS, GENESIS_STATE)
158158
assert chain.chaindb.get_canonical_head().block_number == 0
159159
return chain.chaindb
160160

trinity/db/base.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
BaseProxy,
55
)
66

7+
from eth.db.backends.base import BaseDB
8+
9+
from trinity.utils.mp import async_method
10+
711

812
class DBProxy(BaseProxy):
913
_exposed_ = (
@@ -15,7 +19,11 @@ class DBProxy(BaseProxy):
1519
'exists',
1620
'get',
1721
'set',
22+
'coro_set',
23+
'coro_exists',
1824
)
25+
coro_set = async_method('set')
26+
coro_exists = async_method('exists')
1927

2028
def get(self, key: bytes) -> bytes:
2129
return self._callmethod('get', (key,))
@@ -40,3 +48,12 @@ def exists(self, key: bytes) -> bool:
4048

4149
def __contains__(self, key: bytes) -> bool:
4250
return self._callmethod('__contains__', (key,))
51+
52+
53+
class AsyncBaseDB(BaseDB):
54+
55+
async def coro_set(self, key: bytes, value: bytes) -> None:
56+
raise NotImplementedError()
57+
58+
async def coro_exists(self, key: bytes) -> bool:
59+
raise NotImplementedError()

trinity/exceptions.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,10 @@ class AlreadyWaiting(BaseTrinityError):
3030
peer when there is already an active wait for that message type.
3131
"""
3232
pass
33+
34+
35+
class SyncRequestAlreadyProcessed(BaseTrinityError):
36+
"""
37+
Raised when a trie SyncRequest has already been processed.
38+
"""
39+
pass

trinity/server.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
from cancel_token import CancelToken, OperationCancelled
1717

1818
from eth.chains import AsyncChain
19-
from eth.db.backends.base import BaseDB
2019

2120
from p2p.auth import (
2221
decode_authentication,
@@ -53,6 +52,7 @@
5352
)
5453
from p2p.service import BaseService
5554

55+
from trinity.db.base import AsyncBaseDB
5656
from trinity.db.chain import AsyncChainDB
5757
from trinity.db.header import BaseAsyncHeaderDB
5858
from trinity.protocol.eth.peer import ETHPeer
@@ -76,7 +76,7 @@ def __init__(self,
7676
chain: AsyncChain,
7777
chaindb: AsyncChainDB,
7878
headerdb: BaseAsyncHeaderDB,
79-
base_db: BaseDB,
79+
base_db: AsyncBaseDB,
8080
network_id: int,
8181
max_peers: int = DEFAULT_MAX_PEERS,
8282
peer_class: Type[BasePeer] = ETHPeer,
@@ -295,16 +295,15 @@ def _test() -> None:
295295
from pathlib import Path
296296
import signal
297297

298-
from eth.db.backends.level import LevelDB
299298
from eth.chains.ropsten import RopstenChain, ROPSTEN_GENESIS_HEADER
300299

301300
from p2p import ecies
302301
from p2p.constants import ROPSTEN_BOOTNODES
303302

304303
from trinity.utils.chains import load_nodekey
305304

306-
from tests.p2p.integration_test_helpers import FakeAsyncHeaderDB
307-
from tests.p2p.integration_test_helpers import FakeAsyncChainDB, FakeAsyncRopstenChain
305+
from tests.p2p.integration_test_helpers import (
306+
FakeAsyncLevelDB, FakeAsyncHeaderDB, FakeAsyncChainDB, FakeAsyncRopstenChain)
308307

309308
parser = argparse.ArgumentParser()
310309
parser.add_argument('-db', type=str, required=True)
@@ -322,7 +321,7 @@ def _test() -> None:
322321
logging.getLogger('p2p.server.Server').setLevel(log_level)
323322

324323
loop = asyncio.get_event_loop()
325-
db = LevelDB(args.db)
324+
db = FakeAsyncLevelDB(args.db)
326325
headerdb = FakeAsyncHeaderDB(db)
327326
chaindb = FakeAsyncChainDB(db)
328327
chaindb.persist_header(ROPSTEN_GENESIS_HEADER)

0 commit comments

Comments
 (0)