@@ -294,13 +294,24 @@ void ValidatorSessionImpl::process_broadcast(PublicKeyHash src, td::BufferSlice
294294 << " ]: duplicate optimistic broadcast for round " << block_round;
295295 return ;
296296 }
297+ int priority = description ().get_node_priority (src_idx, block_round);
298+ if (priority < 0 ) {
299+ VLOG (VALIDATOR_SESSION_WARNING) << this << " [node " << src << " ][broadcast " << sha256_bits256 (data.as_slice ())
300+ << " ]: node is not a producer in round " << block_round;
301+ return ;
302+ }
297303 if (block_round > cur_round_) {
298304 OptimisticBroadcast &optimistic_broadcast = optimistic_broadcasts_[{block_round, src_idx}];
299305 optimistic_broadcast.candidate = std::move (candidate);
300306 optimistic_broadcast.prev_candidate_id = optimistic_prev_candidate;
301307 optimistic_broadcast.broadcast_info = broadcast_info;
302308 VLOG (VALIDATOR_SESSION_WARNING) << this << " : received optimistic broadcast " << block_id << " from " << src
303309 << " , round " << block_round;
310+ validate_optimistic_broadcast (
311+ BlockSourceInfo{description ().get_source_public_key (src_idx),
312+ BlockCandidatePriority{block_round, block_round, priority}},
313+ optimistic_broadcast.candidate ->root_hash_ , optimistic_broadcast.candidate ->data_ .clone (),
314+ optimistic_broadcast.candidate ->collated_data_ .clone (), optimistic_broadcast.prev_candidate_id );
304315 return ;
305316 }
306317 if (SentBlock::get_block_id (real_state_->get_committed_block (description (), block_round - 1 )) !=
@@ -336,7 +347,7 @@ void ValidatorSessionImpl::process_received_block(td::uint32 block_round, Public
336347 }
337348 }
338349 stat->deserialize_time = info.deserialize_time ;
339- stat->serialized_size = info.serialized_size ;
350+ stat->serialized_size = ( int ) info.serialized_size ;
340351 stat->block_id .root_hash = candidate->root_hash_ ;
341352 stat->block_id .file_hash = info.file_hash ;
342353 stat->collated_data_hash = info.collated_data_hash ;
@@ -361,6 +372,12 @@ void ValidatorSessionImpl::process_received_block(td::uint32 block_round, Public
361372 }
362373
363374 blocks_[block_id] = std::move (candidate);
375+ if (auto it = block_waiters_.find (block_id); it != block_waiters_.end ()) {
376+ for (auto &promise : it->second ) {
377+ promise.set_result (td::Unit ());
378+ }
379+ block_waiters_.erase (it);
380+ }
364381
365382 VLOG (VALIDATOR_SESSION_WARNING) << this << " : received broadcast " << block_id;
366383 if (block_round != cur_round_) {
@@ -381,6 +398,37 @@ void ValidatorSessionImpl::process_received_block(td::uint32 block_round, Public
381398 }
382399}
383400
401+ void ValidatorSessionImpl::validate_optimistic_broadcast (BlockSourceInfo source_info,
402+ ValidatorSessionRootHash root_hash, td::BufferSlice data,
403+ td::BufferSlice collated_data,
404+ ValidatorSessionCandidateId prev_candidate_id) {
405+ if (source_info.priority .round <= cur_round_) {
406+ VLOG (VALIDATOR_SESSION_DEBUG) << this << " : validate optimistic broadcast from "
407+ << source_info.source .compute_short_id () << " : too old" ;
408+ return ;
409+ }
410+ auto it = blocks_.find (prev_candidate_id);
411+ if (it == blocks_.end ()) {
412+ VLOG (VALIDATOR_SESSION_DEBUG) << this << " : validate optimistic broadcast from "
413+ << source_info.source .compute_short_id () << " : wait for prev block" ;
414+ block_waiters_[prev_candidate_id].push_back (
415+ [=, SelfId = actor_id (this ), data = std::move (data),
416+ collated_data = std::move (collated_data)](td::Result<td::Unit> R) mutable {
417+ if (R.is_ok ()) {
418+ td::actor::send_closure (SelfId, &ValidatorSessionImpl::validate_optimistic_broadcast, source_info,
419+ root_hash, std::move (data), std::move (collated_data), prev_candidate_id);
420+ }
421+ });
422+ return ;
423+ }
424+ VLOG (VALIDATOR_SESSION_DEBUG) << this << " : validate optimistic broadcast from "
425+ << source_info.source .compute_short_id ();
426+ callback_->on_optimistic_candidate (
427+ source_info, root_hash, std::move (data), std::move (collated_data),
428+ description ().get_source_public_key (description ().get_source_idx (PublicKeyHash{it->second ->src_ })),
429+ it->second ->root_hash_ , it->second ->data_ .clone (), it->second ->collated_data_ .clone ());
430+ }
431+
384432void ValidatorSessionImpl::process_message (PublicKeyHash src, td::BufferSlice data) {
385433}
386434
@@ -473,7 +521,7 @@ void ValidatorSessionImpl::candidate_decision_ok(td::uint32 round, ValidatorSess
473521 stat->block_status = ValidatorSessionStats::status_approved;
474522 stat->comment = PSTRING () << " ts=" << ok_from;
475523 stat->validation_time = validation_time;
476- stat->gen_utime = (double )ok_from;
524+ stat->gen_utime = (int )ok_from;
477525 stat->validated_at = td::Clocks::system ();
478526 stat->validation_cached = validation_cached;
479527 }
@@ -528,7 +576,11 @@ void ValidatorSessionImpl::generated_block(td::uint32 round, GeneratedCandidate
528576 stat->block_status = ValidatorSessionStats::status_received;
529577 stat->collation_time = collation_time;
530578 stat->collated_at = td::Clocks::system ();
531- stat->got_block_at = td::Clocks::system ();
579+ if (auto it = sent_candidates_.find (block_id); it != sent_candidates_.end ()) {
580+ stat->got_block_at = it->second .sent_at ;
581+ } else {
582+ stat->got_block_at = td::Clocks::system ();
583+ }
532584 stat->got_block_by = ValidatorSessionStats::recv_collated;
533585 stat->collation_cached = c.is_cached ;
534586 stat->self_collated = c.self_collated ;
@@ -548,6 +600,12 @@ void ValidatorSessionImpl::generated_block(td::uint32 round, GeneratedCandidate
548600 blocks_[block_id] = create_tl_object<ton_api::validatorSession_candidate>(
549601 local_id ().tl (), round, c.candidate .id .root_hash , std::move (c.candidate .data ),
550602 std::move (c.candidate .collated_data ));
603+ if (auto it = block_waiters_.find (block_id); it != block_waiters_.end ()) {
604+ for (auto &promise : it->second ) {
605+ promise.set_result (td::Unit ());
606+ }
607+ block_waiters_.erase (it);
608+ }
551609 pending_generate_ = false ;
552610 generated_ = true ;
553611 generated_block_ = block_id;
@@ -738,7 +796,6 @@ void ValidatorSessionImpl::try_approve_block(const SentBlock *block) {
738796 VLOG (VALIDATOR_SESSION_WARNING) << print_id << " : failed to get candidate " << hash << " from " << id
739797 << " : " << R.move_as_error ();
740798 } else {
741- LOG (ERROR) << " QQQQQ Got block " << R.ok ().size ();
742799 td::actor::send_closure (SelfId, &ValidatorSessionImpl::process_broadcast, src_id, R.move_as_ok (),
743800 candidate_id, false , false );
744801 }
@@ -1082,7 +1139,7 @@ ValidatorSessionImpl::ValidatorSessionImpl(catchain::CatChainSessionId session_i
10821139 PublicKeyHash local_id, std::vector<ValidatorSessionNode> nodes,
10831140 std::unique_ptr<Callback> callback,
10841141 td::actor::ActorId<keyring::Keyring> keyring,
1085- td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp ::Rldp> rldp,
1142+ td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp2 ::Rldp> rldp,
10861143 td::actor::ActorId<overlay::Overlays> overlays, std::string db_root,
10871144 std::string db_suffix, bool allow_unsafe_self_blocks_resync)
10881145 : unique_hash_(session_id)
@@ -1206,7 +1263,6 @@ void ValidatorSessionImpl::start_up() {
12061263 virtual_state_ = real_state_;
12071264
12081265 check_all ();
1209- td::actor::send_closure (rldp_, &rldp::Rldp::add_id, description ().get_source_adnl_id (local_idx ()));
12101266}
12111267
12121268void ValidatorSessionImpl::stats_init () {
@@ -1365,25 +1421,49 @@ void ValidatorSessionImpl::process_approve(td::uint32 node_id, td::uint32 round,
13651421 bool is_approved_66pct = stat->approved_66pct_at > 0.0 ;
13661422
13671423 if (allow_optimistic_generation_ && !was_approved_66pct && is_approved_66pct && cur_round_ == round &&
1368- description ().get_node_priority (local_idx (), round + 1 ) == 0 && blocks_.contains (candidate_id)) {
1369- auto &block = blocks_[candidate_id];
1370- if (cur_round_ == first_block_round_ &&
1371- description ().get_node_priority (description ().get_source_idx (PublicKeyHash{block->src_ }), cur_round_) == 0 ) {
1372- callback_->generate_block_optimistic (BlockSourceInfo{description ().get_source_public_key (local_idx ()),
1373- BlockCandidatePriority{round + 1 , round + 1 , 0 }},
1374- block->data_ .clone (), block->root_hash_ , stat->block_id .file_hash ,
1375- [=, SelfId = actor_id (this )](td::Result<GeneratedCandidate> R) {
1376- if (R.is_error ()) {
1377- return ;
1378- }
1379- td::actor::send_closure (
1380- SelfId, &ValidatorSessionImpl::generated_optimistic_candidate,
1381- round + 1 , R.move_as_ok (), candidate_id);
1382- });
1424+ cur_round_ == first_block_round_ && description ().get_node_priority (local_idx (), round + 1 ) == 0 &&
1425+ blocks_.contains (candidate_id) &&
1426+ description ().get_node_priority (description ().get_source_idx (stat->validator_id ), cur_round_) == 0 ) {
1427+ if (blocks_.contains (candidate_id)) {
1428+ generate_block_optimistic (round, candidate_id);
1429+ } else {
1430+ block_waiters_[candidate_id].push_back ([=, SelfId = actor_id (this )](td::Result<td::Unit> R) {
1431+ if (R.is_ok ()) {
1432+ td::actor::send_closure (SelfId, &ValidatorSessionImpl::generate_block_optimistic, round, candidate_id);
1433+ }
1434+ });
13831435 }
13841436 }
13851437}
13861438
1439+ void ValidatorSessionImpl::generate_block_optimistic (td::uint32 cur_round,
1440+ ValidatorSessionCandidateId prev_candidate_id) {
1441+ if (cur_round != cur_round_) {
1442+ return ;
1443+ }
1444+ auto it = blocks_.find (prev_candidate_id);
1445+ if (it == blocks_.end ()) {
1446+ return ;
1447+ }
1448+ auto &block = it->second ;
1449+ auto stat = stats_get_candidate_stat_by_id (cur_round, prev_candidate_id);
1450+ if (!stat) {
1451+ return ;
1452+ }
1453+ callback_->generate_block_optimistic (BlockSourceInfo{description ().get_source_public_key (local_idx ()),
1454+ BlockCandidatePriority{cur_round + 1 , cur_round + 1 , 0 }},
1455+ block->data_ .clone (), block->root_hash_ , stat->block_id .file_hash ,
1456+ [=, SelfId = actor_id (this )](td::Result<GeneratedCandidate> R) {
1457+ if (R.is_error ()) {
1458+ LOG (DEBUG) << " Optimistic generation error: " << R.move_as_error ();
1459+ return ;
1460+ }
1461+ td::actor::send_closure (SelfId,
1462+ &ValidatorSessionImpl::generated_optimistic_candidate,
1463+ cur_round + 1 , R.move_as_ok (), prev_candidate_id);
1464+ });
1465+ }
1466+
13871467void ValidatorSessionImpl::generated_optimistic_candidate (td::uint32 round, GeneratedCandidate candidate,
13881468 ValidatorSessionCandidateId prev_candidate) {
13891469 if (cur_round_ > round) {
@@ -1396,7 +1476,7 @@ td::actor::ActorOwn<ValidatorSession> ValidatorSession::create(
13961476 catchain::CatChainSessionId session_id, ValidatorSessionOptions opts, PublicKeyHash local_id,
13971477 std::vector<ValidatorSessionNode> nodes, std::unique_ptr<Callback> callback,
13981478 td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
1399- td::actor::ActorId<rldp ::Rldp> rldp, td::actor::ActorId<overlay::Overlays> overlays, std::string db_root,
1479+ td::actor::ActorId<rldp2 ::Rldp> rldp, td::actor::ActorId<overlay::Overlays> overlays, std::string db_root,
14001480 std::string db_suffix, bool allow_unsafe_self_blocks_resync) {
14011481 return td::actor::create_actor<ValidatorSessionImpl>(" session" , session_id, std::move (opts), local_id,
14021482 std::move (nodes), std::move (callback), keyring, adnl, rldp,
0 commit comments