Skip to content

Commit 15958c3

Browse files
authored
Merge pull request ethereum#594 from mhchia/feature/validator-in-beacon-node-plugin
Gather validator with beacon node plugin
2 parents 9c225e1 + af9cf66 commit 15958c3

File tree

11 files changed

+651
-171
lines changed

11 files changed

+651
-171
lines changed

eth2/beacon/chains/testnet/__init__.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@
77
from eth2.beacon.state_machines.forks.xiao_long_bao import (
88
XiaoLongBaoStateMachine,
99
)
10-
from eth2.beacon.state_machines.forks.serenity.configs import (
11-
SERENITY_CONFIG,
12-
)
1310
from .constants import (
1411
TESTNET_CHAIN_ID,
1512
)
@@ -26,10 +23,11 @@
2623
Type,
2724
)
2825

26+
state_machine_class = XiaoLongBaoStateMachine
2927

3028
TESTNET_SM_CONFIGURATION = (
3129
# FIXME: Shouldn't access GENESIS_SLOT from a particular state machine configs.
32-
(SERENITY_CONFIG.GENESIS_SLOT, XiaoLongBaoStateMachine),
30+
(state_machine_class.config.GENESIS_SLOT, state_machine_class),
3331
) # type: Tuple[Tuple[Slot, Type[BaseBeaconStateMachine]], ...]
3432

3533

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
#!/usr/bin/env python
2+
3+
import asyncio
4+
from collections import (
5+
defaultdict,
6+
)
7+
import logging
8+
import signal
9+
import sys
10+
import time
11+
from typing import (
12+
ClassVar,
13+
Dict,
14+
List,
15+
MutableSet,
16+
NamedTuple,
17+
Optional,
18+
Tuple,
19+
)
20+
21+
from pathlib import Path
22+
23+
from eth_keys.datatypes import (
24+
PrivateKey,
25+
)
26+
27+
from eth_utils import (
28+
remove_0x_prefix,
29+
)
30+
31+
32+
async def run(cmd):
33+
proc = await asyncio.create_subprocess_shell(
34+
cmd,
35+
stdout=asyncio.subprocess.PIPE,
36+
stderr=asyncio.subprocess.PIPE,
37+
)
38+
return proc
39+
40+
41+
class Log(NamedTuple):
42+
name: str
43+
pattern: str
44+
# TODO: probably we can add dependent relationship between logs?
45+
timeout: int
46+
47+
48+
class EventTimeOutError(Exception):
49+
pass
50+
51+
52+
SERVER_RUNNING = Log(name="server running", pattern="Running server", timeout=60)
53+
START_SYNCING = Log(name="start syncing", pattern="their head slot", timeout=200)
54+
55+
56+
class Node:
57+
name: str
58+
node_privkey: str
59+
port: int
60+
bootstrap_nodes: Tuple["Node", ...]
61+
62+
start_time: float
63+
proc: asyncio.subprocess.Process
64+
# TODO: use CancelToken instead
65+
tasks: List[asyncio.Task]
66+
logs_expected: Dict[str, MutableSet[Log]]
67+
has_log_happened: Dict[Log, bool]
68+
69+
dir_root: ClassVar[Path] = Path("/tmp/aaaa")
70+
running_nodes: ClassVar[List] = []
71+
logger: ClassVar[logging.Logger] = logging.getLogger(
72+
"eth2.beacon.scripts.run_beacon_nodes.Node"
73+
)
74+
75+
def __init__(
76+
self,
77+
name: str,
78+
node_privkey: str,
79+
port: int,
80+
bootstrap_nodes: Optional[Tuple["Node", ...]] = None) -> None:
81+
self.name = name
82+
self.node_privkey = PrivateKey(bytes.fromhex(node_privkey))
83+
self.port = port
84+
if bootstrap_nodes is None:
85+
bootstrap_nodes = []
86+
self.bootstrap_nodes = bootstrap_nodes
87+
88+
self.tasks = []
89+
self.start_time = time.monotonic()
90+
self.logs_expected = {}
91+
self.logs_expected["stdout"] = set()
92+
self.logs_expected["stderr"] = set()
93+
self.add_log("stderr", SERVER_RUNNING)
94+
self.has_log_happened = defaultdict(lambda: False)
95+
96+
def __repr__(self) -> str:
97+
return f"<Node {self.logging_name} {self.proc}>"
98+
99+
@property
100+
def logging_name(self) -> str:
101+
return f"{self.name}@{remove_0x_prefix(self.node_id)[:6]}"
102+
103+
@property
104+
def root_dir(self) -> Path:
105+
return self.dir_root / self.name
106+
107+
@property
108+
def node_id(self) -> str:
109+
return self.node_privkey.public_key.to_hex()
110+
111+
@property
112+
def enode_id(self) -> str:
113+
return f"enode://{remove_0x_prefix(self.node_id)}@127.0.0.1:{self.port}"
114+
115+
@property
116+
def cmd(self) -> str:
117+
_cmds = [
118+
"trinity-beacon",
119+
f"--port={self.port}",
120+
f"--trinity-root-dir={self.root_dir}",
121+
f" --beacon-nodekey={remove_0x_prefix(self.node_privkey.to_hex())}",
122+
"-l debug",
123+
]
124+
if len(self.bootstrap_nodes) != 0:
125+
bootstrap_nodes_str = ",".join([node.enode_id for node in self.bootstrap_nodes])
126+
_cmds.append(f"--bootstrap_nodes={bootstrap_nodes_str}")
127+
_cmd = " ".join(_cmds)
128+
return _cmd
129+
130+
def stop(self) -> None:
131+
for task in self.tasks:
132+
task.cancel()
133+
self.proc.terminate()
134+
135+
@classmethod
136+
def stop_all_nodes(cls) -> None:
137+
for node in cls.running_nodes:
138+
print(f"Stopping node={node}")
139+
node.stop()
140+
141+
def add_log(self, from_stream: str, log: Log) -> None:
142+
if from_stream not in ("stdout", "stderr"):
143+
return
144+
self.logs_expected[from_stream].add(log)
145+
146+
async def run(self) -> None:
147+
print(f"Spinning up {self.name}")
148+
self.proc = await run(self.cmd)
149+
self.running_nodes.append(self)
150+
self.tasks.append(asyncio.ensure_future(self._print_logs('stdout', self.proc.stdout)))
151+
self.tasks.append(asyncio.ensure_future(self._print_logs('stderr', self.proc.stderr)))
152+
try:
153+
await self._log_monitor()
154+
except EventTimeOutError as e:
155+
self.logger.debug(e)
156+
# FIXME: nasty
157+
self.stop_all_nodes()
158+
sys.exit(2)
159+
160+
async def _log_monitor(self) -> None:
161+
while True:
162+
for from_stream, logs in self.logs_expected.items():
163+
for log in logs:
164+
current_time = time.monotonic()
165+
ellapsed_time = current_time - self.start_time
166+
if not self.has_log_happened[log] and (ellapsed_time > log.timeout):
167+
raise EventTimeOutError(
168+
f"{self.logging_name}: log {log.name!r} is time out, "
169+
f"which should have occurred in {from_stream}."
170+
)
171+
await asyncio.sleep(0.1)
172+
173+
async def _print_logs(self, from_stream: str, stream_reader: asyncio.StreamReader) -> None:
174+
async for line_bytes in stream_reader:
175+
line = line_bytes.decode('utf-8').replace('\n', '')
176+
# TODO: Preprocessing
177+
self._record_happenning_logs(from_stream, line)
178+
print(f"{self.logging_name}.{from_stream}\t: {line}")
179+
180+
def _record_happenning_logs(self, from_stream: str, line: str) -> None:
181+
for log in self.logs_expected[from_stream]:
182+
if log.pattern in line:
183+
self.logger.debug(f"log \"log.name\" occurred in {from_stream}")
184+
self.has_log_happened[log] = True
185+
186+
187+
async def main():
188+
num_validators = 5
189+
time_bob_wait_for_alice = 15
190+
191+
proc = await run(
192+
f"rm -rf {Node.dir_root}"
193+
)
194+
await proc.wait()
195+
proc = await run(
196+
f"mkdir -p {Node.dir_root}"
197+
)
198+
await proc.wait()
199+
200+
proc = await run(
201+
f"trinity-beacon testnet --num={num_validators} --network-dir={Node.dir_root}"
202+
)
203+
await proc.wait()
204+
205+
def sigint_handler(sig, frame):
206+
Node.stop_all_nodes()
207+
sys.exit(123)
208+
209+
signal.signal(signal.SIGINT, sigint_handler)
210+
211+
node_alice = Node(
212+
name="alice",
213+
node_privkey="6b94ffa2d9b8ee85afb9d7153c463ea22789d3bbc5d961cc4f63a41676883c19",
214+
port=30304,
215+
bootstrap_nodes=[],
216+
)
217+
asyncio.ensure_future(node_alice.run())
218+
219+
print(f"Sleeping {time_bob_wait_for_alice} seconds to wait until Alice is initialized")
220+
await asyncio.sleep(time_bob_wait_for_alice)
221+
222+
node_bob = Node(
223+
name="bob",
224+
node_privkey="f5ad1c57b5a489fc8f21ad0e5a19c1f1a60b8ab357a2100ff7e75f3fa8a4fd2e",
225+
port=30305,
226+
bootstrap_nodes=[node_alice],
227+
)
228+
asyncio.ensure_future(node_bob.run())
229+
230+
await asyncio.sleep(1000000)
231+
232+
233+
asyncio.get_event_loop().run_until_complete(main())

tests/core/p2p-proto/bcc/test_full_sync.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@
1616
)
1717

1818

19+
class NoopBlockImporter:
20+
"""
21+
Do nothing, to override the block validation in ``SyncBlockImporter``.
22+
"""
23+
def import_block(self, block):
24+
return None, tuple(), tuple()
25+
26+
1927
async def get_sync_setup(request, event_loop, alice_chain_db, bob_chain_db):
2028
alice, alice_peer_pool, bob, bob_peer_pool = await get_directly_linked_peers_in_peer_pools(
2129
request,
@@ -25,7 +33,7 @@ async def get_sync_setup(request, event_loop, alice_chain_db, bob_chain_db):
2533
)
2634

2735
bob_request_server = BCCRequestServer(bob.context.chain_db, bob_peer_pool)
28-
alice_syncer = BeaconChainSyncer(alice_chain_db, alice_peer_pool)
36+
alice_syncer = BeaconChainSyncer(alice_chain_db, alice_peer_pool, NoopBlockImporter())
2937

3038
asyncio.ensure_future(bob_request_server.run())
3139
asyncio.ensure_future(alice_syncer.run())

tests/plugins/eth2/beacon/helpers.py

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
11
import importlib
2+
import time
23

34
from py_ecc import bls
45

5-
from trinity.config import (
6-
BeaconChainConfig,
7-
BeaconGenesisData,
8-
)
9-
106
from eth2._utils.hash import (
117
hash_eth2,
128
)
@@ -41,13 +37,5 @@
4137
config=XIAO_LONG_BAO_CONFIG,
4238
keymap=keymap,
4339
genesis_block_class=SerenityBeaconBlock,
44-
genesis_time=0,
45-
)
46-
genesis_data = BeaconGenesisData(
47-
genesis_time=0,
48-
genesis_slot=XIAO_LONG_BAO_CONFIG.GENESIS_SLOT,
49-
keymap=keymap,
50-
num_validators=NUM_VALIDATORS,
40+
genesis_time=int(time.time()),
5141
)
52-
beacon_chain_config = BeaconChainConfig(chain_name='TestTestTest', genesis_data=genesis_data)
53-
chain_class = beacon_chain_config.beacon_chain_class

0 commit comments

Comments
 (0)