27
27
28
28
logger = logging .getLogger ("TestFramework.mininode" )
29
29
30
- # Keep our own socket map for asyncore, so that we can track disconnects
31
- # ourselves (to workaround an issue with closing an asyncore socket when
32
- # using select)
33
- mininode_socket_map = dict ()
34
-
35
- # One lock for synchronizing all data access between the networking thread (see
36
- # NetworkThread below) and the thread running the test logic. For simplicity,
37
- # NodeConn acquires this lock whenever delivering a message to a NodeConnCB,
38
- # and whenever adding anything to the send buffer (in send_message()). This
39
- # lock should be acquired in the thread running the test logic to synchronize
40
- # access to any data shared with the NodeConnCB or NodeConn.
41
- mininode_lock = RLock ()
30
+ MESSAGEMAP = {
31
+ b"addr" : msg_addr ,
32
+ b"block" : msg_block ,
33
+ b"blocktxn" : msg_blocktxn ,
34
+ b"cmpctblock" : msg_cmpctblock ,
35
+ b"feefilter" : msg_feefilter ,
36
+ b"getaddr" : msg_getaddr ,
37
+ b"getblocks" : msg_getblocks ,
38
+ b"getblocktxn" : msg_getblocktxn ,
39
+ b"getdata" : msg_getdata ,
40
+ b"getheaders" : msg_getheaders ,
41
+ b"headers" : msg_headers ,
42
+ b"inv" : msg_inv ,
43
+ b"mempool" : msg_mempool ,
44
+ b"ping" : msg_ping ,
45
+ b"pong" : msg_pong ,
46
+ b"reject" : msg_reject ,
47
+ b"sendcmpct" : msg_sendcmpct ,
48
+ b"sendheaders" : msg_sendheaders ,
49
+ b"tx" : msg_tx ,
50
+ b"verack" : msg_verack ,
51
+ b"version" : msg_version ,
52
+ }
53
+
54
+ MAGIC_BYTES = {
55
+ "mainnet" : b"\xf9 \xbe \xb4 \xd9 " , # mainnet
56
+ "testnet3" : b"\x0b \x11 \x09 \x07 " , # testnet3
57
+ "regtest" : b"\xfa \xbf \xb5 \xda " , # regtest
58
+ }
42
59
43
60
class NodeConnCB ():
44
61
"""Callback and helper functions for P2P connection to a bitcoind node.
@@ -183,34 +200,6 @@ class NodeConn(asyncore.dispatcher):
183
200
"""The actual NodeConn class
184
201
185
202
This class provides an interface for a p2p connection to a specified node."""
186
- messagemap = {
187
- b"version" : msg_version ,
188
- b"verack" : msg_verack ,
189
- b"addr" : msg_addr ,
190
- b"inv" : msg_inv ,
191
- b"getdata" : msg_getdata ,
192
- b"getblocks" : msg_getblocks ,
193
- b"tx" : msg_tx ,
194
- b"block" : msg_block ,
195
- b"getaddr" : msg_getaddr ,
196
- b"ping" : msg_ping ,
197
- b"pong" : msg_pong ,
198
- b"headers" : msg_headers ,
199
- b"getheaders" : msg_getheaders ,
200
- b"reject" : msg_reject ,
201
- b"mempool" : msg_mempool ,
202
- b"feefilter" : msg_feefilter ,
203
- b"sendheaders" : msg_sendheaders ,
204
- b"sendcmpct" : msg_sendcmpct ,
205
- b"cmpctblock" : msg_cmpctblock ,
206
- b"getblocktxn" : msg_getblocktxn ,
207
- b"blocktxn" : msg_blocktxn
208
- }
209
- MAGIC_BYTES = {
210
- "mainnet" : b"\xf9 \xbe \xb4 \xd9 " , # mainnet
211
- "testnet3" : b"\x0b \x11 \x09 \x07 " , # testnet3
212
- "regtest" : b"\xfa \xbf \xb5 \xda " , # regtest
213
- }
214
203
215
204
def __init__ (self , dstaddr , dstport , rpc , callback , net = "regtest" , services = NODE_NETWORK | NODE_WITNESS , send_version = True ):
216
205
asyncore .dispatcher .__init__ (self , map = mininode_socket_map )
@@ -247,6 +236,8 @@ def __init__(self, dstaddr, dstport, rpc, callback, net="regtest", services=NODE
247
236
self .handle_close ()
248
237
self .rpc = rpc
249
238
239
+ # Connection and disconnection methods
240
+
250
241
def handle_connect (self ):
251
242
if self .state != "connected" :
252
243
logger .debug ("Connected & Listening: %s:%d" % (self .dstaddr , self .dstport ))
@@ -264,44 +255,30 @@ def handle_close(self):
264
255
pass
265
256
self .cb .on_close (self )
266
257
258
+ def disconnect_node (self ):
259
+ """ Disconnect the p2p connection.
260
+
261
+ Called by the test logic thread. Causes the p2p connection
262
+ to be disconnected on the next iteration of the asyncore loop."""
263
+ self .disconnect = True
264
+
265
+ # Socket read methods
266
+
267
+ def readable (self ):
268
+ return True
269
+
267
270
def handle_read (self ):
268
271
t = self .recv (8192 )
269
272
if len (t ) > 0 :
270
273
self .recvbuf += t
271
274
self .got_data ()
272
275
273
- def readable (self ):
274
- return True
275
-
276
- def writable (self ):
277
- with mininode_lock :
278
- pre_connection = self .state == "connecting"
279
- length = len (self .sendbuf )
280
- return (length > 0 or pre_connection )
281
-
282
- def handle_write (self ):
283
- with mininode_lock :
284
- # asyncore does not expose socket connection, only the first read/write
285
- # event, thus we must check connection manually here to know when we
286
- # actually connect
287
- if self .state == "connecting" :
288
- self .handle_connect ()
289
- if not self .writable ():
290
- return
291
-
292
- try :
293
- sent = self .send (self .sendbuf )
294
- except :
295
- self .handle_close ()
296
- return
297
- self .sendbuf = self .sendbuf [sent :]
298
-
299
276
def got_data (self ):
300
277
try :
301
278
while True :
302
279
if len (self .recvbuf ) < 4 :
303
280
return
304
- if self .recvbuf [:4 ] != self . MAGIC_BYTES [self .network ]:
281
+ if self .recvbuf [:4 ] != MAGIC_BYTES [self .network ]:
305
282
raise ValueError ("got garbage %s" % repr (self .recvbuf ))
306
283
if len (self .recvbuf ) < 4 + 12 + 4 + 4 :
307
284
return
@@ -316,23 +293,54 @@ def got_data(self):
316
293
if checksum != h [:4 ]:
317
294
raise ValueError ("got bad checksum " + repr (self .recvbuf ))
318
295
self .recvbuf = self .recvbuf [4 + 12 + 4 + 4 + msglen :]
319
- if command not in self . messagemap :
296
+ if command not in MESSAGEMAP :
320
297
raise ValueError ("Received unknown command from %s:%d: '%s' %s" % (self .dstaddr , self .dstport , command , repr (msg )))
321
298
f = BytesIO (msg )
322
- t = self . messagemap [command ]()
299
+ t = MESSAGEMAP [command ]()
323
300
t .deserialize (f )
324
301
self .got_message (t )
325
302
except Exception as e :
326
303
logger .exception ('Error reading message:' , repr (e ))
327
304
raise
328
305
306
+ def got_message (self , message ):
307
+ if self .last_sent + 30 * 60 < time .time ():
308
+ self .send_message (MESSAGEMAP [b'ping' ]())
309
+ self ._log_message ("receive" , message )
310
+ self .cb .deliver (self , message )
311
+
312
+ # Socket write methods
313
+
314
+ def writable (self ):
315
+ with mininode_lock :
316
+ pre_connection = self .state == "connecting"
317
+ length = len (self .sendbuf )
318
+ return (length > 0 or pre_connection )
319
+
320
+ def handle_write (self ):
321
+ with mininode_lock :
322
+ # asyncore does not expose socket connection, only the first read/write
323
+ # event, thus we must check connection manually here to know when we
324
+ # actually connect
325
+ if self .state == "connecting" :
326
+ self .handle_connect ()
327
+ if not self .writable ():
328
+ return
329
+
330
+ try :
331
+ sent = self .send (self .sendbuf )
332
+ except :
333
+ self .handle_close ()
334
+ return
335
+ self .sendbuf = self .sendbuf [sent :]
336
+
329
337
def send_message (self , message , pushbuf = False ):
330
338
if self .state != "connected" and not pushbuf :
331
339
raise IOError ('Not connected, no pushbuf' )
332
340
self ._log_message ("send" , message )
333
341
command = message .command
334
342
data = message .serialize ()
335
- tmsg = self . MAGIC_BYTES [self .network ]
343
+ tmsg = MAGIC_BYTES [self .network ]
336
344
tmsg += command
337
345
tmsg += b"\x00 " * (12 - len (command ))
338
346
tmsg += struct .pack ("<I" , len (data ))
@@ -351,11 +359,7 @@ def send_message(self, message, pushbuf=False):
351
359
self .sendbuf += tmsg
352
360
self .last_sent = time .time ()
353
361
354
- def got_message (self , message ):
355
- if self .last_sent + 30 * 60 < time .time ():
356
- self .send_message (self .messagemap [b'ping' ]())
357
- self ._log_message ("receive" , message )
358
- self .cb .deliver (self , message )
362
+ # Class utility methods
359
363
360
364
def _log_message (self , direction , msg ):
361
365
if direction == "send" :
@@ -367,9 +371,19 @@ def _log_message(self, direction, msg):
367
371
log_message += "... (msg truncated)"
368
372
logger .debug (log_message )
369
373
370
- def disconnect_node (self ):
371
- self .disconnect = True
372
374
375
+ # Keep our own socket map for asyncore, so that we can track disconnects
376
+ # ourselves (to workaround an issue with closing an asyncore socket when
377
+ # using select)
378
+ mininode_socket_map = dict ()
379
+
380
+ # One lock for synchronizing all data access between the networking thread (see
381
+ # NetworkThread below) and the thread running the test logic. For simplicity,
382
+ # NodeConn acquires this lock whenever delivering a message to a NodeConnCB,
383
+ # and whenever adding anything to the send buffer (in send_message()). This
384
+ # lock should be acquired in the thread running the test logic to synchronize
385
+ # access to any data shared with the NodeConnCB or NodeConn.
386
+ mininode_lock = RLock ()
373
387
374
388
class NetworkThread (Thread ):
375
389
def run (self ):
@@ -381,6 +395,6 @@ def run(self):
381
395
for fd , obj in mininode_socket_map .items ():
382
396
if obj .disconnect :
383
397
disconnected .append (obj )
384
- [ obj .handle_close () for obj in disconnected ]
398
+ [obj .handle_close () for obj in disconnected ]
385
399
asyncore .loop (0.1 , use_poll = True , map = mininode_socket_map , count = 1 )
386
400
logger .debug ("Network thread closing" )
0 commit comments