1
1
import argparse
2
2
import base64
3
3
import logging
4
+ import sys
4
5
5
6
import multiaddr
6
7
import trio
@@ -72,14 +73,52 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No
72
73
client_addr = server_addr .replace ("/ip4/0.0.0.0/" , "/ip4/127.0.0.1/" )
73
74
74
75
format_name = "length-prefixed" if use_varint_format else "raw protobuf"
76
+ format_flag = "--raw-format" if not use_varint_format else ""
75
77
print (
76
78
f"First host listening (using { format_name } format). "
77
79
f"Run this from another console:\n \n "
78
- f"identify-demo "
79
- f"-d { client_addr } \n "
80
+ f"identify-demo { format_flag } -d { client_addr } \n "
80
81
)
81
82
print ("Waiting for incoming identify request..." )
82
- await trio .sleep_forever ()
83
+
84
+ # Add a custom handler to show connection events
85
+ async def custom_identify_handler (stream ):
86
+ peer_id = stream .muxed_conn .peer_id
87
+ print (f"\n 🔗 Received identify request from peer: { peer_id } " )
88
+
89
+ # Show remote address in multiaddr format
90
+ try :
91
+ from libp2p .identity .identify .identify import (
92
+ _remote_address_to_multiaddr ,
93
+ )
94
+
95
+ remote_address = stream .get_remote_address ()
96
+ if remote_address :
97
+ observed_multiaddr = _remote_address_to_multiaddr (
98
+ remote_address
99
+ )
100
+ # Add the peer ID to create a complete multiaddr
101
+ complete_multiaddr = f"{ observed_multiaddr } /p2p/{ peer_id } "
102
+ print (f" Remote address: { complete_multiaddr } " )
103
+ else :
104
+ print (f" Remote address: { remote_address } " )
105
+ except Exception :
106
+ print (f" Remote address: { stream .get_remote_address ()} " )
107
+
108
+ # Call the original handler
109
+ await identify_handler (stream )
110
+
111
+ print (f"✅ Successfully processed identify request from { peer_id } " )
112
+
113
+ # Replace the handler with our custom one
114
+ host_a .set_stream_handler (IDENTIFY_PROTOCOL_ID , custom_identify_handler )
115
+
116
+ try :
117
+ await trio .sleep_forever ()
118
+ except KeyboardInterrupt :
119
+ print ("\n 🛑 Shutting down listener..." )
120
+ logger .info ("Listener interrupted by user" )
121
+ return
83
122
84
123
else :
85
124
# Create second host (dialer)
@@ -93,25 +132,74 @@ async def run(port: int, destination: str, use_varint_format: bool = True) -> No
93
132
info = info_from_p2p_addr (maddr )
94
133
print (f"Second host connecting to peer: { info .peer_id } " )
95
134
96
- await host_b .connect (info )
135
+ try :
136
+ await host_b .connect (info )
137
+ except Exception as e :
138
+ error_msg = str (e )
139
+ if "unable to connect" in error_msg or "SwarmException" in error_msg :
140
+ print (f"\n ❌ Cannot connect to peer: { info .peer_id } " )
141
+ print (f" Address: { destination } " )
142
+ print (f" Error: { error_msg } " )
143
+ print (
144
+ "\n 💡 Make sure the peer is running and the address is correct."
145
+ )
146
+ return
147
+ else :
148
+ # Re-raise other exceptions
149
+ raise
150
+
97
151
stream = await host_b .new_stream (info .peer_id , (IDENTIFY_PROTOCOL_ID ,))
98
152
99
153
try :
100
154
print ("Starting identify protocol..." )
101
155
102
- # Read the complete response (could be either format)
103
- # Read a larger chunk to get all the data before stream closes
104
- response = await stream .read (8192 ) # Read enough data in one go
156
+ # Read the response using the utility function
157
+ from libp2p .utils .varint import read_length_prefixed_protobuf
158
+
159
+ response = await read_length_prefixed_protobuf (
160
+ stream , use_varint_format
161
+ )
162
+ full_response = response
105
163
106
164
await stream .close ()
107
165
108
166
# Parse the response using the robust protocol-level function
109
167
# This handles both old and new formats automatically
110
- identify_msg = parse_identify_response (response )
168
+ identify_msg = parse_identify_response (full_response )
111
169
print_identify_response (identify_msg )
112
170
113
171
except Exception as e :
114
- print (f"Identify protocol error: { e } " )
172
+ error_msg = str (e )
173
+ print (f"Identify protocol error: { error_msg } " )
174
+
175
+ # Check for specific format mismatch errors
176
+ if "Error parsing message" in error_msg or "DecodeError" in error_msg :
177
+ print ("\n " + "=" * 60 )
178
+ print ("FORMAT MISMATCH DETECTED!" )
179
+ print ("=" * 60 )
180
+ if use_varint_format :
181
+ print (
182
+ "You are using length-prefixed format (default) but the "
183
+ "listener"
184
+ )
185
+ print ("is using raw protobuf format." )
186
+ print (
187
+ "\n To fix this, run the dialer with the --raw-format flag:"
188
+ )
189
+ print (f"identify-demo --raw-format -d { destination } " )
190
+ else :
191
+ print ("You are using raw protobuf format but the listener" )
192
+ print ("is using length-prefixed format (default)." )
193
+ print (
194
+ "\n To fix this, run the dialer without the --raw-format "
195
+ "flag:"
196
+ )
197
+ print (f"identify-demo -d { destination } " )
198
+ print ("=" * 60 )
199
+ else :
200
+ import traceback
201
+
202
+ traceback .print_exc ()
115
203
116
204
return
117
205
@@ -147,16 +235,27 @@ def main() -> None:
147
235
"length-prefixed (new format)"
148
236
),
149
237
)
238
+
150
239
args = parser .parse_args ()
151
240
152
241
# Determine format: raw format if --raw-format is specified, otherwise
153
242
# length-prefixed
154
243
use_varint_format = not args .raw_format
155
244
156
245
try :
157
- trio .run (run , * (args .port , args .destination , use_varint_format ))
246
+ if args .destination :
247
+ # Run in dialer mode
248
+ trio .run (run , * (args .port , args .destination , use_varint_format ))
249
+ else :
250
+ # Run in listener mode
251
+ trio .run (run , * (args .port , args .destination , use_varint_format ))
158
252
except KeyboardInterrupt :
159
- pass
253
+ print ("\n 👋 Goodbye!" )
254
+ logger .info ("Application interrupted by user" )
255
+ except Exception as e :
256
+ print (f"\n ❌ Error: { str (e )} " )
257
+ logger .error ("Error: %s" , str (e ))
258
+ sys .exit (1 )
160
259
161
260
162
261
if __name__ == "__main__" :
0 commit comments