Skip to content

Commit 5c6e42e

Browse files
Merge pull request #319 from andreasgriffin/collect-partial-results
Fix p2p discovery
2 parents 461bb24 + c3ff3e4 commit 5c6e42e

File tree

3 files changed

+30
-35
lines changed

3 files changed

+30
-35
lines changed

bitcoin_safe/client.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,10 @@ def from_cbf(
208208
if initial_peer:
209209
peers.add(initial_peer)
210210

211-
discovered_peers = PeerDiscovery(
212-
network=bdkwallet.network(), loop_in_thread=loop_in_thread
213-
).get_bitcoin_peers(required_services=CBF_REQUIRED_SERVICE_FLAGS, lower_bound=200)
211+
p2p_discovery = PeerDiscovery(network=bdkwallet.network(), loop_in_thread=loop_in_thread)
212+
discovered_peers = loop_in_thread.run_foreground(
213+
p2p_discovery.get_bitcoin_peers(required_services=CBF_REQUIRED_SERVICE_FLAGS, lower_bound=200)
214+
)
214215
peers = peers.union(discovered_peers)
215216

216217
client = CbfSync(

bitcoin_safe/p2p/p2p_listener.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ def on_tx(self, tx: bdk.Transaction):
102102
self.signal_tx.emit(tx)
103103
return
104104

105-
def random_select_peer(
105+
async def random_select_peer(
106106
self,
107107
weight_getaddr: float = 0.7,
108108
weight_dns: float = 0.3,
@@ -128,7 +128,7 @@ def random_select_peer(
128128

129129
# Fast paths -------------------------------------------------
130130
if not self.discovered_peers:
131-
peer = self.peer_discovery.get_bitcoin_peer() # may be None
131+
peer = await self.peer_discovery.get_bitcoin_peer() # may be None
132132
logger.debug(f"Picked {peer=} from DNS seed")
133133
return peer
134134
if weight_dns == 0:
@@ -147,7 +147,7 @@ def random_select_peer(
147147
logger.debug(f"Picked {peer=} from discovered_peers")
148148
return peer
149149

150-
peer = self.peer_discovery.get_bitcoin_peer() # may be None
150+
peer = await self.peer_discovery.get_bitcoin_peer() # may be None
151151
logger.debug(f"Picked {peer=} from DNS seed")
152152
return peer
153153

@@ -175,7 +175,7 @@ async def _start(
175175
# 1. Select the next peer (and avoid repeating the last one immediately)
176176
# ------------------------------------------------------------------
177177
if peer is None:
178-
peer = self.random_select_peer()
178+
peer = await self.random_select_peer()
179179
if peer is None:
180180
# no peers at all? wait then retry
181181
await asyncio.sleep(retry_delay)

bitcoin_safe/p2p/peer_discovery.py

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,11 @@
3232
import logging
3333
import random
3434
import socket
35-
from collections.abc import Sequence
3635
from ipaddress import ip_address
3736
from typing import Any
3837

3938
import bdkpython as bdk
4039
from bitcoin_safe_lib.async_tools.loop_in_thread import LoopInThread
41-
from bitcoin_safe_lib.util import time_logger
4240

4341
from .p2p_client import Peer
4442

@@ -171,35 +169,36 @@ async def _resolve_dns_seed(
171169

172170
return peers
173171

174-
async def _get_bitcoin_peers_async(
172+
async def get_bitcoin_peers(
175173
self,
176174
lower_bound: int | None,
177175
required_services: int | None,
178176
timeout: int = 5,
179177
):
180-
"""Get bitcoin peers async."""
181178
dns_seeds = DNS_SEEDS[self.network]["hosts"].copy()
182179
random.shuffle(dns_seeds)
183180

184181
effective_required_services = (
185182
DEFAULT_REQUIRED_SERVICE_FLAGS if required_services is None else required_services
186183
)
187184

185+
partial_results: list[list[Peer]] = []
186+
188187
async def resolve(seed_host: str) -> list[Peer]:
189-
"""Resolve."""
190188
seed_info = Peer.parse(seed_host, self.network)
191189
candidate_seed = self._seed_with_service_bits(
192190
seed_info.host,
193191
effective_required_services if effective_required_services else None,
194192
)
195-
return await self._resolve_dns_seed(candidate_seed, seed_info.port)
193+
peers = await self._resolve_dns_seed(candidate_seed, seed_info.port)
194+
partial_results.append(peers)
195+
return peers
196196

197-
def enough(results: Sequence[list[Peer]]) -> bool:
198-
"""Enough."""
197+
def enough(results):
199198
if lower_bound is None:
200199
return False
201-
unique_peers = {peer for batch in results for peer in batch}
202-
return len(unique_peers) >= lower_bound
200+
unique = {peer for batch in results for peer in batch}
201+
return len(unique) >= lower_bound
203202

204203
try:
205204
batches = await asyncio.wait_for(
@@ -211,30 +210,25 @@ def enough(results: Sequence[list[Peer]]) -> bool:
211210
)
212211
except asyncio.TimeoutError:
213212
logger.warning(f"Peer discovery timed out after {timeout} seconds")
214-
return set()
213+
return {peer for batch in partial_results for peer in batch}
215214

216215
return {peer for batch in batches for peer in batch}
217216

218-
@time_logger
219-
def get_bitcoin_peers(
220-
self,
221-
lower_bound: int | None = None,
222-
required_services: int | None = DEFAULT_REQUIRED_SERVICE_FLAGS,
223-
) -> set[Peer]:
224-
"""Get bitcoin peers."""
225-
return self._loop_in_thread.run_foreground(
226-
self._get_bitcoin_peers_async(
227-
lower_bound=lower_bound,
228-
required_services=required_services,
229-
)
230-
)
231-
232-
def get_bitcoin_peer(self, required_services: int | None = DEFAULT_REQUIRED_SERVICE_FLAGS) -> None | Peer:
217+
async def get_bitcoin_peer(
218+
self, required_services: int | None = DEFAULT_REQUIRED_SERVICE_FLAGS
219+
) -> None | Peer:
233220
"""Get bitcoin peer."""
234-
peers = self.get_bitcoin_peers(lower_bound=1, required_services=required_services)
221+
222+
# the limit may not be 1 , otherwise 127.0.0.1 will always be returned
223+
peers = await self.get_bitcoin_peers(
224+
lower_bound=10,
225+
required_services=required_services,
226+
)
235227
if not peers:
236228
return None
237-
return list(peers)[0]
229+
peer_list = list(peers)
230+
random.shuffle(peer_list)
231+
return peer_list[0]
238232

239233
def stop(self) -> None:
240234
"""Stop the internally managed loop, if we created it."""

0 commit comments

Comments
 (0)