|
83 | 83 | FullNode = object |
84 | 84 |
|
85 | 85 |
|
| 86 | +async def tx_request_and_timeout(full_node: FullNode, transaction_id: bytes32, task_id: bytes32) -> None: |
| 87 | + """ |
| 88 | + Request a transaction from peers that advertised it, until we either |
| 89 | + receive it or timeout. |
| 90 | + """ |
| 91 | + counter = 0 |
| 92 | + try: |
| 93 | + while True: |
| 94 | + # Limit to asking a few peers, it's possible that this tx got included on chain already |
| 95 | + # Highly unlikely that the peers that advertised a tx don't respond to a request. Also, if we |
| 96 | + # drop some transactions, we don't want to re-fetch too many times |
| 97 | + if counter == 5: |
| 98 | + break |
| 99 | + if transaction_id not in full_node.full_node_store.peers_with_tx: |
| 100 | + break |
| 101 | + peers_with_tx: set[bytes32] = full_node.full_node_store.peers_with_tx[transaction_id] |
| 102 | + if len(peers_with_tx) == 0: |
| 103 | + break |
| 104 | + peer_id = peers_with_tx.pop() |
| 105 | + assert full_node.server is not None |
| 106 | + if peer_id not in full_node.server.all_connections: |
| 107 | + continue |
| 108 | + random_peer = full_node.server.all_connections[peer_id] |
| 109 | + request_tx = full_node_protocol.RequestTransaction(transaction_id) |
| 110 | + msg = make_msg(ProtocolMessageTypes.request_transaction, request_tx) |
| 111 | + await random_peer.send_message(msg) |
| 112 | + await asyncio.sleep(5) |
| 113 | + counter += 1 |
| 114 | + if full_node.mempool_manager.seen(transaction_id): |
| 115 | + break |
| 116 | + except asyncio.CancelledError: |
| 117 | + pass |
| 118 | + finally: |
| 119 | + # Always Cleanup |
| 120 | + if transaction_id in full_node.full_node_store.peers_with_tx: |
| 121 | + full_node.full_node_store.peers_with_tx.pop(transaction_id) |
| 122 | + if transaction_id in full_node.full_node_store.pending_tx_request: |
| 123 | + full_node.full_node_store.pending_tx_request.pop(transaction_id) |
| 124 | + if task_id in full_node.full_node_store.tx_fetch_tasks: |
| 125 | + full_node.full_node_store.tx_fetch_tasks.pop(task_id) |
| 126 | + |
| 127 | + |
86 | 128 | class FullNodeAPI: |
87 | 129 | if TYPE_CHECKING: |
88 | 130 | from chia.server.api_protocol import ApiProtocol |
@@ -201,44 +243,6 @@ async def new_transaction( |
201 | 243 | new_set = set() |
202 | 244 | new_set.add(peer.peer_node_id) |
203 | 245 | self.full_node.full_node_store.peers_with_tx[transaction.transaction_id] = new_set |
204 | | - |
205 | | - async def tx_request_and_timeout(full_node: FullNode, transaction_id: bytes32, task_id: bytes32) -> None: |
206 | | - counter = 0 |
207 | | - try: |
208 | | - while True: |
209 | | - # Limit to asking a few peers, it's possible that this tx got included on chain already |
210 | | - # Highly unlikely that the peers that advertised a tx don't respond to a request. Also, if we |
211 | | - # drop some transactions, we don't want to re-fetch too many times |
212 | | - if counter == 5: |
213 | | - break |
214 | | - if transaction_id not in full_node.full_node_store.peers_with_tx: |
215 | | - break |
216 | | - peers_with_tx: set[bytes32] = full_node.full_node_store.peers_with_tx[transaction_id] |
217 | | - if len(peers_with_tx) == 0: |
218 | | - break |
219 | | - peer_id = peers_with_tx.pop() |
220 | | - assert full_node.server is not None |
221 | | - if peer_id not in full_node.server.all_connections: |
222 | | - continue |
223 | | - random_peer = full_node.server.all_connections[peer_id] |
224 | | - request_tx = full_node_protocol.RequestTransaction(transaction.transaction_id) |
225 | | - msg = make_msg(ProtocolMessageTypes.request_transaction, request_tx) |
226 | | - await random_peer.send_message(msg) |
227 | | - await asyncio.sleep(5) |
228 | | - counter += 1 |
229 | | - if full_node.mempool_manager.seen(transaction_id): |
230 | | - break |
231 | | - except asyncio.CancelledError: |
232 | | - pass |
233 | | - finally: |
234 | | - # Always Cleanup |
235 | | - if transaction_id in full_node.full_node_store.peers_with_tx: |
236 | | - full_node.full_node_store.peers_with_tx.pop(transaction_id) |
237 | | - if transaction_id in full_node.full_node_store.pending_tx_request: |
238 | | - full_node.full_node_store.pending_tx_request.pop(transaction_id) |
239 | | - if task_id in full_node.full_node_store.tx_fetch_tasks: |
240 | | - full_node.full_node_store.tx_fetch_tasks.pop(task_id) |
241 | | - |
242 | 246 | task_id: bytes32 = bytes32.secret() |
243 | 247 | fetch_task = create_referenced_task( |
244 | 248 | tx_request_and_timeout(self.full_node, transaction.transaction_id, task_id) |
|
0 commit comments