diff --git a/chia/_tests/core/full_node/test_full_node.py b/chia/_tests/core/full_node/test_full_node.py index 9b70e3c23ee5..4b1eb96c04ca 100644 --- a/chia/_tests/core/full_node/test_full_node.py +++ b/chia/_tests/core/full_node/test_full_node.py @@ -919,6 +919,10 @@ async def suppress_value_error(coro: Coroutine[Any, Any, None]) -> None: @pytest.mark.anyio +@pytest.mark.limit_consensus_modes( + allowed=[ConsensusMode.HARD_FORK_2_0, ConsensusMode.HARD_FORK_3_0], + reason="We can no longer (reliably) farm blocks from before the hard fork", +) async def test_new_transaction_and_mempool( wallet_nodes: tuple[ FullNodeSimulator, FullNodeSimulator, ChiaServer, ChiaServer, WalletTool, WalletTool, BlockTools @@ -946,8 +950,10 @@ async def test_new_transaction_and_mempool( # Makes a bunch of coins conditions_dict: dict[ConditionOpcode, list[ConditionWithArgs]] = {ConditionOpcode.CREATE_COIN: []} - # This should fit in one transaction - for _ in range(100): + # This should fit in one transaction. The test constants have a max block cost of 400,000,000 + # and the default max *transaction* cost is half that, so 200,000,000. CREATE_COIN has a cost of + # 1,800,000, we create 80 coins + for _ in range(80): receiver_puzzlehash = wallet_receiver.get_new_puzzlehash() puzzle_hashes.append(receiver_puzzlehash) output = ConditionWithArgs(ConditionOpcode.CREATE_COIN, [receiver_puzzlehash, int_to_bytes(10000000000)]) @@ -1046,8 +1052,8 @@ async def test_new_transaction_and_mempool( # these numbers reflect the capacity of the mempool. In these # tests MEMPOOL_BLOCK_BUFFER is 1. The other factors are COST_PER_BYTE # and MAX_BLOCK_COST_CLVM - assert included_tx == 23 - assert not_included_tx == 10 + assert included_tx == 20 + assert not_included_tx == 7 assert seen_bigger_transaction_has_high_fee # Mempool is full diff --git a/chia/_tests/core/mempool/test_mempool.py b/chia/_tests/core/mempool/test_mempool.py index 1527a5abd057..134b12a655fe 100644 --- a/chia/_tests/core/mempool/test_mempool.py +++ b/chia/_tests/core/mempool/test_mempool.py @@ -72,7 +72,7 @@ from chia.types.mempool_inclusion_status import MempoolInclusionStatus from chia.types.mempool_item import MempoolItem, UnspentLineageInfo from chia.util.casts import int_to_bytes -from chia.util.errors import Err +from chia.util.errors import Err, ValidationError from chia.util.hash import std_hash from chia.util.recursive_replace import recursive_replace from chia.wallet.conditions import AssertCoinAnnouncement, AssertPuzzleAnnouncement @@ -361,7 +361,10 @@ async def respond_transaction( self.full_node.full_node_store.pending_tx_request.pop(spend_name) if spend_name in self.full_node.full_node_store.peers_with_tx: self.full_node.full_node_store.peers_with_tx.pop(spend_name) - ret = await self.full_node.add_transaction(tx.transaction, spend_name, peer, test) + try: + ret = await self.full_node.add_transaction(tx.transaction, spend_name, peer, test) + except ValidationError as e: + ret = (MempoolInclusionStatus.FAILED, e.code) invariant_check_mempool(self.full_node.mempool_manager.mempool) return ret @@ -2865,8 +2868,9 @@ async def test_invalid_coin_spend_coin( coin_spend_0 = make_spend(coin_0, cs.puzzle_reveal, cs.solution) new_bundle = recursive_replace(spend_bundle, "coin_spends", [coin_spend_0, *spend_bundle.coin_spends[1:]]) assert spend_bundle is not None - res = await full_node_1.full_node.add_transaction(new_bundle, new_bundle.name(), test=True) - assert res == (MempoolInclusionStatus.FAILED, Err.WRONG_PUZZLE_HASH) + with pytest.raises(ValidationError) as e: + await full_node_1.full_node.add_transaction(new_bundle, new_bundle.name(), test=True) + assert e.value.code == Err.WRONG_PUZZLE_HASH coins = make_test_coins() diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index ff589394622c..163ccf77a020 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -68,6 +68,7 @@ from chia.protocols.full_node_protocol import RequestBlocks, RespondBlock, RespondBlocks, RespondSignagePoint from chia.protocols.outbound_message import Message, NodeType, make_msg from chia.protocols.protocol_message_types import ProtocolMessageTypes +from chia.protocols.protocol_timing import CONSENSUS_ERROR_BAN_SECONDS from chia.protocols.shared_protocol import Capability from chia.protocols.wallet_protocol import CoinStateUpdate, RemovedMempoolItem from chia.rpc.rpc_server import StateChangedProtocol @@ -502,11 +503,16 @@ async def _handle_one_transaction(self, entry: TransactionQueueEntry) -> None: except asyncio.CancelledError: error_stack = traceback.format_exc() self.log.debug(f"Cancelling _handle_one_transaction, closing: {error_stack}") + except ValidationError as e: + self.log.exception("ValidationError in _handle_one_transaction, closing") + if peer is not None: + await peer.close(CONSENSUS_ERROR_BAN_SECONDS) + entry.done.set((MempoolInclusionStatus.FAILED, e.code)) except Exception: - error_stack = traceback.format_exc() - self.log.error(f"Error in _handle_one_transaction, closing: {error_stack}") + self.log.exception("Error in _handle_one_transaction, closing") if peer is not None: - await peer.close() + await peer.close(CONSENSUS_ERROR_BAN_SECONDS) + entry.done.set((MempoolInclusionStatus.FAILED, Err.UNKNOWN)) finally: self.add_transaction_semaphore.release() @@ -1092,13 +1098,13 @@ async def request_validate_wp( response = await weight_proof_peer.call_api(FullNodeAPI.request_proof_of_weight, request, timeout=wp_timeout) # Disconnect from this peer, because they have not behaved properly if response is None or not isinstance(response, full_node_protocol.RespondProofOfWeight): - await weight_proof_peer.close(600) + await weight_proof_peer.close(CONSENSUS_ERROR_BAN_SECONDS) raise RuntimeError(f"Weight proof did not arrive in time from peer: {weight_proof_peer.peer_info.host}") if response.wp.recent_chain_data[-1].reward_chain_block.height != peak_height: - await weight_proof_peer.close(600) + await weight_proof_peer.close(CONSENSUS_ERROR_BAN_SECONDS) raise RuntimeError(f"Weight proof had the wrong height: {weight_proof_peer.peer_info.host}") if response.wp.recent_chain_data[-1].reward_chain_block.weight != peak_weight: - await weight_proof_peer.close(600) + await weight_proof_peer.close(CONSENSUS_ERROR_BAN_SECONDS) raise RuntimeError(f"Weight proof had the wrong weight: {weight_proof_peer.peer_info.host}") if self.in_bad_peak_cache(response.wp): raise ValueError("Weight proof failed bad peak cache validation") @@ -1113,10 +1119,10 @@ async def request_validate_wp( try: validated, fork_point, summaries = await self.weight_proof_handler.validate_weight_proof(response.wp) except Exception as e: - await weight_proof_peer.close(600) + await weight_proof_peer.close(CONSENSUS_ERROR_BAN_SECONDS) raise ValueError(f"Weight proof validation threw an error {e}") if not validated: - await weight_proof_peer.close(600) + await weight_proof_peer.close(CONSENSUS_ERROR_BAN_SECONDS) raise ValueError("Weight proof validation failed") self.log.info(f"Re-checked peers: total of {len(peers_with_peak)} peers with peak {peak_height}") self.sync_store.set_sync_mode(True) @@ -1378,7 +1384,7 @@ async def ingest_blocks( vs, ) if err is not None: - await peer.close(600) + await peer.close(CONSENSUS_ERROR_BAN_SECONDS) raise ValueError(f"Failed to validate block batch {start_height} to {end_height}: {err}") if end_height - block_rate_height > 100: now = time.monotonic() @@ -2767,66 +2773,56 @@ async def add_transaction( return MempoolInclusionStatus.SUCCESS, None if self.mempool_manager.seen(spend_name): return MempoolInclusionStatus.FAILED, Err.ALREADY_INCLUDING_TRANSACTION - self.mempool_manager.add_and_maybe_pop_seen(spend_name) self.log.debug(f"Processing transaction: {spend_name}") # Ignore if syncing or if we have not yet received a block # the mempool must have a peak to validate transactions if self.sync_store.get_sync_mode() or self.mempool_manager.peak is None: - status = MempoolInclusionStatus.FAILED - error: Optional[Err] = Err.NO_TRANSACTIONS_WHILE_SYNCING - self.mempool_manager.remove_seen(spend_name) - else: + return MempoolInclusionStatus.FAILED, Err.NO_TRANSACTIONS_WHILE_SYNCING + + cost_result = await self.mempool_manager.pre_validate_spendbundle(transaction, spend_name, self._bls_cache) + + self.mempool_manager.add_and_maybe_pop_seen(spend_name) + + if self.config.get("log_mempool", False): # pragma: no cover try: - cost_result = await self.mempool_manager.pre_validate_spendbundle( - transaction, spend_name, self._bls_cache - ) - except ValidationError as e: - self.mempool_manager.remove_seen(spend_name) - return MempoolInclusionStatus.FAILED, e.code + mempool_dir = path_from_root(self.root_path, "mempool-log") / f"{self.blockchain.get_peak_height()}" + mempool_dir.mkdir(parents=True, exist_ok=True) + with open(mempool_dir / f"{spend_name}.bundle", "wb+") as f: + f.write(bytes(transaction)) except Exception: - self.mempool_manager.remove_seen(spend_name) - raise + self.log.exception(f"Failed to log mempool item: {spend_name}") - if self.config.get("log_mempool", False): # pragma: no cover - try: - mempool_dir = path_from_root(self.root_path, "mempool-log") / f"{self.blockchain.get_peak_height()}" - mempool_dir.mkdir(parents=True, exist_ok=True) - with open(mempool_dir / f"{spend_name}.bundle", "wb+") as f: - f.write(bytes(transaction)) - except Exception: - self.log.exception(f"Failed to log mempool item: {spend_name}") - - async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.low): - if self.mempool_manager.get_spendbundle(spend_name) is not None: - self.mempool_manager.remove_seen(spend_name) - return MempoolInclusionStatus.SUCCESS, None - if self.mempool_manager.peak is None: - return MempoolInclusionStatus.FAILED, Err.MEMPOOL_NOT_INITIALIZED - info = await self.mempool_manager.add_spend_bundle( - transaction, cost_result, spend_name, self.mempool_manager.peak.height - ) - status = info.status - error = info.error - if status == MempoolInclusionStatus.SUCCESS: - self.log.debug( - f"Added transaction to mempool: {spend_name} mempool size: " - f"{self.mempool_manager.mempool.total_mempool_cost()} normalized " - f"{self.mempool_manager.mempool.total_mempool_cost() / 5000000}" - ) + async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.low): + if self.mempool_manager.get_spendbundle(spend_name) is not None: + self.mempool_manager.remove_seen(spend_name) + return MempoolInclusionStatus.SUCCESS, None + if self.mempool_manager.peak is None: + return MempoolInclusionStatus.FAILED, Err.MEMPOOL_NOT_INITIALIZED + info = await self.mempool_manager.add_spend_bundle( + transaction, cost_result, spend_name, self.mempool_manager.peak.height + ) + status = info.status + error = info.error + if status == MempoolInclusionStatus.SUCCESS: + self.log.debug( + f"Added transaction to mempool: {spend_name} mempool size: " + f"{self.mempool_manager.mempool.total_mempool_cost()} normalized " + f"{self.mempool_manager.mempool.total_mempool_cost() / 5000000}" + ) - # Only broadcast successful transactions, not pending ones. Otherwise it's a DOS - # vector. - mempool_item = self.mempool_manager.get_mempool_item(spend_name) - assert mempool_item is not None - await self.broadcast_removed_tx(info.removals) - await self.broadcast_added_tx(mempool_item, current_peer=peer) + # Only broadcast successful transactions, not pending ones. Otherwise it's a DOS + # vector. + mempool_item = self.mempool_manager.get_mempool_item(spend_name) + assert mempool_item is not None + await self.broadcast_removed_tx(info.removals) + await self.broadcast_added_tx(mempool_item, current_peer=peer) - if self.simulator_transaction_callback is not None: # callback - await self.simulator_transaction_callback(spend_name) + if self.simulator_transaction_callback is not None: # callback + await self.simulator_transaction_callback(spend_name) - else: - self.mempool_manager.remove_seen(spend_name) - self.log.debug(f"Wasn't able to add transaction with id {spend_name}, status {status} error: {error}") + else: + self.mempool_manager.remove_seen(spend_name) + self.log.debug(f"Wasn't able to add transaction with id {spend_name}, status {status} error: {error}") return status, error async def broadcast_added_tx( diff --git a/chia/server/server.py b/chia/server/server.py index f6f97b50791c..b036a258f7ba 100644 --- a/chia/server/server.py +++ b/chia/server/server.py @@ -553,9 +553,15 @@ async def connection_closed( # in this case we still want to do the banning logic and remove the connection from the list # but the other cleanup should already have been done so we skip that - if is_localhost(connection.peer_info.host) and ban_time != 0: - self.log.warning(f"Trying to ban localhost for {ban_time}, but will not ban") - ban_time = 0 + if ban_time > 0: + if is_localhost(connection.peer_info.host): + self.log.warning(f"Trying to ban localhost for {ban_time}, but will not ban") + ban_time = 0 + elif self.is_trusted_peer(connection, self.config.get("trusted_peers", {})): + self.log.warning( + f"Trying to ban trusted peer {connection.peer_info.host} for {ban_time}, but will not ban" + ) + ban_time = 0 if ban_time > 0: ban_until: float = time.time() + ban_time self.log.warning(f"Banning {connection.peer_info.host} for {ban_time} seconds")