@@ -77,14 +77,20 @@ def __init__(self):
77
77
78
78
super ().__init__ (map = mininode_socket_map )
79
79
80
+ self ._conn_open = False
81
+
82
+ @property
83
+ def is_connected (self ):
84
+ return self ._conn_open
85
+
80
86
def peer_connect (self , dstaddr , dstport , net = "regtest" ):
81
87
self .dstaddr = dstaddr
82
88
self .dstport = dstport
83
89
self .create_socket (socket .AF_INET , socket .SOCK_STREAM )
84
90
self .socket .setsockopt (socket .IPPROTO_TCP , socket .TCP_NODELAY , 1 )
85
91
self .sendbuf = b""
86
92
self .recvbuf = b""
87
- self .state = "connecting"
93
+ self ._asyncore_pre_connection = True
88
94
self .network = net
89
95
self .disconnect = False
90
96
@@ -97,22 +103,23 @@ def peer_connect(self, dstaddr, dstport, net="regtest"):
97
103
98
104
def peer_disconnect (self ):
99
105
# Connection could have already been closed by other end.
100
- if self .state == "connected" :
101
- self .disconnect_node ()
106
+ if self .is_connected :
107
+ self .disconnect = True # Signal asyncore to disconnect
102
108
103
109
# Connection and disconnection methods
104
110
105
111
def handle_connect (self ):
106
112
"""asyncore callback when a connection is opened."""
107
- if self .state != "connected" :
113
+ if not self .is_connected :
108
114
logger .debug ("Connected & Listening: %s:%d" % (self .dstaddr , self .dstport ))
109
- self .state = "connected"
115
+ self ._conn_open = True
116
+ self ._asyncore_pre_connection = False
110
117
self .on_open ()
111
118
112
119
def handle_close (self ):
113
120
"""asyncore callback when a connection is closed."""
114
121
logger .debug ("Closing connection to: %s:%d" % (self .dstaddr , self .dstport ))
115
- self .state = "closed"
122
+ self ._conn_open = False
116
123
self .recvbuf = b""
117
124
self .sendbuf = b""
118
125
try :
@@ -121,13 +128,6 @@ def handle_close(self):
121
128
pass
122
129
self .on_close ()
123
130
124
- def disconnect_node (self ):
125
- """Disconnect the p2p connection.
126
-
127
- Called by the test logic thread. Causes the p2p connection
128
- to be disconnected on the next iteration of the asyncore loop."""
129
- self .disconnect = True
130
-
131
131
# Socket read methods
132
132
133
133
def handle_read (self ):
@@ -182,17 +182,16 @@ def on_message(self, message):
182
182
def writable (self ):
183
183
"""asyncore method to determine whether the handle_write() callback should be called on the next loop."""
184
184
with mininode_lock :
185
- pre_connection = self .state == "connecting"
186
185
length = len (self .sendbuf )
187
- return ( length > 0 or pre_connection )
186
+ return length > 0 or self . _asyncore_pre_connection
188
187
189
188
def handle_write (self ):
190
189
"""asyncore callback when data should be written to the socket."""
191
190
with mininode_lock :
192
191
# asyncore does not expose socket connection, only the first read/write
193
192
# event, thus we must check connection manually here to know when we
194
193
# actually connect
195
- if self .state == "connecting" :
194
+ if self ._asyncore_pre_connection :
196
195
self .handle_connect ()
197
196
if not self .writable ():
198
197
return
@@ -204,26 +203,17 @@ def handle_write(self):
204
203
return
205
204
self .sendbuf = self .sendbuf [sent :]
206
205
207
- def send_message (self , message , pushbuf = False ):
206
+ def send_message (self , message ):
208
207
"""Send a P2P message over the socket.
209
208
210
209
This method takes a P2P payload, builds the P2P header and adds
211
210
the message to the send buffer to be sent over the socket."""
212
- if self . state != "connected" and not pushbuf :
213
- raise IOError ('Not connected, no pushbuf ' )
211
+ if not self . is_connected :
212
+ raise IOError ('Not connected' )
214
213
self ._log_message ("send" , message )
215
- command = message .command
216
- data = message .serialize ()
217
- tmsg = MAGIC_BYTES [self .network ]
218
- tmsg += command
219
- tmsg += b"\x00 " * (12 - len (command ))
220
- tmsg += struct .pack ("<I" , len (data ))
221
- th = sha256 (data )
222
- h = sha256 (th )
223
- tmsg += h [:4 ]
224
- tmsg += data
214
+ tmsg = self ._build_message (message )
225
215
with mininode_lock :
226
- if ( len (self .sendbuf ) == 0 and not pushbuf ) :
216
+ if len (self .sendbuf ) == 0 :
227
217
try :
228
218
sent = self .send (tmsg )
229
219
self .sendbuf = tmsg [sent :]
@@ -234,6 +224,20 @@ def send_message(self, message, pushbuf=False):
234
224
235
225
# Class utility methods
236
226
227
+ def _build_message (self , message ):
228
+ """Build a serialized P2P message"""
229
+ command = message .command
230
+ data = message .serialize ()
231
+ tmsg = MAGIC_BYTES [self .network ]
232
+ tmsg += command
233
+ tmsg += b"\x00 " * (12 - len (command ))
234
+ tmsg += struct .pack ("<I" , len (data ))
235
+ th = sha256 (data )
236
+ h = sha256 (th )
237
+ tmsg += h [:4 ]
238
+ tmsg += data
239
+ return tmsg
240
+
237
241
def _log_message (self , direction , msg ):
238
242
"""Logs a message being sent or received over the connection."""
239
243
if direction == "send" :
@@ -280,7 +284,7 @@ def peer_connect(self, *args, services=NODE_NETWORK|NODE_WITNESS, send_version=T
280
284
vt .addrTo .port = self .dstport
281
285
vt .addrFrom .ip = "0.0.0.0"
282
286
vt .addrFrom .port = 0
283
- self .send_message (vt , True )
287
+ self .sendbuf = self . _build_message (vt ) # Will be sent right after handle_connect
284
288
285
289
# Message receiving methods
286
290
@@ -348,7 +352,7 @@ def on_version(self, message):
348
352
# Connection helper methods
349
353
350
354
def wait_for_disconnect (self , timeout = 60 ):
351
- test_function = lambda : self .state != "connected"
355
+ test_function = lambda : not self .is_connected
352
356
wait_until (test_function , timeout = timeout , lock = mininode_lock )
353
357
354
358
# Message receiving helper methods
0 commit comments