Skip to content

Commit cc68f63

Browse files
authored
Merge branch 'main' into main
2 parents 77f4764 + 95e1f62 commit cc68f63

File tree

9 files changed

+421
-36
lines changed

9 files changed

+421
-36
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@
1212
[![Build Status](https://img.shields.io/github/actions/workflow/status/libp2p/py-libp2p/tox.yml?branch=main&label=build%20status)](https://github.com/libp2p/py-libp2p/actions/workflows/tox.yml)
1313
[![Docs build](https://readthedocs.org/projects/py-libp2p/badge/?version=latest)](http://py-libp2p.readthedocs.io/en/latest/?badge=latest)
1414

15-
> ⚠️ **Warning:** py-libp2p is an experimental and work-in-progress repo under development. We do not yet recommend using py-libp2p in production environments.
15+
> py-libp2p has moved beyond its experimental roots and is steadily progressing toward production readiness. The core features are stable, and we’re focused on refining performance, expanding protocol support, and ensuring smooth interop with other libp2p implementations. We welcome contributions and real-world usage feedback to help us reach full production maturity.
1616
1717
Read more in the [documentation on ReadTheDocs](https://py-libp2p.readthedocs.io/). [View the release notes](https://py-libp2p.readthedocs.io/en/latest/release_notes.html).
1818

1919
## Maintainers
2020

21-
Currently maintained by [@pacrob](https://github.com/pacrob), [@seetadev](https://github.com/seetadev) and [@dhuseby](https://github.com/dhuseby), looking for assistance!
21+
Currently maintained by [@pacrob](https://github.com/pacrob), [@seetadev](https://github.com/seetadev) and [@dhuseby](https://github.com/dhuseby). Please reach out to us for collaboration or active feedback. If you have questions, feel free to open a new [discussion](https://github.com/libp2p/py-libp2p/discussions). We are also available on the libp2p Discord — join us at #py-libp2p [sub-channel](https://discord.gg/d92MEugb).
2222

2323
## Feature Breakdown
2424

libp2p/kad_dht/routing_table.py

Lines changed: 153 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import logging
99
import time
1010

11+
import multihash
1112
import trio
1213

1314
from libp2p.abc import (
@@ -40,6 +41,22 @@
4041
STALE_PEER_THRESHOLD = 3600 # Time in seconds after which a peer is considered stale
4142

4243

44+
def peer_id_to_key(peer_id: ID) -> bytes:
45+
"""
46+
Convert a peer ID to a 256-bit key for routing table operations.
47+
This normalizes all peer IDs to exactly 256 bits by hashing them with SHA-256.
48+
49+
:param peer_id: The peer ID to convert
50+
:return: 32-byte (256-bit) key for routing table operations
51+
"""
52+
return multihash.digest(peer_id.to_bytes(), "sha2-256").digest
53+
54+
55+
def key_to_int(key: bytes) -> int:
56+
"""Convert a 256-bit key to an integer for range calculations."""
57+
return int.from_bytes(key, byteorder="big")
58+
59+
4360
class KBucket:
4461
"""
4562
A k-bucket implementation for the Kademlia DHT.
@@ -357,9 +374,24 @@ def key_in_range(self, key: bytes) -> bool:
357374
True if the key is in range, False otherwise
358375
359376
"""
360-
key_int = int.from_bytes(key, byteorder="big")
377+
key_int = key_to_int(key)
361378
return self.min_range <= key_int < self.max_range
362379

380+
def peer_id_in_range(self, peer_id: ID) -> bool:
381+
"""
382+
Check if a peer ID is in the range of this bucket.
383+
384+
params: peer_id: The peer ID to check
385+
386+
Returns
387+
-------
388+
bool
389+
True if the peer ID is in range, False otherwise
390+
391+
"""
392+
key = peer_id_to_key(peer_id)
393+
return self.key_in_range(key)
394+
363395
def split(self) -> tuple["KBucket", "KBucket"]:
364396
"""
365397
Split the bucket into two buckets.
@@ -376,8 +408,9 @@ def split(self) -> tuple["KBucket", "KBucket"]:
376408

377409
# Redistribute peers
378410
for peer_id, (peer_info, timestamp) in self.peers.items():
379-
peer_key = int.from_bytes(peer_id.to_bytes(), byteorder="big")
380-
if peer_key < midpoint:
411+
peer_key = peer_id_to_key(peer_id)
412+
peer_key_int = key_to_int(peer_key)
413+
if peer_key_int < midpoint:
381414
lower_bucket.peers[peer_id] = (peer_info, timestamp)
382415
else:
383416
upper_bucket.peers[peer_id] = (peer_info, timestamp)
@@ -458,7 +491,38 @@ async def add_peer(self, peer_obj: PeerInfo | ID) -> bool:
458491
success = await bucket.add_peer(peer_info)
459492
if success:
460493
logger.debug(f"Successfully added peer {peer_id} to routing table")
461-
return success
494+
return True
495+
496+
# If bucket is full and couldn't add peer, try splitting the bucket
497+
# Only split if the bucket contains our Peer ID
498+
if self._should_split_bucket(bucket):
499+
logger.debug(
500+
f"Bucket is full, attempting to split bucket for peer {peer_id}"
501+
)
502+
split_success = self._split_bucket(bucket)
503+
if split_success:
504+
# After splitting,
505+
# find the appropriate bucket for the peer and try to add it
506+
target_bucket = self.find_bucket(peer_info.peer_id)
507+
success = await target_bucket.add_peer(peer_info)
508+
if success:
509+
logger.debug(
510+
f"Successfully added peer {peer_id} after bucket split"
511+
)
512+
return True
513+
else:
514+
logger.debug(
515+
f"Failed to add peer {peer_id} even after bucket split"
516+
)
517+
return False
518+
else:
519+
logger.debug(f"Failed to split bucket for peer {peer_id}")
520+
return False
521+
else:
522+
logger.debug(
523+
f"Bucket is full and cannot be split, peer {peer_id} not added"
524+
)
525+
return False
462526

463527
except Exception as e:
464528
logger.debug(f"Error adding peer {peer_obj} to routing table: {e}")
@@ -480,17 +544,17 @@ def remove_peer(self, peer_id: ID) -> bool:
480544

481545
def find_bucket(self, peer_id: ID) -> KBucket:
482546
"""
483-
Find the bucket that would contain the given peer ID or PeerInfo.
547+
Find the bucket that would contain the given peer ID.
484548
485-
:param peer_obj: Either a peer ID or a PeerInfo object
549+
:param peer_id: The peer ID to find a bucket for
486550
487551
Returns
488552
-------
489553
KBucket: The bucket for this peer
490554
491555
"""
492556
for bucket in self.buckets:
493-
if bucket.key_in_range(peer_id.to_bytes()):
557+
if bucket.peer_id_in_range(peer_id):
494558
return bucket
495559

496560
return self.buckets[0]
@@ -513,7 +577,11 @@ def find_local_closest_peers(self, key: bytes, count: int = 20) -> list[ID]:
513577
all_peers.extend(bucket.peer_ids())
514578

515579
# Sort by XOR distance to the key
516-
all_peers.sort(key=lambda p: xor_distance(p.to_bytes(), key))
580+
def distance_to_key(peer_id: ID) -> int:
581+
peer_key = peer_id_to_key(peer_id)
582+
return xor_distance(peer_key, key)
583+
584+
all_peers.sort(key=distance_to_key)
517585

518586
return all_peers[:count]
519587

@@ -591,10 +659,87 @@ def get_stale_peers(self, stale_threshold_seconds: int = 3600) -> list[ID]:
591659
stale_peers.extend(bucket.get_stale_peers(stale_threshold_seconds))
592660
return stale_peers
593661

662+
def get_peer_infos(self) -> list[PeerInfo]:
663+
"""
664+
Get all PeerInfo objects in the routing table.
665+
666+
Returns
667+
-------
668+
List[PeerInfo]: List of all PeerInfo objects
669+
670+
"""
671+
peer_infos = []
672+
for bucket in self.buckets:
673+
peer_infos.extend(bucket.peer_infos())
674+
return peer_infos
675+
594676
def cleanup_routing_table(self) -> None:
595677
"""
596678
Cleanup the routing table by removing all data.
597679
This is useful for resetting the routing table during tests or reinitialization.
598680
"""
599681
self.buckets = [KBucket(self.host, BUCKET_SIZE)]
600682
logger.info("Routing table cleaned up, all data removed.")
683+
684+
def _should_split_bucket(self, bucket: KBucket) -> bool:
685+
"""
686+
Check if a bucket should be split according to Kademlia rules.
687+
688+
:param bucket: The bucket to check
689+
:return: True if the bucket should be split
690+
"""
691+
# Check if we've exceeded maximum buckets
692+
if len(self.buckets) >= MAXIMUM_BUCKETS:
693+
logger.debug("Maximum number of buckets reached, cannot split")
694+
return False
695+
696+
# Check if the bucket contains our local ID
697+
local_key = peer_id_to_key(self.local_id)
698+
local_key_int = key_to_int(local_key)
699+
contains_local_id = bucket.min_range <= local_key_int < bucket.max_range
700+
701+
logger.debug(
702+
f"Bucket range: {bucket.min_range} - {bucket.max_range}, "
703+
f"local_key_int: {local_key_int}, contains_local: {contains_local_id}"
704+
)
705+
706+
return contains_local_id
707+
708+
def _split_bucket(self, bucket: KBucket) -> bool:
709+
"""
710+
Split a bucket into two buckets.
711+
712+
:param bucket: The bucket to split
713+
:return: True if the bucket was successfully split
714+
"""
715+
try:
716+
# Find the bucket index
717+
bucket_index = self.buckets.index(bucket)
718+
logger.debug(f"Splitting bucket at index {bucket_index}")
719+
720+
# Split the bucket
721+
lower_bucket, upper_bucket = bucket.split()
722+
723+
# Replace the original bucket with the two new buckets
724+
self.buckets[bucket_index] = lower_bucket
725+
self.buckets.insert(bucket_index + 1, upper_bucket)
726+
727+
logger.debug(
728+
f"Bucket split successful. New bucket count: {len(self.buckets)}"
729+
)
730+
logger.debug(
731+
f"Lower bucket range: "
732+
f"{lower_bucket.min_range} - {lower_bucket.max_range}, "
733+
f"peers: {lower_bucket.size()}"
734+
)
735+
logger.debug(
736+
f"Upper bucket range: "
737+
f"{upper_bucket.min_range} - {upper_bucket.max_range}, "
738+
f"peers: {upper_bucket.size()}"
739+
)
740+
741+
return True
742+
743+
except Exception as e:
744+
logger.error(f"Error splitting bucket: {e}")
745+
return False

libp2p/network/connection/swarm_connection.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323

2424

2525
"""
26-
Reference: https://github.com/libp2p/go-libp2p-swarm/blob/04c86bbdafd390651cb2ee14e334f7caeedad722/swarm_conn.go
26+
Reference: https://github.com/libp2p/go-libp2p-swarm/blob/
27+
04c86bbdafd390651cb2ee14e334f7caeedad722/swarm_conn.go
2728
"""
2829

2930

@@ -43,6 +44,21 @@ def __init__(
4344
self.streams = set()
4445
self.event_closed = trio.Event()
4546
self.event_started = trio.Event()
47+
# Provide back-references/hooks expected by NetStream
48+
try:
49+
setattr(self.muxed_conn, "swarm", self.swarm)
50+
51+
# NetStream expects an awaitable remove_stream hook
52+
async def _remove_stream_hook(stream: NetStream) -> None:
53+
self.remove_stream(stream)
54+
55+
setattr(self.muxed_conn, "remove_stream", _remove_stream_hook)
56+
except Exception as e:
57+
logging.warning(
58+
f"Failed to set optional conveniences on muxed_conn "
59+
f"for peer {muxed_conn.peer_id}: {e}"
60+
)
61+
# optional conveniences
4662
if hasattr(muxed_conn, "on_close"):
4763
logging.debug(f"Setting on_close for peer {muxed_conn.peer_id}")
4864
setattr(muxed_conn, "on_close", self._on_muxed_conn_closed)

libp2p/network/swarm.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
from collections.abc import (
2+
Awaitable,
3+
Callable,
4+
)
15
import logging
26

37
from multiaddr import (
@@ -326,8 +330,16 @@ async def close(self) -> None:
326330

327331
# Close all listeners
328332
if hasattr(self, "listeners"):
329-
for listener in self.listeners.values():
333+
for maddr_str, listener in self.listeners.items():
330334
await listener.close()
335+
# Notify about listener closure
336+
try:
337+
multiaddr = Multiaddr(maddr_str)
338+
await self.notify_listen_close(multiaddr)
339+
except Exception as e:
340+
logger.warning(
341+
f"Failed to notify listen_close for {maddr_str}: {e}"
342+
)
331343
self.listeners.clear()
332344

333345
# Close the transport if it exists and has a close method
@@ -411,7 +423,17 @@ async def notify_listen(self, multiaddr: Multiaddr) -> None:
411423
nursery.start_soon(notifee.listen, self, multiaddr)
412424

413425
async def notify_closed_stream(self, stream: INetStream) -> None:
414-
raise NotImplementedError
426+
async with trio.open_nursery() as nursery:
427+
for notifee in self.notifees:
428+
nursery.start_soon(notifee.closed_stream, self, stream)
415429

416430
async def notify_listen_close(self, multiaddr: Multiaddr) -> None:
417-
raise NotImplementedError
431+
async with trio.open_nursery() as nursery:
432+
for notifee in self.notifees:
433+
nursery.start_soon(notifee.listen_close, self, multiaddr)
434+
435+
# Generic notifier used by NetStream._notify_closed
436+
async def notify_all(self, notifier: Callable[[INotifee], Awaitable[None]]) -> None:
437+
async with trio.open_nursery() as nursery:
438+
for notifee in self.notifees:
439+
nursery.start_soon(notifier, notifee)

newsfragments/826.feature.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Implement closed_stream notification in MyNotifee
2+
3+
- Add notify_closed_stream method to swarm notification system for proper stream lifecycle management
4+
- Integrate remove_stream hook in SwarmConn to enable stream closure notifications
5+
- Add comprehensive tests for closed_stream functionality in test_notify.py
6+
- Enable stream lifecycle integration for proper cleanup and resource management

newsfragments/846.bugfix.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix kbucket splitting in routing table when full. Routing table now maintains multiple kbuckets and properly distributes peers as specified by the Kademlia DHT protocol.

pyproject.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@ readme = "README.md"
1010
requires-python = ">=3.10, <4.0"
1111
license = { text = "MIT AND Apache-2.0" }
1212
keywords = ["libp2p", "p2p"]
13-
authors = [
14-
{ name = "The Ethereum Foundation", email = "[email protected]" },
13+
maintainers = [
14+
{ name = "pacrob", email = "[email protected]" },
15+
{ name = "Manu Sheel Gupta", email = "[email protected]" },
16+
{ name = "Dave Grantham", email = "[email protected]" },
1517
]
1618
dependencies = [
1719
"base58>=1.0.3",

tests/core/kad_dht/test_unit_routing_table.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,32 @@ async def test_ping_peer_scenarios(self, mock_host, sample_peer_info):
226226
class TestRoutingTable:
227227
"""Test suite for RoutingTable class."""
228228

229+
@pytest.mark.trio
230+
async def test_kbucket_split_behavior(self, mock_host, local_peer_id):
231+
"""
232+
Test that adding more than BUCKET_SIZE peers to the routing table
233+
triggers kbucket splitting and all peers are added.
234+
"""
235+
routing_table = RoutingTable(local_peer_id, mock_host)
236+
237+
num_peers = BUCKET_SIZE + 5
238+
peer_ids = []
239+
for i in range(num_peers):
240+
key_pair = create_new_key_pair()
241+
peer_id = ID.from_pubkey(key_pair.public_key)
242+
peer_info = PeerInfo(peer_id, [Multiaddr(f"/ip4/127.0.0.1/tcp/{9000 + i}")])
243+
peer_ids.append(peer_id)
244+
added = await routing_table.add_peer(peer_info)
245+
assert added, f"Peer {peer_id} should be added"
246+
247+
assert len(routing_table.buckets) > 1, "KBucket splitting did not occur"
248+
for pid in peer_ids:
249+
assert routing_table.peer_in_table(pid), f"Peer {pid} not found after split"
250+
all_peer_ids = routing_table.get_peer_ids()
251+
assert set(peer_ids).issubset(set(all_peer_ids)), (
252+
"Not all peers present after split"
253+
)
254+
229255
@pytest.fixture
230256
def mock_host(self):
231257
"""Create a mock host for testing."""

0 commit comments

Comments
 (0)