Skip to content

Commit 5a1d271

Browse files
committed
Add support for split persistent states in StateSerializer
1 parent 8d2551a commit 5a1d271

File tree

4 files changed

+168
-23
lines changed

4 files changed

+168
-23
lines changed

crypto/vm/dict.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2790,6 +2790,12 @@ Ref<CellSlice> AugmentedDictionary::get_root() const {
27902790
return root;
27912791
}
27922792

2793+
Ref<Cell> AugmentedDictionary::get_wrapped_dict_root() const {
2794+
vm::CellBuilder cb;
2795+
cb.append_cellslice(get_root());
2796+
return cb.finalize();
2797+
}
2798+
27932799
Ref<CellSlice> AugmentedDictionary::extract_root() && {
27942800
if (!(flags & f_root_cached) && !compute_root()) {
27952801
return {};

crypto/vm/dict.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,7 @@ class AugmentedDictionary final : public DictionaryFixed {
572572
AugmentedDictionary(DictNonEmpty, Ref<CellSlice> _root, int _n, const AugmentationData& _aug, bool validate = true);
573573
Ref<CellSlice> get_empty_dictionary() const;
574574
Ref<CellSlice> get_root() const;
575+
Ref<Cell> get_wrapped_dict_root() const;
575576
Ref<CellSlice> extract_root() &&;
576577
bool append_dict_to_bool(CellBuilder& cb) &&;
577578
bool append_dict_to_bool(CellBuilder& cb) const &;

validator/state-serializer.cpp

Lines changed: 147 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717
Copyright 2017-2020 Telegram Systems LLP
1818
*/
1919
#include "state-serializer.hpp"
20+
#include "crypto/block/block-auto.h"
21+
#include "crypto/block/block-parse.h"
2022
#include "td/utils/Random.h"
23+
#include "td/utils/overloaded.h"
2124
#include "ton/ton-io.hpp"
2225
#include "common/delay.h"
2326
#include "td/utils/filesystem.h"
2427
#include "td/utils/HashSet.h"
28+
#include "vm/cells/MerkleProof.h"
2529

2630
namespace ton {
2731

@@ -199,7 +203,7 @@ void AsyncStateSerializer::next_iteration() {
199203
}
200204
if (next_idx_ < shards_.size()) {
201205
running_ = true;
202-
request_shard_state(shards_[next_idx_]);
206+
request_shard_state(shards_[next_idx_].block_id);
203207
return;
204208
}
205209
LOG(ERROR) << "finished serializing persistent state for " << masterchain_handle_->id().id.to_str();
@@ -324,7 +328,16 @@ class CachedCellDbReader : public vm::CellDbReader {
324328
td::uint64 bulk_reqs_ = 0;
325329
};
326330

327-
void AsyncStateSerializer::PreviousStateCache::prepare_cache(ShardIdFull shard) {
331+
void AsyncStateSerializer::PreviousStateCache::prepare_cache(ShardIdFull shard, PersistentStateType type) {
332+
if (type.get_offset() == type.offset<SplitPersistentStateType>()) {
333+
// Header of a split state is small, so not caching it is fine.
334+
return;
335+
}
336+
337+
if (type.get_offset() == type.offset<SplitAccountStateType>()) {
338+
shard = {shard.workchain, type.get<SplitAccountStateType>().effective_shard_id};
339+
}
340+
328341
std::vector<ShardIdFull> prev_shards;
329342
for (const auto& [_, prev_shard] : state_files) {
330343
if (shard_intersects(shard, prev_shard)) {
@@ -335,7 +348,6 @@ void AsyncStateSerializer::PreviousStateCache::prepare_cache(ShardIdFull shard)
335348
return;
336349
}
337350
cur_shards = std::move(prev_shards);
338-
cache = {};
339351
if (cur_shards.empty()) {
340352
return;
341353
}
@@ -375,6 +387,25 @@ void AsyncStateSerializer::PreviousStateCache::prepare_cache(ShardIdFull shard)
375387
cache = std::make_shared<vm::CellHashSet>(std::move(cells));
376388
}
377389

390+
void AsyncStateSerializer::PreviousStateCache::add_new_cells(vm::CellDbReader& reader, Ref<vm::Cell> const& cell) {
391+
if (!cell->is_loaded()) {
392+
return;
393+
}
394+
if (reader.load_cell(cell->get_hash().as_slice()).is_ok()) {
395+
return;
396+
}
397+
398+
auto [_, inserted] = cache->insert(cell);
399+
if (!inserted) {
400+
return;
401+
}
402+
403+
vm::CellSlice cs{vm::NoVm{}, cell};
404+
for (unsigned i = 0; i < cs.size_refs(); ++i) {
405+
add_new_cells(reader, cs.prefetch_ref(i));
406+
}
407+
}
408+
378409
void AsyncStateSerializer::got_masterchain_state(td::Ref<MasterchainState> state,
379410
std::shared_ptr<vm::CellDbReader> cell_db_reader) {
380411
if (!opts_->get_state_serializer_enabled() || auto_disabled_) {
@@ -389,7 +420,10 @@ void AsyncStateSerializer::got_masterchain_state(td::Ref<MasterchainState> state
389420
auto vec = state->get_shards();
390421
for (auto &v : vec) {
391422
if (opts_->need_monitor(v->shard(), state)) {
392-
shards_.push_back(v->top_block_id());
423+
shards_.push_back({
424+
.block_id = v->top_block_id(),
425+
.split_depth = state->persistent_state_split_depth(v->shard().workchain),
426+
});
393427
}
394428
}
395429

@@ -401,7 +435,7 @@ void AsyncStateSerializer::got_masterchain_state(td::Ref<MasterchainState> state
401435
return vm::std_boc_serialize_to_file(root, fd, 31, std::move(cancellation_token));
402436
}
403437
if (fast_serializer_enabled) {
404-
previous_state_cache->prepare_cache(shard);
438+
previous_state_cache->prepare_cache(shard, UnsplitStateType{});
405439
}
406440
auto new_cell_db_reader = std::make_shared<CachedCellDbReader>(cell_db_reader, previous_state_cache->cache);
407441
auto res = vm::boc_serialize_to_file_large(new_cell_db_reader, root->get_hash(), fd, 31, std::move(cancellation_token));
@@ -455,43 +489,135 @@ void AsyncStateSerializer::got_shard_handle(BlockHandle handle) {
455489
td::actor::send_closure(manager_, &ValidatorManager::get_shard_state_from_db, handle, std::move(P));
456490
}
457491

492+
namespace {
493+
494+
// Expects `ShardStateUnsplit` as `shard_state_cell`.
495+
std::vector<SerializablePart> split_shard_state(ShardId shard_id, td::Ref<vm::Cell> shard_state_cell, int split_depth) {
496+
CHECK(split_depth <= 63);
497+
int shard_prefix_length = shard_pfx_len(shard_id);
498+
if (shard_prefix_length >= static_cast<int>(split_depth)) {
499+
return {{UnsplitStateType{}, std::move(shard_state_cell)}};
500+
}
501+
502+
block::gen::ShardStateUnsplit::Record unsplit_shard_state;
503+
bool rc = tlb::unpack_cell(shard_state_cell, unsplit_shard_state);
504+
CHECK(rc);
505+
506+
std::vector<SerializablePart> result;
507+
508+
auto unwrapped_accounts_root = unsplit_shard_state.accounts;
509+
auto accounts_cut = std::make_shared<vm::CellUsageTree>();
510+
auto accounts_root = vm::UsageCell::create(unwrapped_accounts_root, accounts_cut->root_ptr());
511+
512+
// NOTE: Ref<Cell> constructor expects caller to unwrap HashMapAugE.
513+
vm::AugmentedDictionary accounts{
514+
vm::load_cell_slice_ref(accounts_root),
515+
256,
516+
block::tlb::aug_ShardAccounts,
517+
false,
518+
};
519+
520+
// Build account dict parts
521+
ShardId effective_shard = shard_id ^ (1ULL << (63 - shard_prefix_length)) ^ (1ULL << (63 - split_depth));
522+
ShardId increment = 1ULL << (64 - split_depth);
523+
524+
for (int i = 0; i < (1 << (split_depth - shard_prefix_length)); ++i, effective_shard += increment) {
525+
td::BitArray<64> prefix;
526+
prefix.store_ulong(effective_shard);
527+
auto account_dict_part = accounts;
528+
account_dict_part.cut_prefix_subdict(prefix.bits(), split_depth);
529+
530+
if (!account_dict_part.is_empty()) {
531+
result.push_back({SplitAccountStateType{effective_shard}, account_dict_part.get_wrapped_dict_root()});
532+
}
533+
}
534+
535+
auto accounts_proof = vm::MerkleProof::generate_raw(unwrapped_accounts_root, accounts_cut.get());
536+
537+
// Build header
538+
unsplit_shard_state.accounts = accounts_proof;
539+
vm::CellBuilder unsplit_shard_state_cb;
540+
rc = tlb::pack(unsplit_shard_state_cb, unsplit_shard_state);
541+
CHECK(rc);
542+
543+
auto header = unsplit_shard_state_cb.finalize();
544+
CHECK(header->get_level() <= 1 && header->get_hash(0) == shard_state_cell->get_hash());
545+
546+
result.push_back({SplitPersistentStateType{}, vm::CellBuilder::create_merkle_proof(header)});
547+
548+
return result;
549+
}
550+
551+
} // namespace
552+
458553
void AsyncStateSerializer::got_shard_state(BlockHandle handle, td::Ref<ShardState> state,
459554
std::shared_ptr<vm::CellDbReader> cell_db_reader) {
555+
int archive_split_depth = shards_[next_idx_].split_depth;
460556
next_idx_++;
557+
461558
if (!opts_->get_state_serializer_enabled() || auto_disabled_) {
462559
success_handler();
463560
return;
464561
}
465562
LOG(ERROR) << "serializing shard state " << handle->id().id.to_str();
466-
auto write_data = [shard = state->get_shard(), root = state->root_cell(), cell_db_reader,
467-
previous_state_cache = previous_state_cache_,
468-
fast_serializer_enabled = opts_->get_fast_state_serializer_enabled(),
563+
564+
auto parts = split_shard_state(state->get_shard().shard, state->root_cell(), archive_split_depth);
565+
CHECK(!parts.empty());
566+
567+
write_shard_state(handle, state->get_shard(), cell_db_reader,
568+
std::make_shared<std::vector<SerializablePart>>(std::move(parts)), 0);
569+
570+
current_status_ = PSTRING() << "serializing shard state " << next_idx_ << "/" << shards_.size() << " "
571+
<< state->get_block_id().id.to_str();
572+
current_status_ts_ = td::Timestamp::now();
573+
}
574+
575+
void AsyncStateSerializer::write_shard_state(BlockHandle handle, ShardIdFull shard,
576+
std::shared_ptr<vm::CellDbReader> cell_db_reader,
577+
std::shared_ptr<std::vector<SerializablePart>> parts, size_t idx) {
578+
auto [type, cell] = parts->at(idx);
579+
580+
auto write_data = [=, this,
469581
cancellation_token = cancellation_token_source_.get_cancellation_token()](td::FileFd& fd) mutable {
582+
CHECK(running_);
583+
584+
LOG(ERROR) << "serializing shard state " << handle->id().id.to_str() << " ("
585+
<< persistent_state_type_to_string(shard, type) << ")";
470586
if (!cell_db_reader) {
471-
return vm::std_boc_serialize_to_file(root, fd, 31, std::move(cancellation_token));
587+
return vm::std_boc_serialize_to_file(cell, fd, 31, std::move(cancellation_token));
472588
}
473-
if (fast_serializer_enabled) {
474-
previous_state_cache->prepare_cache(shard);
589+
if (opts_->get_fast_state_serializer_enabled()) {
590+
previous_state_cache_->prepare_cache(shard, type);
475591
}
476-
auto new_cell_db_reader = std::make_shared<CachedCellDbReader>(cell_db_reader, previous_state_cache->cache);
477-
auto res = vm::boc_serialize_to_file_large(new_cell_db_reader, root->get_hash(), fd, 31, std::move(cancellation_token));
592+
if (!previous_state_cache_->cache) {
593+
previous_state_cache_->cache = std::make_shared<vm::CellHashSet>();
594+
}
595+
previous_state_cache_->add_new_cells(*cell_db_reader, cell);
596+
auto new_cell_db_reader = std::make_shared<CachedCellDbReader>(cell_db_reader, previous_state_cache_->cache);
597+
auto res =
598+
vm::boc_serialize_to_file_large(new_cell_db_reader, cell->get_hash(), fd, 31, std::move(cancellation_token));
478599
new_cell_db_reader->print_stats();
479600
return res;
480601
};
481-
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle](td::Result<td::Unit> R) {
602+
auto P = td::PromiseCreator::lambda([=, SelfId = actor_id(this)](td::Result<td::Unit> R) {
482603
if (R.is_error() && R.error().code() == cancelled) {
483604
LOG(ERROR) << "Persistent state serialization cancelled";
605+
td::actor::send_closure(SelfId, &AsyncStateSerializer::success_handler);
606+
return;
607+
}
608+
609+
R.ensure();
610+
LOG(ERROR) << "finished serializing shard state " << handle->id().id.to_str() << " ("
611+
<< persistent_state_type_to_string(shard, type) << ")";
612+
if (idx + 1 == parts->size()) {
613+
td::actor::send_closure(SelfId, &AsyncStateSerializer::success_handler);
484614
} else {
485-
R.ensure();
486-
LOG(ERROR) << "finished serializing shard state " << handle->id().id.to_str();
615+
td::actor::send_closure(SelfId, &AsyncStateSerializer::write_shard_state, handle, shard, cell_db_reader, parts,
616+
idx + 1);
487617
}
488-
td::actor::send_closure(SelfId, &AsyncStateSerializer::success_handler);
489618
});
490619
td::actor::send_closure(manager_, &ValidatorManager::store_persistent_state_file_gen, handle->id(),
491-
masterchain_handle_->id(), UnsplitStateType{}, write_data, std::move(P));
492-
current_status_ = PSTRING() << "serializing shard state " << next_idx_ << "/" << shards_.size() << " "
493-
<< state->get_block_id().id.to_str();
494-
current_status_ts_ = td::Timestamp::now();
620+
masterchain_handle_->id(), type, write_data, std::move(P));
495621
}
496622

497623
void AsyncStateSerializer::fail_handler(td::Status reason) {

validator/state-serializer.hpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ namespace ton {
2727

2828
namespace validator {
2929

30+
struct SerializablePart {
31+
PersistentStateType type;
32+
td::Ref<vm::Cell> cell;
33+
};
34+
3035
class AsyncStateSerializer : public td::actor::Actor {
3136
private:
3237
td::uint32 attempt_ = 0;
@@ -52,13 +57,18 @@ class AsyncStateSerializer : public td::actor::Actor {
5257
bool stored_persistent_state_description_ = false;
5358
bool have_masterchain_state_ = false;
5459

55-
std::vector<BlockIdExt> shards_;
60+
struct ShardSerializationConfig {
61+
BlockIdExt block_id;
62+
td::uint32 split_depth;
63+
};
64+
std::vector<ShardSerializationConfig> shards_;
5665
struct PreviousStateCache {
5766
std::vector<std::pair<std::string, ShardIdFull>> state_files;
5867
std::shared_ptr<vm::CellHashSet> cache;
5968
std::vector<ShardIdFull> cur_shards;
6069

61-
void prepare_cache(ShardIdFull shard);
70+
void prepare_cache(ShardIdFull shard, PersistentStateType type);
71+
void add_new_cells(vm::CellDbReader& reader, Ref<vm::Cell> const& cell);
6272
};
6373
std::shared_ptr<PreviousStateCache> previous_state_cache_;
6474

@@ -93,6 +103,8 @@ class AsyncStateSerializer : public td::actor::Actor {
93103
void stored_masterchain_state();
94104
void got_shard_handle(BlockHandle handle);
95105
void got_shard_state(BlockHandle handle, td::Ref<ShardState> state, std::shared_ptr<vm::CellDbReader> cell_db_reader);
106+
void write_shard_state(BlockHandle handle, ShardIdFull shard, std::shared_ptr<vm::CellDbReader> cell_db_reader,
107+
std::shared_ptr<std::vector<SerializablePart>> parts, size_t idx);
96108

97109
void get_masterchain_seqno(td::Promise<BlockSeqno> promise) {
98110
promise.set_result(last_block_id_.id.seqno);

0 commit comments

Comments
 (0)