20
20
ser_*, deser_*: functions that handle serialization/deserialization
21
21
"""
22
22
23
- import struct
24
- import socket
25
23
import asyncore
26
- import time
27
- import sys
28
- import random
29
- from .util import hex_str_to_bytes , bytes_to_hex_str
30
- from io import BytesIO
31
24
from codecs import encode
25
+ from collections import defaultdict
26
+ import copy
32
27
import hashlib
33
- from threading import RLock
34
- from threading import Thread
28
+ from io import BytesIO
35
29
import logging
36
- import copy
30
+ import random
31
+ import socket
32
+ import struct
33
+ import sys
34
+ import time
35
+ from threading import RLock , Thread
36
+
37
37
from test_framework .siphash import siphash256
38
+ from test_framework .util import hex_str_to_bytes , bytes_to_hex_str
38
39
39
40
BIP0031_VERSION = 60000
40
41
MY_VERSION = 70014 # past bip-31 for ping/pong
@@ -1465,30 +1466,57 @@ def serialize(self):
1465
1466
r += self .block_transactions .serialize (with_witness = True )
1466
1467
return r
1467
1468
1468
- # This is what a callback should look like for NodeConn
1469
- # Reimplement the on_* functions to provide handling for events
1470
1469
class NodeConnCB (object ):
1470
+ """Callback and helper functions for P2P connection to a bitcoind node.
1471
+
1472
+ Individual testcases should subclass this and override the on_* methods
1473
+ if they want to alter message handling behaviour.
1474
+ """
1475
+
1471
1476
def __init__ (self ):
1472
- self .verack_received = False
1477
+ # Track whether we have a P2P connection open to the node
1478
+ self .connected = False
1479
+ self .connection = None
1480
+
1481
+ # Track number of messages of each type received and the most recent
1482
+ # message of each type
1483
+ self .message_count = defaultdict (int )
1484
+ self .last_message = {}
1485
+
1486
+ # A count of the number of ping messages we've sent to the node
1487
+ self .ping_counter = 1
1488
+
1473
1489
# deliver_sleep_time is helpful for debugging race conditions in p2p
1474
1490
# tests; it causes message delivery to sleep for the specified time
1475
1491
# before acquiring the global lock and delivering the next message.
1476
1492
self .deliver_sleep_time = None
1493
+
1477
1494
# Remember the services our peer has advertised
1478
1495
self .peer_services = None
1479
- self .connection = None
1480
- self .ping_counter = 1
1481
- self .last_pong = msg_pong ()
1496
+
1497
+ # Message receiving methods
1482
1498
1483
1499
def deliver (self , conn , message ):
1500
+ """Receive message and dispatch message to appropriate callback.
1501
+
1502
+ We keep a count of how many of each message type has been received
1503
+ and the most recent message of each type.
1504
+
1505
+ Optionally waits for deliver_sleep_time before dispatching message.
1506
+ """
1507
+
1484
1508
deliver_sleep = self .get_deliver_sleep_time ()
1485
1509
if deliver_sleep is not None :
1486
1510
time .sleep (deliver_sleep )
1487
1511
with mininode_lock :
1488
1512
try :
1489
- getattr (self , 'on_' + message .command .decode ('ascii' ))(conn , message )
1513
+ command = message .command .decode ('ascii' )
1514
+ self .message_count [command ] += 1
1515
+ self .last_message [command ] = message
1516
+ getattr (self , 'on_' + command )(conn , message )
1490
1517
except :
1491
- logger .exception ("ERROR delivering %s" % repr (message ))
1518
+ print ("ERROR delivering %s (%s)" % (repr (message ),
1519
+ sys .exc_info ()[0 ]))
1492
1520
1493
1521
def set_deliver_sleep_time (self , value ):
1494
1522
with mininode_lock :
@@ -1498,14 +1526,20 @@ def get_deliver_sleep_time(self):
1498
1526
with mininode_lock :
1499
1527
return self .deliver_sleep_time
1500
1528
1501
- # Callbacks which can be overridden by subclasses
1502
- #################################################
1529
+ # Callback methods. Can be overridden by subclasses in individual test
1530
+ # cases to provide custom message handling behaviour.
1531
+
1532
+ def on_open (self , conn ):
1533
+ self .connected = True
1534
+
1535
+ def on_close (self , conn ):
1536
+ self .connected = False
1537
+ self .connection = None
1503
1538
1504
1539
def on_addr (self , conn , message ): pass
1505
1540
def on_alert (self , conn , message ): pass
1506
1541
def on_block (self , conn , message ): pass
1507
1542
def on_blocktxn (self , conn , message ): pass
1508
- def on_close (self , conn ): pass
1509
1543
def on_cmpctblock (self , conn , message ): pass
1510
1544
def on_feefilter (self , conn , message ): pass
1511
1545
def on_getaddr (self , conn , message ): pass
@@ -1515,7 +1549,7 @@ def on_getdata(self, conn, message): pass
1515
1549
def on_getheaders (self , conn , message ): pass
1516
1550
def on_headers (self , conn , message ): pass
1517
1551
def on_mempool (self , conn ): pass
1518
- def on_open (self , conn ): pass
1552
+ def on_pong (self , conn , message ): pass
1519
1553
def on_reject (self , conn , message ): pass
1520
1554
def on_sendcmpct (self , conn , message ): pass
1521
1555
def on_sendheaders (self , conn , message ): pass
@@ -1533,9 +1567,6 @@ def on_ping(self, conn, message):
1533
1567
if conn .ver_send > BIP0031_VERSION :
1534
1568
conn .send_message (msg_pong (message .nonce ))
1535
1569
1536
- def on_pong (self , conn , message ):
1537
- self .last_pong = message
1538
-
1539
1570
def on_verack (self , conn , message ):
1540
1571
conn .ver_recv = conn .ver_send
1541
1572
self .verack_received = True
@@ -1548,44 +1579,69 @@ def on_version(self, conn, message):
1548
1579
conn .ver_recv = conn .ver_send
1549
1580
conn .nServices = message .nServices
1550
1581
1551
- # Helper functions
1552
- ##################
1582
+ # Connection helper methods
1553
1583
1554
1584
def add_connection (self , conn ):
1555
1585
self .connection = conn
1556
1586
1557
- # Wrapper for the NodeConn's send_message function
1587
+ def wait_for_disconnect (self , timeout = 60 ):
1588
+ test_function = lambda : not self .connected
1589
+ assert wait_until (test_function , timeout = timeout )
1590
+
1591
+ # Message receiving helper methods
1592
+
1593
+ def sync (self , test_function , timeout = 60 ):
1594
+ while timeout > 0 :
1595
+ with mininode_lock :
1596
+ if test_function ():
1597
+ return
1598
+ time .sleep (0.05 )
1599
+ timeout -= 0.05
1600
+ raise AssertionError ("Sync failed to complete" )
1601
+
1602
+ def wait_for_block (self , blockhash , timeout = 60 ):
1603
+ test_function = lambda : self .last_message .get ("block" ) and self .last_message ["block" ].block .rehash () == blockhash
1604
+ self .sync (test_function , timeout )
1605
+
1606
+ def wait_for_getdata (self , timeout = 60 ):
1607
+ test_function = lambda : self .last_message .get ("getdata" )
1608
+ self .sync (test_function , timeout )
1609
+
1610
+ def wait_for_getheaders (self , timeout = 60 ):
1611
+ test_function = lambda : self .last_message .get ("getheaders" )
1612
+ self .sync (test_function , timeout )
1613
+
1614
+ def wait_for_inv (self , expected_inv , timeout = 60 ):
1615
+ test_function = lambda : self .last_message .get ("inv" ) and self .last_message ["inv" ] != expected_inv
1616
+ self .sync (test_function , timeout )
1617
+
1618
+ def wait_for_verack (self , timeout = 60 ):
1619
+ test_function = lambda : self .message_count ["verack" ]
1620
+ self .sync (test_function , timeout )
1621
+
1622
+ # Message sending helper functions
1623
+
1558
1624
def send_message (self , message ):
1559
- self .connection .send_message (message )
1625
+ if self .connection :
1626
+ self .connection .send_message (message )
1627
+ else :
1628
+ logger .error ("Cannot send message. No connection to node!" )
1560
1629
1561
1630
def send_and_ping (self , message ):
1562
1631
self .send_message (message )
1563
1632
self .sync_with_ping ()
1564
1633
1565
1634
# Sync up with the node
1566
1635
def sync_with_ping (self , timeout = 60 ):
1567
- def received_pong ():
1568
- return (self .last_pong .nonce == self .ping_counter )
1569
1636
self .send_message (msg_ping (nonce = self .ping_counter ))
1570
- success = wait_until (received_pong , timeout = timeout )
1637
+ test_function = lambda : self .last_message .get ("pong" ) and self .last_message ["pong" ].nonce == self .ping_counter
1638
+ success = wait_until (test_function , timeout = timeout )
1571
1639
if not success :
1572
1640
logger .error ("sync_with_ping failed!" )
1573
1641
raise AssertionError ("sync_with_ping failed!" )
1574
1642
self .ping_counter += 1
1575
-
1576
1643
return success
1577
1644
1578
- # Spin until verack message is received from the node.
1579
- # Tests may want to use this as a signal that the test can begin.
1580
- # This can be called from the testing thread, so it needs to acquire the
1581
- # global lock.
1582
- def wait_for_verack (self ):
1583
- while True :
1584
- with mininode_lock :
1585
- if self .verack_received :
1586
- return
1587
- time .sleep (0.05 )
1588
-
1589
1645
# The actual NodeConn class
1590
1646
# This class provides an interface for a p2p connection to a specified node
1591
1647
class NodeConn (asyncore .dispatcher ):
0 commit comments