|
10 | 10 | found in the mini-node branch of http://github.com/jgarzik/pynode.
|
11 | 11 |
|
12 | 12 | P2PConnection: A low-level connection object to a node's P2P interface
|
13 |
| -P2PInterface: A high-level interface object for communicating to a node over P2P""" |
| 13 | +P2PInterface: A high-level interface object for communicating to a node over P2P |
| 14 | +P2PDataStore: A p2p interface class that keeps a store of transactions and blocks |
| 15 | + and can respond correctly to getdata and getheaders messages""" |
14 | 16 | import asyncore
|
15 | 17 | from collections import defaultdict
|
16 | 18 | from io import BytesIO
|
@@ -86,7 +88,7 @@ def peer_connect(self, dstaddr, dstport, net="regtest"):
|
86 | 88 | self.network = net
|
87 | 89 | self.disconnect = False
|
88 | 90 |
|
89 |
| - logger.info('Connecting to Bitcoin Node: %s:%d' % (self.dstaddr, self.dstport)) |
| 91 | + logger.debug('Connecting to Bitcoin Node: %s:%d' % (self.dstaddr, self.dstport)) |
90 | 92 |
|
91 | 93 | try:
|
92 | 94 | self.connect((dstaddr, dstport))
|
@@ -356,10 +358,22 @@ def wait_for_block(self, blockhash, timeout=60):
|
356 | 358 | wait_until(test_function, timeout=timeout, lock=mininode_lock)
|
357 | 359 |
|
358 | 360 | def wait_for_getdata(self, timeout=60):
|
| 361 | + """Waits for a getdata message. |
| 362 | +
|
| 363 | + Receiving any getdata message will satisfy the predicate. the last_message["getdata"] |
| 364 | + value must be explicitly cleared before calling this method, or this will return |
| 365 | + immediately with success. TODO: change this method to take a hash value and only |
| 366 | + return true if the correct block/tx has been requested.""" |
359 | 367 | test_function = lambda: self.last_message.get("getdata")
|
360 | 368 | wait_until(test_function, timeout=timeout, lock=mininode_lock)
|
361 | 369 |
|
362 | 370 | def wait_for_getheaders(self, timeout=60):
|
| 371 | + """Waits for a getheaders message. |
| 372 | +
|
| 373 | + Receiving any getheaders message will satisfy the predicate. the last_message["getheaders"] |
| 374 | + value must be explicitly cleared before calling this method, or this will return |
| 375 | + immediately with success. TODO: change this method to take a hash value and only |
| 376 | + return true if the correct block header has been requested.""" |
363 | 377 | test_function = lambda: self.last_message.get("getheaders")
|
364 | 378 | wait_until(test_function, timeout=timeout, lock=mininode_lock)
|
365 | 379 |
|
@@ -440,3 +454,138 @@ def network_thread_join(timeout=10):
|
440 | 454 | for thread in network_threads:
|
441 | 455 | thread.join(timeout)
|
442 | 456 | assert not thread.is_alive()
|
| 457 | + |
| 458 | +class P2PDataStore(P2PInterface): |
| 459 | + """A P2P data store class. |
| 460 | +
|
| 461 | + Keeps a block and transaction store and responds correctly to getdata and getheaders requests.""" |
| 462 | + |
| 463 | + def __init__(self): |
| 464 | + super().__init__() |
| 465 | + self.reject_code_received = None |
| 466 | + self.reject_reason_received = None |
| 467 | + # store of blocks. key is block hash, value is a CBlock object |
| 468 | + self.block_store = {} |
| 469 | + self.last_block_hash = '' |
| 470 | + # store of txs. key is txid, value is a CTransaction object |
| 471 | + self.tx_store = {} |
| 472 | + self.getdata_requests = [] |
| 473 | + |
| 474 | + def on_getdata(self, message): |
| 475 | + """Check for the tx/block in our stores and if found, reply with an inv message.""" |
| 476 | + for inv in message.inv: |
| 477 | + self.getdata_requests.append(inv.hash) |
| 478 | + if (inv.type & MSG_TYPE_MASK) == MSG_TX and inv.hash in self.tx_store.keys(): |
| 479 | + self.send_message(msg_tx(self.tx_store[inv.hash])) |
| 480 | + elif (inv.type & MSG_TYPE_MASK) == MSG_BLOCK and inv.hash in self.block_store.keys(): |
| 481 | + self.send_message(msg_block(self.block_store[inv.hash])) |
| 482 | + else: |
| 483 | + logger.debug('getdata message type {} received.'.format(hex(inv.type))) |
| 484 | + |
| 485 | + def on_getheaders(self, message): |
| 486 | + """Search back through our block store for the locator, and reply with a headers message if found.""" |
| 487 | + |
| 488 | + locator, hash_stop = message.locator, message.hashstop |
| 489 | + |
| 490 | + # Assume that the most recent block added is the tip |
| 491 | + if not self.block_store: |
| 492 | + return |
| 493 | + |
| 494 | + headers_list = [self.block_store[self.last_block_hash]] |
| 495 | + maxheaders = 2000 |
| 496 | + while headers_list[-1].sha256 not in locator.vHave: |
| 497 | + # Walk back through the block store, adding headers to headers_list |
| 498 | + # as we go. |
| 499 | + prev_block_hash = headers_list[-1].hashPrevBlock |
| 500 | + if prev_block_hash in self.block_store: |
| 501 | + prev_block_header = self.block_store[prev_block_hash] |
| 502 | + headers_list.append(prev_block_header) |
| 503 | + if prev_block_header.sha256 == hash_stop: |
| 504 | + # if this is the hashstop header, stop here |
| 505 | + break |
| 506 | + else: |
| 507 | + logger.debug('block hash {} not found in block store'.format(hex(prev_block_hash))) |
| 508 | + break |
| 509 | + |
| 510 | + # Truncate the list if there are too many headers |
| 511 | + headers_list = headers_list[:-maxheaders - 1:-1] |
| 512 | + response = msg_headers(headers_list) |
| 513 | + |
| 514 | + if response is not None: |
| 515 | + self.send_message(response) |
| 516 | + |
| 517 | + def on_reject(self, message): |
| 518 | + """Store reject reason and code for testing.""" |
| 519 | + self.reject_code_received = message.code |
| 520 | + self.reject_reason_received = message.reason |
| 521 | + |
| 522 | + def send_blocks_and_test(self, blocks, rpc, success=True, request_block=True, reject_code=None, reject_reason=None, timeout=60): |
| 523 | + """Send blocks to test node and test whether the tip advances. |
| 524 | +
|
| 525 | + - add all blocks to our block_store |
| 526 | + - send a headers message for the final block |
| 527 | + - the on_getheaders handler will ensure that any getheaders are responded to |
| 528 | + - if request_block is True: wait for getdata for each of the blocks. The on_getdata handler will |
| 529 | + ensure that any getdata messages are responded to |
| 530 | + - if success is True: assert that the node's tip advances to the most recent block |
| 531 | + - if success is False: assert that the node's tip doesn't advance |
| 532 | + - if reject_code and reject_reason are set: assert that the correct reject message is received""" |
| 533 | + |
| 534 | + with mininode_lock: |
| 535 | + self.reject_code_received = None |
| 536 | + self.reject_reason_received = None |
| 537 | + |
| 538 | + for block in blocks: |
| 539 | + self.block_store[block.sha256] = block |
| 540 | + self.last_block_hash = block.sha256 |
| 541 | + |
| 542 | + self.send_message(msg_headers([blocks[-1]])) |
| 543 | + |
| 544 | + if request_block: |
| 545 | + wait_until(lambda: blocks[-1].sha256 in self.getdata_requests, timeout=timeout, lock=mininode_lock) |
| 546 | + |
| 547 | + if success: |
| 548 | + wait_until(lambda: rpc.getbestblockhash() == blocks[-1].hash, timeout=timeout) |
| 549 | + else: |
| 550 | + assert rpc.getbestblockhash() != blocks[-1].hash |
| 551 | + |
| 552 | + if reject_code is not None: |
| 553 | + wait_until(lambda: self.reject_code_received == reject_code, lock=mininode_lock) |
| 554 | + if reject_reason is not None: |
| 555 | + wait_until(lambda: self.reject_reason_received == reject_reason, lock=mininode_lock) |
| 556 | + |
| 557 | + def send_txs_and_test(self, txs, rpc, success=True, reject_code=None, reject_reason=None): |
| 558 | + """Send txs to test node and test whether they're accepted to the mempool. |
| 559 | +
|
| 560 | + - add all txs to our tx_store |
| 561 | + - send tx messages for all txs |
| 562 | + - if success is True: assert that the tx is accepted to the mempool |
| 563 | + - if success is False: assert that the tx is not accepted to the mempool |
| 564 | + - if reject_code and reject_reason are set: assert that the correct reject message is received.""" |
| 565 | + |
| 566 | + with mininode_lock: |
| 567 | + self.reject_code_received = None |
| 568 | + self.reject_reason_received = None |
| 569 | + |
| 570 | + for tx in txs: |
| 571 | + self.tx_store[tx.sha256] = tx |
| 572 | + |
| 573 | + for tx in txs: |
| 574 | + self.send_message(msg_tx(tx)) |
| 575 | + |
| 576 | + self.sync_with_ping() |
| 577 | + |
| 578 | + raw_mempool = rpc.getrawmempool() |
| 579 | + if success: |
| 580 | + # Check that all txs are now in the mempool |
| 581 | + for tx in txs: |
| 582 | + assert tx.hash in raw_mempool, "{} not found in mempool".format(tx.hash) |
| 583 | + else: |
| 584 | + # Check that none of the txs are now in the mempool |
| 585 | + for tx in txs: |
| 586 | + assert tx.hash not in raw_mempool, "{} tx found in mempool".format(tx.hash) |
| 587 | + |
| 588 | + if reject_code is not None: |
| 589 | + wait_until(lambda: self.reject_code_received == reject_code, lock=mininode_lock) |
| 590 | + if reject_reason is not None: |
| 591 | + wait_until(lambda: self.reject_reason_received == reject_reason, lock=mininode_lock) |
0 commit comments