Skip to content

Commit cff0bfc

Browse files
authored
Merge pull request #846 from sumanjeet0012/bugfix/kbucket_split_fix
Fix: kbucket splitting in routing table.
2 parents 90f143c + a2ad10b commit cff0bfc

File tree

3 files changed

+180
-8
lines changed

3 files changed

+180
-8
lines changed

libp2p/kad_dht/routing_table.py

Lines changed: 153 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import logging
99
import time
1010

11+
import multihash
1112
import trio
1213

1314
from libp2p.abc import (
@@ -40,6 +41,22 @@
4041
STALE_PEER_THRESHOLD = 3600 # Time in seconds after which a peer is considered stale
4142

4243

44+
def peer_id_to_key(peer_id: ID) -> bytes:
45+
"""
46+
Convert a peer ID to a 256-bit key for routing table operations.
47+
This normalizes all peer IDs to exactly 256 bits by hashing them with SHA-256.
48+
49+
:param peer_id: The peer ID to convert
50+
:return: 32-byte (256-bit) key for routing table operations
51+
"""
52+
return multihash.digest(peer_id.to_bytes(), "sha2-256").digest
53+
54+
55+
def key_to_int(key: bytes) -> int:
56+
"""Convert a 256-bit key to an integer for range calculations."""
57+
return int.from_bytes(key, byteorder="big")
58+
59+
4360
class KBucket:
4461
"""
4562
A k-bucket implementation for the Kademlia DHT.
@@ -357,9 +374,24 @@ def key_in_range(self, key: bytes) -> bool:
357374
True if the key is in range, False otherwise
358375
359376
"""
360-
key_int = int.from_bytes(key, byteorder="big")
377+
key_int = key_to_int(key)
361378
return self.min_range <= key_int < self.max_range
362379

380+
def peer_id_in_range(self, peer_id: ID) -> bool:
381+
"""
382+
Check if a peer ID is in the range of this bucket.
383+
384+
params: peer_id: The peer ID to check
385+
386+
Returns
387+
-------
388+
bool
389+
True if the peer ID is in range, False otherwise
390+
391+
"""
392+
key = peer_id_to_key(peer_id)
393+
return self.key_in_range(key)
394+
363395
def split(self) -> tuple["KBucket", "KBucket"]:
364396
"""
365397
Split the bucket into two buckets.
@@ -376,8 +408,9 @@ def split(self) -> tuple["KBucket", "KBucket"]:
376408

377409
# Redistribute peers
378410
for peer_id, (peer_info, timestamp) in self.peers.items():
379-
peer_key = int.from_bytes(peer_id.to_bytes(), byteorder="big")
380-
if peer_key < midpoint:
411+
peer_key = peer_id_to_key(peer_id)
412+
peer_key_int = key_to_int(peer_key)
413+
if peer_key_int < midpoint:
381414
lower_bucket.peers[peer_id] = (peer_info, timestamp)
382415
else:
383416
upper_bucket.peers[peer_id] = (peer_info, timestamp)
@@ -458,7 +491,38 @@ async def add_peer(self, peer_obj: PeerInfo | ID) -> bool:
458491
success = await bucket.add_peer(peer_info)
459492
if success:
460493
logger.debug(f"Successfully added peer {peer_id} to routing table")
461-
return success
494+
return True
495+
496+
# If bucket is full and couldn't add peer, try splitting the bucket
497+
# Only split if the bucket contains our Peer ID
498+
if self._should_split_bucket(bucket):
499+
logger.debug(
500+
f"Bucket is full, attempting to split bucket for peer {peer_id}"
501+
)
502+
split_success = self._split_bucket(bucket)
503+
if split_success:
504+
# After splitting,
505+
# find the appropriate bucket for the peer and try to add it
506+
target_bucket = self.find_bucket(peer_info.peer_id)
507+
success = await target_bucket.add_peer(peer_info)
508+
if success:
509+
logger.debug(
510+
f"Successfully added peer {peer_id} after bucket split"
511+
)
512+
return True
513+
else:
514+
logger.debug(
515+
f"Failed to add peer {peer_id} even after bucket split"
516+
)
517+
return False
518+
else:
519+
logger.debug(f"Failed to split bucket for peer {peer_id}")
520+
return False
521+
else:
522+
logger.debug(
523+
f"Bucket is full and cannot be split, peer {peer_id} not added"
524+
)
525+
return False
462526

463527
except Exception as e:
464528
logger.debug(f"Error adding peer {peer_obj} to routing table: {e}")
@@ -480,17 +544,17 @@ def remove_peer(self, peer_id: ID) -> bool:
480544

481545
def find_bucket(self, peer_id: ID) -> KBucket:
482546
"""
483-
Find the bucket that would contain the given peer ID or PeerInfo.
547+
Find the bucket that would contain the given peer ID.
484548
485-
:param peer_obj: Either a peer ID or a PeerInfo object
549+
:param peer_id: The peer ID to find a bucket for
486550
487551
Returns
488552
-------
489553
KBucket: The bucket for this peer
490554
491555
"""
492556
for bucket in self.buckets:
493-
if bucket.key_in_range(peer_id.to_bytes()):
557+
if bucket.peer_id_in_range(peer_id):
494558
return bucket
495559

496560
return self.buckets[0]
@@ -513,7 +577,11 @@ def find_local_closest_peers(self, key: bytes, count: int = 20) -> list[ID]:
513577
all_peers.extend(bucket.peer_ids())
514578

515579
# Sort by XOR distance to the key
516-
all_peers.sort(key=lambda p: xor_distance(p.to_bytes(), key))
580+
def distance_to_key(peer_id: ID) -> int:
581+
peer_key = peer_id_to_key(peer_id)
582+
return xor_distance(peer_key, key)
583+
584+
all_peers.sort(key=distance_to_key)
517585

518586
return all_peers[:count]
519587

@@ -591,10 +659,87 @@ def get_stale_peers(self, stale_threshold_seconds: int = 3600) -> list[ID]:
591659
stale_peers.extend(bucket.get_stale_peers(stale_threshold_seconds))
592660
return stale_peers
593661

662+
def get_peer_infos(self) -> list[PeerInfo]:
663+
"""
664+
Get all PeerInfo objects in the routing table.
665+
666+
Returns
667+
-------
668+
List[PeerInfo]: List of all PeerInfo objects
669+
670+
"""
671+
peer_infos = []
672+
for bucket in self.buckets:
673+
peer_infos.extend(bucket.peer_infos())
674+
return peer_infos
675+
594676
def cleanup_routing_table(self) -> None:
595677
"""
596678
Cleanup the routing table by removing all data.
597679
This is useful for resetting the routing table during tests or reinitialization.
598680
"""
599681
self.buckets = [KBucket(self.host, BUCKET_SIZE)]
600682
logger.info("Routing table cleaned up, all data removed.")
683+
684+
def _should_split_bucket(self, bucket: KBucket) -> bool:
685+
"""
686+
Check if a bucket should be split according to Kademlia rules.
687+
688+
:param bucket: The bucket to check
689+
:return: True if the bucket should be split
690+
"""
691+
# Check if we've exceeded maximum buckets
692+
if len(self.buckets) >= MAXIMUM_BUCKETS:
693+
logger.debug("Maximum number of buckets reached, cannot split")
694+
return False
695+
696+
# Check if the bucket contains our local ID
697+
local_key = peer_id_to_key(self.local_id)
698+
local_key_int = key_to_int(local_key)
699+
contains_local_id = bucket.min_range <= local_key_int < bucket.max_range
700+
701+
logger.debug(
702+
f"Bucket range: {bucket.min_range} - {bucket.max_range}, "
703+
f"local_key_int: {local_key_int}, contains_local: {contains_local_id}"
704+
)
705+
706+
return contains_local_id
707+
708+
def _split_bucket(self, bucket: KBucket) -> bool:
709+
"""
710+
Split a bucket into two buckets.
711+
712+
:param bucket: The bucket to split
713+
:return: True if the bucket was successfully split
714+
"""
715+
try:
716+
# Find the bucket index
717+
bucket_index = self.buckets.index(bucket)
718+
logger.debug(f"Splitting bucket at index {bucket_index}")
719+
720+
# Split the bucket
721+
lower_bucket, upper_bucket = bucket.split()
722+
723+
# Replace the original bucket with the two new buckets
724+
self.buckets[bucket_index] = lower_bucket
725+
self.buckets.insert(bucket_index + 1, upper_bucket)
726+
727+
logger.debug(
728+
f"Bucket split successful. New bucket count: {len(self.buckets)}"
729+
)
730+
logger.debug(
731+
f"Lower bucket range: "
732+
f"{lower_bucket.min_range} - {lower_bucket.max_range}, "
733+
f"peers: {lower_bucket.size()}"
734+
)
735+
logger.debug(
736+
f"Upper bucket range: "
737+
f"{upper_bucket.min_range} - {upper_bucket.max_range}, "
738+
f"peers: {upper_bucket.size()}"
739+
)
740+
741+
return True
742+
743+
except Exception as e:
744+
logger.error(f"Error splitting bucket: {e}")
745+
return False

newsfragments/846.bugfix.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix kbucket splitting in routing table when full. Routing table now maintains multiple kbuckets and properly distributes peers as specified by the Kademlia DHT protocol.

tests/core/kad_dht/test_unit_routing_table.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,32 @@ async def test_ping_peer_scenarios(self, mock_host, sample_peer_info):
226226
class TestRoutingTable:
227227
"""Test suite for RoutingTable class."""
228228

229+
@pytest.mark.trio
230+
async def test_kbucket_split_behavior(self, mock_host, local_peer_id):
231+
"""
232+
Test that adding more than BUCKET_SIZE peers to the routing table
233+
triggers kbucket splitting and all peers are added.
234+
"""
235+
routing_table = RoutingTable(local_peer_id, mock_host)
236+
237+
num_peers = BUCKET_SIZE + 5
238+
peer_ids = []
239+
for i in range(num_peers):
240+
key_pair = create_new_key_pair()
241+
peer_id = ID.from_pubkey(key_pair.public_key)
242+
peer_info = PeerInfo(peer_id, [Multiaddr(f"/ip4/127.0.0.1/tcp/{9000 + i}")])
243+
peer_ids.append(peer_id)
244+
added = await routing_table.add_peer(peer_info)
245+
assert added, f"Peer {peer_id} should be added"
246+
247+
assert len(routing_table.buckets) > 1, "KBucket splitting did not occur"
248+
for pid in peer_ids:
249+
assert routing_table.peer_in_table(pid), f"Peer {pid} not found after split"
250+
all_peer_ids = routing_table.get_peer_ids()
251+
assert set(peer_ids).issubset(set(all_peer_ids)), (
252+
"Not all peers present after split"
253+
)
254+
229255
@pytest.fixture
230256
def mock_host(self):
231257
"""Create a mock host for testing."""

0 commit comments

Comments
 (0)