@@ -45,11 +45,13 @@ using td::Ref;
4545using namespace std ::literals::string_literals;
4646
4747// Don't increase MERGE_MAX_QUEUE_LIMIT too much: merging requires cleaning the whole queue in out_msg_queue_cleanup
48- static const td::uint32 FORCE_SPLIT_QUEUE_SIZE = 4096 ;
49- static const td::uint32 SPLIT_MAX_QUEUE_SIZE = 100000 ;
50- static const td::uint32 MERGE_MAX_QUEUE_SIZE = 2047 ;
51- static const td::uint32 SKIP_EXTERNALS_QUEUE_SIZE = 8000 ;
52- static const int HIGH_PRIORITY_EXTERNAL = 10 ; // don't skip high priority externals when queue is big
48+ static constexpr td::uint32 FORCE_SPLIT_QUEUE_SIZE = 4096 ;
49+ static constexpr td::uint32 SPLIT_MAX_QUEUE_SIZE = 100000 ;
50+ static constexpr td::uint32 MERGE_MAX_QUEUE_SIZE = 2047 ;
51+ static constexpr td::uint32 SKIP_EXTERNALS_QUEUE_SIZE = 8000 ;
52+ static constexpr int HIGH_PRIORITY_EXTERNAL = 10 ; // don't skip high priority externals when queue is big
53+
54+ static constexpr int MAX_ATTEMPTS = 5 ;
5355
5456#define DBG (__n ) dbg(__n)&&
5557#define DSTART int __dcnt = 0 ;
@@ -74,11 +76,15 @@ static inline bool dbg(int c) {
7476 * @param manager The ActorId of the ValidatorManager.
7577 * @param timeout The timeout for the collator.
7678 * @param promise The promise to return the result.
79+ * @param cancellation_token Token to cancel collation.
80+ * @param mode +1 - skip storing candidate to disk.
81+ * @param attempt_idx The index of the attempt, starting from 0. On later attempts collator decreases block limits and skips some steps.
7782 */
7883Collator::Collator (ShardIdFull shard, bool is_hardfork, BlockIdExt min_masterchain_block_id,
7984 std::vector<BlockIdExt> prev, td::Ref<ValidatorSet> validator_set, Ed25519_PublicKey collator_id,
8085 Ref<CollatorOptions> collator_opts, td::actor::ActorId<ValidatorManager> manager,
81- td::Timestamp timeout, td::Promise<BlockCandidate> promise)
86+ td::Timestamp timeout, td::Promise<BlockCandidate> promise, td::CancellationToken cancellation_token,
87+ unsigned mode, int attempt_idx)
8288 : shard_(shard)
8389 , is_hardfork_(is_hardfork)
8490 , min_mc_block_id{min_masterchain_block_id}
@@ -93,9 +99,13 @@ Collator::Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_mastercha
9399 , soft_timeout_(td::Timestamp::at(timeout.at() - 3.0 ))
94100 , medium_timeout_(td::Timestamp::at(timeout.at() - 1.5 ))
95101 , main_promise(std::move(promise))
96- , perf_timer_(" collate" , 0.1 , [manager](double duration) {
97- send_closure (manager, &ValidatorManager::add_perf_timer_stat, " collate" , duration);
98- }) {
102+ , mode_(mode)
103+ , attempt_idx_(attempt_idx)
104+ , perf_timer_(" collate" , 0.1 ,
105+ [manager](double duration) {
106+ send_closure (manager, &ValidatorManager::add_perf_timer_stat, " collate" , duration);
107+ })
108+ , cancellation_token_(std::move(cancellation_token)) {
99109}
100110
101111/* *
@@ -107,7 +117,11 @@ Collator::Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_mastercha
107117 * The results of these queries are handled by corresponding callback functions.
108118 */
109119void Collator::start_up () {
110- LOG (WARNING) << " Collator for shard " << shard_.to_str () << " started" ;
120+ LOG (WARNING) << " Collator for shard " << shard_.to_str () << " started"
121+ << (attempt_idx_ ? PSTRING () << " (attempt #" << attempt_idx_ << " )" : " " );
122+ if (!check_cancelled ()) {
123+ return ;
124+ }
111125 LOG (DEBUG) << " Previous block #1 is " << prev_blocks.at (0 ).to_str ();
112126 if (prev_blocks.size () > 1 ) {
113127 LOG (DEBUG) << " Previous block #2 is " << prev_blocks.at (1 ).to_str ();
@@ -340,7 +354,15 @@ bool Collator::fatal_error(td::Status error) {
340354 error.ensure_error ();
341355 LOG (ERROR) << " cannot generate block candidate for " << show_shard (shard_) << " : " << error.to_string ();
342356 if (busy_) {
343- main_promise (std::move (error));
357+ if (allow_repeat_collation_ && error.code () != ErrorCode::cancelled && attempt_idx_ + 1 < MAX_ATTEMPTS &&
358+ !is_hardfork_ && !timeout.is_in_past ()) {
359+ LOG (WARNING) << " Repeating collation (attempt #" << attempt_idx_ + 1 << " )" ;
360+ run_collate_query (shard_, min_mc_block_id, prev_blocks, created_by_, validator_set_, collator_opts_, manager,
361+ td::Timestamp::in (10.0 ), std::move (main_promise), std::move (cancellation_token_), mode_,
362+ attempt_idx_ + 1 );
363+ } else {
364+ main_promise (std::move (error));
365+ }
344366 busy_ = false ;
345367 }
346368 stop ();
@@ -382,6 +404,9 @@ bool Collator::fatal_error(std::string err_msg, int err_code) {
382404 */
383405void Collator::check_pending () {
384406 // LOG(DEBUG) << "pending = " << pending;
407+ if (!check_cancelled ()) {
408+ return ;
409+ }
385410 if (!pending) {
386411 step = 2 ;
387412 try {
@@ -712,6 +737,15 @@ bool Collator::unpack_last_mc_state() {
712737 return fatal_error (limits.move_as_error ());
713738 }
714739 block_limits_ = limits.move_as_ok ();
740+ if (attempt_idx_ == 3 ) {
741+ LOG (INFO) << " Attempt #3: bytes, gas limits /= 2" ;
742+ block_limits_->bytes .multiply_by (0.5 );
743+ block_limits_->gas .multiply_by (0.5 );
744+ } else if (attempt_idx_ == 4 ) {
745+ LOG (INFO) << " Attempt #4: bytes, gas limits /= 4" ;
746+ block_limits_->bytes .multiply_by (0.25 );
747+ block_limits_->gas .multiply_by (0.25 );
748+ }
715749 LOG (DEBUG) << " block limits: bytes [" << block_limits_->bytes .underload () << " , " << block_limits_->bytes .soft ()
716750 << " , " << block_limits_->bytes .hard () << " ]" ;
717751 LOG (DEBUG) << " block limits: gas [" << block_limits_->gas .underload () << " , " << block_limits_->gas .soft () << " , "
@@ -2093,6 +2127,7 @@ bool Collator::do_collate() {
20932127 if (max_lt == start_lt) {
20942128 ++max_lt;
20952129 }
2130+ allow_repeat_collation_ = true ;
20962131 // NB: interchanged 1.2 and 1.1 (is this always correct?)
20972132 // 1.1. re-adjust neighbors' out_msg_queues (for oneself)
20982133 if (!add_trivial_neighbor ()) {
@@ -2333,6 +2368,9 @@ bool Collator::out_msg_queue_cleanup() {
23332368 LOG (WARNING) << " cleaning up outbound queue takes too long, ending" ;
23342369 break ;
23352370 }
2371+ if (!check_cancelled ()) {
2372+ return false ;
2373+ }
23362374 if (i == queue_parts.size ()) {
23372375 i = 0 ;
23382376 }
@@ -3532,6 +3570,9 @@ bool Collator::process_inbound_internal_messages() {
35323570 stats_.limits_log += PSTRING () << " INBOUND_INT_MESSAGES: timeout\n " ;
35333571 break ;
35343572 }
3573+ if (!check_cancelled ()) {
3574+ return false ;
3575+ }
35353576 auto kv = nb_out_msgs_->extract_cur ();
35363577 CHECK (kv && kv->msg .not_null ());
35373578 LOG (DEBUG) << " processing inbound message with (lt,hash)=(" << kv->lt << " ," << kv->key .to_hex ()
@@ -3565,6 +3606,10 @@ bool Collator::process_inbound_external_messages() {
35653606 LOG (INFO) << " skipping processing of inbound external messages" ;
35663607 return true ;
35673608 }
3609+ if (attempt_idx_ >= 2 ) {
3610+ LOG (INFO) << " Attempt #" << attempt_idx_ << " : skip external messages" ;
3611+ return true ;
3612+ }
35683613 if (out_msg_queue_size_ > SKIP_EXTERNALS_QUEUE_SIZE) {
35693614 LOG (INFO) << " skipping processing of inbound external messages (except for high-priority) because out_msg_queue is "
35703615 " too big ("
@@ -3586,6 +3631,9 @@ bool Collator::process_inbound_external_messages() {
35863631 stats_.limits_log += PSTRING () << " INBOUND_EXT_MESSAGES: timeout\n " ;
35873632 break ;
35883633 }
3634+ if (!check_cancelled ()) {
3635+ return false ;
3636+ }
35893637 auto ext_msg = ext_msg_struct.cell ;
35903638 ton::Bits256 hash{ext_msg->get_hash ().bits ()};
35913639 int r = process_external_message (std::move (ext_msg));
@@ -3692,6 +3740,10 @@ bool Collator::process_dispatch_queue() {
36923740 if (max_per_initiator[iter] == 0 || max_total_count[iter] == 0 ) {
36933741 continue ;
36943742 }
3743+ if (iter > 0 && attempt_idx_ >= 1 ) {
3744+ LOG (INFO) << " Attempt #" << attempt_idx_ << " : skip process_dispatch_queue" ;
3745+ break ;
3746+ }
36953747 vm::AugmentedDictionary cur_dispatch_queue{dispatch_queue_->get_root (), 256 , block::tlb::aug_DispatchQueue};
36963748 std::map<std::tuple<WorkchainId, StdSmcAddress, LogicalTime>, size_t > count_per_initiator;
36973749 size_t total_count = 0 ;
@@ -3704,13 +3756,13 @@ bool Collator::process_dispatch_queue() {
37043756 stats_.limits_log += PSTRING () << " DISPATCH_QUEUE_STAGE_" << iter << " : "
37053757 << block_full_comment (*block_limit_status_, block::ParamLimits::cl_normal)
37063758 << " \n " ;
3707- return true ;
3759+ return register_dispatch_queue_op ( true ) ;
37083760 }
37093761 if (soft_timeout_.is_in_past (td::Timestamp::now ())) {
37103762 block_full_ = true ;
37113763 LOG (WARNING) << " soft timeout reached, stop processing dispatch queue" ;
37123764 stats_.limits_log += PSTRING () << " DISPATCH_QUEUE_STAGE_" << iter << " : timeout\n " ;
3713- return true ;
3765+ return register_dispatch_queue_op ( true ) ;
37143766 }
37153767 StdSmcAddress src_addr;
37163768 td::Ref<vm::CellSlice> account_dispatch_queue;
@@ -3788,6 +3840,7 @@ bool Collator::process_dispatch_queue() {
37883840 if (iter == 0 ) {
37893841 have_unprocessed_account_dispatch_queue_ = false ;
37903842 }
3843+ register_dispatch_queue_op (true );
37913844 }
37923845 return true ;
37933846}
@@ -3811,12 +3864,7 @@ bool Collator::process_deferred_message(Ref<vm::CellSlice> enq_msg, StdSmcAddres
38113864 return fatal_error (PSTRING () << " failed to delete message from DispatchQueue: address=" << src_addr.to_hex ()
38123865 << " , lt=" << lt);
38133866 }
3814- ++dispatch_queue_ops_;
3815- if (!(dispatch_queue_ops_ & 63 )) {
3816- if (!block_limit_status_->add_proof (dispatch_queue_->get_root_cell ())) {
3817- return false ;
3818- }
3819- }
3867+ register_dispatch_queue_op ();
38203868 ++sender_generated_messages_count_[src_addr];
38213869
38223870 LogicalTime enqueued_lt = 0 ;
@@ -3909,6 +3957,7 @@ bool Collator::process_deferred_message(Ref<vm::CellSlice> enq_msg, StdSmcAddres
39093957 ++unprocessed_deferred_messages_[src_addr];
39103958 LOG (INFO) << " delivering deferred message from account " << src_addr.to_hex () << " , lt=" << lt
39113959 << " , emitted_lt=" << emitted_lt;
3960+ block_limit_status_->add_cell (msg_env);
39123961 register_new_msg (std::move (new_msg));
39133962 msg_metadata = std::move (env.metadata );
39143963 return true ;
@@ -4088,11 +4137,7 @@ bool Collator::enqueue_message(block::NewOutMsg msg, td::RefInt256 fwd_fees_rema
40884137 }
40894138 ++dispatch_dict_size;
40904139 dispatch_queue_->set (src_addr, block::pack_account_dispatch_queue (dispatch_dict, dispatch_dict_size));
4091- ++dispatch_queue_ops_;
4092- if (!(dispatch_queue_ops_ & 63 )) {
4093- return block_limit_status_->add_proof (dispatch_queue_->get_root_cell ());
4094- }
4095- return true ;
4140+ return register_dispatch_queue_op ();
40964141 }
40974142
40984143 auto next_hop = block::interpolate_addr (src_prefix, dest_prefix, route_info.second );
@@ -4134,6 +4179,9 @@ bool Collator::process_new_messages(bool enqueue_only) {
41344179 stats_.limits_log += PSTRING () << " NEW_MESSAGES: "
41354180 << block_full_comment (*block_limit_status_, block::ParamLimits::cl_normal) << " \n " ;
41364181 }
4182+ if (!check_cancelled ()) {
4183+ return false ;
4184+ }
41374185 LOG (DEBUG) << " have message with lt=" << msg.lt ;
41384186 int res = process_one_new_message (std::move (msg), enqueue_only);
41394187 if (res < 0 ) {
@@ -4973,6 +5021,23 @@ bool Collator::register_out_msg_queue_op(bool force) {
49735021 }
49745022}
49755023
5024+ /* *
5025+ * Registers a dispatch queue message queue operation.
5026+ * Adds the proof to the block limit status every 64 operations.
5027+ *
5028+ * @param force If true, the proof will always be added to the block limit status.
5029+ *
5030+ * @returns True if the operation was successfully registered, false otherwise.
5031+ */
5032+ bool Collator::register_dispatch_queue_op (bool force) {
5033+ ++dispatch_queue_ops_;
5034+ if (force || !(dispatch_queue_ops_ & 63 )) {
5035+ return block_limit_status_->add_proof (dispatch_queue_->get_root_cell ());
5036+ } else {
5037+ return true ;
5038+ }
5039+ }
5040+
49765041/* *
49775042 * Creates a new shard state and the Merkle update.
49785043 *
@@ -5098,9 +5163,10 @@ bool Collator::compute_out_msg_queue_info(Ref<vm::Cell>& out_msg_queue_info) {
50985163 vm::CellSlice maybe_extra = cb.as_cellslice ();
50995164 cb.reset ();
51005165
5101- return register_out_msg_queue_op (true ) && out_msg_queue_->append_dict_to_bool (cb) // _ out_queue:OutMsgQueue
5102- && processed_upto_->pack (cb) // proc_info:ProcessedInfo
5103- && cb.append_cellslice_bool (maybe_extra) // extra:(Maybe OutMsgQueueExtra)
5166+ return register_out_msg_queue_op (true ) && register_dispatch_queue_op (true ) &&
5167+ out_msg_queue_->append_dict_to_bool (cb) // _ out_queue:OutMsgQueue
5168+ && processed_upto_->pack (cb) // proc_info:ProcessedInfo
5169+ && cb.append_cellslice_bool (maybe_extra) // extra:(Maybe OutMsgQueueExtra)
51045170 && cb.finalize_to (out_msg_queue_info);
51055171}
51065172
@@ -5492,14 +5558,18 @@ bool Collator::create_block_candidate() {
54925558 << consensus_config.max_collated_data_size << " )" );
54935559 }
54945560 // 4. save block candidate
5495- LOG (INFO) << " saving new BlockCandidate" ;
5496- td::actor::send_closure_later (
5497- manager, &ValidatorManager::set_block_candidate, block_candidate->id , block_candidate->clone (),
5498- validator_set_->get_catchain_seqno (), validator_set_->get_validator_set_hash (),
5499- [self = get_self ()](td::Result<td::Unit> saved) -> void {
5500- LOG (DEBUG) << " got answer to set_block_candidate" ;
5501- td::actor::send_closure_later (std::move (self), &Collator::return_block_candidate, std::move (saved));
5502- });
5561+ if (mode_ & CollateMode::skip_store_candidate) {
5562+ td::actor::send_closure_later (actor_id (this ), &Collator::return_block_candidate, td::Unit ());
5563+ } else {
5564+ LOG (INFO) << " saving new BlockCandidate" ;
5565+ td::actor::send_closure_later (
5566+ manager, &ValidatorManager::set_block_candidate, block_candidate->id , block_candidate->clone (),
5567+ validator_set_->get_catchain_seqno (), validator_set_->get_validator_set_hash (),
5568+ [self = get_self ()](td::Result<td::Unit> saved) -> void {
5569+ LOG (DEBUG) << " got answer to set_block_candidate" ;
5570+ td::actor::send_closure_later (std::move (self), &Collator::return_block_candidate, std::move (saved));
5571+ });
5572+ }
55035573 // 5. communicate about bad and delayed external messages
55045574 if (!bad_ext_msgs_.empty () || !delay_ext_msgs_.empty ()) {
55055575 LOG (INFO) << " sending complete_external_messages() to Manager" ;
@@ -5643,6 +5713,18 @@ void Collator::after_get_external_messages(td::Result<std::vector<std::pair<Ref<
56435713 check_pending ();
56445714}
56455715
5716+ /* *
5717+ * Checks if collation was cancelled via cancellation token
5718+ *
5719+ * @returns false if the collation was cancelled, true otherwise
5720+ */
5721+ bool Collator::check_cancelled () {
5722+ if (cancellation_token_) {
5723+ return fatal_error (td::Status::Error (ErrorCode::cancelled, " cancelled" ));
5724+ }
5725+ return true ;
5726+ }
5727+
56465728td::uint32 Collator::get_skip_externals_queue_size () {
56475729 return SKIP_EXTERNALS_QUEUE_SIZE;
56485730}
0 commit comments