Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 42 additions & 38 deletions chia/full_node/full_node_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,48 @@
FullNode = object


async def tx_request_and_timeout(full_node: FullNode, transaction_id: bytes32, task_id: bytes32) -> None:
"""
Request a transaction from peers that advertised it, until we either
receive it or timeout.
"""
counter = 0
try:
while True:
# Limit to asking a few peers, it's possible that this tx got included on chain already
# Highly unlikely that the peers that advertised a tx don't respond to a request. Also, if we
# drop some transactions, we don't want to re-fetch too many times
if counter == 5:
break
if transaction_id not in full_node.full_node_store.peers_with_tx:
break
peers_with_tx: set[bytes32] = full_node.full_node_store.peers_with_tx[transaction_id]
if len(peers_with_tx) == 0:
break
peer_id = peers_with_tx.pop()
assert full_node.server is not None
if peer_id not in full_node.server.all_connections:
continue
random_peer = full_node.server.all_connections[peer_id]
request_tx = full_node_protocol.RequestTransaction(transaction_id)
msg = make_msg(ProtocolMessageTypes.request_transaction, request_tx)
await random_peer.send_message(msg)
await asyncio.sleep(5)
counter += 1
if full_node.mempool_manager.seen(transaction_id):
break
except asyncio.CancelledError:
pass
finally:
# Always Cleanup
if transaction_id in full_node.full_node_store.peers_with_tx:
full_node.full_node_store.peers_with_tx.pop(transaction_id)
if transaction_id in full_node.full_node_store.pending_tx_request:
full_node.full_node_store.pending_tx_request.pop(transaction_id)
if task_id in full_node.full_node_store.tx_fetch_tasks:
full_node.full_node_store.tx_fetch_tasks.pop(task_id)


class FullNodeAPI:
if TYPE_CHECKING:
from chia.server.api_protocol import ApiProtocol
Expand Down Expand Up @@ -201,44 +243,6 @@ async def new_transaction(
new_set = set()
new_set.add(peer.peer_node_id)
self.full_node.full_node_store.peers_with_tx[transaction.transaction_id] = new_set

async def tx_request_and_timeout(full_node: FullNode, transaction_id: bytes32, task_id: bytes32) -> None:
counter = 0
try:
while True:
# Limit to asking a few peers, it's possible that this tx got included on chain already
# Highly unlikely that the peers that advertised a tx don't respond to a request. Also, if we
# drop some transactions, we don't want to re-fetch too many times
if counter == 5:
break
if transaction_id not in full_node.full_node_store.peers_with_tx:
break
peers_with_tx: set[bytes32] = full_node.full_node_store.peers_with_tx[transaction_id]
if len(peers_with_tx) == 0:
break
peer_id = peers_with_tx.pop()
assert full_node.server is not None
if peer_id not in full_node.server.all_connections:
continue
random_peer = full_node.server.all_connections[peer_id]
request_tx = full_node_protocol.RequestTransaction(transaction.transaction_id)
msg = make_msg(ProtocolMessageTypes.request_transaction, request_tx)
await random_peer.send_message(msg)
await asyncio.sleep(5)
counter += 1
if full_node.mempool_manager.seen(transaction_id):
break
except asyncio.CancelledError:
pass
finally:
# Always Cleanup
if transaction_id in full_node.full_node_store.peers_with_tx:
full_node.full_node_store.peers_with_tx.pop(transaction_id)
if transaction_id in full_node.full_node_store.pending_tx_request:
full_node.full_node_store.pending_tx_request.pop(transaction_id)
if task_id in full_node.full_node_store.tx_fetch_tasks:
full_node.full_node_store.tx_fetch_tasks.pop(task_id)

task_id: bytes32 = bytes32.secret()
fetch_task = create_referenced_task(
tx_request_and_timeout(self.full_node, transaction.transaction_id, task_id)
Expand Down
Loading