5757logger = logging .getLogger ("libp2p.identity.identify-push-example" )
5858
5959
60- def custom_identify_push_handler_for (host ):
60+ def custom_identify_push_handler_for (host , use_varint_format : bool = True ):
6161 """
6262 Create a custom handler for the identify/push protocol that logs and prints
6363 the identity information received from the dialer.
64+
65+ Args:
66+ host: The libp2p host
67+ use_varint_format: If True, expect length-prefixed format; if False, expect
68+ raw protobuf
69+
6470 """
6571
6672 async def handle_identify_push (stream : INetStream ) -> None :
6773 peer_id = stream .muxed_conn .peer_id
6874
6975 try :
70- # Read the identify message from the stream
71- data = await stream .read ()
76+ if use_varint_format :
77+ # Read length-prefixed identify message from the stream
78+ from libp2p .utils .varint import decode_varint_from_bytes
79+
80+ # First read the varint length prefix
81+ length_bytes = b""
82+ while True :
83+ b = await stream .read (1 )
84+ if not b :
85+ break
86+ length_bytes += b
87+ if b [0 ] & 0x80 == 0 :
88+ break
89+
90+ if not length_bytes :
91+ logger .warning ("No length prefix received from peer %s" , peer_id )
92+ return
93+
94+ msg_length = decode_varint_from_bytes (length_bytes )
95+
96+ # Read the protobuf message
97+ data = await stream .read (msg_length )
98+ if len (data ) != msg_length :
99+ logger .warning ("Incomplete message received from peer %s" , peer_id )
100+ return
101+ else :
102+ # Read raw protobuf message from the stream
103+ data = b""
104+ while True :
105+ chunk = await stream .read (4096 )
106+ if not chunk :
107+ break
108+ data += chunk
109+
72110 identify_msg = Identify ()
73111 identify_msg .ParseFromString (data )
74112
@@ -129,19 +167,28 @@ async def handle_identify_push(stream: INetStream) -> None:
129167 return handle_identify_push
130168
131169
132- async def run_listener (port : int ) -> None :
170+ async def run_listener (port : int , use_varint_format : bool = True ) -> None :
133171 """Run a host in listener mode."""
134- print (f"\n ==== Starting Identify-Push Listener on port { port } ====\n " )
172+ format_name = "length-prefixed" if use_varint_format else "raw protobuf"
173+ print (
174+ f"\n ==== Starting Identify-Push Listener on port { port } "
175+ f"(using { format_name } format) ====\n "
176+ )
135177
136178 # Create key pair for the listener
137179 key_pair = create_new_key_pair ()
138180
139181 # Create the listener host
140182 host = new_host (key_pair = key_pair )
141183
142- # Set up the identify and identify/push handlers
143- host .set_stream_handler (ID_IDENTIFY , identify_handler_for (host ))
144- host .set_stream_handler (ID_IDENTIFY_PUSH , custom_identify_push_handler_for (host ))
184+ # Set up the identify and identify/push handlers with specified format
185+ host .set_stream_handler (
186+ ID_IDENTIFY , identify_handler_for (host , use_varint_format = use_varint_format )
187+ )
188+ host .set_stream_handler (
189+ ID_IDENTIFY_PUSH ,
190+ identify_push_handler_for (host , use_varint_format = use_varint_format ),
191+ )
145192
146193 # Start listening
147194 listen_addr = multiaddr .Multiaddr (f"/ip4/0.0.0.0/tcp/{ port } " )
@@ -165,19 +212,30 @@ async def run_listener(port: int) -> None:
165212 await trio .sleep_forever ()
166213
167214
168- async def run_dialer (port : int , destination : str ) -> None :
215+ async def run_dialer (
216+ port : int , destination : str , use_varint_format : bool = True
217+ ) -> None :
169218 """Run a host in dialer mode that connects to a listener."""
170- print (f"\n ==== Starting Identify-Push Dialer on port { port } ====\n " )
219+ format_name = "length-prefixed" if use_varint_format else "raw protobuf"
220+ print (
221+ f"\n ==== Starting Identify-Push Dialer on port { port } "
222+ f"(using { format_name } format) ====\n "
223+ )
171224
172225 # Create key pair for the dialer
173226 key_pair = create_new_key_pair ()
174227
175228 # Create the dialer host
176229 host = new_host (key_pair = key_pair )
177230
178- # Set up the identify and identify/push handlers
179- host .set_stream_handler (ID_IDENTIFY , identify_handler_for (host ))
180- host .set_stream_handler (ID_IDENTIFY_PUSH , identify_push_handler_for (host ))
231+ # Set up the identify and identify/push handlers with specified format
232+ host .set_stream_handler (
233+ ID_IDENTIFY , identify_handler_for (host , use_varint_format = use_varint_format )
234+ )
235+ host .set_stream_handler (
236+ ID_IDENTIFY_PUSH ,
237+ identify_push_handler_for (host , use_varint_format = use_varint_format ),
238+ )
181239
182240 # Start listening on a different port
183241 listen_addr = multiaddr .Multiaddr (f"/ip4/0.0.0.0/tcp/{ port } " )
@@ -206,7 +264,9 @@ async def run_dialer(port: int, destination: str) -> None:
206264
207265 try :
208266 # Call push_identify_to_peer which returns a boolean
209- success = await push_identify_to_peer (host , peer_info .peer_id )
267+ success = await push_identify_to_peer (
268+ host , peer_info .peer_id , use_varint_format = use_varint_format
269+ )
210270
211271 if success :
212272 logger .info ("Identify push completed successfully!" )
@@ -240,29 +300,40 @@ def main() -> None:
240300 This program demonstrates the libp2p identify/push protocol.
241301 Without arguments, it runs as a listener on random port.
242302 With -d parameter, it runs as a dialer on random port.
243- """
244303
245- example = (
246- "/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
247- )
304+ Use --raw-format to send raw protobuf messages (old format) instead of
305+ length-prefixed protobuf messages (new format, default).
306+ """
248307
249308 parser = argparse .ArgumentParser (description = description )
250309 parser .add_argument ("-p" , "--port" , default = 0 , type = int , help = "source port number" )
251310 parser .add_argument (
252311 "-d" ,
253312 "--destination" ,
254313 type = str ,
255- help = f"destination multiaddr string, e.g. { example } " ,
314+ help = "destination multiaddr string" ,
315+ )
316+ parser .add_argument (
317+ "--raw-format" ,
318+ action = "store_true" ,
319+ help = (
320+ "use raw protobuf format (old format) instead of "
321+ "length-prefixed (new format)"
322+ ),
256323 )
257324 args = parser .parse_args ()
258325
326+ # Determine format: raw format if --raw-format is specified, otherwise
327+ # length-prefixed
328+ use_varint_format = not args .raw_format
329+
259330 try :
260331 if args .destination :
261332 # Run in dialer mode with random available port if not specified
262- trio .run (run_dialer , args .port , args .destination )
333+ trio .run (run_dialer , args .port , args .destination , use_varint_format )
263334 else :
264335 # Run in listener mode with random available port if not specified
265- trio .run (run_listener , args .port )
336+ trio .run (run_listener , args .port , use_varint_format )
266337 except KeyboardInterrupt :
267338 print ("\n Interrupted by user" )
268339 logger .info ("Interrupted by user" )
0 commit comments