diff --git a/chia/full_node/full_node_api.py b/chia/full_node/full_node_api.py index e462041e51d2..ed88d2d32c0a 100644 --- a/chia/full_node/full_node_api.py +++ b/chia/full_node/full_node_api.py @@ -83,6 +83,48 @@ FullNode = object +async def tx_request_and_timeout(full_node: FullNode, transaction_id: bytes32, task_id: bytes32) -> None: + """ + Request a transaction from peers that advertised it, until we either + receive it or timeout. + """ + counter = 0 + try: + while True: + # Limit to asking a few peers, it's possible that this tx got included on chain already + # Highly unlikely that the peers that advertised a tx don't respond to a request. Also, if we + # drop some transactions, we don't want to re-fetch too many times + if counter == 5: + break + if transaction_id not in full_node.full_node_store.peers_with_tx: + break + peers_with_tx: set[bytes32] = full_node.full_node_store.peers_with_tx[transaction_id] + if len(peers_with_tx) == 0: + break + peer_id = peers_with_tx.pop() + assert full_node.server is not None + if peer_id not in full_node.server.all_connections: + continue + random_peer = full_node.server.all_connections[peer_id] + request_tx = full_node_protocol.RequestTransaction(transaction_id) + msg = make_msg(ProtocolMessageTypes.request_transaction, request_tx) + await random_peer.send_message(msg) + await asyncio.sleep(5) + counter += 1 + if full_node.mempool_manager.seen(transaction_id): + break + except asyncio.CancelledError: + pass + finally: + # Always Cleanup + if transaction_id in full_node.full_node_store.peers_with_tx: + full_node.full_node_store.peers_with_tx.pop(transaction_id) + if transaction_id in full_node.full_node_store.pending_tx_request: + full_node.full_node_store.pending_tx_request.pop(transaction_id) + if task_id in full_node.full_node_store.tx_fetch_tasks: + full_node.full_node_store.tx_fetch_tasks.pop(task_id) + + class FullNodeAPI: if TYPE_CHECKING: from chia.server.api_protocol import ApiProtocol @@ -201,44 +243,6 @@ async def new_transaction( new_set = set() new_set.add(peer.peer_node_id) self.full_node.full_node_store.peers_with_tx[transaction.transaction_id] = new_set - - async def tx_request_and_timeout(full_node: FullNode, transaction_id: bytes32, task_id: bytes32) -> None: - counter = 0 - try: - while True: - # Limit to asking a few peers, it's possible that this tx got included on chain already - # Highly unlikely that the peers that advertised a tx don't respond to a request. Also, if we - # drop some transactions, we don't want to re-fetch too many times - if counter == 5: - break - if transaction_id not in full_node.full_node_store.peers_with_tx: - break - peers_with_tx: set[bytes32] = full_node.full_node_store.peers_with_tx[transaction_id] - if len(peers_with_tx) == 0: - break - peer_id = peers_with_tx.pop() - assert full_node.server is not None - if peer_id not in full_node.server.all_connections: - continue - random_peer = full_node.server.all_connections[peer_id] - request_tx = full_node_protocol.RequestTransaction(transaction.transaction_id) - msg = make_msg(ProtocolMessageTypes.request_transaction, request_tx) - await random_peer.send_message(msg) - await asyncio.sleep(5) - counter += 1 - if full_node.mempool_manager.seen(transaction_id): - break - except asyncio.CancelledError: - pass - finally: - # Always Cleanup - if transaction_id in full_node.full_node_store.peers_with_tx: - full_node.full_node_store.peers_with_tx.pop(transaction_id) - if transaction_id in full_node.full_node_store.pending_tx_request: - full_node.full_node_store.pending_tx_request.pop(transaction_id) - if task_id in full_node.full_node_store.tx_fetch_tasks: - full_node.full_node_store.tx_fetch_tasks.pop(task_id) - task_id: bytes32 = bytes32.secret() fetch_task = create_referenced_task( tx_request_and_timeout(self.full_node, transaction.transaction_id, task_id)