Skip to content

Commit 3be416e

Browse files
committed
Fix processing applied blocks in CollatorNodeSession
1 parent 7e7195b commit 3be416e

File tree

2 files changed

+52
-4
lines changed

2 files changed

+52
-4
lines changed

validator/collator-node/collator-node-session.cpp

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ CollatorNodeSession::CollatorNodeSession(ShardIdFull shard, std::vector<BlockIdE
5050
, manager_(manager)
5151
, adnl_(adnl)
5252
, rldp_(rldp)
53+
, first_block_seqno_(get_next_block_seqno(prev_))
5354
, next_block_seqno_(get_next_block_seqno(prev_)) {
5455
collated_data_merged_upto_ = next_block_seqno_;
5556
update_masterchain_config(state);
@@ -86,10 +87,10 @@ void CollatorNodeSession::new_shard_block_accepted(BlockIdExt block_id, bool can
8687
return;
8788
}
8889
LOG(INFO) << "New shard block #" << block_id.seqno();
89-
LOG(DEBUG) << "New shard block " << block_id.to_str();
9090
next_block_seqno_ = block_id.seqno() + 1;
9191
prev_ = {block_id};
92-
accepted_blocks_[block_id.seqno()] = block_id;
92+
93+
process_accepted_block(block_id);
9394

9495
while (!cache_.empty()) {
9596
auto& [cache_prev, entry] = *cache_.begin();
@@ -116,8 +117,6 @@ void CollatorNodeSession::new_shard_block_accepted(BlockIdExt block_id, bool can
116117
cache_.erase(cache_.begin());
117118
}
118119

119-
try_merge_collated_data(block_id);
120-
121120
if (can_generate_) {
122121
generate_block(prev_, {}, {}, {}, td::Timestamp::in(10.0), [](td::Result<BlockCandidate>) {});
123122
}
@@ -401,6 +400,50 @@ void CollatorNodeSession::alarm() {
401400
}
402401
}
403402

403+
void CollatorNodeSession::process_accepted_block(BlockIdExt block_id) {
404+
if (!accepted_blocks_.emplace(block_id.seqno(), block_id).second) {
405+
return;
406+
}
407+
LOG(INFO) << "Accepted block " << block_id.to_str();
408+
try_merge_collated_data(block_id);
409+
410+
if (accepted_blocks_.contains(block_id.seqno() - 1) || block_id.seqno() == first_block_seqno_) {
411+
return;
412+
}
413+
LOG(INFO) << "Prev block for " << block_id.id.to_str() << " is not processed, waiting block data";
414+
process_accepted_block_cont(block_id);
415+
}
416+
417+
void CollatorNodeSession::process_accepted_block_cont(BlockIdExt block_id) {
418+
td::actor::send_closure(
419+
manager_, &ValidatorManager::wait_block_data_short, block_id, 0, td::Timestamp::in(30.0),
420+
[SelfId = actor_id(this), block_id](td::Result<Ref<BlockData>> R) mutable {
421+
if (R.is_error()) {
422+
LOG(WARNING) << "Wait block data for #" << block_id.seqno() << ": " << R.error();
423+
td::actor::send_closure(SelfId, &CollatorNodeSession::process_accepted_block_cont, block_id);
424+
} else {
425+
td::actor::send_closure(SelfId, &CollatorNodeSession::process_accepted_block_cont2, R.move_as_ok());
426+
}
427+
});
428+
}
429+
430+
void CollatorNodeSession::process_accepted_block_cont2(Ref<BlockData> block) {
431+
LOG(DEBUG) << "Wait block data for #" << block->block_id().seqno() << ": OK";
432+
std::vector<BlockIdExt> prev;
433+
BlockIdExt mc_block_id;
434+
bool after_split;
435+
if (!block::unpack_block_prev_blk(block->root_cell(), block->block_id(), prev, mc_block_id, after_split)) {
436+
LOG(ERROR) << "Unpack block data for #" << block->block_id().seqno() << ": error";
437+
return;
438+
}
439+
if (prev.size() != 1) {
440+
LOG(ERROR) << "Unpack block data for #" << block->block_id().seqno() << ": not single prev block";
441+
return;
442+
}
443+
process_accepted_block(prev[0]);
444+
}
445+
446+
404447
void CollatorNodeSession::wait_collated_data_merged(BlockSeqno seqno, td::Timestamp timeout,
405448
td::Promise<td::Unit> promise) {
406449
if (!merge_collated_data_enabled_ || collated_data_merged_upto_ >= seqno) {

validator/collator-node/collator-node-session.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ class CollatorNodeSession : public td::actor::Actor {
7676
void cancel(td::Status reason);
7777
};
7878

79+
BlockSeqno first_block_seqno_;
7980
BlockSeqno next_block_seqno_;
8081
std::map<std::vector<BlockIdExt>, std::shared_ptr<CacheEntry>> cache_;
8182

@@ -101,6 +102,10 @@ class CollatorNodeSession : public td::actor::Actor {
101102
BlockSeqno collated_data_merged_upto_ = 0;
102103
std::map<BlockSeqno, std::vector<std::pair<td::Promise<td::Unit>, td::Timestamp>>> collated_data_merged_waiters_;
103104

105+
void process_accepted_block(BlockIdExt block_id);
106+
void process_accepted_block_cont(BlockIdExt block_id);
107+
void process_accepted_block_cont2(Ref<BlockData> block);
108+
104109
void wait_collated_data_merged(BlockSeqno seqno, td::Timestamp timeout, td::Promise<td::Unit> promise);
105110
void try_merge_collated_data(BlockIdExt block_id);
106111
void try_merge_collated_data_from_net(BlockIdExt block_id);

0 commit comments

Comments
 (0)