Skip to content

Commit 6308fb7

Browse files
committed
Support optimistic collation/validation
1 parent 655d7e6 commit 6308fb7

15 files changed

+173
-87
lines changed

tdutils/td/utils/Status.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
#define TRY_RESULT_ASSIGN(name, result) TRY_RESULT_IMPL(TD_CONCAT(r_response, __LINE__), name, result)
7575

7676
#define TRY_RESULT_PROMISE_ASSIGN(promise_name, name, result) \
77-
TRY_RESULT_PROMISE_IMPL(promise_name, TD_CONCAT(TD_CONCAT(r_, name), __LINE__), name, result)
77+
TRY_RESULT_PROMISE_IMPL(promise_name, TD_CONCAT(r_response, __LINE__), name, result)
7878

7979
#define TRY_RESULT_PREFIX(name, result, prefix) \
8080
TRY_RESULT_PREFIX_IMPL(TD_CONCAT(TD_CONCAT(r_, name), __LINE__), auto name, result, prefix)
@@ -86,7 +86,7 @@
8686
TRY_RESULT_PROMISE_PREFIX_IMPL(promise_name, TD_CONCAT(TD_CONCAT(r_, name), __LINE__), auto name, result, prefix)
8787

8888
#define TRY_RESULT_PROMISE_PREFIX_ASSIGN(promise_name, name, result, prefix) \
89-
TRY_RESULT_PROMISE_PREFIX_IMPL(promise_name, TD_CONCAT(TD_CONCAT(r_, name), __LINE__), name, result, prefix)
89+
TRY_RESULT_PROMISE_PREFIX_IMPL(promise_name, TD_CONCAT(r_response, __LINE__), name, result, prefix)
9090

9191
#define TRY_RESULT_IMPL(r_name, name, result) \
9292
auto r_name = (result); \

validator-session/validator-session.cpp

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1578,18 +1578,18 @@ void ValidatorSessionImpl::generate_block_optimistic(td::uint32 cur_round,
15781578
if (!stat) {
15791579
return;
15801580
}
1581-
callback_->generate_block_optimistic(BlockSourceInfo{description().get_source_public_key(local_idx()),
1582-
BlockCandidatePriority{cur_round + 1, cur_round + 1, 0}},
1583-
block->data_.clone(), block->root_hash_, stat->block_id.file_hash,
1584-
[=, SelfId = actor_id(this)](td::Result<GeneratedCandidate> R) {
1585-
if (R.is_error()) {
1586-
LOG(DEBUG) << "Optimistic generation error: " << R.move_as_error();
1587-
return;
1588-
}
1589-
td::actor::send_closure(SelfId,
1590-
&ValidatorSessionImpl::generated_optimistic_candidate,
1591-
cur_round + 1, R.move_as_ok(), prev_candidate_id);
1592-
});
1581+
callback_->generate_block_optimistic(
1582+
BlockSourceInfo{description().get_source_public_key(local_idx()),
1583+
BlockCandidatePriority{cur_round + 1, cur_round + 1, 0}},
1584+
block->data_.clone(), block->collated_data_.clone(), block->root_hash_, stat->block_id.file_hash,
1585+
[=, SelfId = actor_id(this)](td::Result<GeneratedCandidate> R) {
1586+
if (R.is_error()) {
1587+
LOG(DEBUG) << "Optimistic generation error: " << R.move_as_error();
1588+
return;
1589+
}
1590+
td::actor::send_closure(SelfId, &ValidatorSessionImpl::generated_optimistic_candidate, cur_round + 1,
1591+
R.move_as_ok(), prev_candidate_id);
1592+
});
15931593
}
15941594

15951595
void ValidatorSessionImpl::generated_optimistic_candidate(td::uint32 round, GeneratedCandidate candidate,

validator-session/validator-session.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ class ValidatorSession : public td::actor::Actor {
9595
ValidatorSessionCollatedDataFileHash collated_data_file_hash,
9696
td::Promise<BlockCandidate> promise) = 0;
9797
virtual void generate_block_optimistic(BlockSourceInfo source_info, td::BufferSlice prev_block,
98-
RootHash prev_root_hash, FileHash prev_file_hash,
99-
td::Promise<GeneratedCandidate> promise) {
98+
td::BufferSlice prev_collated_data, RootHash prev_root_hash,
99+
FileHash prev_file_hash, td::Promise<GeneratedCandidate> promise) {
100100
}
101101
virtual void on_optimistic_candidate(BlockSourceInfo source_info, ValidatorSessionRootHash root_hash,
102102
td::BufferSlice data, td::BufferSlice collated_data, PublicKey prev_source,

validator/collation-manager.cpp

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ void CollationManager::collate_shard_block(CollateParams params, BlockCandidateP
159159
next_block_id.seqno = std::max(next_block_id.seqno, p.seqno() + 1);
160160
}
161161

162-
td::Promise<BlockCandidate> P = [=, SelfId = actor_id(this), promise = std::move(promise),
162+
td::Promise<BlockCandidate> P = [=, SelfId = actor_id(this), promise = std::move(promise), params = params.clone(),
163163
retry_at = td::Timestamp::in(0.5)](td::Result<BlockCandidate> R) mutable {
164164
if (R.is_ok()) {
165165
promise.set_value(GeneratedCandidate{.candidate = R.move_as_ok(),
@@ -178,9 +178,9 @@ void CollationManager::collate_shard_block(CollateParams params, BlockCandidateP
178178
return;
179179
}
180180
delay_action(
181-
[=, promise = std::move(promise)]() mutable {
182-
td::actor::send_closure(SelfId, &CollationManager::collate_shard_block, params, priority, max_answer_size,
183-
cancellation_token, std::move(promise), timeout);
181+
[=, promise = std::move(promise), params = std::move(params)]() mutable {
182+
td::actor::send_closure(SelfId, &CollationManager::collate_shard_block, std::move(params), priority,
183+
max_answer_size, cancellation_token, std::move(promise), timeout);
184184
},
185185
retry_at);
186186
};
@@ -203,7 +203,7 @@ void CollationManager::collate_shard_block(CollateParams params, BlockCandidateP
203203
LOG(INFO) << "sending collate query for " << next_block_id.to_str() << ": send to #" << selected_idx << "("
204204
<< selected_collator << ")";
205205

206-
td::Promise<td::BufferSlice> P2 = [=, SelfId = actor_id(this), P = std::move(P),
206+
td::Promise<td::BufferSlice> P2 = [=, SelfId = actor_id(this), P = std::move(P), creator = params.creator,
207207
timer = td::Timer()](td::Result<td::BufferSlice> R) mutable {
208208
TRY_RESULT_PROMISE_PREFIX(P, data, std::move(R), "rldp query failed: ");
209209
auto r_error = fetch_tl_object<ton_api::collatorNode_error>(data, true);
@@ -214,7 +214,7 @@ void CollationManager::collate_shard_block(CollateParams params, BlockCandidateP
214214
}
215215
TRY_RESULT_PROMISE(P, f, fetch_tl_object<ton_api::collatorNode_Candidate>(data, true));
216216
TRY_RESULT_PROMISE(P, candidate, deserialize_candidate(std::move(f), td::narrow_cast<int>(max_answer_size)));
217-
if (candidate.pubkey.as_bits256() != params.creator.as_bits256()) {
217+
if (candidate.pubkey.as_bits256() != creator.as_bits256()) {
218218
P.set_error(td::Status::Error("collate query: block candidate source mismatch"));
219219
return;
220220
}
@@ -231,6 +231,7 @@ void CollationManager::collate_shard_block(CollateParams params, BlockCandidateP
231231
BlockIdExt prev_block_id = params.optimistic_prev_block->block_id();
232232
auto& entry = optimistic_prev_cache_[prev_block_id];
233233
entry.block_data = params.optimistic_prev_block->data().clone();
234+
entry.collated_data = params.optimistic_prev_collated_data.clone();
234235
++entry.refcnt;
235236
P2 = [this, SelfId = actor_id(this), prev_block_id, P2 = std::move(P2)](td::Result<td::BufferSlice> R) mutable {
236237
P2.set_result(std::move(R));
@@ -479,18 +480,22 @@ void CollationManager::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice
479480
}
480481
TRY_RESULT_PROMISE(promise, query, fetch_tl_object<ton_api::collatorNode_requestBlockCallback>(data, true));
481482
BlockIdExt block_id = create_block_id(query->block_id_);
483+
bool with_collated_data = query->flags_ & 1;
482484
auto it = optimistic_prev_cache_.find(block_id);
483485
if (it == optimistic_prev_cache_.end()) {
484-
LOG(INFO) << "collatorNode.requestBlockCallback from " << src << " block " << block_id.to_str() << " : not found";
486+
LOG(INFO) << "collatorNode.requestBlockCallback from " << src << " block " << block_id.to_str() << " ("
487+
<< (with_collated_data ? "with" : "no") << " cdata) : not found";
485488
promise.set_error(td::Status::Error("block not found"));
486489
return;
487490
}
488-
LOG(INFO) << "collatorNode.requestBlockCallback from " << src << " block " << block_id.to_str() << " : OK";
489-
promise.set_value(
490-
serialize_tl_object(serialize_candidate(BlockCandidate(Ed25519_PublicKey{td::Bits256::zero()}, block_id,
491-
td::Bits256::zero(), it->second.block_data.clone(), {}),
492-
true),
493-
true));
491+
LOG(INFO) << "collatorNode.requestBlockCallback from " << src << " block " << block_id.to_str() << " ("
492+
<< (with_collated_data ? "with" : "no") << " cdata) : OK";
493+
promise.set_value(serialize_tl_object(
494+
serialize_candidate(BlockCandidate(Ed25519_PublicKey{td::Bits256::zero()}, block_id, td::Bits256::zero(),
495+
it->second.block_data.clone(),
496+
with_collated_data ? it->second.collated_data.clone() : td::BufferSlice{}),
497+
true),
498+
true));
494499
}
495500

496501
} // namespace ton::validator

validator/collation-manager.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ class CollationManager : public td::actor::Actor {
9696

9797
struct OptimisticPrevCache {
9898
td::BufferSlice block_data;
99+
td::BufferSlice collated_data;
99100
size_t refcnt = 0;
100101
};
101102
std::map<BlockIdExt, OptimisticPrevCache> optimistic_prev_cache_;

validator/collator-node/collator-node-session.cpp

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ void CollatorNodeSession::start_up() {
6262
collated_data_deduplicator_ = std::make_shared<CollatedDataDeduplicator>();
6363
}
6464
if (can_generate_) {
65-
generate_block(prev_, {}, {}, td::Timestamp::in(10.0), [](td::Result<BlockCandidate>) {});
65+
generate_block(prev_, {}, {}, {}, td::Timestamp::in(10.0), [](td::Result<BlockCandidate>) {});
6666
}
6767
}
6868

@@ -72,6 +72,11 @@ void CollatorNodeSession::tear_down() {
7272
for (auto& [_, entry] : cache_) {
7373
entry->cancel(td::Status::Error("validator session finished"));
7474
}
75+
for (auto& [_, vec] : collated_data_merged_waiters_) {
76+
for (auto& [promise, _] : vec) {
77+
promise.set_error(td::Status::Error("validator session finished"));
78+
}
79+
}
7580
}
7681

7782
void CollatorNodeSession::new_shard_block_accepted(BlockIdExt block_id, bool can_generate) {
@@ -114,7 +119,7 @@ void CollatorNodeSession::new_shard_block_accepted(BlockIdExt block_id, bool can
114119
try_merge_collated_data(block_id);
115120

116121
if (can_generate_) {
117-
generate_block(prev_, {}, {}, td::Timestamp::in(10.0), [](td::Result<BlockCandidate>) {});
122+
generate_block(prev_, {}, {}, {}, td::Timestamp::in(10.0), [](td::Result<BlockCandidate>) {});
118123
}
119124
}
120125

@@ -143,7 +148,8 @@ void CollatorNodeSession::update_masterchain_config(td::Ref<MasterchainState> st
143148

144149
void CollatorNodeSession::generate_block(std::vector<BlockIdExt> prev_blocks,
145150
td::optional<BlockCandidatePriority> o_priority,
146-
td::Ref<BlockData> o_optimistic_prev_block, td::Timestamp timeout,
151+
td::Ref<BlockData> o_optimistic_prev_block,
152+
td::BufferSlice o_optimistic_prev_collated_data, td::Timestamp timeout,
147153
td::Promise<BlockCandidate> promise) {
148154
bool is_external = !o_priority;
149155
bool is_optimistic = o_optimistic_prev_block.not_null();
@@ -242,6 +248,7 @@ void CollatorNodeSession::generate_block(std::vector<BlockIdExt> prev_blocks,
242248
.collator_node_id = local_id_,
243249
.skip_store_candidate = true,
244250
.optimistic_prev_block = o_optimistic_prev_block,
251+
.optimistic_prev_collated_data = std::move(o_optimistic_prev_collated_data),
245252
.collated_data_deduplicator = collated_data_deduplicator_};
246253
auto token = cache_entry->cancellation_token_source.get_cancellation_token();
247254
wait_collated_data_merged(
@@ -295,26 +302,33 @@ void CollatorNodeSession::process_request(adnl::AdnlNodeIdShort src, std::vector
295302
if (it == cache_.end() || it->second->started) {
296303
BlockIdExt prev_block = prev_blocks[0];
297304
td::actor::send_closure(
298-
manager_, &ValidatorManager::get_candidate_data_by_block_id_from_db, prev_block,
299-
[=, SelfId = actor_id(this), promise = std::move(promise)](td::Result<td::BufferSlice> R) mutable {
305+
manager_, &ValidatorManager::get_block_candidate_by_block_id_from_db, prev_block,
306+
[=, SelfId = actor_id(this), promise = std::move(promise)](td::Result<BlockCandidate> R) mutable {
307+
td::Result<std::pair<td::BufferSlice, td::BufferSlice>> res;
308+
if (R.is_error()) {
309+
res = R.move_as_error();
310+
} else {
311+
BlockCandidate c = R.move_as_ok();
312+
res = std::make_pair(std::move(c.data), std::move(c.collated_data));
313+
}
300314
td::actor::send_closure(SelfId, &CollatorNodeSession::process_request_optimistic_cont, src, prev_block,
301-
priority, timeout, std::move(promise), std::move(R));
315+
priority, timeout, std::move(promise), std::move(res));
302316
});
303317
return;
304318
}
305319
}
306-
generate_block(std::move(prev_blocks), priority, {}, timeout, std::move(promise));
320+
generate_block(std::move(prev_blocks), priority, {}, {}, timeout, std::move(promise));
307321
}
308322

309-
void CollatorNodeSession::process_request_optimistic_cont(adnl::AdnlNodeIdShort src, BlockIdExt prev_block_id,
310-
BlockCandidatePriority priority, td::Timestamp timeout,
311-
td::Promise<BlockCandidate> promise,
312-
td::Result<td::BufferSlice> prev_block_data) {
313-
if (prev_block_data.is_ok()) {
314-
TRY_RESULT_PROMISE_PREFIX(promise, prev_block, create_block(prev_block_id, prev_block_data.move_as_ok()),
323+
void CollatorNodeSession::process_request_optimistic_cont(
324+
adnl::AdnlNodeIdShort src, BlockIdExt prev_block_id, BlockCandidatePriority priority, td::Timestamp timeout,
325+
td::Promise<BlockCandidate> promise, td::Result<std::pair<td::BufferSlice, td::BufferSlice>> prev_candidate) {
326+
if (prev_candidate.is_ok()) {
327+
auto [prev_block_data, prev_collated_data] = prev_candidate.move_as_ok();
328+
TRY_RESULT_PROMISE_PREFIX(promise, prev_block, create_block(prev_block_id, std::move(prev_block_data)),
315329
"invalid prev block data in db: ");
316330
LOG(INFO) << "got prev block from db for optimistic collation: " << prev_block_id.to_str();
317-
generate_block({prev_block_id}, priority, prev_block, timeout, std::move(promise));
331+
generate_block({prev_block_id}, priority, prev_block, std::move(prev_collated_data), timeout, std::move(promise));
318332
return;
319333
}
320334
td::actor::send_closure(
@@ -324,7 +338,8 @@ void CollatorNodeSession::process_request_optimistic_cont(adnl::AdnlNodeIdShort
324338
timeout, std::move(promise), std::move(R));
325339
},
326340
timeout,
327-
create_serialize_tl_object<ton_api::collatorNode_requestBlockCallback>(0, create_tl_block_id(prev_block_id)),
341+
create_serialize_tl_object<ton_api::collatorNode_requestBlockCallback>(merge_collated_data_enabled_ ? 1 : 0,
342+
create_tl_block_id(prev_block_id)),
328343
max_candidate_size_);
329344
}
330345

@@ -335,13 +350,29 @@ void CollatorNodeSession::process_request_optimistic_cont2(BlockIdExt prev_block
335350
"failed to download prev block data for optimistic collation: ");
336351
TRY_RESULT_PROMISE_PREFIX(promise, f, fetch_tl_object<ton_api::collatorNode_Candidate>(response, true),
337352
"failed to download prev block data for optimistic collation: ");
338-
TRY_RESULT_PROMISE_PREFIX(promise, candidate,
339-
deserialize_candidate(std::move(f), max_candidate_size_),
353+
TRY_RESULT_PROMISE_PREFIX(promise, candidate, deserialize_candidate(std::move(f), max_candidate_size_),
340354
"failed to download prev block data for optimistic collation: ");
341355
TRY_RESULT_PROMISE_PREFIX(promise, prev_block, create_block(prev_block_id, std::move(candidate.data)),
342356
"invalid prev block data from validator: ");
357+
if (merge_collated_data_enabled_) {
358+
block::gen::Block::Record rec;
359+
block::gen::BlockInfo::Record info;
360+
if (!block::gen::unpack_cell(prev_block->root_cell(), rec) || !block::gen::unpack_cell(rec.info, info)) {
361+
promise.set_error(td::Status::Error("failed to unpack prev block header"));
362+
return;
363+
}
364+
if (info.flags & 2) {
365+
FileHash stored_collated_data_hash;
366+
info.collated_data_hash->prefetch_bits_to(stored_collated_data_hash);
367+
if (stored_collated_data_hash != candidate.collated_file_hash) {
368+
promise.set_error(td::Status::Error("collated data hash mismatch"));
369+
return;
370+
}
371+
}
372+
}
343373
LOG(INFO) << "got prev block from validator for optimistic collation: " << prev_block_id.to_str();
344-
generate_block({prev_block_id}, priority, prev_block, timeout, std::move(promise));
374+
generate_block({prev_block_id}, priority, prev_block, std::move(candidate.collated_data), timeout,
375+
std::move(promise));
345376
}
346377

347378
void CollatorNodeSession::CacheEntry::cancel(td::Status reason) {

validator/collator-node/collator-node-session.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,14 @@ class CollatorNodeSession : public td::actor::Actor {
8282
td::uint32 max_candidate_size_ = 0;
8383

8484
void generate_block(std::vector<BlockIdExt> prev_blocks, td::optional<BlockCandidatePriority> o_priority,
85-
td::Ref<BlockData> o_optimistic_prev_block, td::Timestamp timeout,
86-
td::Promise<BlockCandidate> promise);
85+
td::Ref<BlockData> o_optimistic_prev_block, td::BufferSlice o_optimistic_prev_collated_data,
86+
td::Timestamp timeout, td::Promise<BlockCandidate> promise);
8787
void process_result(std::shared_ptr<CacheEntry> cache_entry, td::Result<BlockCandidate> R);
8888

8989
void process_request_optimistic_cont(adnl::AdnlNodeIdShort src, BlockIdExt prev_block_id,
9090
BlockCandidatePriority priority, td::Timestamp timeout,
9191
td::Promise<BlockCandidate> promise,
92-
td::Result<td::BufferSlice> prev_block_data);
92+
td::Result<std::pair<td::BufferSlice, td::BufferSlice>> prev_candidate);
9393
void process_request_optimistic_cont2(BlockIdExt prev_block_id, BlockCandidatePriority priority,
9494
td::Timestamp timeout, td::Promise<BlockCandidate> promise,
9595
td::Result<td::BufferSlice> R);

validator/collator-node/collator-node.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ void CollatorNode::new_masterchain_block_notification(td::Ref<MasterchainState>
193193
}
194194
}
195195
for (BlockCandidate& candidate : future_group.pending_candidate_broadcasts) {
196-
td::actor::send_closure(it->second.actor, &CollatorNodeSession::on_block_candidate_broadcast, std::move(candidate));
196+
td::actor::send_closure(it->second.actor, &CollatorNodeSession::on_block_candidate_broadcast,
197+
std::move(candidate));
197198
}
198199
for (auto& promise : future_group.promises) {
199200
promise.set_value(td::Unit());

0 commit comments

Comments
 (0)