@@ -168,6 +168,7 @@ def __init__(self):
168168 # p2p_lock must not be acquired after _send_lock as it could result in deadlocks.
169169 self ._send_lock = threading .Lock ()
170170 self .v2_state = None # EncryptedP2PState object needed for v2 p2p connections
171+ self .reconnect = False # set if reconnection needs to happen
171172
172173 @property
173174 def is_connected (self ):
@@ -197,8 +198,9 @@ def peer_connect(self, dstaddr, dstport, *, net, timeout_factor, supports_v2_p2p
197198 coroutine = loop .create_connection (lambda : self , host = self .dstaddr , port = self .dstport )
198199 return lambda : loop .call_soon_threadsafe (loop .create_task , coroutine )
199200
200- def peer_accept_connection (self , connect_id , connect_cb = lambda : None , * , net , timeout_factor , supports_v2_p2p ):
201+ def peer_accept_connection (self , connect_id , connect_cb = lambda : None , * , net , timeout_factor , supports_v2_p2p , reconnect ):
201202 self .peer_connect_helper ('0' , 0 , net , timeout_factor )
203+ self .reconnect = reconnect
202204 if supports_v2_p2p :
203205 self .v2_state = EncryptedP2PState (initiating = False , net = net )
204206
@@ -222,14 +224,16 @@ def connection_made(self, transport):
222224 send_handshake_bytes = self .v2_state .initiate_v2_handshake ()
223225 self .send_raw_message (send_handshake_bytes )
224226 # if v2 connection, send `on_connection_send_msg` after initial v2 handshake.
225- if self .on_connection_send_msg and not self .supports_v2_p2p :
227+ # if reconnection situation, send `on_connection_send_msg` after version message is received in `on_version()`.
228+ if self .on_connection_send_msg and not self .supports_v2_p2p and not self .reconnect :
226229 self .send_message (self .on_connection_send_msg )
227230 self .on_connection_send_msg = None # Never used again
228231 self .on_open ()
229232
230233 def connection_lost (self , exc ):
231234 """asyncio callback when a connection is closed."""
232- if exc :
235+ # don't display warning if reconnection needs to be attempted using v1 P2P
236+ if exc and not self .reconnect :
233237 logger .warning ("Connection lost to {}:{} due to {}" .format (self .dstaddr , self .dstport , exc ))
234238 else :
235239 logger .debug ("Closed connection to: %s:%d" % (self .dstaddr , self .dstport ))
@@ -279,9 +283,9 @@ def v2_handshake(self):
279283 if not is_mac_auth :
280284 raise ValueError ("invalid v2 mac tag in handshake authentication" )
281285 self .recvbuf = self .recvbuf [length :]
282- while self .v2_state .tried_v2_handshake and self .queue_messages :
283- message = self .queue_messages . pop ( 0 )
284- self .send_message ( message )
286+ if self .v2_state .tried_v2_handshake and self .on_connection_send_msg :
287+ self .send_message ( self . on_connection_send_msg )
288+ self .on_connection_send_msg = None
285289
286290 # Socket read methods
287291
@@ -350,7 +354,8 @@ def _on_data(self):
350354 self ._log_message ("receive" , t )
351355 self .on_message (t )
352356 except Exception as e :
353- logger .exception ('Error reading message:' , repr (e ))
357+ if not self .reconnect :
358+ logger .exception ('Error reading message:' , repr (e ))
354359 raise
355360
356361 def on_message (self , message ):
@@ -549,6 +554,12 @@ def on_verack(self, message):
549554
550555 def on_version (self , message ):
551556 assert message .nVersion >= MIN_P2P_VERSION_SUPPORTED , "Version {} received. Test framework only supports versions greater than {}" .format (message .nVersion , MIN_P2P_VERSION_SUPPORTED )
557+ # reconnection using v1 P2P has happened since version message can be processed, previously unsent version message is sent using v1 P2P here
558+ if self .reconnect :
559+ if self .on_connection_send_msg :
560+ self .send_message (self .on_connection_send_msg )
561+ self .on_connection_send_msg = None
562+ self .reconnect = False
552563 if message .nVersion >= 70016 and self .wtxidrelay :
553564 self .send_message (msg_wtxidrelay ())
554565 if self .support_addrv2 :
@@ -721,6 +732,11 @@ def listen(cls, p2p, callback, port=None, addr=None, idx=1):
721732 if addr is None :
722733 addr = '127.0.0.1'
723734
735+ def exception_handler (loop , context ):
736+ if not p2p .reconnect :
737+ loop .default_exception_handler (context )
738+
739+ cls .network_event_loop .set_exception_handler (exception_handler )
724740 coroutine = cls .create_listen_server (addr , port , callback , p2p )
725741 cls .network_event_loop .call_soon_threadsafe (cls .network_event_loop .create_task , coroutine )
726742
@@ -734,7 +750,9 @@ def peer_protocol():
734750 protocol function from that dict, and returns it so the event loop
735751 can start executing it."""
736752 response = cls .protos .get ((addr , port ))
737- cls .protos [(addr , port )] = None
753+ # remove protocol function from dict only when reconnection doesn't need to happen/already happened
754+ if not proto .reconnect :
755+ cls .protos [(addr , port )] = None
738756 return response
739757
740758 if (addr , port ) not in cls .listeners :
0 commit comments