Skip to content

Commit e655371

Browse files
committed
Move discovery into an isolated plugin
Resolves #1518
1 parent 49b6d08 commit e655371

File tree

11 files changed

+299
-62
lines changed

11 files changed

+299
-62
lines changed

p2p/constants.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,8 @@
7676
# The amount of time that the BasePeerPool will wait for a peer to boot before
7777
# aborting the connection attempt.
7878
DEFAULT_PEER_BOOT_TIMEOUT = 20
79+
80+
# Interval at which peer pool is checked for potential new candidates
81+
DISOVERY_INTERVAL = 2
82+
# Timeout used when fetching peer candidates from discovery
83+
REQUEST_PEER_CANDIDATE_TIMEOUT = 0.5

p2p/discovery.py

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@
3232

3333
import cytoolz
3434

35+
from lahja import (
36+
Endpoint,
37+
)
38+
3539
import rlp
3640

3741
from eth_typing import Hash32
@@ -56,10 +60,11 @@
5660

5761
from cancel_token import CancelToken, OperationCancelled
5862

63+
from p2p.events import PeerCandidatesRequest, RandomBootnodeRequest
5964
from p2p.exceptions import AlreadyWaitingDiscoveryResponse, NoEligibleNodes, UnableToGetDiscV5Ticket
65+
from p2p.kademlia import to_uris
6066
from p2p import kademlia
6167
from p2p import protocol
62-
from p2p.peer import BasePeerPool
6368
from p2p.service import BaseService
6469

6570
if TYPE_CHECKING:
@@ -951,21 +956,48 @@ class DiscoveryService(BaseService):
951956
_last_lookup: float = 0
952957
_lookup_interval: int = 30
953958

954-
def __init__(self, proto: DiscoveryProtocol, peer_pool: BasePeerPool,
955-
port: int, token: CancelToken = None) -> None:
959+
def __init__(self,
960+
proto: DiscoveryProtocol,
961+
port: int,
962+
event_bus: Endpoint,
963+
token: CancelToken = None) -> None:
956964
super().__init__(token)
957965
self.proto = proto
958-
self.peer_pool = peer_pool
959966
self.port = port
967+
self._event_bus = event_bus
960968
self._lookup_running = asyncio.Lock()
961969

970+
self.run_daemon_task(self.handle_get_peer_candidates_requests())
971+
self.run_daemon_task(self.handle_get_random_bootnode_requests())
972+
973+
async def handle_get_peer_candidates_requests(self) -> None:
974+
async for event in self._event_bus.stream(PeerCandidatesRequest):
975+
976+
self.run_task(self.maybe_lookup_random_node())
977+
978+
nodes = tuple(to_uris(self.proto.get_nodes_to_connect(event.max_candidates)))
979+
980+
self.logger.debug2("Broadcasting peer candidates (%s)", nodes)
981+
self._event_bus.broadcast(
982+
event.expected_response_type()(nodes),
983+
event.broadcast_config()
984+
)
985+
986+
async def handle_get_random_bootnode_requests(self) -> None:
987+
async for event in self._event_bus.stream(RandomBootnodeRequest):
988+
989+
nodes = tuple(to_uris(self.proto.get_random_bootnode()))
990+
991+
self.logger.debug2("Broadcasting random boot nodes (%s)", nodes)
992+
self._event_bus.broadcast(
993+
event.expected_response_type()(nodes),
994+
event.broadcast_config()
995+
)
996+
962997
async def _run(self) -> None:
963998
await self._start_udp_listener()
964-
connect_loop_sleep = 2
965999
self.run_task(self.proto.bootstrap())
966-
while self.is_operational:
967-
await self.maybe_connect_to_more_peers()
968-
await self.sleep(connect_loop_sleep)
1000+
await self.cancel_token.wait()
9691001

9701002
async def _start_udp_listener(self) -> None:
9711003
loop = asyncio.get_event_loop()
@@ -975,22 +1007,6 @@ async def _start_udp_listener(self) -> None:
9751007
local_addr=('0.0.0.0', self.port),
9761008
family=socket.AF_INET)
9771009

978-
async def maybe_connect_to_more_peers(self) -> None:
979-
"""Connect to more peers if we're not yet maxed out to max_peers"""
980-
if self.peer_pool.is_full:
981-
self.logger.debug("Already connected to %s peers; sleeping", len(self.peer_pool))
982-
return
983-
984-
self.run_task(self.maybe_lookup_random_node())
985-
986-
await self.peer_pool.connect_to_nodes(
987-
self.proto.get_nodes_to_connect(self.peer_pool.max_peers))
988-
989-
# In some cases (e.g ROPSTEN or private testnets), the discovery table might be full of
990-
# bad peers so if we can't connect to any peers we try a random bootstrap node as well.
991-
if not len(self.peer_pool):
992-
await self.peer_pool.connect_to_nodes(self.proto.get_random_bootnode())
993-
9941010
async def maybe_lookup_random_node(self) -> None:
9951011
if self._last_lookup + self._lookup_interval > time.time():
9961012
return

p2p/events.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from typing import (
2+
Tuple,
23
Type,
34
)
45

@@ -8,6 +9,36 @@
89
)
910

1011

12+
class BaseDiscoveryServiceResponse(BaseEvent):
13+
14+
def __init__(self, error: Exception) -> None:
15+
self.error = error
16+
17+
18+
class PeerCandidatesResponse(BaseDiscoveryServiceResponse):
19+
20+
def __init__(self, candidates: Tuple[str, ...], error: Exception=None) -> None:
21+
super().__init__(error)
22+
self.candidates = candidates
23+
24+
25+
class PeerCandidatesRequest(BaseRequestResponseEvent[PeerCandidatesResponse]):
26+
27+
def __init__(self, max_candidates: int) -> None:
28+
self.max_candidates = max_candidates
29+
30+
@staticmethod
31+
def expected_response_type() -> Type[PeerCandidatesResponse]:
32+
return PeerCandidatesResponse
33+
34+
35+
class RandomBootnodeRequest(BaseRequestResponseEvent[PeerCandidatesResponse]):
36+
37+
@staticmethod
38+
def expected_response_type() -> Type[PeerCandidatesResponse]:
39+
return PeerCandidatesResponse
40+
41+
1142
class PeerCountResponse(BaseEvent):
1243

1344
def __init__(self, peer_count: int) -> None:

p2p/kademlia.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ def from_uri(cls, uri: str) -> 'Node':
105105
pubkey = keys.PublicKey(decode_hex(parsed.username))
106106
return cls(pubkey, Address(parsed.hostname, parsed.port))
107107

108+
def uri(self) -> str:
109+
return f'enode://{self.pubkey.to_hex()}@{self.address.ip}:{self.address.tcp_port}'
110+
108111
def __str__(self) -> str:
109112
return '<Node(%s@%s)>' % (self.pubkey.to_hex()[:6], self.address.ip)
110113

@@ -133,6 +136,19 @@ def __ne__(self, other: object) -> bool:
133136
def __hash__(self) -> int:
134137
return hash(self.pubkey)
135138

139+
# TODO: check if we can make the nodes pickable and get rid of these
140+
# https://github.com/ethereum/py-evm/issues/1578
141+
142+
143+
def to_uris(nodes: Iterable[Node]) -> Iterator[str]:
144+
for node in nodes:
145+
yield node.uri()
146+
147+
148+
def from_uris(uris: Iterable[str]) -> Iterator[Node]:
149+
for uri in uris:
150+
yield Node.from_uri(uri)
151+
136152

137153
@total_ordering
138154
class KBucket(Sized):

p2p/peer.py

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@
4848

4949
from p2p import auth
5050
from p2p import protocol
51-
from p2p.kademlia import Node
51+
from p2p.kademlia import (
52+
from_uris,
53+
Node,
54+
)
5255
from p2p.exceptions import (
5356
BadAckMessage,
5457
DecryptionError,
@@ -81,13 +84,17 @@
8184
CONN_IDLE_TIMEOUT,
8285
DEFAULT_MAX_PEERS,
8386
DEFAULT_PEER_BOOT_TIMEOUT,
87+
DISOVERY_INTERVAL,
8488
HEADER_LEN,
8589
MAC_LEN,
90+
REQUEST_PEER_CANDIDATE_TIMEOUT,
8691
)
8792

8893
from .events import (
94+
PeerCandidatesRequest,
8995
PeerCountRequest,
9096
PeerCountResponse,
97+
RandomBootnodeRequest,
9198
)
9299

93100

@@ -799,6 +806,43 @@ async def handle_peer_count_requests(self) -> None:
799806
# `event.broadcast_config()` API.
800807
self.event_bus.broadcast(PeerCountResponse(len(self)), req.broadcast_config())
801808

809+
async def maybe_connect_more_peers(self) -> None:
810+
while self.is_operational:
811+
await self.sleep(DISOVERY_INTERVAL)
812+
813+
available_peer_slots = self.max_peers - len(self)
814+
if available_peer_slots > 0:
815+
try:
816+
response = await self.wait(
817+
# TODO: This should use a BroadcastConfig to send the request to discovery
818+
# only as soon as we have cut a new Lahja release.
819+
self.event_bus.request(PeerCandidatesRequest(available_peer_slots)),
820+
timeout=REQUEST_PEER_CANDIDATE_TIMEOUT
821+
)
822+
except TimeoutError:
823+
self.logger.warning("Discovery did not answer PeerCandidateRequest in time")
824+
continue
825+
826+
# In some cases (e.g ROPSTEN or private testnets), the discovery table might be
827+
# full of bad peers so if we can't connect to any peers we try a random bootstrap
828+
# node as well.
829+
if not len(self):
830+
try:
831+
response = await self.wait(
832+
# TODO: This should use a BroadcastConfig to send the request to
833+
# discovery only as soon as we have cut a new Lahja release.
834+
self.event_bus.request(RandomBootnodeRequest()),
835+
timeout=REQUEST_PEER_CANDIDATE_TIMEOUT
836+
)
837+
except TimeoutError:
838+
self.logger.warning(
839+
"Discovery did not answer RandomBootnodeRequest in time"
840+
)
841+
continue
842+
843+
self.logger.debug2("Received candidates to connect to (%s)", response.candidates)
844+
await self.connect_to_nodes(from_uris(response.candidates))
845+
802846
def __len__(self) -> int:
803847
return len(self.connected_nodes)
804848

@@ -880,6 +924,7 @@ async def _run(self) -> None:
880924
# so in order to ensure we cancel all peers when we terminate.
881925
if self.event_bus is not None:
882926
self.run_daemon_task(self.handle_peer_count_requests())
927+
self.run_daemon_task(self.maybe_connect_more_peers())
883928
self.run_daemon_task(self._periodically_report_stats())
884929
await self.cancel_token.wait()
885930

trinity/nodes/base.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ def __init__(self, event_bus: Endpoint, trinity_config: TrinityConfig) -> None:
5252

5353
self._jsonrpc_ipc_path: Path = trinity_config.jsonrpc_ipc_path
5454
self._network_id = trinity_config.network_id
55-
self._use_discv5 = trinity_config.use_discv5
5655

5756
self.event_bus = event_bus
5857

trinity/nodes/full.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ def get_p2p_server(self) -> FullServer:
4444
max_peers=self._max_peers,
4545
bootstrap_nodes=self._bootstrap_nodes,
4646
preferred_nodes=self._preferred_nodes,
47-
use_discv5=self._use_discv5,
4847
token=self.cancel_token,
4948
event_bus=self.event_bus,
5049
)

trinity/nodes/light.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ def get_p2p_server(self) -> LightServer:
8080
max_peers=self._max_peers,
8181
bootstrap_nodes=self._bootstrap_nodes,
8282
preferred_nodes=self._preferred_nodes,
83-
use_discv5=self._use_discv5,
8483
token=self.cancel_token,
8584
event_bus=self.event_bus,
8685
)

0 commit comments

Comments
 (0)