57
57
logger = logging .getLogger ("libp2p.identity.identify-push-example" )
58
58
59
59
60
- def custom_identify_push_handler_for (host ):
60
+ def custom_identify_push_handler_for (host , use_varint_format : bool = True ):
61
61
"""
62
62
Create a custom handler for the identify/push protocol that logs and prints
63
63
the identity information received from the dialer.
64
+
65
+ Args:
66
+ host: The libp2p host
67
+ use_varint_format: If True, expect length-prefixed format; if False, expect
68
+ raw protobuf
69
+
64
70
"""
65
71
66
72
async def handle_identify_push (stream : INetStream ) -> None :
67
73
peer_id = stream .muxed_conn .peer_id
68
74
69
75
try :
70
- # Read the identify message from the stream
71
- data = await stream .read ()
76
+ if use_varint_format :
77
+ # Read length-prefixed identify message from the stream
78
+ from libp2p .utils .varint import decode_varint_from_bytes
79
+
80
+ # First read the varint length prefix
81
+ length_bytes = b""
82
+ while True :
83
+ b = await stream .read (1 )
84
+ if not b :
85
+ break
86
+ length_bytes += b
87
+ if b [0 ] & 0x80 == 0 :
88
+ break
89
+
90
+ if not length_bytes :
91
+ logger .warning ("No length prefix received from peer %s" , peer_id )
92
+ return
93
+
94
+ msg_length = decode_varint_from_bytes (length_bytes )
95
+
96
+ # Read the protobuf message
97
+ data = await stream .read (msg_length )
98
+ if len (data ) != msg_length :
99
+ logger .warning ("Incomplete message received from peer %s" , peer_id )
100
+ return
101
+ else :
102
+ # Read raw protobuf message from the stream
103
+ data = b""
104
+ while True :
105
+ chunk = await stream .read (4096 )
106
+ if not chunk :
107
+ break
108
+ data += chunk
109
+
72
110
identify_msg = Identify ()
73
111
identify_msg .ParseFromString (data )
74
112
@@ -129,19 +167,28 @@ async def handle_identify_push(stream: INetStream) -> None:
129
167
return handle_identify_push
130
168
131
169
132
- async def run_listener (port : int ) -> None :
170
+ async def run_listener (port : int , use_varint_format : bool = True ) -> None :
133
171
"""Run a host in listener mode."""
134
- print (f"\n ==== Starting Identify-Push Listener on port { port } ====\n " )
172
+ format_name = "length-prefixed" if use_varint_format else "raw protobuf"
173
+ print (
174
+ f"\n ==== Starting Identify-Push Listener on port { port } "
175
+ f"(using { format_name } format) ====\n "
176
+ )
135
177
136
178
# Create key pair for the listener
137
179
key_pair = create_new_key_pair ()
138
180
139
181
# Create the listener host
140
182
host = new_host (key_pair = key_pair )
141
183
142
- # Set up the identify and identify/push handlers
143
- host .set_stream_handler (ID_IDENTIFY , identify_handler_for (host ))
144
- host .set_stream_handler (ID_IDENTIFY_PUSH , custom_identify_push_handler_for (host ))
184
+ # Set up the identify and identify/push handlers with specified format
185
+ host .set_stream_handler (
186
+ ID_IDENTIFY , identify_handler_for (host , use_varint_format = use_varint_format )
187
+ )
188
+ host .set_stream_handler (
189
+ ID_IDENTIFY_PUSH ,
190
+ identify_push_handler_for (host , use_varint_format = use_varint_format ),
191
+ )
145
192
146
193
# Start listening
147
194
listen_addr = multiaddr .Multiaddr (f"/ip4/0.0.0.0/tcp/{ port } " )
@@ -165,19 +212,30 @@ async def run_listener(port: int) -> None:
165
212
await trio .sleep_forever ()
166
213
167
214
168
- async def run_dialer (port : int , destination : str ) -> None :
215
+ async def run_dialer (
216
+ port : int , destination : str , use_varint_format : bool = True
217
+ ) -> None :
169
218
"""Run a host in dialer mode that connects to a listener."""
170
- print (f"\n ==== Starting Identify-Push Dialer on port { port } ====\n " )
219
+ format_name = "length-prefixed" if use_varint_format else "raw protobuf"
220
+ print (
221
+ f"\n ==== Starting Identify-Push Dialer on port { port } "
222
+ f"(using { format_name } format) ====\n "
223
+ )
171
224
172
225
# Create key pair for the dialer
173
226
key_pair = create_new_key_pair ()
174
227
175
228
# Create the dialer host
176
229
host = new_host (key_pair = key_pair )
177
230
178
- # Set up the identify and identify/push handlers
179
- host .set_stream_handler (ID_IDENTIFY , identify_handler_for (host ))
180
- host .set_stream_handler (ID_IDENTIFY_PUSH , identify_push_handler_for (host ))
231
+ # Set up the identify and identify/push handlers with specified format
232
+ host .set_stream_handler (
233
+ ID_IDENTIFY , identify_handler_for (host , use_varint_format = use_varint_format )
234
+ )
235
+ host .set_stream_handler (
236
+ ID_IDENTIFY_PUSH ,
237
+ identify_push_handler_for (host , use_varint_format = use_varint_format ),
238
+ )
181
239
182
240
# Start listening on a different port
183
241
listen_addr = multiaddr .Multiaddr (f"/ip4/0.0.0.0/tcp/{ port } " )
@@ -206,7 +264,9 @@ async def run_dialer(port: int, destination: str) -> None:
206
264
207
265
try :
208
266
# Call push_identify_to_peer which returns a boolean
209
- success = await push_identify_to_peer (host , peer_info .peer_id )
267
+ success = await push_identify_to_peer (
268
+ host , peer_info .peer_id , use_varint_format = use_varint_format
269
+ )
210
270
211
271
if success :
212
272
logger .info ("Identify push completed successfully!" )
@@ -240,29 +300,40 @@ def main() -> None:
240
300
This program demonstrates the libp2p identify/push protocol.
241
301
Without arguments, it runs as a listener on random port.
242
302
With -d parameter, it runs as a dialer on random port.
243
- """
244
303
245
- example = (
246
- "/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
247
- )
304
+ Use --raw-format to send raw protobuf messages (old format) instead of
305
+ length-prefixed protobuf messages (new format, default).
306
+ """
248
307
249
308
parser = argparse .ArgumentParser (description = description )
250
309
parser .add_argument ("-p" , "--port" , default = 0 , type = int , help = "source port number" )
251
310
parser .add_argument (
252
311
"-d" ,
253
312
"--destination" ,
254
313
type = str ,
255
- help = f"destination multiaddr string, e.g. { example } " ,
314
+ help = "destination multiaddr string" ,
315
+ )
316
+ parser .add_argument (
317
+ "--raw-format" ,
318
+ action = "store_true" ,
319
+ help = (
320
+ "use raw protobuf format (old format) instead of "
321
+ "length-prefixed (new format)"
322
+ ),
256
323
)
257
324
args = parser .parse_args ()
258
325
326
+ # Determine format: raw format if --raw-format is specified, otherwise
327
+ # length-prefixed
328
+ use_varint_format = not args .raw_format
329
+
259
330
try :
260
331
if args .destination :
261
332
# Run in dialer mode with random available port if not specified
262
- trio .run (run_dialer , args .port , args .destination )
333
+ trio .run (run_dialer , args .port , args .destination , use_varint_format )
263
334
else :
264
335
# Run in listener mode with random available port if not specified
265
- trio .run (run_listener , args .port )
336
+ trio .run (run_listener , args .port , use_varint_format )
266
337
except KeyboardInterrupt :
267
338
print ("\n Interrupted by user" )
268
339
logger .info ("Interrupted by user" )
0 commit comments