Skip to content

Commit 6fe28dc

Browse files
authored
Merge branch 'main' into py-multiaddr
2 parents 003e7bf + 62ea3bb commit 6fe28dc

File tree

24 files changed

+2294
-63
lines changed

24 files changed

+2294
-63
lines changed

examples/identify/identify.py

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88
from libp2p import (
99
new_host,
1010
)
11-
from libp2p.identity.identify.identify import ID as IDENTIFY_PROTOCOL_ID
12-
from libp2p.identity.identify.pb.identify_pb2 import (
13-
Identify,
11+
from libp2p.identity.identify.identify import (
12+
ID as IDENTIFY_PROTOCOL_ID,
13+
identify_handler_for,
14+
parse_identify_response,
1415
)
1516
from libp2p.peer.peerinfo import (
1617
info_from_p2p_addr,
@@ -50,19 +51,32 @@ def print_identify_response(identify_response):
5051
)
5152

5253

53-
async def run(port: int, destination: str) -> None:
54+
async def run(port: int, destination: str, use_varint_format: bool = True) -> None:
5455
localhost_ip = "0.0.0.0"
5556

5657
if not destination:
5758
# Create first host (listener)
5859
listen_addr = multiaddr.Multiaddr(f"/ip4/{localhost_ip}/tcp/{port}")
5960
host_a = new_host()
6061

62+
# Set up identify handler with specified format
63+
identify_handler = identify_handler_for(
64+
host_a, use_varint_format=use_varint_format
65+
)
66+
host_a.set_stream_handler(IDENTIFY_PROTOCOL_ID, identify_handler)
67+
6168
async with host_a.run(listen_addrs=[listen_addr]):
69+
# Get the actual address and replace 0.0.0.0 with 127.0.0.1 for client
70+
# connections
71+
server_addr = str(host_a.get_addrs()[0])
72+
client_addr = server_addr.replace("/ip4/0.0.0.0/", "/ip4/127.0.0.1/")
73+
74+
format_name = "length-prefixed" if use_varint_format else "raw protobuf"
6275
print(
63-
"First host listening. Run this from another console:\n\n"
76+
f"First host listening (using {format_name} format). "
77+
f"Run this from another console:\n\n"
6478
f"identify-demo "
65-
f"-d {host_a.get_addrs()[0]}\n"
79+
f"-d {client_addr}\n"
6680
)
6781
print("Waiting for incoming identify request...")
6882
await trio.sleep_forever()
@@ -84,11 +98,18 @@ async def run(port: int, destination: str) -> None:
8498

8599
try:
86100
print("Starting identify protocol...")
87-
response = await stream.read()
101+
102+
# Read the complete response (could be either format)
103+
# Read a larger chunk to get all the data before stream closes
104+
response = await stream.read(8192) # Read enough data in one go
105+
88106
await stream.close()
89-
identify_msg = Identify()
90-
identify_msg.ParseFromString(response)
107+
108+
# Parse the response using the robust protocol-level function
109+
# This handles both old and new formats automatically
110+
identify_msg = parse_identify_response(response)
91111
print_identify_response(identify_msg)
112+
92113
except Exception as e:
93114
print(f"Identify protocol error: {e}")
94115

@@ -98,9 +119,12 @@ async def run(port: int, destination: str) -> None:
98119
def main() -> None:
99120
description = """
100121
This program demonstrates the libp2p identify protocol.
101-
First run identify-demo -p <PORT>' to start a listener.
122+
First run 'identify-demo -p <PORT> [--raw-format]' to start a listener.
102123
Then run 'identify-demo <ANOTHER_PORT> -d <DESTINATION>'
103124
where <DESTINATION> is the multiaddress shown by the listener.
125+
126+
Use --raw-format to send raw protobuf messages (old format) instead of
127+
length-prefixed protobuf messages (new format, default).
104128
"""
105129

106130
example_maddr = (
@@ -115,10 +139,22 @@ def main() -> None:
115139
type=str,
116140
help=f"destination multiaddr string, e.g. {example_maddr}",
117141
)
142+
parser.add_argument(
143+
"--raw-format",
144+
action="store_true",
145+
help=(
146+
"use raw protobuf format (old format) instead of "
147+
"length-prefixed (new format)"
148+
),
149+
)
118150
args = parser.parse_args()
119151

152+
# Determine format: raw format if --raw-format is specified, otherwise
153+
# length-prefixed
154+
use_varint_format = not args.raw_format
155+
120156
try:
121-
trio.run(run, *(args.port, args.destination))
157+
trio.run(run, *(args.port, args.destination, use_varint_format))
122158
except KeyboardInterrupt:
123159
pass
124160

examples/identify_push/identify_push_listener_dialer.py

Lines changed: 92 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,56 @@
5757
logger = logging.getLogger("libp2p.identity.identify-push-example")
5858

5959

60-
def custom_identify_push_handler_for(host):
60+
def custom_identify_push_handler_for(host, use_varint_format: bool = True):
6161
"""
6262
Create a custom handler for the identify/push protocol that logs and prints
6363
the identity information received from the dialer.
64+
65+
Args:
66+
host: The libp2p host
67+
use_varint_format: If True, expect length-prefixed format; if False, expect
68+
raw protobuf
69+
6470
"""
6571

6672
async def handle_identify_push(stream: INetStream) -> None:
6773
peer_id = stream.muxed_conn.peer_id
6874

6975
try:
70-
# Read the identify message from the stream
71-
data = await stream.read()
76+
if use_varint_format:
77+
# Read length-prefixed identify message from the stream
78+
from libp2p.utils.varint import decode_varint_from_bytes
79+
80+
# First read the varint length prefix
81+
length_bytes = b""
82+
while True:
83+
b = await stream.read(1)
84+
if not b:
85+
break
86+
length_bytes += b
87+
if b[0] & 0x80 == 0:
88+
break
89+
90+
if not length_bytes:
91+
logger.warning("No length prefix received from peer %s", peer_id)
92+
return
93+
94+
msg_length = decode_varint_from_bytes(length_bytes)
95+
96+
# Read the protobuf message
97+
data = await stream.read(msg_length)
98+
if len(data) != msg_length:
99+
logger.warning("Incomplete message received from peer %s", peer_id)
100+
return
101+
else:
102+
# Read raw protobuf message from the stream
103+
data = b""
104+
while True:
105+
chunk = await stream.read(4096)
106+
if not chunk:
107+
break
108+
data += chunk
109+
72110
identify_msg = Identify()
73111
identify_msg.ParseFromString(data)
74112

@@ -129,19 +167,28 @@ async def handle_identify_push(stream: INetStream) -> None:
129167
return handle_identify_push
130168

131169

132-
async def run_listener(port: int) -> None:
170+
async def run_listener(port: int, use_varint_format: bool = True) -> None:
133171
"""Run a host in listener mode."""
134-
print(f"\n==== Starting Identify-Push Listener on port {port} ====\n")
172+
format_name = "length-prefixed" if use_varint_format else "raw protobuf"
173+
print(
174+
f"\n==== Starting Identify-Push Listener on port {port} "
175+
f"(using {format_name} format) ====\n"
176+
)
135177

136178
# Create key pair for the listener
137179
key_pair = create_new_key_pair()
138180

139181
# Create the listener host
140182
host = new_host(key_pair=key_pair)
141183

142-
# Set up the identify and identify/push handlers
143-
host.set_stream_handler(ID_IDENTIFY, identify_handler_for(host))
144-
host.set_stream_handler(ID_IDENTIFY_PUSH, custom_identify_push_handler_for(host))
184+
# Set up the identify and identify/push handlers with specified format
185+
host.set_stream_handler(
186+
ID_IDENTIFY, identify_handler_for(host, use_varint_format=use_varint_format)
187+
)
188+
host.set_stream_handler(
189+
ID_IDENTIFY_PUSH,
190+
identify_push_handler_for(host, use_varint_format=use_varint_format),
191+
)
145192

146193
# Start listening
147194
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
@@ -165,19 +212,30 @@ async def run_listener(port: int) -> None:
165212
await trio.sleep_forever()
166213

167214

168-
async def run_dialer(port: int, destination: str) -> None:
215+
async def run_dialer(
216+
port: int, destination: str, use_varint_format: bool = True
217+
) -> None:
169218
"""Run a host in dialer mode that connects to a listener."""
170-
print(f"\n==== Starting Identify-Push Dialer on port {port} ====\n")
219+
format_name = "length-prefixed" if use_varint_format else "raw protobuf"
220+
print(
221+
f"\n==== Starting Identify-Push Dialer on port {port} "
222+
f"(using {format_name} format) ====\n"
223+
)
171224

172225
# Create key pair for the dialer
173226
key_pair = create_new_key_pair()
174227

175228
# Create the dialer host
176229
host = new_host(key_pair=key_pair)
177230

178-
# Set up the identify and identify/push handlers
179-
host.set_stream_handler(ID_IDENTIFY, identify_handler_for(host))
180-
host.set_stream_handler(ID_IDENTIFY_PUSH, identify_push_handler_for(host))
231+
# Set up the identify and identify/push handlers with specified format
232+
host.set_stream_handler(
233+
ID_IDENTIFY, identify_handler_for(host, use_varint_format=use_varint_format)
234+
)
235+
host.set_stream_handler(
236+
ID_IDENTIFY_PUSH,
237+
identify_push_handler_for(host, use_varint_format=use_varint_format),
238+
)
181239

182240
# Start listening on a different port
183241
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
@@ -206,7 +264,9 @@ async def run_dialer(port: int, destination: str) -> None:
206264

207265
try:
208266
# Call push_identify_to_peer which returns a boolean
209-
success = await push_identify_to_peer(host, peer_info.peer_id)
267+
success = await push_identify_to_peer(
268+
host, peer_info.peer_id, use_varint_format=use_varint_format
269+
)
210270

211271
if success:
212272
logger.info("Identify push completed successfully!")
@@ -240,29 +300,40 @@ def main() -> None:
240300
This program demonstrates the libp2p identify/push protocol.
241301
Without arguments, it runs as a listener on random port.
242302
With -d parameter, it runs as a dialer on random port.
243-
"""
244303
245-
example = (
246-
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
247-
)
304+
Use --raw-format to send raw protobuf messages (old format) instead of
305+
length-prefixed protobuf messages (new format, default).
306+
"""
248307

249308
parser = argparse.ArgumentParser(description=description)
250309
parser.add_argument("-p", "--port", default=0, type=int, help="source port number")
251310
parser.add_argument(
252311
"-d",
253312
"--destination",
254313
type=str,
255-
help=f"destination multiaddr string, e.g. {example}",
314+
help="destination multiaddr string",
315+
)
316+
parser.add_argument(
317+
"--raw-format",
318+
action="store_true",
319+
help=(
320+
"use raw protobuf format (old format) instead of "
321+
"length-prefixed (new format)"
322+
),
256323
)
257324
args = parser.parse_args()
258325

326+
# Determine format: raw format if --raw-format is specified, otherwise
327+
# length-prefixed
328+
use_varint_format = not args.raw_format
329+
259330
try:
260331
if args.destination:
261332
# Run in dialer mode with random available port if not specified
262-
trio.run(run_dialer, args.port, args.destination)
333+
trio.run(run_dialer, args.port, args.destination, use_varint_format)
263334
else:
264335
# Run in listener mode with random available port if not specified
265-
trio.run(run_listener, args.port)
336+
trio.run(run_listener, args.port, use_varint_format)
266337
except KeyboardInterrupt:
267338
print("\nInterrupted by user")
268339
logger.info("Interrupted by user")

libp2p/host/defaults.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,8 @@
2626

2727
def get_default_protocols(host: IHost) -> "OrderedDict[TProtocol, StreamHandlerFn]":
2828
return OrderedDict(
29-
((IdentifyID, identify_handler_for(host)), (PingID, handle_ping))
29+
(
30+
(IdentifyID, identify_handler_for(host, use_varint_format=True)),
31+
(PingID, handle_ping),
32+
)
3033
)

libp2p/identity/identify/identify.py

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
StreamClosed,
1717
)
1818
from libp2p.utils import (
19+
decode_varint_with_size,
1920
get_agent_version,
21+
varint,
2022
)
2123

2224
from .pb.identify_pb2 import (
@@ -72,7 +74,47 @@ def _mk_identify_protobuf(
7274
)
7375

7476

75-
def identify_handler_for(host: IHost) -> StreamHandlerFn:
77+
def parse_identify_response(response: bytes) -> Identify:
78+
"""
79+
Parse identify response that could be either:
80+
- Old format: raw protobuf
81+
- New format: length-prefixed protobuf
82+
83+
This function provides backward and forward compatibility.
84+
"""
85+
# Try new format first: length-prefixed protobuf
86+
if len(response) >= 1:
87+
length, varint_size = decode_varint_with_size(response)
88+
if varint_size > 0 and length > 0 and varint_size + length <= len(response):
89+
protobuf_data = response[varint_size : varint_size + length]
90+
try:
91+
identify_response = Identify()
92+
identify_response.ParseFromString(protobuf_data)
93+
# Sanity check: must have agent_version (protocol_version is optional)
94+
if identify_response.agent_version:
95+
logger.debug(
96+
"Parsed length-prefixed identify response (new format)"
97+
)
98+
return identify_response
99+
except Exception:
100+
pass # Fall through to old format
101+
102+
# Fall back to old format: raw protobuf
103+
try:
104+
identify_response = Identify()
105+
identify_response.ParseFromString(response)
106+
logger.debug("Parsed raw protobuf identify response (old format)")
107+
return identify_response
108+
except Exception as e:
109+
logger.error(f"Failed to parse identify response: {e}")
110+
logger.error(f"Response length: {len(response)}")
111+
logger.error(f"Response hex: {response.hex()}")
112+
raise
113+
114+
115+
def identify_handler_for(
116+
host: IHost, use_varint_format: bool = False
117+
) -> StreamHandlerFn:
76118
async def handle_identify(stream: INetStream) -> None:
77119
# get observed address from ``stream``
78120
peer_id = (
@@ -100,7 +142,21 @@ async def handle_identify(stream: INetStream) -> None:
100142
response = protobuf.SerializeToString()
101143

102144
try:
103-
await stream.write(response)
145+
if use_varint_format:
146+
# Send length-prefixed protobuf message (new format)
147+
await stream.write(varint.encode_uvarint(len(response)))
148+
await stream.write(response)
149+
logger.debug(
150+
"Sent new format (length-prefixed) identify response to %s",
151+
peer_id,
152+
)
153+
else:
154+
# Send raw protobuf message (old format for backward compatibility)
155+
await stream.write(response)
156+
logger.debug(
157+
"Sent old format (raw protobuf) identify response to %s",
158+
peer_id,
159+
)
104160
except StreamClosed:
105161
logger.debug("Fail to respond to %s request: stream closed", ID)
106162
else:

0 commit comments

Comments
 (0)