Skip to content

Commit 6cbae03

Browse files
committed
Refactor CollatorNode
1 parent dd13096 commit 6cbae03

File tree

9 files changed

+471
-325
lines changed

9 files changed

+471
-325
lines changed

validator/CMakeLists.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ set(VALIDATOR_HEADERS
6464
shard-block-retainer.hpp
6565

6666
collation-manager.hpp
67-
collator-node.hpp
67+
collator-node/collator-node.hpp
68+
collator-node/collator-node-session.hpp
6869
manager-disk.h
6970
manager-disk.hpp
7071
manager-init.h
@@ -81,7 +82,8 @@ set(VALIDATOR_SOURCE
8182
apply-block.cpp
8283
block-handle.cpp
8384
collation-manager.cpp
84-
collator-node.cpp
85+
collator-node/collator-node.cpp
86+
collator-node/collator-node-session.cpp
8587
get-next-key-blocks.cpp
8688
import-db-slice.cpp
8789
import-db-slice-local.cpp

validator/collation-manager.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
#include "collation-manager.hpp"
1818

19-
#include "collator-node.hpp"
19+
#include "collator-node/collator-node.hpp"
2020
#include "fabric.h"
2121
#include "td/utils/Random.h"
2222

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
/*
2+
This file is part of TON Blockchain Library.
3+
4+
TON Blockchain Library is free software: you can redistribute it and/or modify
5+
it under the terms of the GNU Lesser General Public License as published by
6+
the Free Software Foundation, either version 2 of the License, or
7+
(at your option) any later version.
8+
9+
TON Blockchain Library is distributed in the hope that it will be useful,
10+
but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
GNU Lesser General Public License for more details.
13+
14+
You should have received a copy of the GNU Lesser General Public License
15+
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
16+
*/
17+
18+
#include "collator-node-session.hpp"
19+
20+
#include "fabric.h"
21+
22+
namespace ton::validator {
23+
24+
static BlockSeqno get_next_block_seqno(const std::vector<BlockIdExt>& prev) {
25+
if (prev.size() == 1) {
26+
return prev[0].seqno() + 1;
27+
}
28+
CHECK(prev.size() == 2);
29+
return std::max(prev[0].seqno(), prev[1].seqno()) + 1;
30+
}
31+
32+
CollatorNodeSession::CollatorNodeSession(ShardIdFull shard, std::vector<BlockIdExt> prev,
33+
td::Ref<ValidatorSet> validator_set, BlockIdExt min_masterchain_block_id,
34+
bool can_generate, adnl::AdnlNodeIdShort local_id,
35+
td::Ref<ValidatorManagerOptions> opts,
36+
td::actor::ActorId<ValidatorManager> manager,
37+
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp2::Rldp> rldp)
38+
: shard_(shard)
39+
, prev_(std::move(prev))
40+
, validator_set_(validator_set)
41+
, min_masterchain_block_id_(min_masterchain_block_id)
42+
, can_generate_(can_generate)
43+
, local_id_(local_id)
44+
, opts_(opts)
45+
, manager_(manager)
46+
, adnl_(adnl)
47+
, rldp_(rldp)
48+
, next_block_seqno_(get_next_block_seqno(prev_)) {
49+
}
50+
51+
void CollatorNodeSession::start_up() {
52+
LOG(INFO) << "Starting collator node session, shard " << shard_.to_str() << ", cc_seqno "
53+
<< validator_set_->get_catchain_seqno() << ", next block seqno " << next_block_seqno_;
54+
55+
if (can_generate_) {
56+
generate_block(prev_, {}, td::Timestamp::in(10.0), [](td::Result<BlockCandidate>) {});
57+
}
58+
}
59+
60+
void CollatorNodeSession::tear_down() {
61+
LOG(INFO) << "Finishing collator node session, shard " << shard_.to_str() << ", cc_seqno "
62+
<< validator_set_->get_catchain_seqno();
63+
for (auto& [_, entry] : cache_) {
64+
entry->cancel(td::Status::Error("validator session finished"));
65+
}
66+
}
67+
68+
void CollatorNodeSession::new_shard_block_accepted(BlockIdExt block_id, bool can_generate) {
69+
CHECK(block_id.shard_full() == shard_);
70+
can_generate_ = can_generate;
71+
if (next_block_seqno_ > block_id.seqno()) {
72+
return;
73+
}
74+
LOG(DEBUG) << "New shard block " << block_id.to_str();
75+
next_block_seqno_ = block_id.seqno() + 1;
76+
prev_ = {block_id};
77+
78+
while (!cache_.empty()) {
79+
auto& [cache_prev, entry] = *cache_.begin();
80+
if (entry->block_seqno < next_block_seqno_) {
81+
entry->cancel(td::Status::Error(PSTRING() << "next block seqno " << entry->block_seqno << " is too old, expected "
82+
<< next_block_seqno_));
83+
} else if (entry->block_seqno == next_block_seqno_ && prev_ != cache_prev) {
84+
entry->cancel(td::Status::Error(PSTRING() << "invalid prev blocks for seqno " << entry->block_seqno));
85+
} else {
86+
break;
87+
}
88+
if (!entry->has_external_query_at && entry->has_internal_query_at) {
89+
LOG(INFO) << "generate block query"
90+
<< ": shard=" << shard_.to_str() << ", cc_seqno=" << validator_set_->get_catchain_seqno()
91+
<< ", next_block_seqno=" << entry->block_seqno
92+
<< ": nobody asked for block, but we tried to generate it";
93+
}
94+
if (entry->has_external_query_at && !entry->has_internal_query_at) {
95+
LOG(INFO) << "generate block query"
96+
<< ": shard=" << shard_.to_str() << ", cc_seqno=" << validator_set_->get_catchain_seqno()
97+
<< ", next_block_seqno=" << entry->block_seqno
98+
<< ": somebody asked for block we didn't even try to generate";
99+
}
100+
cache_.erase(cache_.begin());
101+
}
102+
103+
if (can_generate_) {
104+
generate_block(prev_, {}, td::Timestamp::in(10.0), [](td::Result<BlockCandidate>) {});
105+
}
106+
}
107+
108+
void CollatorNodeSession::generate_block(std::vector<BlockIdExt> prev_blocks,
109+
td::optional<BlockCandidatePriority> o_priority, td::Timestamp timeout,
110+
td::Promise<BlockCandidate> promise) {
111+
bool is_external = !o_priority;
112+
BlockSeqno block_seqno = get_next_block_seqno(prev_blocks);
113+
if (next_block_seqno_ > block_seqno) {
114+
promise.set_error(td::Status::Error(PSTRING() << "next block seqno " << block_seqno << " is too old, expected "
115+
<< next_block_seqno_));
116+
return;
117+
}
118+
if (next_block_seqno_ == block_seqno && prev_ != prev_blocks) {
119+
promise.set_error(td::Status::Error("invalid prev_blocks"));
120+
return;
121+
}
122+
if (next_block_seqno_ + 10 < block_seqno) {
123+
promise.set_error(td::Status::Error(PSTRING() << "next block seqno " << block_seqno << " is too new, current is "
124+
<< next_block_seqno_));
125+
return;
126+
}
127+
128+
static auto prefix_inner = [](td::StringBuilder& sb, const ShardIdFull& shard, CatchainSeqno cc_seqno,
129+
BlockSeqno block_seqno, const td::optional<BlockCandidatePriority>& o_priority) {
130+
sb << "generate block query"
131+
<< ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno;
132+
if (o_priority) {
133+
sb << " external{";
134+
sb << "round_offset=" << o_priority.value().round - o_priority.value().first_block_round
135+
<< ",priority=" << o_priority.value().priority;
136+
sb << ",first_block_round=" << o_priority.value().first_block_round;
137+
sb << "}";
138+
} else {
139+
sb << " internal";
140+
}
141+
};
142+
auto prefix = [&](td::StringBuilder& sb) {
143+
prefix_inner(sb, shard_, validator_set_->get_catchain_seqno(), block_seqno, o_priority);
144+
};
145+
146+
auto cache_entry = cache_[prev_blocks];
147+
if (cache_entry == nullptr) {
148+
cache_entry = cache_[prev_blocks] = std::make_shared<CacheEntry>();
149+
}
150+
if (is_external && !cache_entry->has_external_query_at) {
151+
cache_entry->has_external_query_at = td::Timestamp::now();
152+
if (cache_entry->has_internal_query_at && cache_entry->has_external_query_at) {
153+
FLOG(INFO) {
154+
prefix(sb);
155+
sb << ": got external query " << cache_entry->has_external_query_at - cache_entry->has_internal_query_at
156+
<< "s after internal query [WON]";
157+
};
158+
}
159+
}
160+
if (!is_external && !cache_entry->has_internal_query_at) {
161+
cache_entry->has_internal_query_at = td::Timestamp::now();
162+
if (cache_entry->has_internal_query_at && cache_entry->has_external_query_at) {
163+
FLOG(INFO) {
164+
prefix(sb);
165+
sb << ": got internal query " << cache_entry->has_internal_query_at - cache_entry->has_external_query_at
166+
<< "s after external query [LOST]";
167+
};
168+
}
169+
}
170+
if (cache_entry->result) {
171+
auto has_result_ago = td::Timestamp::now() - cache_entry->has_result_at;
172+
FLOG(INFO) {
173+
prefix(sb);
174+
sb << ": using cached result " << " generated " << has_result_ago << "s ago";
175+
sb << (is_external ? " for external query [WON]" : " for internal query ");
176+
};
177+
promise.set_result(cache_entry->result.value().clone());
178+
return;
179+
}
180+
cache_entry->promises.push_back(std::move(promise));
181+
182+
if (cache_entry->started) {
183+
FLOG(INFO) {
184+
prefix(sb);
185+
sb << ": collation in progress, waiting";
186+
};
187+
return;
188+
}
189+
FLOG(INFO) {
190+
prefix(sb);
191+
sb << ": starting collation";
192+
};
193+
cache_entry->started = true;
194+
cache_entry->block_seqno = block_seqno;
195+
run_collate_query(CollateParams{.shard = shard_,
196+
.min_masterchain_block_id = min_masterchain_block_id_,
197+
.prev = std::move(prev_blocks),
198+
.validator_set = validator_set_,
199+
.collator_opts = opts_->get_collator_options(),
200+
.collator_node_id = local_id_,
201+
.skip_store_candidate = true},
202+
manager_, timeout, cache_entry->cancellation_token_source.get_cancellation_token(),
203+
[=, shard = shard_, cc_seqno = validator_set_->get_catchain_seqno(), SelfId = actor_id(this),
204+
timer = td::Timer{}](td::Result<BlockCandidate> R) mutable {
205+
FLOG(INFO) {
206+
prefix_inner(sb, shard, cc_seqno, block_seqno, o_priority);
207+
sb << timer.elapsed() << ": " << (R.is_ok() ? "OK" : R.error().to_string());
208+
};
209+
td::actor::send_closure(SelfId, &CollatorNodeSession::process_result, cache_entry, std::move(R));
210+
});
211+
}
212+
213+
void CollatorNodeSession::process_result(std::shared_ptr<CacheEntry> cache_entry, td::Result<BlockCandidate> R) {
214+
if (R.is_error()) {
215+
cache_entry->started = false;
216+
for (auto& p : cache_entry->promises) {
217+
p.set_error(R.error().clone());
218+
}
219+
} else {
220+
cache_entry->result = R.move_as_ok();
221+
cache_entry->has_result_at = td::Timestamp::now();
222+
for (auto& p : cache_entry->promises) {
223+
p.set_result(cache_entry->result.value().clone());
224+
}
225+
}
226+
cache_entry->promises.clear();
227+
}
228+
229+
void CollatorNodeSession::CacheEntry::cancel(td::Status reason) {
230+
for (auto& promise : promises) {
231+
promise.set_error(reason.clone());
232+
}
233+
promises.clear();
234+
cancellation_token_source.cancel();
235+
}
236+
237+
} // namespace ton::validator
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
This file is part of TON Blockchain Library.
3+
4+
TON Blockchain Library is free software: you can redistribute it and/or modify
5+
it under the terms of the GNU Lesser General Public License as published by
6+
the Free Software Foundation, either version 2 of the License, or
7+
(at your option) any later version.
8+
9+
TON Blockchain Library is distributed in the hope that it will be useful,
10+
but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
GNU Lesser General Public License for more details.
13+
14+
You should have received a copy of the GNU Lesser General Public License
15+
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
16+
*/
17+
#pragma once
18+
19+
#include "interfaces/validator-manager.h"
20+
#include "rldp/rldp.h"
21+
#include "rldp2/rldp.h"
22+
#include <map>
23+
#include <optional>
24+
25+
namespace ton::validator {
26+
27+
class ValidatorManager;
28+
29+
class CollatorNodeSession : public td::actor::Actor {
30+
public:
31+
CollatorNodeSession(ShardIdFull shard, std::vector<BlockIdExt> prev, td::Ref<ValidatorSet> validator_set,
32+
BlockIdExt min_masterchain_block_id, bool can_generate, adnl::AdnlNodeIdShort local_id,
33+
td::Ref<ValidatorManagerOptions> opts, td::actor::ActorId<ValidatorManager> manager,
34+
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp2::Rldp> rldp);
35+
36+
void start_up() override;
37+
void tear_down() override;
38+
39+
void update_options(td::Ref<ValidatorManagerOptions> opts) {
40+
opts_ = std::move(opts);
41+
}
42+
43+
void new_shard_block_accepted(BlockIdExt block_id, bool can_generate);
44+
45+
void generate_block(std::vector<BlockIdExt> prev_blocks, td::optional<BlockCandidatePriority> o_priority,
46+
td::Timestamp timeout, td::Promise<BlockCandidate> promise);
47+
48+
private:
49+
ShardIdFull shard_;
50+
std::vector<BlockIdExt> prev_;
51+
td::Ref<ValidatorSet> validator_set_;
52+
BlockIdExt min_masterchain_block_id_;
53+
bool can_generate_;
54+
adnl::AdnlNodeIdShort local_id_;
55+
td::Ref<ValidatorManagerOptions> opts_;
56+
td::actor::ActorId<ValidatorManager> manager_;
57+
td::actor::ActorId<adnl::Adnl> adnl_;
58+
td::actor::ActorId<rldp2::Rldp> rldp_;
59+
60+
struct CacheEntry {
61+
bool started = false;
62+
td::Timestamp has_internal_query_at;
63+
td::Timestamp has_external_query_at;
64+
td::Timestamp has_result_at;
65+
BlockSeqno block_seqno = 0;
66+
td::optional<BlockCandidate> result;
67+
td::CancellationTokenSource cancellation_token_source;
68+
std::vector<td::Promise<BlockCandidate>> promises;
69+
70+
void cancel(td::Status reason);
71+
};
72+
73+
BlockSeqno next_block_seqno_;
74+
std::map<std::vector<BlockIdExt>, std::shared_ptr<CacheEntry>> cache_;
75+
76+
void process_result(std::shared_ptr<CacheEntry> cache_entry, td::Result<BlockCandidate> R);
77+
};
78+
79+
} // namespace ton::validator

0 commit comments

Comments
 (0)