Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions chia/_tests/core/full_node/test_full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,10 @@ async def suppress_value_error(coro: Coroutine[Any, Any, None]) -> None:


@pytest.mark.anyio
@pytest.mark.limit_consensus_modes(
allowed=[ConsensusMode.HARD_FORK_2_0, ConsensusMode.HARD_FORK_3_0],
reason="We can no longer (reliably) farm blocks from before the hard fork",
)
async def test_new_transaction_and_mempool(
wallet_nodes: tuple[
FullNodeSimulator, FullNodeSimulator, ChiaServer, ChiaServer, WalletTool, WalletTool, BlockTools
Expand Down Expand Up @@ -946,8 +950,10 @@ async def test_new_transaction_and_mempool(

# Makes a bunch of coins
conditions_dict: dict[ConditionOpcode, list[ConditionWithArgs]] = {ConditionOpcode.CREATE_COIN: []}
# This should fit in one transaction
for _ in range(100):
# This should fit in one transaction. The test constants have a max block cost of 400,000,000
# and the default max *transaction* cost is half that, so 200,000,000. CREATE_COIN has a cost of
# 1,800,000, we create 80 coins
for _ in range(80):
receiver_puzzlehash = wallet_receiver.get_new_puzzlehash()
puzzle_hashes.append(receiver_puzzlehash)
output = ConditionWithArgs(ConditionOpcode.CREATE_COIN, [receiver_puzzlehash, int_to_bytes(10000000000)])
Expand Down Expand Up @@ -1046,8 +1052,8 @@ async def test_new_transaction_and_mempool(
# these numbers reflect the capacity of the mempool. In these
# tests MEMPOOL_BLOCK_BUFFER is 1. The other factors are COST_PER_BYTE
# and MAX_BLOCK_COST_CLVM
assert included_tx == 23
assert not_included_tx == 10
assert included_tx == 20
assert not_included_tx == 7
assert seen_bigger_transaction_has_high_fee

# Mempool is full
Expand Down
12 changes: 8 additions & 4 deletions chia/_tests/core/mempool/test_mempool.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
from chia.types.mempool_inclusion_status import MempoolInclusionStatus
from chia.types.mempool_item import MempoolItem, UnspentLineageInfo
from chia.util.casts import int_to_bytes
from chia.util.errors import Err
from chia.util.errors import Err, ValidationError
from chia.util.hash import std_hash
from chia.util.recursive_replace import recursive_replace
from chia.wallet.conditions import AssertCoinAnnouncement, AssertPuzzleAnnouncement
Expand Down Expand Up @@ -361,7 +361,10 @@ async def respond_transaction(
self.full_node.full_node_store.pending_tx_request.pop(spend_name)
if spend_name in self.full_node.full_node_store.peers_with_tx:
self.full_node.full_node_store.peers_with_tx.pop(spend_name)
ret = await self.full_node.add_transaction(tx.transaction, spend_name, peer, test)
try:
ret = await self.full_node.add_transaction(tx.transaction, spend_name, peer, test)
except ValidationError as e:
ret = (MempoolInclusionStatus.FAILED, e.code)
invariant_check_mempool(self.full_node.mempool_manager.mempool)
return ret

Expand Down Expand Up @@ -2865,8 +2868,9 @@ async def test_invalid_coin_spend_coin(
coin_spend_0 = make_spend(coin_0, cs.puzzle_reveal, cs.solution)
new_bundle = recursive_replace(spend_bundle, "coin_spends", [coin_spend_0, *spend_bundle.coin_spends[1:]])
assert spend_bundle is not None
res = await full_node_1.full_node.add_transaction(new_bundle, new_bundle.name(), test=True)
assert res == (MempoolInclusionStatus.FAILED, Err.WRONG_PUZZLE_HASH)
with pytest.raises(ValidationError) as e:
await full_node_1.full_node.add_transaction(new_bundle, new_bundle.name(), test=True)
assert e.value.code == Err.WRONG_PUZZLE_HASH


coins = make_test_coins()
Expand Down
114 changes: 55 additions & 59 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from chia.protocols.full_node_protocol import RequestBlocks, RespondBlock, RespondBlocks, RespondSignagePoint
from chia.protocols.outbound_message import Message, NodeType, make_msg
from chia.protocols.protocol_message_types import ProtocolMessageTypes
from chia.protocols.protocol_timing import CONSENSUS_ERROR_BAN_SECONDS
from chia.protocols.shared_protocol import Capability
from chia.protocols.wallet_protocol import CoinStateUpdate, RemovedMempoolItem
from chia.rpc.rpc_server import StateChangedProtocol
Expand Down Expand Up @@ -502,11 +503,16 @@ async def _handle_one_transaction(self, entry: TransactionQueueEntry) -> None:
except asyncio.CancelledError:
error_stack = traceback.format_exc()
self.log.debug(f"Cancelling _handle_one_transaction, closing: {error_stack}")
except ValidationError as e:
self.log.exception("ValidationError in _handle_one_transaction, closing")
if peer is not None:
await peer.close(CONSENSUS_ERROR_BAN_SECONDS)
entry.done.set((MempoolInclusionStatus.FAILED, e.code))
except Exception:
error_stack = traceback.format_exc()
self.log.error(f"Error in _handle_one_transaction, closing: {error_stack}")
self.log.exception("Error in _handle_one_transaction, closing")
if peer is not None:
await peer.close()
await peer.close(CONSENSUS_ERROR_BAN_SECONDS)
entry.done.set((MempoolInclusionStatus.FAILED, Err.UNKNOWN))
finally:
self.add_transaction_semaphore.release()

Expand Down Expand Up @@ -1092,13 +1098,13 @@ async def request_validate_wp(
response = await weight_proof_peer.call_api(FullNodeAPI.request_proof_of_weight, request, timeout=wp_timeout)
# Disconnect from this peer, because they have not behaved properly
if response is None or not isinstance(response, full_node_protocol.RespondProofOfWeight):
await weight_proof_peer.close(600)
await weight_proof_peer.close(CONSENSUS_ERROR_BAN_SECONDS)
raise RuntimeError(f"Weight proof did not arrive in time from peer: {weight_proof_peer.peer_info.host}")
if response.wp.recent_chain_data[-1].reward_chain_block.height != peak_height:
await weight_proof_peer.close(600)
await weight_proof_peer.close(CONSENSUS_ERROR_BAN_SECONDS)
raise RuntimeError(f"Weight proof had the wrong height: {weight_proof_peer.peer_info.host}")
if response.wp.recent_chain_data[-1].reward_chain_block.weight != peak_weight:
await weight_proof_peer.close(600)
await weight_proof_peer.close(CONSENSUS_ERROR_BAN_SECONDS)
raise RuntimeError(f"Weight proof had the wrong weight: {weight_proof_peer.peer_info.host}")
if self.in_bad_peak_cache(response.wp):
raise ValueError("Weight proof failed bad peak cache validation")
Expand All @@ -1113,10 +1119,10 @@ async def request_validate_wp(
try:
validated, fork_point, summaries = await self.weight_proof_handler.validate_weight_proof(response.wp)
except Exception as e:
await weight_proof_peer.close(600)
await weight_proof_peer.close(CONSENSUS_ERROR_BAN_SECONDS)
raise ValueError(f"Weight proof validation threw an error {e}")
if not validated:
await weight_proof_peer.close(600)
await weight_proof_peer.close(CONSENSUS_ERROR_BAN_SECONDS)
raise ValueError("Weight proof validation failed")
self.log.info(f"Re-checked peers: total of {len(peers_with_peak)} peers with peak {peak_height}")
self.sync_store.set_sync_mode(True)
Expand Down Expand Up @@ -1378,7 +1384,7 @@ async def ingest_blocks(
vs,
)
if err is not None:
await peer.close(600)
await peer.close(CONSENSUS_ERROR_BAN_SECONDS)
raise ValueError(f"Failed to validate block batch {start_height} to {end_height}: {err}")
if end_height - block_rate_height > 100:
now = time.monotonic()
Expand Down Expand Up @@ -2767,66 +2773,56 @@ async def add_transaction(
return MempoolInclusionStatus.SUCCESS, None
if self.mempool_manager.seen(spend_name):
return MempoolInclusionStatus.FAILED, Err.ALREADY_INCLUDING_TRANSACTION
self.mempool_manager.add_and_maybe_pop_seen(spend_name)
self.log.debug(f"Processing transaction: {spend_name}")
# Ignore if syncing or if we have not yet received a block
# the mempool must have a peak to validate transactions
if self.sync_store.get_sync_mode() or self.mempool_manager.peak is None:
status = MempoolInclusionStatus.FAILED
error: Optional[Err] = Err.NO_TRANSACTIONS_WHILE_SYNCING
self.mempool_manager.remove_seen(spend_name)
else:
return MempoolInclusionStatus.FAILED, Err.NO_TRANSACTIONS_WHILE_SYNCING

cost_result = await self.mempool_manager.pre_validate_spendbundle(transaction, spend_name, self._bls_cache)

self.mempool_manager.add_and_maybe_pop_seen(spend_name)

if self.config.get("log_mempool", False): # pragma: no cover
try:
cost_result = await self.mempool_manager.pre_validate_spendbundle(
transaction, spend_name, self._bls_cache
)
except ValidationError as e:
self.mempool_manager.remove_seen(spend_name)
return MempoolInclusionStatus.FAILED, e.code
mempool_dir = path_from_root(self.root_path, "mempool-log") / f"{self.blockchain.get_peak_height()}"
mempool_dir.mkdir(parents=True, exist_ok=True)
with open(mempool_dir / f"{spend_name}.bundle", "wb+") as f:
f.write(bytes(transaction))
except Exception:
self.mempool_manager.remove_seen(spend_name)
raise
self.log.exception(f"Failed to log mempool item: {spend_name}")

if self.config.get("log_mempool", False): # pragma: no cover
try:
mempool_dir = path_from_root(self.root_path, "mempool-log") / f"{self.blockchain.get_peak_height()}"
mempool_dir.mkdir(parents=True, exist_ok=True)
with open(mempool_dir / f"{spend_name}.bundle", "wb+") as f:
f.write(bytes(transaction))
except Exception:
self.log.exception(f"Failed to log mempool item: {spend_name}")

async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.low):
if self.mempool_manager.get_spendbundle(spend_name) is not None:
self.mempool_manager.remove_seen(spend_name)
return MempoolInclusionStatus.SUCCESS, None
if self.mempool_manager.peak is None:
return MempoolInclusionStatus.FAILED, Err.MEMPOOL_NOT_INITIALIZED
info = await self.mempool_manager.add_spend_bundle(
transaction, cost_result, spend_name, self.mempool_manager.peak.height
)
status = info.status
error = info.error
if status == MempoolInclusionStatus.SUCCESS:
self.log.debug(
f"Added transaction to mempool: {spend_name} mempool size: "
f"{self.mempool_manager.mempool.total_mempool_cost()} normalized "
f"{self.mempool_manager.mempool.total_mempool_cost() / 5000000}"
)
async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.low):
if self.mempool_manager.get_spendbundle(spend_name) is not None:
self.mempool_manager.remove_seen(spend_name)
return MempoolInclusionStatus.SUCCESS, None
if self.mempool_manager.peak is None:
return MempoolInclusionStatus.FAILED, Err.MEMPOOL_NOT_INITIALIZED
info = await self.mempool_manager.add_spend_bundle(
transaction, cost_result, spend_name, self.mempool_manager.peak.height
)
status = info.status
error = info.error
if status == MempoolInclusionStatus.SUCCESS:
self.log.debug(
f"Added transaction to mempool: {spend_name} mempool size: "
f"{self.mempool_manager.mempool.total_mempool_cost()} normalized "
f"{self.mempool_manager.mempool.total_mempool_cost() / 5000000}"
)

# Only broadcast successful transactions, not pending ones. Otherwise it's a DOS
# vector.
mempool_item = self.mempool_manager.get_mempool_item(spend_name)
assert mempool_item is not None
await self.broadcast_removed_tx(info.removals)
await self.broadcast_added_tx(mempool_item, current_peer=peer)
# Only broadcast successful transactions, not pending ones. Otherwise it's a DOS
# vector.
mempool_item = self.mempool_manager.get_mempool_item(spend_name)
assert mempool_item is not None
await self.broadcast_removed_tx(info.removals)
await self.broadcast_added_tx(mempool_item, current_peer=peer)

if self.simulator_transaction_callback is not None: # callback
await self.simulator_transaction_callback(spend_name)
if self.simulator_transaction_callback is not None: # callback
await self.simulator_transaction_callback(spend_name)

else:
self.mempool_manager.remove_seen(spend_name)
self.log.debug(f"Wasn't able to add transaction with id {spend_name}, status {status} error: {error}")
else:
self.mempool_manager.remove_seen(spend_name)
self.log.debug(f"Wasn't able to add transaction with id {spend_name}, status {status} error: {error}")
return status, error

async def broadcast_added_tx(
Expand Down
12 changes: 9 additions & 3 deletions chia/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,9 +553,15 @@ async def connection_closed(
# in this case we still want to do the banning logic and remove the connection from the list
# but the other cleanup should already have been done so we skip that

if is_localhost(connection.peer_info.host) and ban_time != 0:
self.log.warning(f"Trying to ban localhost for {ban_time}, but will not ban")
ban_time = 0
if ban_time > 0:
if is_localhost(connection.peer_info.host):
self.log.warning(f"Trying to ban localhost for {ban_time}, but will not ban")
ban_time = 0
elif self.is_trusted_peer(connection, self.config.get("trusted_peers", {})):
self.log.warning(
f"Trying to ban trusted peer {connection.peer_info.host} for {ban_time}, but will not ban"
)
ban_time = 0
if ban_time > 0:
ban_until: float = time.time() + ban_time
self.log.warning(f"Banning {connection.peer_info.host} for {ban_time} seconds")
Expand Down
Loading