71
71
NODE_WITNESS ,
72
72
sha256 ,
73
73
)
74
- from test_framework .util import wait_until_helper
74
+ from test_framework .util import (
75
+ MAX_NODES ,
76
+ p2p_port ,
77
+ wait_until_helper ,
78
+ )
75
79
76
80
logger = logging .getLogger ("TestFramework.p2p" )
77
81
@@ -139,7 +143,7 @@ def __init__(self):
139
143
def is_connected (self ):
140
144
return self ._transport is not None
141
145
142
- def peer_connect (self , dstaddr , dstport , * , net , timeout_factor ):
146
+ def peer_connect_helper (self , dstaddr , dstport , net , timeout_factor ):
143
147
assert not self .is_connected
144
148
self .timeout_factor = timeout_factor
145
149
self .dstaddr = dstaddr
@@ -148,12 +152,20 @@ def peer_connect(self, dstaddr, dstport, *, net, timeout_factor):
148
152
self .on_connection_send_msg = None
149
153
self .recvbuf = b""
150
154
self .magic_bytes = MAGIC_BYTES [net ]
151
- logger .debug ('Connecting to Bitcoin Node: %s:%d' % (self .dstaddr , self .dstport ))
155
+
156
+ def peer_connect (self , dstaddr , dstport , * , net , timeout_factor ):
157
+ self .peer_connect_helper (dstaddr , dstport , net , timeout_factor )
152
158
153
159
loop = NetworkThread .network_event_loop
154
- conn_gen_unsafe = loop .create_connection (lambda : self , host = self .dstaddr , port = self .dstport )
155
- conn_gen = lambda : loop .call_soon_threadsafe (loop .create_task , conn_gen_unsafe )
156
- return conn_gen
160
+ logger .debug ('Connecting to Bitcoin Node: %s:%d' % (self .dstaddr , self .dstport ))
161
+ coroutine = loop .create_connection (lambda : self , host = self .dstaddr , port = self .dstport )
162
+ return lambda : loop .call_soon_threadsafe (loop .create_task , coroutine )
163
+
164
+ def peer_accept_connection (self , connect_id , connect_cb = lambda : None , * , net , timeout_factor ):
165
+ self .peer_connect_helper ('0' , 0 , net , timeout_factor )
166
+
167
+ logger .debug ('Listening for Bitcoin Node with id: {}' .format (connect_id ))
168
+ return lambda : NetworkThread .listen (self , connect_cb , idx = connect_id )
157
169
158
170
def peer_disconnect (self ):
159
171
# Connection could have already been closed by other end.
@@ -312,18 +324,27 @@ def __init__(self, support_addrv2=False, wtxidrelay=True):
312
324
# If the peer supports wtxid-relay
313
325
self .wtxidrelay = wtxidrelay
314
326
315
- def peer_connect (self , * args , services = NODE_NETWORK | NODE_WITNESS , send_version = True , ** kwargs ):
327
+ def peer_connect_send_version (self , services ):
328
+ # Send a version msg
329
+ vt = msg_version ()
330
+ vt .nServices = services
331
+ vt .addrTo .ip = self .dstaddr
332
+ vt .addrTo .port = self .dstport
333
+ vt .addrFrom .ip = "0.0.0.0"
334
+ vt .addrFrom .port = 0
335
+ self .on_connection_send_msg = vt # Will be sent in connection_made callback
336
+
337
+ def peer_connect (self , * args , services = NODE_NETWORK | NODE_WITNESS , send_version = True , ** kwargs ):
316
338
create_conn = super ().peer_connect (* args , ** kwargs )
317
339
318
340
if send_version :
319
- # Send a version msg
320
- vt = msg_version ()
321
- vt .nServices = services
322
- vt .addrTo .ip = self .dstaddr
323
- vt .addrTo .port = self .dstport
324
- vt .addrFrom .ip = "0.0.0.0"
325
- vt .addrFrom .port = 0
326
- self .on_connection_send_msg = vt # Will be sent soon after connection_made
341
+ self .peer_connect_send_version (services )
342
+
343
+ return create_conn
344
+
345
+ def peer_accept_connection (self , * args , services = NODE_NETWORK | NODE_WITNESS , ** kwargs ):
346
+ create_conn = super ().peer_accept_connection (* args , ** kwargs )
347
+ self .peer_connect_send_version (services )
327
348
328
349
return create_conn
329
350
@@ -414,6 +435,10 @@ def test_function():
414
435
415
436
wait_until_helper (test_function , timeout = timeout , lock = p2p_lock , timeout_factor = self .timeout_factor )
416
437
438
+ def wait_for_connect (self , timeout = 60 ):
439
+ test_function = lambda : self .is_connected
440
+ wait_until_helper (test_function , timeout = timeout , lock = p2p_lock )
441
+
417
442
def wait_for_disconnect (self , timeout = 60 ):
418
443
test_function = lambda : not self .is_connected
419
444
self .wait_until (test_function , timeout = timeout , check_connected = False )
@@ -527,6 +552,8 @@ def __init__(self):
527
552
# There is only one event loop and no more than one thread must be created
528
553
assert not self .network_event_loop
529
554
555
+ NetworkThread .listeners = {}
556
+ NetworkThread .protos = {}
530
557
NetworkThread .network_event_loop = asyncio .new_event_loop ()
531
558
532
559
def run (self ):
@@ -542,6 +569,48 @@ def close(self, timeout=10):
542
569
# Safe to remove event loop.
543
570
NetworkThread .network_event_loop = None
544
571
572
+ @classmethod
573
+ def listen (cls , p2p , callback , port = None , addr = None , idx = 1 ):
574
+ """ Ensure a listening server is running on the given port, and run the
575
+ protocol specified by `p2p` on the next connection to it. Once ready
576
+ for connections, call `callback`."""
577
+
578
+ if port is None :
579
+ assert 0 < idx <= MAX_NODES
580
+ port = p2p_port (MAX_NODES - idx )
581
+ if addr is None :
582
+ addr = '127.0.0.1'
583
+
584
+ coroutine = cls .create_listen_server (addr , port , callback , p2p )
585
+ cls .network_event_loop .call_soon_threadsafe (cls .network_event_loop .create_task , coroutine )
586
+
587
+ @classmethod
588
+ async def create_listen_server (cls , addr , port , callback , proto ):
589
+ def peer_protocol ():
590
+ """Returns a function that does the protocol handling for a new
591
+ connection. To allow different connections to have different
592
+ behaviors, the protocol function is first put in the cls.protos
593
+ dict. When the connection is made, the function removes the
594
+ protocol function from that dict, and returns it so the event loop
595
+ can start executing it."""
596
+ response = cls .protos .get ((addr , port ))
597
+ cls .protos [(addr , port )] = None
598
+ return response
599
+
600
+ if (addr , port ) not in cls .listeners :
601
+ # When creating a listener on a given (addr, port) we only need to
602
+ # do it once. If we want different behaviors for different
603
+ # connections, we can accomplish this by providing different
604
+ # `proto` functions
605
+
606
+ listener = await cls .network_event_loop .create_server (peer_protocol , addr , port )
607
+ logger .debug ("Listening server on %s:%d should be started" % (addr , port ))
608
+ cls .listeners [(addr , port )] = listener
609
+
610
+ cls .protos [(addr , port )] = proto
611
+ callback (addr , port )
612
+
613
+
545
614
class P2PDataStore (P2PInterface ):
546
615
"""A P2P data store class.
547
616
0 commit comments