Skip to content

Commit 82d06e2

Browse files
committed
Merge pull request #6094
2a22d4b Fix comptool send_message call when MAX_INV_SZ reached (Suhas Daftuar) 574db48 Fix potential race conditions in p2p testing framework (Suhas Daftuar) 5487975 Don't run invalidblockrequest.py in travis until race condition is fixed (Suhas Daftuar) ef32817 Fix mininode disconnections to work with select (Suhas Daftuar)
2 parents 90c37bc + 2a22d4b commit 82d06e2

File tree

3 files changed

+91
-70
lines changed

3 files changed

+91
-70
lines changed

qa/rpc-tests/comptool.py

Lines changed: 48 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
# on_getheaders: provide headers via BlockStore
2626
# on_getdata: provide blocks via BlockStore
2727

28+
global mininode_lock
29+
2830
class TestNode(NodeConnCB):
2931

3032
def __init__(self, block_store, tx_store):
@@ -148,10 +150,11 @@ def wait_for_verack(self):
148150
max_tries = 10 / sleep_time # Wait at most 10 seconds
149151
while max_tries > 0:
150152
done = True
151-
for c in self.connections:
152-
if c.cb.verack_received is False:
153-
done = False
154-
break
153+
with mininode_lock:
154+
for c in self.connections:
155+
if c.cb.verack_received is False:
156+
done = False
157+
break
155158
if done:
156159
break
157160
time.sleep(sleep_time)
@@ -161,10 +164,11 @@ def wait_for_pings(self, counter):
161164
while received_pongs is not True:
162165
time.sleep(0.05)
163166
received_pongs = True
164-
for c in self.connections:
165-
if c.cb.received_ping_response(counter) is not True:
166-
received_pongs = False
167-
break
167+
with mininode_lock:
168+
for c in self.connections:
169+
if c.cb.received_ping_response(counter) is not True:
170+
received_pongs = False
171+
break
168172

169173
# sync_blocks: Wait for all connections to request the blockhash given
170174
# then send get_headers to find out the tip of each node, and synchronize
@@ -173,8 +177,9 @@ def sync_blocks(self, blockhash, num_blocks):
173177
# Wait for nodes to request block (50ms sleep * 20 tries * num_blocks)
174178
max_tries = 20*num_blocks
175179
while max_tries > 0:
176-
results = [ blockhash in c.cb.block_request_map and
177-
c.cb.block_request_map[blockhash] for c in self.connections ]
180+
with mininode_lock:
181+
results = [ blockhash in c.cb.block_request_map and
182+
c.cb.block_request_map[blockhash] for c in self.connections ]
178183
if False not in results:
179184
break
180185
time.sleep(0.05)
@@ -199,8 +204,9 @@ def sync_transaction(self, txhash, num_events):
199204
# Wait for nodes to request transaction (50ms sleep * 20 tries * num_events)
200205
max_tries = 20*num_events
201206
while max_tries > 0:
202-
results = [ txhash in c.cb.tx_request_map and
203-
c.cb.tx_request_map[txhash] for c in self.connections ]
207+
with mininode_lock:
208+
results = [ txhash in c.cb.tx_request_map and
209+
c.cb.tx_request_map[txhash] for c in self.connections ]
204210
if False not in results:
205211
break
206212
time.sleep(0.05)
@@ -221,19 +227,21 @@ def sync_transaction(self, txhash, num_events):
221227
self.ping_counter += 1
222228

223229
# Sort inv responses from each node
224-
[ c.cb.lastInv.sort() for c in self.connections ]
230+
with mininode_lock:
231+
[ c.cb.lastInv.sort() for c in self.connections ]
225232

226233
# Verify that the tip of each connection all agree with each other, and
227234
# with the expected outcome (if given)
228235
def check_results(self, blockhash, outcome):
229-
for c in self.connections:
230-
if outcome is None:
231-
if c.cb.bestblockhash != self.connections[0].cb.bestblockhash:
236+
with mininode_lock:
237+
for c in self.connections:
238+
if outcome is None:
239+
if c.cb.bestblockhash != self.connections[0].cb.bestblockhash:
240+
return False
241+
elif ((c.cb.bestblockhash == blockhash) != outcome):
242+
# print c.cb.bestblockhash, blockhash, outcome
232243
return False
233-
elif ((c.cb.bestblockhash == blockhash) != outcome):
234-
# print c.cb.bestblockhash, blockhash, outcome
235-
return False
236-
return True
244+
return True
237245

238246
# Either check that the mempools all agree with each other, or that
239247
# txhash's presence in the mempool matches the outcome specified.
@@ -242,16 +250,17 @@ def check_results(self, blockhash, outcome):
242250
# perhaps it would be useful to add the ability to check explicitly that
243251
# a particular tx's existence in the mempool is the same across all nodes.
244252
def check_mempool(self, txhash, outcome):
245-
for c in self.connections:
246-
if outcome is None:
247-
# Make sure the mempools agree with each other
248-
if c.cb.lastInv != self.connections[0].cb.lastInv:
249-
# print c.rpc.getrawmempool()
253+
with mininode_lock:
254+
for c in self.connections:
255+
if outcome is None:
256+
# Make sure the mempools agree with each other
257+
if c.cb.lastInv != self.connections[0].cb.lastInv:
258+
# print c.rpc.getrawmempool()
259+
return False
260+
elif ((txhash in c.cb.lastInv) != outcome):
261+
# print c.rpc.getrawmempool(), c.cb.lastInv
250262
return False
251-
elif ((txhash in c.cb.lastInv) != outcome):
252-
# print c.rpc.getrawmempool(), c.cb.lastInv
253-
return False
254-
return True
263+
return True
255264

256265
def run(self):
257266
# Wait until verack is received
@@ -272,9 +281,10 @@ def run(self):
272281
block = b_or_t
273282
block_outcome = outcome
274283
# Add to shared block_store, set as current block
275-
self.block_store.add_block(block)
276-
for c in self.connections:
277-
c.cb.block_request_map[block.sha256] = False
284+
with mininode_lock:
285+
self.block_store.add_block(block)
286+
for c in self.connections:
287+
c.cb.block_request_map[block.sha256] = False
278288
# Either send inv's to each node and sync, or add
279289
# to invqueue for later inv'ing.
280290
if (test_instance.sync_every_block):
@@ -288,10 +298,11 @@ def run(self):
288298
assert(isinstance(b_or_t, CTransaction))
289299
tx = b_or_t
290300
tx_outcome = outcome
291-
# Add to shared tx store
292-
self.tx_store.add_transaction(tx)
293-
for c in self.connections:
294-
c.cb.tx_request_map[tx.sha256] = False
301+
# Add to shared tx store and clear map entry
302+
with mininode_lock:
303+
self.tx_store.add_transaction(tx)
304+
for c in self.connections:
305+
c.cb.tx_request_map[tx.sha256] = False
295306
# Again, either inv to all nodes or save for later
296307
if (test_instance.sync_every_tx):
297308
[ c.cb.send_inv(tx) for c in self.connections ]
@@ -302,7 +313,7 @@ def run(self):
302313
invqueue.append(CInv(1, tx.sha256))
303314
# Ensure we're not overflowing the inv queue
304315
if len(invqueue) == MAX_INV_SZ:
305-
[ c.sb.send_message(msg_inv(invqueue)) for c in self.connections ]
316+
[ c.send_message(msg_inv(invqueue)) for c in self.connections ]
306317
invqueue = []
307318

308319
# Do final sync if we weren't syncing on every block or every tx.

qa/rpc-tests/maxblocksinflight.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,11 @@ def run(self):
6161
time.sleep(2)
6262

6363
total_requests = 0
64-
for key in self.blockReqCounts:
65-
total_requests += self.blockReqCounts[key]
66-
if self.blockReqCounts[key] > 1:
67-
raise AssertionError("Error, test failed: block %064x requested more than once" % key)
64+
with mininode_lock:
65+
for key in self.blockReqCounts:
66+
total_requests += self.blockReqCounts[key]
67+
if self.blockReqCounts[key] > 1:
68+
raise AssertionError("Error, test failed: block %064x requested more than once" % key)
6869
if total_requests > MAX_REQUESTS:
6970
raise AssertionError("Error, too many blocks (%d) requested" % total_requests)
7071
print "Round %d: success (total requests: %d)" % (count, total_requests)

qa/rpc-tests/mininode.py

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import random
2727
import cStringIO
2828
import hashlib
29-
from threading import Lock
29+
from threading import RLock
3030
from threading import Thread
3131
import logging
3232
import copy
@@ -37,6 +37,19 @@
3737

3838
MAX_INV_SZ = 50000
3939

40+
# Keep our own socket map for asyncore, so that we can track disconnects
41+
# ourselves (to workaround an issue with closing an asyncore socket when
42+
# using select)
43+
mininode_socket_map = dict()
44+
45+
# One lock for synchronizing all data access between the networking thread (see
46+
# NetworkThread below) and the thread running the test logic. For simplicity,
47+
# NodeConn acquires this lock whenever delivering a message to to a NodeConnCB,
48+
# and whenever adding anything to the send buffer (in send_message()). This
49+
# lock should be acquired in the thread running the test logic to synchronize
50+
# access to any data shared with the NodeConnCB or NodeConn.
51+
mininode_lock = RLock()
52+
4053
# Serialization/deserialization tools
4154
def sha256(s):
4255
return hashlib.new('sha256', s).digest()
@@ -975,10 +988,6 @@ def __repr__(self):
975988
# Reimplement the on_* functions to provide handling for events
976989
class NodeConnCB(object):
977990
def __init__(self):
978-
# Acquire on all callbacks -- overkill for now since asyncore is
979-
# single-threaded, but may be useful for synchronizing access to
980-
# member variables in derived classes.
981-
self.cbLock = Lock()
982991
self.verack_received = False
983992

984993
# Derived classes should call this function once to set the message map
@@ -1004,7 +1013,7 @@ def create_callback_map(self):
10041013
}
10051014

10061015
def deliver(self, conn, message):
1007-
with self.cbLock:
1016+
with mininode_lock:
10081017
try:
10091018
self.cbmap[message.command](conn, message)
10101019
except:
@@ -1076,7 +1085,7 @@ class NodeConn(asyncore.dispatcher):
10761085
}
10771086

10781087
def __init__(self, dstaddr, dstport, rpc, callback, net="regtest"):
1079-
asyncore.dispatcher.__init__(self)
1088+
asyncore.dispatcher.__init__(self, map=mininode_socket_map)
10801089
self.log = logging.getLogger("NodeConn(%s:%d)" % (dstaddr, dstport))
10811090
self.dstaddr = dstaddr
10821091
self.dstport = dstport
@@ -1089,7 +1098,6 @@ def __init__(self, dstaddr, dstport, rpc, callback, net="regtest"):
10891098
self.state = "connecting"
10901099
self.network = net
10911100
self.cb = callback
1092-
self.sendbufLock = Lock() # for protecting the sendbuffer
10931101
self.disconnect = False
10941102

10951103
# stuff version msg into sendbuf
@@ -1140,24 +1148,18 @@ def readable(self):
11401148
return True
11411149

11421150
def writable(self):
1143-
if self.disconnect:
1144-
self.handle_close()
1145-
return False
1146-
else:
1147-
self.sendbufLock.acquire()
1151+
with mininode_lock:
11481152
length = len(self.sendbuf)
1149-
self.sendbufLock.release()
1150-
return (length > 0)
1153+
return (length > 0)
11511154

11521155
def handle_write(self):
1153-
self.sendbufLock.acquire()
1154-
try:
1155-
sent = self.send(self.sendbuf)
1156-
except:
1157-
self.handle_close()
1158-
return
1159-
self.sendbuf = self.sendbuf[sent:]
1160-
self.sendbufLock.release()
1156+
with mininode_lock:
1157+
try:
1158+
sent = self.send(self.sendbuf)
1159+
except:
1160+
self.handle_close()
1161+
return
1162+
self.sendbuf = self.sendbuf[sent:]
11611163

11621164
def got_data(self):
11631165
while True:
@@ -1201,7 +1203,6 @@ def got_data(self):
12011203
def send_message(self, message, pushbuf=False):
12021204
if self.state != "connected" and not pushbuf:
12031205
return
1204-
self.sendbufLock.acquire()
12051206
self.show_debug_msg("Send %s" % repr(message))
12061207
command = message.command
12071208
data = message.serialize()
@@ -1214,9 +1215,9 @@ def send_message(self, message, pushbuf=False):
12141215
h = sha256(th)
12151216
tmsg += h[:4]
12161217
tmsg += data
1217-
self.sendbuf += tmsg
1218-
self.last_sent = time.time()
1219-
self.sendbufLock.release()
1218+
with mininode_lock:
1219+
self.sendbuf += tmsg
1220+
self.last_sent = time.time()
12201221

12211222
def got_message(self, message):
12221223
if message.command == "version":
@@ -1229,12 +1230,20 @@ def got_message(self, message):
12291230

12301231
def disconnect_node(self):
12311232
self.disconnect = True
1232-
self.send_message(self.messagemap['ping']())
12331233

12341234

12351235
class NetworkThread(Thread):
12361236
def run(self):
1237-
asyncore.loop(0.1, True)
1237+
while mininode_socket_map:
1238+
# We check for whether to disconnect outside of the asyncore
1239+
# loop to workaround the behavior of asyncore when using
1240+
# select
1241+
disconnected = []
1242+
for fd, obj in mininode_socket_map.items():
1243+
if obj.disconnect:
1244+
disconnected.append(obj)
1245+
[ obj.handle_close() for obj in disconnected ]
1246+
asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1)
12381247

12391248

12401249
# An exception we can raise if we detect a potential disconnect

0 commit comments

Comments
 (0)