@@ -168,6 +168,7 @@ def __init__(self):
168
168
# p2p_lock must not be acquired after _send_lock as it could result in deadlocks.
169
169
self ._send_lock = threading .Lock ()
170
170
self .v2_state = None # EncryptedP2PState object needed for v2 p2p connections
171
+ self .reconnect = False # set if reconnection needs to happen
171
172
172
173
@property
173
174
def is_connected (self ):
@@ -197,8 +198,9 @@ def peer_connect(self, dstaddr, dstport, *, net, timeout_factor, supports_v2_p2p
197
198
coroutine = loop .create_connection (lambda : self , host = self .dstaddr , port = self .dstport )
198
199
return lambda : loop .call_soon_threadsafe (loop .create_task , coroutine )
199
200
200
- def peer_accept_connection (self , connect_id , connect_cb = lambda : None , * , net , timeout_factor , supports_v2_p2p ):
201
+ def peer_accept_connection (self , connect_id , connect_cb = lambda : None , * , net , timeout_factor , supports_v2_p2p , reconnect ):
201
202
self .peer_connect_helper ('0' , 0 , net , timeout_factor )
203
+ self .reconnect = reconnect
202
204
if supports_v2_p2p :
203
205
self .v2_state = EncryptedP2PState (initiating = False , net = net )
204
206
@@ -222,14 +224,16 @@ def connection_made(self, transport):
222
224
send_handshake_bytes = self .v2_state .initiate_v2_handshake ()
223
225
self .send_raw_message (send_handshake_bytes )
224
226
# if v2 connection, send `on_connection_send_msg` after initial v2 handshake.
225
- if self .on_connection_send_msg and not self .supports_v2_p2p :
227
+ # if reconnection situation, send `on_connection_send_msg` after version message is received in `on_version()`.
228
+ if self .on_connection_send_msg and not self .supports_v2_p2p and not self .reconnect :
226
229
self .send_message (self .on_connection_send_msg )
227
230
self .on_connection_send_msg = None # Never used again
228
231
self .on_open ()
229
232
230
233
def connection_lost (self , exc ):
231
234
"""asyncio callback when a connection is closed."""
232
- if exc :
235
+ # don't display warning if reconnection needs to be attempted using v1 P2P
236
+ if exc and not self .reconnect :
233
237
logger .warning ("Connection lost to {}:{} due to {}" .format (self .dstaddr , self .dstport , exc ))
234
238
else :
235
239
logger .debug ("Closed connection to: %s:%d" % (self .dstaddr , self .dstport ))
@@ -279,9 +283,9 @@ def v2_handshake(self):
279
283
if not is_mac_auth :
280
284
raise ValueError ("invalid v2 mac tag in handshake authentication" )
281
285
self .recvbuf = self .recvbuf [length :]
282
- while self .v2_state .tried_v2_handshake and self .queue_messages :
283
- message = self .queue_messages . pop ( 0 )
284
- self .send_message ( message )
286
+ if self .v2_state .tried_v2_handshake and self .on_connection_send_msg :
287
+ self .send_message ( self . on_connection_send_msg )
288
+ self .on_connection_send_msg = None
285
289
286
290
# Socket read methods
287
291
@@ -350,7 +354,8 @@ def _on_data(self):
350
354
self ._log_message ("receive" , t )
351
355
self .on_message (t )
352
356
except Exception as e :
353
- logger .exception ('Error reading message:' , repr (e ))
357
+ if not self .reconnect :
358
+ logger .exception ('Error reading message:' , repr (e ))
354
359
raise
355
360
356
361
def on_message (self , message ):
@@ -549,6 +554,12 @@ def on_verack(self, message):
549
554
550
555
def on_version (self , message ):
551
556
assert message .nVersion >= MIN_P2P_VERSION_SUPPORTED , "Version {} received. Test framework only supports versions greater than {}" .format (message .nVersion , MIN_P2P_VERSION_SUPPORTED )
557
+ # reconnection using v1 P2P has happened since version message can be processed, previously unsent version message is sent using v1 P2P here
558
+ if self .reconnect :
559
+ if self .on_connection_send_msg :
560
+ self .send_message (self .on_connection_send_msg )
561
+ self .on_connection_send_msg = None
562
+ self .reconnect = False
552
563
if message .nVersion >= 70016 and self .wtxidrelay :
553
564
self .send_message (msg_wtxidrelay ())
554
565
if self .support_addrv2 :
@@ -721,6 +732,11 @@ def listen(cls, p2p, callback, port=None, addr=None, idx=1):
721
732
if addr is None :
722
733
addr = '127.0.0.1'
723
734
735
+ def exception_handler (loop , context ):
736
+ if not p2p .reconnect :
737
+ loop .default_exception_handler (context )
738
+
739
+ cls .network_event_loop .set_exception_handler (exception_handler )
724
740
coroutine = cls .create_listen_server (addr , port , callback , p2p )
725
741
cls .network_event_loop .call_soon_threadsafe (cls .network_event_loop .create_task , coroutine )
726
742
@@ -734,7 +750,9 @@ def peer_protocol():
734
750
protocol function from that dict, and returns it so the event loop
735
751
can start executing it."""
736
752
response = cls .protos .get ((addr , port ))
737
- cls .protos [(addr , port )] = None
753
+ # remove protocol function from dict only when reconnection doesn't need to happen/already happened
754
+ if not proto .reconnect :
755
+ cls .protos [(addr , port )] = None
738
756
return response
739
757
740
758
if (addr , port ) not in cls .listeners :
0 commit comments