68
68
from chia .protocols .full_node_protocol import RequestBlocks , RespondBlock , RespondBlocks , RespondSignagePoint
69
69
from chia .protocols .outbound_message import Message , NodeType , make_msg
70
70
from chia .protocols .protocol_message_types import ProtocolMessageTypes
71
+ from chia .protocols .protocol_timing import CONSENSUS_ERROR_BAN_SECONDS
71
72
from chia .protocols .shared_protocol import Capability
72
73
from chia .protocols .wallet_protocol import CoinStateUpdate , RemovedMempoolItem
73
74
from chia .rpc .rpc_server import StateChangedProtocol
@@ -502,11 +503,16 @@ async def _handle_one_transaction(self, entry: TransactionQueueEntry) -> None:
502
503
except asyncio .CancelledError :
503
504
error_stack = traceback .format_exc ()
504
505
self .log .debug (f"Cancelling _handle_one_transaction, closing: { error_stack } " )
506
+ except ValidationError as e :
507
+ self .log .exception ("ValidationError in _handle_one_transaction, closing" )
508
+ if peer is not None :
509
+ await peer .close (CONSENSUS_ERROR_BAN_SECONDS )
510
+ entry .done .set ((MempoolInclusionStatus .FAILED , e .code ))
505
511
except Exception :
506
- error_stack = traceback .format_exc ()
507
- self .log .error (f"Error in _handle_one_transaction, closing: { error_stack } " )
512
+ self .log .exception ("Error in _handle_one_transaction, closing" )
508
513
if peer is not None :
509
- await peer .close ()
514
+ await peer .close (CONSENSUS_ERROR_BAN_SECONDS )
515
+ entry .done .set ((MempoolInclusionStatus .FAILED , Err .UNKNOWN ))
510
516
finally :
511
517
self .add_transaction_semaphore .release ()
512
518
@@ -1092,13 +1098,13 @@ async def request_validate_wp(
1092
1098
response = await weight_proof_peer .call_api (FullNodeAPI .request_proof_of_weight , request , timeout = wp_timeout )
1093
1099
# Disconnect from this peer, because they have not behaved properly
1094
1100
if response is None or not isinstance (response , full_node_protocol .RespondProofOfWeight ):
1095
- await weight_proof_peer .close (600 )
1101
+ await weight_proof_peer .close (CONSENSUS_ERROR_BAN_SECONDS )
1096
1102
raise RuntimeError (f"Weight proof did not arrive in time from peer: { weight_proof_peer .peer_info .host } " )
1097
1103
if response .wp .recent_chain_data [- 1 ].reward_chain_block .height != peak_height :
1098
- await weight_proof_peer .close (600 )
1104
+ await weight_proof_peer .close (CONSENSUS_ERROR_BAN_SECONDS )
1099
1105
raise RuntimeError (f"Weight proof had the wrong height: { weight_proof_peer .peer_info .host } " )
1100
1106
if response .wp .recent_chain_data [- 1 ].reward_chain_block .weight != peak_weight :
1101
- await weight_proof_peer .close (600 )
1107
+ await weight_proof_peer .close (CONSENSUS_ERROR_BAN_SECONDS )
1102
1108
raise RuntimeError (f"Weight proof had the wrong weight: { weight_proof_peer .peer_info .host } " )
1103
1109
if self .in_bad_peak_cache (response .wp ):
1104
1110
raise ValueError ("Weight proof failed bad peak cache validation" )
@@ -1113,10 +1119,10 @@ async def request_validate_wp(
1113
1119
try :
1114
1120
validated , fork_point , summaries = await self .weight_proof_handler .validate_weight_proof (response .wp )
1115
1121
except Exception as e :
1116
- await weight_proof_peer .close (600 )
1122
+ await weight_proof_peer .close (CONSENSUS_ERROR_BAN_SECONDS )
1117
1123
raise ValueError (f"Weight proof validation threw an error { e } " )
1118
1124
if not validated :
1119
- await weight_proof_peer .close (600 )
1125
+ await weight_proof_peer .close (CONSENSUS_ERROR_BAN_SECONDS )
1120
1126
raise ValueError ("Weight proof validation failed" )
1121
1127
self .log .info (f"Re-checked peers: total of { len (peers_with_peak )} peers with peak { peak_height } " )
1122
1128
self .sync_store .set_sync_mode (True )
@@ -1378,7 +1384,7 @@ async def ingest_blocks(
1378
1384
vs ,
1379
1385
)
1380
1386
if err is not None :
1381
- await peer .close (600 )
1387
+ await peer .close (CONSENSUS_ERROR_BAN_SECONDS )
1382
1388
raise ValueError (f"Failed to validate block batch { start_height } to { end_height } : { err } " )
1383
1389
if end_height - block_rate_height > 100 :
1384
1390
now = time .monotonic ()
@@ -2767,66 +2773,56 @@ async def add_transaction(
2767
2773
return MempoolInclusionStatus .SUCCESS , None
2768
2774
if self .mempool_manager .seen (spend_name ):
2769
2775
return MempoolInclusionStatus .FAILED , Err .ALREADY_INCLUDING_TRANSACTION
2770
- self .mempool_manager .add_and_maybe_pop_seen (spend_name )
2771
2776
self .log .debug (f"Processing transaction: { spend_name } " )
2772
2777
# Ignore if syncing or if we have not yet received a block
2773
2778
# the mempool must have a peak to validate transactions
2774
2779
if self .sync_store .get_sync_mode () or self .mempool_manager .peak is None :
2775
- status = MempoolInclusionStatus .FAILED
2776
- error : Optional [Err ] = Err .NO_TRANSACTIONS_WHILE_SYNCING
2777
- self .mempool_manager .remove_seen (spend_name )
2778
- else :
2780
+ return MempoolInclusionStatus .FAILED , Err .NO_TRANSACTIONS_WHILE_SYNCING
2781
+
2782
+ cost_result = await self .mempool_manager .pre_validate_spendbundle (transaction , spend_name , self ._bls_cache )
2783
+
2784
+ self .mempool_manager .add_and_maybe_pop_seen (spend_name )
2785
+
2786
+ if self .config .get ("log_mempool" , False ): # pragma: no cover
2779
2787
try :
2780
- cost_result = await self .mempool_manager .pre_validate_spendbundle (
2781
- transaction , spend_name , self ._bls_cache
2782
- )
2783
- except ValidationError as e :
2784
- self .mempool_manager .remove_seen (spend_name )
2785
- return MempoolInclusionStatus .FAILED , e .code
2788
+ mempool_dir = path_from_root (self .root_path , "mempool-log" ) / f"{ self .blockchain .get_peak_height ()} "
2789
+ mempool_dir .mkdir (parents = True , exist_ok = True )
2790
+ with open (mempool_dir / f"{ spend_name } .bundle" , "wb+" ) as f :
2791
+ f .write (bytes (transaction ))
2786
2792
except Exception :
2787
- self .mempool_manager .remove_seen (spend_name )
2788
- raise
2793
+ self .log .exception (f"Failed to log mempool item: { spend_name } " )
2789
2794
2790
- if self .config .get ("log_mempool" , False ): # pragma: no cover
2791
- try :
2792
- mempool_dir = path_from_root (self .root_path , "mempool-log" ) / f"{ self .blockchain .get_peak_height ()} "
2793
- mempool_dir .mkdir (parents = True , exist_ok = True )
2794
- with open (mempool_dir / f"{ spend_name } .bundle" , "wb+" ) as f :
2795
- f .write (bytes (transaction ))
2796
- except Exception :
2797
- self .log .exception (f"Failed to log mempool item: { spend_name } " )
2798
-
2799
- async with self .blockchain .priority_mutex .acquire (priority = BlockchainMutexPriority .low ):
2800
- if self .mempool_manager .get_spendbundle (spend_name ) is not None :
2801
- self .mempool_manager .remove_seen (spend_name )
2802
- return MempoolInclusionStatus .SUCCESS , None
2803
- if self .mempool_manager .peak is None :
2804
- return MempoolInclusionStatus .FAILED , Err .MEMPOOL_NOT_INITIALIZED
2805
- info = await self .mempool_manager .add_spend_bundle (
2806
- transaction , cost_result , spend_name , self .mempool_manager .peak .height
2807
- )
2808
- status = info .status
2809
- error = info .error
2810
- if status == MempoolInclusionStatus .SUCCESS :
2811
- self .log .debug (
2812
- f"Added transaction to mempool: { spend_name } mempool size: "
2813
- f"{ self .mempool_manager .mempool .total_mempool_cost ()} normalized "
2814
- f"{ self .mempool_manager .mempool .total_mempool_cost () / 5000000 } "
2815
- )
2795
+ async with self .blockchain .priority_mutex .acquire (priority = BlockchainMutexPriority .low ):
2796
+ if self .mempool_manager .get_spendbundle (spend_name ) is not None :
2797
+ self .mempool_manager .remove_seen (spend_name )
2798
+ return MempoolInclusionStatus .SUCCESS , None
2799
+ if self .mempool_manager .peak is None :
2800
+ return MempoolInclusionStatus .FAILED , Err .MEMPOOL_NOT_INITIALIZED
2801
+ info = await self .mempool_manager .add_spend_bundle (
2802
+ transaction , cost_result , spend_name , self .mempool_manager .peak .height
2803
+ )
2804
+ status = info .status
2805
+ error = info .error
2806
+ if status == MempoolInclusionStatus .SUCCESS :
2807
+ self .log .debug (
2808
+ f"Added transaction to mempool: { spend_name } mempool size: "
2809
+ f"{ self .mempool_manager .mempool .total_mempool_cost ()} normalized "
2810
+ f"{ self .mempool_manager .mempool .total_mempool_cost () / 5000000 } "
2811
+ )
2816
2812
2817
- # Only broadcast successful transactions, not pending ones. Otherwise it's a DOS
2818
- # vector.
2819
- mempool_item = self .mempool_manager .get_mempool_item (spend_name )
2820
- assert mempool_item is not None
2821
- await self .broadcast_removed_tx (info .removals )
2822
- await self .broadcast_added_tx (mempool_item , current_peer = peer )
2813
+ # Only broadcast successful transactions, not pending ones. Otherwise it's a DOS
2814
+ # vector.
2815
+ mempool_item = self .mempool_manager .get_mempool_item (spend_name )
2816
+ assert mempool_item is not None
2817
+ await self .broadcast_removed_tx (info .removals )
2818
+ await self .broadcast_added_tx (mempool_item , current_peer = peer )
2823
2819
2824
- if self .simulator_transaction_callback is not None : # callback
2825
- await self .simulator_transaction_callback (spend_name )
2820
+ if self .simulator_transaction_callback is not None : # callback
2821
+ await self .simulator_transaction_callback (spend_name )
2826
2822
2827
- else :
2828
- self .mempool_manager .remove_seen (spend_name )
2829
- self .log .debug (f"Wasn't able to add transaction with id { spend_name } , status { status } error: { error } " )
2823
+ else :
2824
+ self .mempool_manager .remove_seen (spend_name )
2825
+ self .log .debug (f"Wasn't able to add transaction with id { spend_name } , status { status } error: { error } " )
2830
2826
return status , error
2831
2827
2832
2828
async def broadcast_added_tx (
0 commit comments