Skip to content

Commit 5c6779f

Browse files
authored
Merge pull request libp2p#1095 from sumanjeet0012/feat/kademlia_validator
Implement custom validators and quorum based value retrieval
2 parents f505fcb + 880b79a commit 5c6779f

File tree

6 files changed

+341
-51
lines changed

6 files changed

+341
-51
lines changed

examples/kademlia/kademlia.py

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
A basic example of using the Kademlia DHT implementation, with all setup logic inlined.
55
This example demonstrates both value storage/retrieval and content server
66
advertisement/discovery.
7+
It also shows how to use custom validators for namespaced keys.
78
"""
89

910
import argparse
@@ -31,6 +32,7 @@
3132
DHTMode,
3233
KadDHT,
3334
)
35+
from libp2p.records.validator import Validator
3436
from libp2p.tools.async_service import (
3537
background_trio_service,
3638
)
@@ -39,6 +41,46 @@
3941
)
4042
from libp2p.utils.paths import get_script_dir, join_paths
4143

44+
45+
# Custom validator for the "example" namespace
46+
class ExampleValidator(Validator):
47+
"""
48+
A simple validator for the 'example' namespace.
49+
50+
This validator accepts any value and always selects the first value
51+
when comparing multiple values.
52+
"""
53+
54+
def validate(self, key: str, value: bytes) -> None:
55+
"""
56+
Validate a key-value pair.
57+
58+
In a real application, you might check:
59+
- Value format/schema
60+
- Signatures
61+
- Size limits
62+
- etc.
63+
"""
64+
# For this example, we accept any value
65+
# You can add custom validation logic here
66+
if not value:
67+
raise ValueError("Value cannot be empty")
68+
69+
def select(self, key: str, values: list[bytes]) -> int:
70+
"""
71+
Select the best value from a list of values.
72+
73+
Returns the index of the selected value.
74+
In this example, we simply return the first value (index 0).
75+
76+
In a real application, you might:
77+
- Compare timestamps
78+
- Check version numbers
79+
- Verify signatures and pick the most recent valid one
80+
"""
81+
return 0
82+
83+
4284
# Configure logging
4385
logging.basicConfig(
4486
level=logging.INFO,
@@ -175,6 +217,12 @@ async def run_node(
175217

176218
await connect_to_bootstrap_nodes(host, bootstrap_nodes)
177219
dht = KadDHT(host, dht_mode)
220+
221+
# Register a custom validator for the "example" namespace
222+
# This allows us to store values with keys like "/example/my-key"
223+
dht.register_validator("example", ExampleValidator())
224+
logger.info("Registered custom 'example' namespace validator")
225+
178226
# take all peer ids from the host and add them to the dht
179227
for peer_id in host.get_peerstore().peer_ids():
180228
await dht.routing_table.add_peer(peer_id)
@@ -188,12 +236,15 @@ async def run_node(
188236
async with background_trio_service(dht):
189237
logger.info(f"DHT service started in {dht_mode.value} mode")
190238

191-
# Example 1: Simple Key-Value Storage
192-
# Just use a string key directly - DHT API accepts strings!
193-
key = "my-example-key"
239+
# Example 1: Simple Key-Value Storage with namespaced key
240+
# Keys MUST be namespaced (e.g., /namespace/key) for validation
241+
# The namespace must have a registered validator
242+
key = "/example/my-example-key"
194243
value = b"Hello from py-libp2p!"
195244

196245
# Example 2: Content Provider Advertisement
246+
# Provider keys use a different storage mechanism (provider store)
247+
# that doesn't go through the value validation path
197248
content_id = "my-content-identifier"
198249

199250
if dht_mode == DHTMode.SERVER:

libp2p/kad_dht/kad_dht.py

Lines changed: 178 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
)
5151
from .pb.kademlia_pb2 import (
5252
Message,
53+
Record,
5354
)
5455
from .peer_routing import (
5556
PeerRouting,
@@ -77,6 +78,32 @@ class DHTMode(Enum):
7778
SERVER = "SERVER"
7879

7980

81+
# Timestamp validation constants
82+
MAX_TIMESTAMP_AGE = 24 * 60 * 60 # 24 hours in seconds
83+
MAX_TIMESTAMP_FUTURE = 5 * 60 # 5 minutes in the future in seconds
84+
85+
86+
def is_valid_timestamp(ts: float) -> bool:
87+
"""
88+
Validate if a timestamp is within acceptable bounds.
89+
90+
Args:
91+
ts: The timestamp to validate (Unix timestamp in seconds)
92+
93+
Returns:
94+
bool: True if timestamp is valid (not too old and not too far in future)
95+
96+
"""
97+
current_time = time.time()
98+
# Check if timestamp is not in the future by more than MAX_TIMESTAMP_FUTURE
99+
if ts > current_time + MAX_TIMESTAMP_FUTURE:
100+
return False
101+
# Check if timestamp is not too far in the past
102+
if current_time - ts > MAX_TIMESTAMP_AGE:
103+
return False
104+
return True
105+
106+
80107
class KadDHT(Service):
81108
"""
82109
Kademlia DHT implementation for libp2p.
@@ -714,6 +741,13 @@ async def handle_stream(self, stream: INetStream) -> None:
714741
"Missing key or value in PUT_VALUE message"
715742
)
716743

744+
# Always validate the key-value pair before storing
745+
# Reject keys without registered namespace validators
746+
key_str = key.decode("utf-8")
747+
if self.validator is None:
748+
raise ValueError("Validator is required for DHT operations")
749+
self.validator.validate(key_str, value)
750+
717751
self.value_store.put(key, value)
718752
logger.debug(f"Stored value {value.hex()} for key {key.hex()}")
719753
success = True
@@ -771,24 +805,28 @@ async def put_value(self, key: str, value: bytes) -> None:
771805
key: String key (will be converted to bytes for storage)
772806
value: Binary value to store
773807
808+
Raises:
809+
InvalidRecordType: If no validator is registered for the key's namespace
810+
ValueError: If trying to replace a newer value with an older one
811+
774812
"""
775813
logger.debug(f"Storing value for key {key}")
776814

777-
# Validate if key starts with "/" (namespaced keys like /pk/...)
778-
if self.validator is not None and key.startswith("/"):
779-
self.validator.validate(key, value)
780-
781-
key_bytes = key.encode("utf-8")
782-
old_value_record = self.value_store.get(key_bytes)
783-
if old_value_record is not None and old_value_record.value != value:
784-
index = self.validator.select(key, [value, old_value_record.value])
785-
if index != 0:
786-
raise ValueError(
787-
"Refusing to replace newer value with the older one"
788-
)
815+
# Always validate the key-value pair using the namespaced validator
816+
# This will raise InvalidRecordType if:
817+
# - The key is not namespaced (doesn't start with / or has no second /)
818+
# - No validator is registered for the key's namespace
819+
# Following Go libp2p behavior where only namespaced keys are allowed
820+
if self.validator is None:
821+
raise ValueError("Validator is required for DHT operations")
822+
self.validator.validate(key, value)
789823

790-
# Convert string key to bytes for storage
791824
key_bytes = key.encode("utf-8")
825+
old_value_record = self.value_store.get(key_bytes)
826+
if old_value_record is not None and old_value_record.value != value:
827+
index = self.validator.select(key, [value, old_value_record.value])
828+
if index != 0:
829+
raise ValueError("Refusing to replace newer value with the older one")
792830

793831
# 1. Store locally first
794832
self.value_store.put(key_bytes, value)
@@ -834,15 +872,21 @@ async def store_one(idx: int, peer: ID) -> None:
834872

835873
logger.info(f"Successfully stored value at {stored_count} peers")
836874

837-
async def get_value(self, key: str) -> bytes | None:
875+
async def get_value(self, key: str, quorum: int = 0) -> bytes | None:
838876
"""
839877
Retrieve a value from the DHT.
840878
841879
Args:
842880
key: String key (will be converted to bytes for lookup)
881+
quorum: Minimum number of valid peer responses required for confidence.
882+
If quorum > 0 and not met, the function still returns the best value
883+
found (if any) but logs a warning. Set to 0 to disable quorum checking.
843884
844885
Returns:
845-
The value if found, None otherwise
886+
The value if found (best value even if quorum not met), None otherwise.
887+
Note: When quorum is not met, a warning is logged but the best available
888+
value is still returned. This allows graceful degradation when the network
889+
has insufficient peers.
846890
847891
"""
848892
logger.debug(f"Getting value for key: {key}")
@@ -864,32 +908,110 @@ async def get_value(self, key: str) -> bytes | None:
864908
]
865909
logger.debug(f"Searching {len(closest_peers)} peers for value")
866910

911+
# Collect valid records from peers: mapping peer -> Record
912+
valid_records: list[tuple[ID, Record]] = []
913+
867914
# 3. Query ALPHA peers at a time in parallel
915+
# Use list to track mutable state (pyrefly requirement)
916+
total_responses_list: list[int] = [0]
868917
for i in range(0, len(closest_peers), ALPHA):
869918
batch = closest_peers[i : i + ALPHA]
870-
found_value = None
871919

872920
async def query_one(peer: ID) -> None:
873-
nonlocal found_value
874921
try:
875922
with trio.move_on_after(QUERY_TIMEOUT):
876-
value = await self.value_store._get_from_peer(peer, key_bytes)
877-
if value is not None and found_value is None:
878-
found_value = value
879-
logger.debug(f"Found value at peer {peer}")
923+
# Fetch the record directly to get timeReceived
924+
rec = await self.value_store._get_from_peer(
925+
peer, key_bytes, return_record=True
926+
)
927+
if rec is not None:
928+
total_responses_list[0] += 1
929+
# Validate the record's value
930+
try:
931+
if self.validator is None:
932+
raise ValueError("Validator is required")
933+
if not isinstance(rec, Record):
934+
raise TypeError("Expected Record type")
935+
self.validator.validate(key, rec.value)
936+
valid_records.append((peer, rec))
937+
logger.debug(f"Found valid record at peer {peer}")
938+
except Exception as e:
939+
logger.debug(
940+
f"Received invalid record from {peer}, "
941+
f"discarding: {e}"
942+
)
880943
except Exception as e:
881944
logger.debug(f"Error querying peer {peer}: {e}")
882945

883946
async with trio.open_nursery() as nursery:
884947
for peer in batch:
885948
nursery.start_soon(query_one, peer)
886949

887-
if found_value is not None:
888-
self.value_store.put(key_bytes, found_value)
889-
logger.info("Successfully retrieved value from network")
890-
return found_value
950+
# If quorum is set and we have enough valid records, we can stop early
951+
# Note: quorum counts valid records, not all responses.
952+
if quorum and len(valid_records) >= quorum:
953+
logger.debug(f"Quorum reached ({len(valid_records)} valid records)")
954+
break
955+
956+
# 4. Select the best record if any valid records were found
957+
if valid_records:
958+
# Check if quorum was met
959+
if quorum > 0 and len(valid_records) < quorum:
960+
logger.warning(
961+
f"Quorum not met: found {len(valid_records)} valid records, "
962+
f"required {quorum}. Returning best value found."
963+
)
964+
965+
# Select the best record using the validator
966+
# Note: Following Go libp2p's approach, we use validator.select() to choose
967+
# the best value, not timestamps. The timeReceived field is for local
968+
# bookkeeping only, not for distributed consensus on the "best" record.
969+
if self.validator is None:
970+
raise ValueError("Validator is required for record selection")
971+
972+
values = [rec.value for _p, rec in valid_records]
973+
best_idx = self.validator.select(key, values)
974+
logger.debug(
975+
f"Selected best value at index {best_idx}using validator.select()"
976+
)
977+
978+
best_peer, best_rec = valid_records[best_idx]
979+
best_value = best_rec.value
980+
981+
# Propagate the best record to peers that have different values
982+
# This ensures network consistency, following Go libp2p's approach
983+
outdated_peers: list[ID] = []
984+
for peer, rec in valid_records:
985+
# Propagate if the peer has a different value than the best
986+
if rec.value != best_value:
987+
outdated_peers.append(peer)
891988

892-
# 4. Not found
989+
if outdated_peers:
990+
logger.debug(
991+
f"Propagating best value to {len(outdated_peers)}"
992+
"peers with outdated values"
993+
)
994+
995+
async def propagate(peer: ID) -> None:
996+
try:
997+
with trio.move_on_after(QUERY_TIMEOUT):
998+
await self.value_store._store_at_peer(
999+
peer, key_bytes, best_value
1000+
)
1001+
logger.debug(f"Propagated updated record to peer {peer}")
1002+
except Exception as e:
1003+
logger.debug(f"Failed to propagate to peer {peer}: {e}")
1004+
1005+
async with trio.open_nursery() as nursery:
1006+
for p in outdated_peers:
1007+
nursery.start_soon(propagate, p)
1008+
1009+
# Store the best value locally
1010+
self.value_store.put(key_bytes, best_value)
1011+
logger.info("Successfully retrieved value from network")
1012+
return best_value
1013+
1014+
# 5. Not found
8931015
logger.warning(f"Value not found for key {key}")
8941016
return None
8951017

@@ -949,6 +1071,36 @@ def get_value_store_size(self) -> int:
9491071
"""
9501072
return self.value_store.size()
9511073

1074+
def register_validator(self, namespace: str, validator: Validator) -> None:
1075+
"""
1076+
Register a custom validator for a specific namespace.
1077+
1078+
This allows storing and retrieving values with custom namespaced keys
1079+
(e.g., /myapp/key). The validator will be used to validate values
1080+
before storing and after retrieval.
1081+
1082+
Args:
1083+
namespace: The namespace string (e.g., "myapp" for keys like /myapp/key)
1084+
validator: A Validator instance with validate() and select() methods
1085+
1086+
Example:
1087+
class MyValidator(Validator):
1088+
def validate(self, key: str, value: bytes) -> None:
1089+
# Custom validation logic
1090+
pass
1091+
1092+
def select(self, key: str, values: list[bytes]) -> int:
1093+
return 0 # Return index of best value
1094+
1095+
dht.register_validator("myapp", MyValidator())
1096+
await dht.put_value("/myapp/my-key", b"my-value")
1097+
1098+
"""
1099+
if self.validator is None:
1100+
self.validator = NamespacedValidator({namespace: validator})
1101+
else:
1102+
self.validator.add_validator(namespace, validator)
1103+
9521104
def is_random_walk_enabled(self) -> bool:
9531105
"""
9541106
Check if random walk peer discovery is enabled.

0 commit comments

Comments
 (0)