1717#include " collation-manager.hpp"
1818
1919#include " collator-node/collator-node.hpp"
20+ #include " collator-node/utils.hpp"
2021#include " fabric.h"
2122#include " td/utils/Random.h"
2223
@@ -29,6 +30,29 @@ namespace ton::validator {
2930void CollationManager::start_up () {
3031 td::actor::send_closure (rldp_, &rldp2::Rldp::add_id, local_id_);
3132 update_collators_list (*opts_->get_collators_list ());
33+
34+ class Cb : public adnl ::Adnl::Callback {
35+ public:
36+ explicit Cb (td::actor::ActorId<CollationManager> id) : id_(std::move(id)) {
37+ }
38+ void receive_message (adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data) override {
39+ }
40+ void receive_query (adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data,
41+ td::Promise<td::BufferSlice> promise) override {
42+ td::actor::send_closure (id_, &CollationManager::receive_query, src, std::move (data), std::move (promise));
43+ }
44+
45+ private:
46+ td::actor::ActorId<CollationManager> id_;
47+ };
48+ td::actor::send_closure (adnl_, &adnl::Adnl::subscribe, local_id_,
49+ adnl::Adnl::int_to_bytestring (ton_api::collatorNode_requestBlockCallback::ID),
50+ std::make_unique<Cb>(actor_id (this )));
51+ }
52+
53+ void CollationManager::tear_down () {
54+ td::actor::send_closure (adnl_, &adnl::Adnl::unsubscribe, local_id_,
55+ adnl::Adnl::int_to_bytestring (ton_api::collatorNode_requestBlockCallback::ID));
3256}
3357
3458void CollationManager::collate_block (ShardIdFull shard, BlockIdExt min_masterchain_block_id,
@@ -54,33 +78,55 @@ void CollationManager::collate_block(ShardIdFull shard, BlockIdExt min_mastercha
5478 proto_version);
5579}
5680
57- void CollationManager::collate_next_block (ShardIdFull shard, BlockIdExt min_masterchain_block_id,
58- BlockIdExt prev_block_id, td::BufferSlice prev_block,
59- Ed25519_PublicKey creator, BlockCandidatePriority priority,
60- td::Ref<ValidatorSet> validator_set, td::uint64 max_answer_size,
61- td::CancellationToken cancellation_token,
62- td::Promise<GeneratedCandidate> promise, int proto_version) {
63- TRY_RESULT_PROMISE (promise, prev_block_data, create_block (prev_block_id, std::move (prev_block)));
64- run_collate_query (
65- CollateParams{.shard = shard,
66- .min_masterchain_block_id = min_masterchain_block_id,
67- .prev = {prev_block_id},
68- .creator = creator,
69- .validator_set = std::move (validator_set),
70- .collator_opts = opts_->get_collator_options (),
71- .optimistic_prev_block_ = std::move (prev_block_data)},
72- manager_, td::Timestamp::in (10.0 ), std::move (cancellation_token), promise.wrap ([](BlockCandidate&& candidate) {
73- return GeneratedCandidate{.candidate = std::move (candidate), .is_cached = false , .self_collated = true };
74- }));
75- // TODO: request to collator node
81+ void CollationManager::collate_block_optimistic (ShardIdFull shard, BlockIdExt min_masterchain_block_id,
82+ BlockIdExt prev_block_id, td::BufferSlice prev_block,
83+ Ed25519_PublicKey creator, BlockCandidatePriority priority,
84+ td::Ref<ValidatorSet> validator_set, td::uint64 max_answer_size,
85+ td::CancellationToken cancellation_token,
86+ td::Promise<GeneratedCandidate> promise, int proto_version) {
87+ if (shard.is_masterchain ()) {
88+ TRY_RESULT_PROMISE (promise, prev_block_data, create_block (prev_block_id, std::move (prev_block)));
89+ run_collate_query (
90+ CollateParams{.shard = shard,
91+ .min_masterchain_block_id = min_masterchain_block_id,
92+ .prev = {prev_block_id},
93+ .creator = creator,
94+ .validator_set = std::move (validator_set),
95+ .collator_opts = opts_->get_collator_options (),
96+ .optimistic_prev_block = std::move (prev_block_data)},
97+ manager_, td::Timestamp::in (10.0 ), std::move (cancellation_token), promise.wrap ([](BlockCandidate&& candidate) {
98+ return GeneratedCandidate{.candidate = std::move (candidate), .is_cached = false , .self_collated = true };
99+ }));
100+ return ;
101+ }
102+
103+ auto & entry = optimistic_prev_cache_[prev_block_id];
104+ entry.block_data = std::move (prev_block);
105+ ++entry.refcnt ;
106+ promise = [this , SelfId = actor_id (this ), prev_block_id,
107+ promise = std::move (promise)](td::Result<GeneratedCandidate> R) mutable {
108+ promise.set_result (std::move (R));
109+ td::actor::send_lambda_later (SelfId, [=, this ]() {
110+ auto it = optimistic_prev_cache_.find (prev_block_id);
111+ CHECK (it != optimistic_prev_cache_.end ());
112+ CHECK (it->second .refcnt > 0 );
113+ if (--it->second .refcnt == 0 ) {
114+ optimistic_prev_cache_.erase (it);
115+ }
116+ });
117+ };
118+
119+ collate_shard_block (shard, min_masterchain_block_id, {prev_block_id}, creator, priority, std::move (validator_set),
120+ max_answer_size, std::move (cancellation_token), std::move (promise), td::Timestamp::in (10.0 ),
121+ proto_version, true );
76122}
77123
78124void CollationManager::collate_shard_block (ShardIdFull shard, BlockIdExt min_masterchain_block_id,
79125 std::vector<BlockIdExt> prev, Ed25519_PublicKey creator,
80126 BlockCandidatePriority priority, td::Ref<ValidatorSet> validator_set,
81127 td::uint64 max_answer_size, td::CancellationToken cancellation_token,
82128 td::Promise<GeneratedCandidate> promise, td::Timestamp timeout,
83- int proto_version) {
129+ int proto_version, bool is_optimistic ) {
84130 TRY_STATUS_PROMISE (promise, cancellation_token.check ());
85131 ShardInfo* s = select_shard_info (shard);
86132 if (s == nullptr ) {
@@ -91,12 +137,22 @@ void CollationManager::collate_shard_block(ShardIdFull shard, BlockIdExt min_mas
91137
92138 adnl::AdnlNodeIdShort selected_collator = adnl::AdnlNodeIdShort::zero ();
93139 size_t selected_idx = 0 ;
140+ auto check_collator = [&](const adnl::AdnlNodeIdShort& id) -> bool {
141+ auto & collator = collators_[id];
142+ if (!collator.alive ) {
143+ return false ;
144+ }
145+ if (is_optimistic && collator.version < CollatorNode::VERSION_OPTIMISTIC_COLLATE) {
146+ return false ;
147+ }
148+ return true ;
149+ };
94150 switch (s->select_mode ) {
95151 case CollatorsList::mode_random: {
96152 int cnt = 0 ;
97153 for (size_t i = 0 ; i < s->collators .size (); ++i) {
98154 adnl::AdnlNodeIdShort collator = s->collators [i];
99- if (collators_[ collator]. alive ) {
155+ if (check_collator ( collator) ) {
100156 ++cnt;
101157 if (td::Random::fast (1 , cnt) == 1 ) {
102158 selected_collator = collator;
@@ -109,7 +165,7 @@ void CollationManager::collate_shard_block(ShardIdFull shard, BlockIdExt min_mas
109165 case CollatorsList::mode_ordered: {
110166 for (size_t i = 0 ; i < s->collators .size (); ++i) {
111167 adnl::AdnlNodeIdShort collator = s->collators [i];
112- if (collators_[ collator]. alive ) {
168+ if (check_collator ( collator) ) {
113169 selected_collator = collator;
114170 selected_idx = i;
115171 break ;
@@ -121,7 +177,7 @@ void CollationManager::collate_shard_block(ShardIdFull shard, BlockIdExt min_mas
121177 size_t iters = 0 ;
122178 for (size_t i = s->cur_idx ; iters < s->collators .size (); (++i) %= s->collators .size (), ++iters) {
123179 adnl::AdnlNodeIdShort& collator = s->collators [i];
124- if (collators_[ collator]. alive ) {
180+ if (check_collator ( collator) ) {
125181 selected_collator = collator;
126182 selected_idx = i;
127183 s->cur_idx = (i + 1 ) % s->collators .size ();
@@ -133,13 +189,20 @@ void CollationManager::collate_shard_block(ShardIdFull shard, BlockIdExt min_mas
133189 }
134190
135191 if (selected_collator.is_zero () && s->self_collate ) {
192+ td::Ref<BlockData> optimistic_prev_block;
193+ if (is_optimistic) {
194+ CHECK (prev.size () == 1 );
195+ TRY_RESULT_PROMISE_ASSIGN (promise, optimistic_prev_block,
196+ create_block (prev[0 ], optimistic_prev_cache_.at (prev[0 ]).block_data .clone ()));
197+ }
136198 run_collate_query (
137199 CollateParams{.shard = shard,
138200 .min_masterchain_block_id = min_masterchain_block_id,
139201 .prev = std::move (prev),
140202 .creator = creator,
141203 .validator_set = std::move (validator_set),
142- .collator_opts = opts_->get_collator_options ()},
204+ .collator_opts = opts_->get_collator_options (),
205+ .optimistic_prev_block = std::move (optimistic_prev_block)},
143206 manager_, td::Timestamp::in (10.0 ), std::move (cancellation_token), promise.wrap ([](BlockCandidate&& candidate) {
144207 return GeneratedCandidate{.candidate = std::move (candidate), .is_cached = false , .self_collated = true };
145208 }));
@@ -175,19 +238,26 @@ void CollationManager::collate_shard_block(ShardIdFull shard, BlockIdExt min_mas
175238 [=, promise = std::move (promise)]() mutable {
176239 td::actor::send_closure (SelfId, &CollationManager::collate_shard_block, shard, min_masterchain_block_id, prev,
177240 creator, priority, validator_set, max_answer_size, cancellation_token,
178- std::move (promise), timeout, proto_version);
241+ std::move (promise), timeout, proto_version, is_optimistic );
179242 },
180243 retry_at);
181244 };
182245
183246 if (selected_collator.is_zero ()) {
184- P.set_error (td::Status::Error (PSTRING () << " shard " << shard.to_str () << " has no alive collator node" ));
247+ P.set_error (td::Status::Error (PSTRING () << " shard " << shard.to_str () << " has no suitable collator node" ));
185248 return ;
186249 }
187250
188- td::BufferSlice query = create_serialize_tl_object<ton_api::collatorNode_generateBlock>(
189- create_tl_shard_id (shard), validator_set->get_catchain_seqno (), std::move (prev_blocks), creator.as_bits256 (),
190- priority.round , priority.first_block_round , priority.priority );
251+ td::BufferSlice query;
252+ if (is_optimistic) {
253+ query = create_serialize_tl_object<ton_api::collatorNode_generateBlockOptimistic>(
254+ create_tl_shard_id (shard), validator_set->get_catchain_seqno (), std::move (prev_blocks), creator.as_bits256 (),
255+ priority.round , priority.first_block_round , priority.priority );
256+ } else {
257+ query = create_serialize_tl_object<ton_api::collatorNode_generateBlock>(
258+ create_tl_shard_id (shard), validator_set->get_catchain_seqno (), std::move (prev_blocks), creator.as_bits256 (),
259+ priority.round , priority.first_block_round , priority.priority );
260+ }
191261 LOG (INFO) << " sending collate query for " << next_block_id.to_str () << " : send to #" << selected_idx << " ("
192262 << selected_collator << " )" ;
193263
@@ -201,9 +271,8 @@ void CollationManager::collate_shard_block(ShardIdFull shard, BlockIdExt min_mas
201271 return ;
202272 }
203273 TRY_RESULT_PROMISE (P, f, fetch_tl_object<ton_api::collatorNode_Candidate>(data, true ));
204- TRY_RESULT_PROMISE (
205- P, candidate,
206- CollatorNode::deserialize_candidate (std::move (f), td::narrow_cast<int >(max_answer_size), proto_version));
274+ TRY_RESULT_PROMISE (P, candidate,
275+ deserialize_candidate (std::move (f), td::narrow_cast<int >(max_answer_size), proto_version));
207276 if (candidate.pubkey .as_bits256 () != creator.as_bits256 ()) {
208277 P.set_error (td::Status::Error (" collate query: block candidate source mismatch" ));
209278 return ;
@@ -363,7 +432,8 @@ void CollationManager::alarm() {
363432 }
364433 if (collator.ping_at .is_in_past ()) {
365434 collator.sent_ping = true ;
366- td::BufferSlice query = create_serialize_tl_object<ton_api::collatorNode_ping>(0 );
435+ td::BufferSlice query =
436+ create_serialize_tl_object<ton_api::collatorNode_ping>(ton_api::collatorNode_pong::VERSION_MASK);
367437 td::Promise<td::BufferSlice> P = [=, id = id, SelfId = actor_id (this )](td::Result<td::BufferSlice> R) mutable {
368438 td::actor::send_closure (SelfId, &CollationManager::got_pong, id, std::move (R));
369439 };
@@ -399,9 +469,11 @@ void CollationManager::got_pong(adnl::AdnlNodeIdShort id, td::Result<td::BufferS
399469 collator.alive = false ;
400470 collator.last_ping_status = r_pong.move_as_error ();
401471 } else {
402- LOG (DEBUG) << " pong from " << id << " : OK" ;
403472 collator.alive = true ;
404473 collator.last_ping_status = td::Status::OK ();
474+ auto pong = r_pong.move_as_ok ();
475+ collator.version = pong->flags_ & ton_api::collatorNode_pong::VERSION_MASK ? pong->version_ : 0 ;
476+ LOG (DEBUG) << " pong from " << id << " : OK, version=" << collator.version ;
405477 }
406478 collator.ping_at = td::Timestamp::in (td::Random::fast (10.0 , 20.0 ));
407479 if (collator.active_cnt && !collator.sent_ping ) {
@@ -421,4 +493,26 @@ void CollationManager::on_collate_query_error(adnl::AdnlNodeIdShort id) {
421493 }
422494}
423495
496+ void CollationManager::receive_query (adnl::AdnlNodeIdShort src, td::BufferSlice data,
497+ td::Promise<td::BufferSlice> promise) {
498+ if (!collators_.contains (src)) {
499+ promise.set_error (td::Status::Error (" got request from unknown collator" ));
500+ return ;
501+ }
502+ TRY_RESULT_PROMISE (promise, query, fetch_tl_object<ton_api::collatorNode_requestBlockCallback>(data, true ));
503+ BlockIdExt block_id = create_block_id (query->block_id_ );
504+ auto it = optimistic_prev_cache_.find (block_id);
505+ if (it == optimistic_prev_cache_.end ()) {
506+ LOG (INFO) << " collatorNode.requestBlockCallback from " << src << " block " << block_id.to_str () << " : not found" ;
507+ promise.set_error (td::Status::Error (" block not found" ));
508+ return ;
509+ }
510+ LOG (INFO) << " collatorNode.requestBlockCallback from " << src << " block " << block_id.to_str () << " : OK" ;
511+ promise.set_value (
512+ serialize_tl_object (serialize_candidate (BlockCandidate (Ed25519_PublicKey{td::Bits256::zero ()}, block_id,
513+ td::Bits256::zero (), it->second .block_data .clone (), {}),
514+ true ),
515+ true ));
516+ }
517+
424518} // namespace ton::validator
0 commit comments