diff --git a/crypto/common/refcnt.hpp b/crypto/common/refcnt.hpp index f31a0faa8..3c784c5a7 100644 --- a/crypto/common/refcnt.hpp +++ b/crypto/common/refcnt.hpp @@ -285,14 +285,14 @@ class Ref { Ref& operator=(Ref&& r); const typename RefValue::Type* operator->() const { if (!ptr) { - CHECK(ptr && "deferencing null Ref"); + LOG_CHECK(ptr) << "dereferencing null Ref<" << typeid(T).name() << ">"; throw NullRef{}; } return RefValue::make_const_ptr(ptr); } const typename RefValue::Type& operator*() const { if (!ptr) { - CHECK(ptr && "deferencing null Ref"); + LOG_CHECK(ptr) << "dereferencing null Ref<" << typeid(T).name() << ">"; throw NullRef{}; } return RefValue::make_const_ref(ptr); diff --git a/validator-engine/validator-engine.cpp b/validator-engine/validator-engine.cpp index 2e5532dec..8573242c1 100644 --- a/validator-engine/validator-engine.cpp +++ b/validator-engine/validator-engine.cpp @@ -1596,7 +1596,7 @@ td::Status ValidatorEngine::load_global_config() { } validator_options_.write().set_celldb_compress_depth(celldb_compress_depth_); validator_options_.write().set_celldb_in_memory(celldb_in_memory_); - validator_options_.write().set_celldb_v2(!celldb_in_memory_); + validator_options_.write().set_celldb_v2(!celldb_in_memory_ && !permanent_celldb_); validator_options_.write().set_celldb_disable_bloom_filter(celldb_disable_bloom_filter_); validator_options_.write().set_max_open_archive_files(max_open_archive_files_); validator_options_.write().set_archive_preload_period(archive_preload_period_); diff --git a/validator/db/archive-manager.cpp b/validator/db/archive-manager.cpp index d234397fa..03d522ac1 100644 --- a/validator/db/archive-manager.cpp +++ b/validator/db/archive-manager.cpp @@ -662,13 +662,23 @@ void ArchiveManager::load_package(PackageId id) { } } - desc.file = td::actor::create_actor("slice", id.id, id.key, id.temp, false, 0, db_root_, - archive_lru_.get(), statistics_); + desc.file = create_archive_slice(id, 0); m.emplace(id, std::move(desc)); update_permanent_slices(); } +td::actor::ActorOwn ArchiveManager::create_archive_slice(const PackageId &id, + td::uint32 shard_split_depth) { + auto actor = td::actor::create_actor( + PSTRING() << "slice." << (id.temp ? "temp." : (id.key ? "key." : "")) << id.id, id.id, id.key, id.temp, false, + shard_split_depth, db_root_, archive_lru_.get(), statistics_); + if (async_mode_) { + td::actor::send_closure(actor, &ArchiveSlice::set_async_mode, true, [](td::Result) {}); + } + return actor; +} + td::Result ArchiveManager::get_file_desc(ShardIdFull shard, PackageId id, BlockSeqno seqno, UnixTime ts, LogicalTime lt, bool force) { @@ -698,9 +708,7 @@ const ArchiveManager::FileDescription *ArchiveManager::add_file_desc(ShardIdFull FileDescription new_desc{id, false}; td::mkdir(db_root_ + id.path()).ensure(); std::string prefix = PSTRING() << db_root_ << id.path() << id.name(); - new_desc.file = td::actor::create_actor("slice", id.id, id.key, id.temp, false, - id.key || id.temp ? 0 : cur_shard_split_depth_, db_root_, - archive_lru_.get(), statistics_); + new_desc.file = create_archive_slice(id, id.key || id.temp ? 0 : cur_shard_split_depth_); const FileDescription &desc = f.emplace(id, std::move(new_desc)); if (!id.temp) { update_desc(f, desc, shard, seqno, ts, lt); @@ -1205,23 +1213,8 @@ void ArchiveManager::get_archive_slice(td::uint64 archive_id, td::uint64 offset, td::actor::send_closure(f->file_actor_id(), &ArchiveSlice::get_slice, archive_id, offset, limit, std::move(promise)); } -void ArchiveManager::commit_transaction() { - if (!async_mode_ || huge_transaction_size_++ >= 100) { - index_->commit_transaction().ensure(); - if (async_mode_) { - huge_transaction_size_ = 0; - huge_transaction_started_ = false; - } - } -} - void ArchiveManager::set_async_mode(bool mode, td::Promise promise) { async_mode_ = mode; - if (!async_mode_ && huge_transaction_started_) { - index_->commit_transaction().ensure(); - huge_transaction_size_ = 0; - huge_transaction_started_ = false; - } td::MultiPromise mp; auto ig = mp.init_guard(); diff --git a/validator/db/archive-manager.hpp b/validator/db/archive-manager.hpp index 247e0a99b..df460064d 100644 --- a/validator/db/archive-manager.hpp +++ b/validator/db/archive-manager.hpp @@ -77,7 +77,6 @@ class ArchiveManager : public td::actor::Actor { void start_up() override; void alarm() override; - void commit_transaction(); void set_async_mode(bool mode, td::Promise promise); void set_current_shard_split_depth(td::uint32 value) { @@ -184,8 +183,6 @@ class ArchiveManager : public td::actor::Actor { td::actor::ActorOwn archive_lru_; BlockSeqno finalized_up_to_{0}; bool async_mode_ = false; - bool huge_transaction_started_ = false; - td::uint32 huge_transaction_size_ = 0; td::uint32 cur_shard_split_depth_ = 0; DbStatistics statistics_; @@ -201,6 +198,7 @@ class ArchiveManager : public td::actor::Actor { std::map, PermState> perm_states_; // Mc block seqno, hash -> state void load_package(PackageId seqno); + td::actor::ActorOwn create_archive_slice(const PackageId &id, td::uint32 shard_split_depth); void delete_package(PackageId seqno, td::Promise promise); void deleted_package(PackageId seqno, td::Promise promise); void get_handle_cont(BlockIdExt block_id, PackageId id, td::Promise promise); diff --git a/validator/db/archive-slice.cpp b/validator/db/archive-slice.cpp index b78c4849a..6e075c5ac 100644 --- a/validator/db/archive-slice.cpp +++ b/validator/db/archive-slice.cpp @@ -16,9 +16,13 @@ Copyright 2019-2020 Telegram Systems LLP */ +#include + #include "common/delay.h" #include "td/actor/MultiPromise.h" #include "td/db/RocksDb.h" +#include "td/utils/PathView.h" +#include "td/utils/Random.h" #include "td/utils/port/path.h" #include "validator/fabric.h" @@ -106,6 +110,10 @@ std::string DbStatistics::to_string_and_reset() { return ss.str(); } +void PackageWriter::tear_down() { + sync_now(); +} + void PackageWriter::append(std::string filename, td::BufferSlice data, td::Promise> promise) { td::uint64 offset, size; @@ -118,14 +126,45 @@ void PackageWriter::append(std::string filename, td::BufferSlice data, return; } start = td::Timestamp::now(); - offset = p->append(std::move(filename), std::move(data), !async_mode_); + offset = p->append(std::move(filename), std::move(data), false); end = td::Timestamp::now(); size = p->size(); } if (statistics_) { statistics_->record_write((end.at() - start.at()) * 1e6, data_size); } - promise.set_value(std::pair{offset, size}); + sync([=, promise = std::move(promise)](td::Result) mutable { + promise.set_value(std::pair{offset, size}); + }); +} + +void PackageWriter::sync(td::Promise promise) { + if (!async_mode_) { + sync_now(); + promise.set_value(td::Unit{}); + return; + } + sync_waiters_.push_back(std::move(promise)); + if (!sync_pending_) { + sync_pending_ = true; + td::actor::send_closure_later(actor_id(this), &PackageWriter::sync_now); + } +} + +void PackageWriter::sync_now() { + auto p = package_.lock(); + if (p) { + p->sync(); + } + sync_pending_ = false; + auto waiters = std::move(sync_waiters_); + sync_waiters_.clear(); + if (!waiters.empty()) { + LOG(DEBUG) << "Writer huge transaction size = " << waiters.size(); + } + for (auto &promise : waiters) { + promise.set_value(td::Unit{}); + } } class PackageReader : public td::actor::Actor { @@ -171,6 +210,10 @@ static std::string package_info_to_str(BlockSeqno seqno, ShardIdFull shard_prefi return PSTRING() << seqno << "." << shard_prefix.workchain << ":" << shard_to_str(shard_prefix.shard); } +void ArchiveSlice::tear_down() { + commit_transaction_now(); +} + void ArchiveSlice::add_handle(BlockHandle handle, td::Promise promise) { if (destroyed_) { promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd")); @@ -242,16 +285,17 @@ void ArchiveSlice::add_handle(BlockHandle handle, td::Promise promise) kv_->set(shard_key.as_slice(), shard_value.as_slice()).ensure(); } kv_->set(get_db_key_block_info(handle->id()), handle->serialize().as_slice()).ensure(); - commit_transaction(); - - handle->flushed_upto(version); - handle->set_handle_moved_to_archive(); + commit_transaction([=, this, promise = std::move(promise)](td::Result R) mutable { + R.ensure(); + handle->flushed_upto(version); + handle->set_handle_moved_to_archive(); - if (handle->need_flush()) { - update_handle(std::move(handle), std::move(promise)); - } else { - promise.set_value(td::Unit()); - } + if (handle->need_flush()) { + update_handle(std::move(handle), std::move(promise)); + } else { + promise.set_value(td::Unit()); + } + }); } void ArchiveSlice::update_handle(BlockHandle handle, td::Promise promise) { @@ -272,12 +316,13 @@ void ArchiveSlice::update_handle(BlockHandle handle, td::Promise promi kv_->set(get_db_key_block_info(handle->id()), handle->serialize().as_slice()).ensure(); handle->flushed_upto(version); } while (handle->need_flush()); - commit_transaction(); - if (!temp_) { - handle->set_handle_moved_to_archive(); - } - - promise.set_value(td::Unit()); + commit_transaction([=, this, promise = std::move(promise)](td::Result R) mutable { + R.ensure(); + if (!temp_) { + handle->set_handle_moved_to_archive(); + } + promise.set_value(td::Unit()); + }); } void ArchiveSlice::add_file(BlockHandle handle, FileReference ref_id, td::BufferSlice data, @@ -329,8 +374,7 @@ void ArchiveSlice::add_file_cont(size_t idx, FileReference ref_id, td::uint64 of kv_->set("status", td::to_string(size)).ensure(); kv_->set(ref_id.hash().to_hex(), td::to_string(offset)).ensure(); } - commit_transaction(); - promise.set_value(td::Unit()); + commit_transaction(std::move(promise)); } void ArchiveSlice::get_handle(BlockIdExt block_id, td::Promise promise) { @@ -408,7 +452,8 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P promise.set_value(std::move(R.move_as_ok().second)); } }); - td::actor::create_actor("reader", p->package, offset, std::move(P), statistics_.pack_statistics) + td::actor::create_actor(PSTRING() << "reader." << td::PathView(p->path).file_name(), p->package, + offset, std::move(P), statistics_.pack_statistics) .release(); } @@ -715,6 +760,7 @@ void ArchiveSlice::do_close() { if (destroyed_) { return; } + commit_transaction_now(); CHECK(status_ != st_closed && active_queries_ == 0); LOG(DEBUG) << "Closing archive slice " << db_path_; status_ = st_closed; @@ -744,30 +790,47 @@ void ArchiveSlice::end_async_query() { } void ArchiveSlice::begin_transaction() { - if (!async_mode_ || !huge_transaction_started_) { - kv_->begin_transaction().ensure(); - if (async_mode_) { - huge_transaction_started_ = true; - } + if (transaction_started_) { + return; } + transaction_started_ = true; + kv_->begin_transaction().ensure(); } -void ArchiveSlice::commit_transaction() { - if (!async_mode_ || huge_transaction_size_++ >= 100) { - kv_->commit_transaction().ensure(); - if (async_mode_) { - huge_transaction_size_ = 0; - huge_transaction_started_ = false; - } +void ArchiveSlice::commit_transaction(td::Promise promise) { + if (!transaction_started_ || !async_mode_) { + commit_transaction_now(); + promise.set_value(td::Unit{}); + return; + } + transaction_commit_waiters_.push_back(std::move(promise)); + if (!transaction_commit_pending_) { + transaction_commit_pending_ = true; + td::actor::send_closure_later(actor_id(this), &ArchiveSlice::commit_transaction_now); + } +} + +void ArchiveSlice::commit_transaction_now() { + if (!transaction_started_) { + return; + } + kv_->commit_transaction().ensure(); + transaction_started_ = false; + transaction_commit_pending_ = false; + auto waiters = std::move(transaction_commit_waiters_); + transaction_commit_waiters_.clear(); + if (!waiters.empty()) { + LOG(DEBUG) << "Huge transaction size = " << waiters.size(); + } + for (auto &promise : waiters) { + promise.set_value(td::Unit{}); } } void ArchiveSlice::set_async_mode(bool mode, td::Promise promise) { async_mode_ = mode; - if (!async_mode_ && huge_transaction_started_ && kv_) { - kv_->commit_transaction().ensure(); - huge_transaction_size_ = 0; - huge_transaction_started_ = false; + if (!async_mode_) { + commit_transaction_now(); } td::MultiPromise mp; @@ -823,7 +886,7 @@ td::Result ArchiveSlice::choose_package(BlockSeqno if (shard_separated_) { kv_->set(PSTRING() << "info." << v, package_info_to_str(masterchain_seqno, shard_prefix)).ensure(); } - commit_transaction(); + commit_transaction_now(); add_package(masterchain_seqno, shard_prefix, 0, default_package_version()); return &packages_[v]; } else { @@ -858,7 +921,8 @@ void ArchiveSlice::add_package(td::uint32 seqno, ShardIdFull shard_prefix, td::u if (version >= 1) { pack->truncate(size).ensure(); } - auto writer = td::actor::create_actor("writer", pack, async_mode_, statistics_.pack_statistics); + auto writer = td::actor::create_actor(PSTRING() << "writer." << td::PathView(path).file_name(), pack, + async_mode_, statistics_.pack_statistics); packages_.emplace_back(std::move(pack), std::move(writer), seqno, shard_prefix, path, idx, version); } @@ -882,6 +946,7 @@ void destroy_db(std::string name, td::uint32 attempt, td::Promise prom } // namespace void ArchiveSlice::destroy(td::Promise promise) { + commit_transaction_now(); before_query(); destroyed_ = true; @@ -1089,7 +1154,8 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle, td:: package->writer.reset(); td::unlink(package->path).ensure(); td::rename(package->path + ".new", package->path).ensure(); - package->writer = td::actor::create_actor("writer", new_package, async_mode_); + package->writer = td::actor::create_actor( + PSTRING() << "writer." << td::PathView(package->path).file_name(), new_package, async_mode_); } std::vector new_packages_info; diff --git a/validator/db/archive-slice.hpp b/validator/db/archive-slice.hpp index 4fb0bab92..cc2e5a3f7 100644 --- a/validator/db/archive-slice.hpp +++ b/validator/db/archive-slice.hpp @@ -77,14 +77,12 @@ class PackageWriter : public td::actor::Actor { : package_(std::move(package)), async_mode_(async_mode), statistics_(std::move(statistics)) { } + void tear_down() override; void append(std::string filename, td::BufferSlice data, td::Promise> promise); void set_async_mode(bool mode, td::Promise promise) { async_mode_ = mode; if (!async_mode_) { - auto p = package_.lock(); - if (p) { - p->sync(); - } + sync_now(); } promise.set_value(td::Unit()); } @@ -92,7 +90,12 @@ class PackageWriter : public td::actor::Actor { private: std::weak_ptr package_; bool async_mode_ = false; + bool sync_pending_ = false; + std::vector> sync_waiters_; std::shared_ptr statistics_; + + void sync(td::Promise promise); + void sync_now(); }; class ArchiveLru; @@ -102,6 +105,8 @@ class ArchiveSlice : public td::actor::Actor { ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, td::uint32 shard_split_depth, std::string db_root, td::actor::ActorId archive_lru, DbStatistics statistics = {}); + void tear_down() override; + void get_archive_id(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, td::Promise promise); void add_handle(BlockHandle handle, td::Promise promise); @@ -140,7 +145,8 @@ class ArchiveSlice : public td::actor::Actor { void end_async_query(); void begin_transaction(); - void commit_transaction(); + void commit_transaction(td::Promise promise); + void commit_transaction_now(); void add_file_cont(size_t idx, FileReference ref_id, td::uint64 offset, td::uint64 size, td::Promise promise); @@ -160,9 +166,10 @@ class ArchiveSlice : public td::actor::Actor { bool destroyed_ = false; bool async_mode_ = false; - bool huge_transaction_started_ = false; + bool transaction_started_ = false; + std::vector> transaction_commit_waiters_; + bool transaction_commit_pending_ = false; bool sliced_mode_{false}; - td::uint32 huge_transaction_size_ = 0; td::uint32 slice_size_{100}; bool shard_separated_{false}; td::uint32 shard_split_depth_ = 0; diff --git a/validator/db/celldb.cpp b/validator/db/celldb.cpp index 1072fd28e..fe0f8596b 100644 --- a/validator/db/celldb.cpp +++ b/validator/db/celldb.cpp @@ -337,6 +337,9 @@ void CellDbIn::start_up() { auto R = boc_->meta_get(td::as_slice(key), value); R.ensure(); bool stored_permanent_mode = R.ok() == td::KeyValue::GetStatus::Ok; + if (stored_permanent_mode) { + LOG_CHECK(opts_->get_permanent_celldb()) << "permanent_celldb cannot be turned off"; + } permanent_mode_ = stored_permanent_mode || opts_->get_permanent_celldb(); if (permanent_mode_) { LOG(WARNING) << "Celldb is in permanent mode"; @@ -488,27 +491,28 @@ void CellDbIn::store_block_state_permanent(td::Ref block, td::Promise td::Timestamp::now()); return; } - store_block_state_permanent_bulk( - {block}, [=, SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable { - TRY_STATUS_PROMISE(promise, R.move_as_status()); - block::gen::Block::Record rec; - if (!block::gen::unpack_cell(block->root_cell(), rec)) { - promise.set_error(td::Status::Error("cannot unpack Block record")); - return; - } - bool spec; - vm::CellSlice update_cs = vm::load_cell_slice_special(rec.state_update, spec); - if (update_cs.special_type() != vm::CellTraits::SpecialType::MerkleUpdate) { - promise.set_error(td::Status::Error("invalid Merkle update in block")); - return; - } - td::Ref new_state_root = update_cs.prefetch_ref(1); - RootHash state_root_hash = new_state_root->get_hash(0).bits(); - td::actor::send_closure(SelfId, &CellDbIn::load_cell, state_root_hash, std::move(promise)); - }); + store_block_state_permanent_bulk({block}, [=, SelfId = actor_id(this), promise = std::move(promise)]( + td::Result> R) mutable { + TRY_STATUS_PROMISE(promise, R.move_as_status()); + block::gen::Block::Record rec; + if (!block::gen::unpack_cell(block->root_cell(), rec)) { + promise.set_error(td::Status::Error("cannot unpack Block record")); + return; + } + bool spec; + vm::CellSlice update_cs = vm::load_cell_slice_special(rec.state_update, spec); + if (update_cs.special_type() != vm::CellTraits::SpecialType::MerkleUpdate) { + promise.set_error(td::Status::Error("invalid Merkle update in block")); + return; + } + td::Ref new_state_root = update_cs.prefetch_ref(1); + RootHash state_root_hash = new_state_root->get_hash(0).bits(); + td::actor::send_closure(SelfId, &CellDbIn::load_cell, state_root_hash, std::move(promise)); + }); } -void CellDbIn::store_block_state_permanent_bulk(std::vector> blocks, td::Promise promise) { +void CellDbIn::store_block_state_permanent_bulk(std::vector> blocks, + td::Promise> promise) { if (!permanent_mode_) { promise.set_error(td::Status::Error("celldb is not in permanent mode")); return; @@ -535,7 +539,7 @@ void CellDbIn::store_block_state_permanent_bulk(std::vector> new_blocks[block_id] = std::move(block); } if (new_blocks.empty()) { - promise.set_value(td::Unit{}); + promise.set_value(std::map{}); return; } for (auto& [block_id, block] : new_blocks) { @@ -569,7 +573,9 @@ void CellDbIn::store_block_state_permanent_bulk(std::vector> vm::CellStorer stor{*cell_db_}; cell_db_->begin_write_batch().ensure(); + std::map state_root_hashes; for (auto& update : updates) { + state_root_hashes[update.block_id] = update.state_root_hash; for (auto& [k, v] : update.to_store) { cell_db_->set(k.as_slice(), v).ensure(); } @@ -606,7 +612,7 @@ void CellDbIn::store_block_state_permanent_bulk(std::vector> cell_db_statistics_.store_cell_bulk_queries_++; cell_db_statistics_.store_cell_bulk_total_blocks_ += updates.size(); } - promise.set_result(td::Unit()); + promise.set_result(std::move(state_root_hashes)); }); }); } @@ -1014,7 +1020,8 @@ void CellDb::store_block_state_permanent(td::Ref block, td::Promise> blocks, td::Promise promise) { +void CellDb::store_block_state_permanent_bulk(std::vector> blocks, + td::Promise> promise) { td::actor::send_closure(cell_db_, &CellDbIn::store_block_state_permanent_bulk, std::move(blocks), std::move(promise)); } diff --git a/validator/db/celldb.hpp b/validator/db/celldb.hpp index d83f68eec..cde443f38 100644 --- a/validator/db/celldb.hpp +++ b/validator/db/celldb.hpp @@ -68,7 +68,8 @@ class CellDbIn : public CellDbBase { void store_cell(BlockIdExt block_id, td::Ref cell, td::Promise> promise); void get_cell_db_reader(td::Promise> promise); void store_block_state_permanent(td::Ref block, td::Promise> promise); - void store_block_state_permanent_bulk(std::vector> blocks, td::Promise promise); + void store_block_state_permanent_bulk(std::vector> blocks, + td::Promise> promise); void migrate_cell(td::Bits256 hash); @@ -196,7 +197,8 @@ class CellDb : public CellDbBase { void load_cell(RootHash hash, td::Promise> promise); void store_cell(BlockIdExt block_id, td::Ref cell, td::Promise> promise); void store_block_state_permanent(td::Ref block, td::Promise> promise); - void store_block_state_permanent_bulk(std::vector> blocks, td::Promise promise); + void store_block_state_permanent_bulk(std::vector> blocks, + td::Promise> promise); void update_snapshot(std::unique_ptr snapshot) { CHECK(!opts_->get_celldb_in_memory()); if (!started_) { diff --git a/validator/db/rootdb.cpp b/validator/db/rootdb.cpp index accc111c4..db94f9315 100644 --- a/validator/db/rootdb.cpp +++ b/validator/db/rootdb.cpp @@ -289,9 +289,26 @@ void RootDb::store_block_state_from_data(BlockHandle handle, td::Ref td::actor::send_closure(cell_db_, &CellDb::store_block_state_permanent, std::move(block), std::move(P)); } -void RootDb::store_block_state_from_data_preliminary(std::vector> blocks, - td::Promise promise) { - td::actor::send_closure(cell_db_, &CellDb::store_block_state_permanent_bulk, std::move(blocks), std::move(promise)); +void RootDb::store_block_state_from_data_bulk(std::vector> blocks, td::Promise promise) { + td::actor::send_closure( + cell_db_, &CellDb::store_block_state_permanent_bulk, std::move(blocks), + [SelfId = actor_id(this), b = archive_db_.get(), + promise = std::move(promise)](td::Result> R) mutable { + TRY_RESULT_PROMISE(promise, state_root_hashes, std::move(R)); + td::MultiPromise mp; + auto ig = mp.init_guard(); + ig.add_promise(std::move(promise)); + for (auto &[block_id, state_root_hash] : state_root_hashes) { + td::actor::send_closure( + SelfId, &RootDb::get_block_handle_external, block_id, true, + [b, root_hash = state_root_hash, p = ig.get_promise()](td::Result R) mutable { + TRY_RESULT_PROMISE(p, handle, std::move(R)); + handle->set_state_root_hash(root_hash); + handle->set_state_boc(); + td::actor::send_closure(b, &ArchiveManager::update_handle, std::move(handle), std::move(p)); + }); + } + }); } void RootDb::get_block_state(ConstBlockHandle handle, td::Promise> promise) { @@ -569,6 +586,14 @@ void RootDb::set_async_mode(bool mode, td::Promise promise) { td::actor::send_closure(archive_db_, &ArchiveManager::set_async_mode, mode, std::move(promise)); } +void RootDb::add_handle_to_archive(BlockHandle handle, td::Promise promise) { + td::actor::send_closure(archive_db_, &ArchiveManager::add_handle, std::move(handle), std::move(promise)); +} + +void RootDb::set_archive_current_shard_split_depth(td::uint32 value) { + td::actor::send_closure(archive_db_, &ArchiveManager::set_current_shard_split_depth, value); +} + void RootDb::run_gc(UnixTime mc_ts, UnixTime gc_ts, double archive_ttl) { td::actor::send_closure(archive_db_, &ArchiveManager::run_gc, mc_ts, gc_ts, archive_ttl); } diff --git a/validator/db/rootdb.hpp b/validator/db/rootdb.hpp index 5a0007eb9..2fbf72fba 100644 --- a/validator/db/rootdb.hpp +++ b/validator/db/rootdb.hpp @@ -64,8 +64,7 @@ class RootDb : public Db { td::Promise> promise) override; void store_block_state_from_data(BlockHandle handle, td::Ref block, td::Promise> promise) override; - void store_block_state_from_data_preliminary(std::vector> blocks, - td::Promise promise) override; + void store_block_state_from_data_bulk(std::vector> blocks, td::Promise promise) override; void get_block_state(ConstBlockHandle handle, td::Promise> promise) override; void store_block_state_part(BlockId effective_block, td::Ref cell, td::Promise> promise) override; @@ -140,6 +139,8 @@ class RootDb : public Db { void get_archive_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit, td::Promise promise) override; void set_async_mode(bool mode, td::Promise promise) override; + void add_handle_to_archive(BlockHandle handle, td::Promise promise) override; + void set_archive_current_shard_split_depth(td::uint32 value) override; void run_gc(UnixTime mc_ts, UnixTime gc_ts, double archive_ttl) override; void add_persistent_state_description(td::Ref desc, diff --git a/validator/import-db-slice-local.cpp b/validator/import-db-slice-local.cpp index 6ed062096..0400a5ca5 100644 --- a/validator/import-db-slice-local.cpp +++ b/validator/import-db-slice-local.cpp @@ -14,16 +14,15 @@ You should have received a copy of the GNU Lesser General Public License along with TON Blockchain Library. If not, see . */ -#include - #include "block/block-auto.h" #include "common/checksum.h" #include "downloaders/download-state.hpp" #include "td/actor/MultiPromise.h" +#include "td/actor/coro_utils.h" #include "td/utils/overloaded.h" #include "td/utils/port/path.h" -#include "ton/ton-io.hpp" #include "validator/db/fileref.hpp" +#include "validator/db/package.hpp" #include "validator/fabric.h" #include "import-db-slice-local.hpp" @@ -34,7 +33,7 @@ namespace validator { ArchiveImporterLocal::ArchiveImporterLocal(std::string db_root, td::Ref state, BlockSeqno shard_client_seqno, td::Ref opts, - td::actor::ActorId manager, + td::actor::ActorId manager, td::actor::ActorId db, std::vector to_import_files, td::Promise> promise) : db_root_(std::move(db_root)) @@ -42,6 +41,7 @@ ArchiveImporterLocal::ArchiveImporterLocal(std::string db_root, td::Ref ArchiveImporterLocal::run() { + auto R = co_await run_inner().wrap(); + if (R.is_error()) { + LOG(ERROR) << "Archive import: " << R.error(); + if (!imported_any_) { + promise_.set_error(R.move_as_error()); + stop(); + co_return td::Unit{}; + } + } + LOG(WARNING) << "Imported archive in " << perf_timer_.elapsed() << "s : mc_seqno=" << final_masterchain_state_seqno_ + << " shard_seqno=" << final_shard_client_seqno_; + promise_.set_value({final_masterchain_state_seqno_, + std::min(final_masterchain_state_seqno_, final_shard_client_seqno_)}); + stop(); + co_return td::Unit{}; +} + +td::actor::Task ArchiveImporterLocal::run_inner() { LOG(WARNING) << "Importing archive for masterchain seqno #" << shard_client_seqno_ + 1 << " from disk"; + read_files(); + co_await process_masterchain_blocks(); + co_await process_shard_blocks(); + co_await store_data(); + co_await apply_blocks(); + co_return td::Unit{}; +} + +void ArchiveImporterLocal::read_files() { for (const std::string &path : to_import_files_) { LOG(INFO) << "Importing file from disk " << path; td::Status S = process_package(path); @@ -58,12 +89,9 @@ void ArchiveImporterLocal::start_up() { LOG(WARNING) << "Error processing package " << path << ": " << S; } } - - process_masterchain_blocks(); } td::Status ArchiveImporterLocal::process_package(std::string path) { - LOG(DEBUG) << "Processing package " << path; TRY_RESULT(p, Package::open(path, false, false)); auto package = std::make_shared(std::move(p)); @@ -71,8 +99,7 @@ td::Status ArchiveImporterLocal::process_package(std::string path) { package->iterate([&](std::string filename, td::BufferSlice data, td::uint64) -> bool { auto F = FileReference::create(filename); if (F.is_error()) { - S = F.move_as_error(); - return false; + return true; } auto f = F.move_as_ok(); @@ -98,7 +125,7 @@ td::Status ArchiveImporterLocal::process_package(std::string path) { }, [&](const auto &) { ignore = true; })); - if (ignore || block_id.is_masterchain() && block_id.seqno() <= last_masterchain_state_->get_seqno()) { + if (ignore || (block_id.is_masterchain() && block_id.seqno() <= last_masterchain_state_->get_seqno())) { return true; } @@ -138,225 +165,186 @@ td::Status ArchiveImporterLocal::process_package(std::string path) { return S; } -void ArchiveImporterLocal::process_masterchain_blocks() { +td::actor::Task ArchiveImporterLocal::process_masterchain_blocks() { + final_masterchain_state_seqno_ = last_masterchain_state_->get_seqno(); + final_shard_client_seqno_ = shard_client_seqno_; if (masterchain_blocks_.empty()) { LOG(INFO) << "No masterchain blocks in the archive"; - checked_masterchain_proofs(); - return; + co_return td::Unit{}; } if (masterchain_blocks_.begin()->first != last_masterchain_state_->get_seqno() + 1) { - abort_query(td::Status::Error(ErrorCode::notready, PSTRING() << "expected masterchain seqno " - << last_masterchain_state_->get_seqno() + 1 - << ", found " << masterchain_blocks_.begin()->first)); - return; + co_return td::Status::Error(ErrorCode::notready, PSTRING() << "expected masterchain seqno " + << last_masterchain_state_->get_seqno() + 1 << ", found " + << masterchain_blocks_.begin()->first); } { BlockSeqno expected_seqno = last_masterchain_state_->get_seqno() + 1; for (auto &[seqno, _] : masterchain_blocks_) { if (seqno != expected_seqno) { - abort_query(td::Status::Error(ErrorCode::protoviolation, "non-consecutive masterchain blocks in the archive")); - return; + co_return td::Status::Error(ErrorCode::protoviolation, "non-consecutive masterchain blocks in the archive"); } ++expected_seqno; } } BlockInfo &first_block = blocks_[masterchain_blocks_.begin()->second]; if (first_block.proof.is_null()) { - abort_query(td::Status::Error(ErrorCode::protoviolation, "no masterchain block proof")); - return; + co_return td::Status::Error(ErrorCode::protoviolation, "no masterchain block proof"); } if (first_block.block.is_null()) { - abort_query(td::Status::Error(ErrorCode::protoviolation, "no masterchain block data")); - return; + co_return td::Status::Error(ErrorCode::protoviolation, "no masterchain block data"); } block::gen::Block::Record rec; block::gen::BlockInfo::Record info; if (!(block::gen::unpack_cell(first_block.block->root_cell(), rec) && block::gen::unpack_cell(rec.info, info))) { - abort_query(td::Status::Error(ErrorCode::protoviolation, "cannot unpack masterchain block info")); - return; + co_return td::Status::Error(ErrorCode::protoviolation, "cannot unpack masterchain block info"); } if (info.key_block) { - import_first_key_block(); - return; + co_await import_first_key_block(); + if (masterchain_blocks_.empty()) { + LOG(INFO) << "No more masterchain blocks in the archive"; + co_return td::Unit{}; + } } - - process_masterchain_blocks_cont(); + co_await check_masterchain_proofs(); + co_return td::Unit{}; } -void ArchiveImporterLocal::import_first_key_block() { +td::actor::Task ArchiveImporterLocal::import_first_key_block() { BlockIdExt block_id = masterchain_blocks_.begin()->second; BlockInfo &first_block = blocks_[block_id]; LOG(INFO) << "First block in archive is key block : " << block_id.id.to_str(); - - auto P = - td::PromiseCreator::lambda([SelfId = actor_id(this), prev_block_id = last_masterchain_state_->get_block_id()]( - td::Result R) mutable { - if (R.is_error()) { - td::actor::send_closure(SelfId, &ArchiveImporterLocal::abort_query, R.move_as_error()); - return; - } - auto handle = R.move_as_ok(); - CHECK(!handle->merge_before()); - if (handle->one_prev(true) != prev_block_id) { - td::actor::send_closure(SelfId, &ArchiveImporterLocal::abort_query, - td::Status::Error(ErrorCode::protoviolation, "prev block mismatch")); - return; - } - td::actor::send_closure(SelfId, &ArchiveImporterLocal::checked_key_block_proof, std::move(handle)); - }); - - run_check_proof_query(block_id, first_block.proof, manager_, td::Timestamp::in(600.0), std::move(P), + auto [task, promise] = td::actor::StartedTask::make_bridge(); + run_check_proof_query(block_id, first_block.proof, manager_, td::Timestamp::in(600.0), std::move(promise), last_masterchain_state_, opts_->is_hardfork(block_id)); -} - -void ArchiveImporterLocal::checked_key_block_proof(BlockHandle handle) { - BlockIdExt block_id = masterchain_blocks_.begin()->second; + BlockHandle handle = co_await std::move(task); + CHECK(!handle->merge_before()); CHECK(block_id == handle->id()); - BlockInfo &first_block = blocks_[block_id]; - run_apply_block_query( - handle->id(), first_block.block, handle->id(), manager_, td::Timestamp::in(600.0), - [SelfId = actor_id(this), manager = manager_, handle](td::Result R) { - if (R.is_error()) { - td::actor::send_closure(SelfId, &ArchiveImporterLocal::abort_query, R.move_as_error()); - return; - } - td::actor::send_closure( - manager, &ValidatorManager::get_shard_state_from_db, handle, [=](td::Result> R2) { - if (R2.is_error()) { - td::actor::send_closure(SelfId, &ArchiveImporterLocal::abort_query, R2.move_as_error()); - return; - } - td::actor::send_closure(SelfId, &ArchiveImporterLocal::applied_key_block, - td::Ref{R2.move_as_ok()}); - }); - }); -} + if (handle->one_prev(true) != last_masterchain_state_->get_block_id()) { + co_return td::Status::Error(ErrorCode::protoviolation, "prev block mismatch"); + } + + auto [task2, promise2] = td::actor::StartedTask::make_bridge(); + run_apply_block_query(handle->id(), first_block.block, handle->id(), manager_, td::Timestamp::in(600.0), + std::move(promise2)); + co_await std::move(task2); + auto state = + td::Ref{co_await td::actor::ask(manager_, &ValidatorManager::get_shard_state_from_db, handle)}; -void ArchiveImporterLocal::applied_key_block(td::Ref state) { CHECK(state->get_block_id() == masterchain_blocks_.begin()->second); last_masterchain_state_ = state; + final_masterchain_state_seqno_ = state->get_seqno(); imported_any_ = true; masterchain_blocks_.erase(masterchain_blocks_.begin()); blocks_.erase(state->get_block_id()); - LOG(INFO) << "Imported key block " << state->get_block_id().id.to_str(); - if (masterchain_blocks_.empty()) { - LOG(INFO) << "No more masterchain blocks in the archive"; - checked_masterchain_proofs(); - return; - } - process_masterchain_blocks_cont(); + LOG(WARNING) << "Imported key block " << state->get_block_id().id.to_str(); + co_return td::Unit{}; } -void ArchiveImporterLocal::process_masterchain_blocks_cont() { - LOG(INFO) << "Importing masterchain blocks from " << masterchain_blocks_.begin()->first << " to " +td::actor::Task ArchiveImporterLocal::check_masterchain_proofs() { + LOG(INFO) << "Checking masterchain blocks from " << masterchain_blocks_.begin()->first << " to " << masterchain_blocks_.rbegin()->first; - td::MultiPromise mp; - auto ig = mp.init_guard(); + std::vector> tasks; - BlockIdExt prev_block_id = last_masterchain_state_->get_block_id(); for (auto &[_, block_id] : masterchain_blocks_) { auto &info = blocks_[block_id]; - info.import = true; if (info.proof.is_null()) { - abort_query(td::Status::Error(ErrorCode::protoviolation, "no masterchain block proof")); - return; + co_return td::Status::Error(ErrorCode::protoviolation, "no masterchain block proof"); } if (info.block.is_null()) { - abort_query(td::Status::Error(ErrorCode::protoviolation, "no masterchain block data")); - return; + co_return td::Status::Error(ErrorCode::protoviolation, "no masterchain block data"); } - auto P = td::PromiseCreator::lambda( - [SelfId = actor_id(this), prev_block_id, promise = ig.get_promise()](td::Result R) mutable { - TRY_RESULT_PROMISE(promise, handle, std::move(R)); - CHECK(!handle->merge_before()); - if (handle->one_prev(true) != prev_block_id) { - promise.set_error(td::Status::Error(ErrorCode::protoviolation, "prev block mismatch")); - return; - } - promise.set_result(td::Unit()); - }); - run_check_proof_query(block_id, info.proof, manager_, td::Timestamp::in(600.0), std::move(P), + auto [task, promise] = td::actor::StartedTask::make_bridge(); + run_check_proof_query(block_id, info.proof, manager_, td::Timestamp::in(600.0), std::move(promise), last_masterchain_state_, opts_->is_hardfork(block_id)); - prev_block_id = block_id; + tasks.push_back(std::move(task)); } - ig.add_promise([SelfId = actor_id(this)](td::Result R) { - if (R.is_error()) { - td::actor::send_closure(SelfId, &ArchiveImporterLocal::abort_query, R.move_as_error()); - } else { - LOG(INFO) << "Checked proofs for masterchain blocks"; - td::actor::send_closure(SelfId, &ArchiveImporterLocal::checked_masterchain_proofs); + + auto handles = co_await td::actor::all(std::move(tasks)); + BlockIdExt prev_block_id = last_masterchain_state_->get_block_id(); + size_t i = 0; + for (auto &[_, block_id] : masterchain_blocks_) { + CHECK(i < handles.size()); + CHECK(!handles[i]->merge_before()); + if (handles[i]->one_prev(true) != prev_block_id) { + co_return td::Status::Error(ErrorCode::protoviolation, "prev block mismatch"); } - }); + blocks_to_apply_mc_.emplace_back(block_id, block_id); + blocks_[block_id].import = true; + CHECK(handles[i]->inited_is_key_block()); + if (handles[i]->is_key_block()) { + co_return td::Status::Error(ErrorCode::protoviolation, + "package contains a key block, and it is not the first block"); + } + prev_block_id = block_id; + ++i; + } + LOG(INFO) << "Checked proofs for masterchain blocks"; + co_return td::Unit{}; } -void ArchiveImporterLocal::checked_masterchain_proofs() { +td::actor::Task ArchiveImporterLocal::process_shard_blocks() { + td::Ref state; if (shard_client_seqno_ == last_masterchain_state_->get_seqno()) { - got_shard_client_state(last_masterchain_state_); + state = last_masterchain_state_; } else { CHECK(shard_client_seqno_ < last_masterchain_state_->get_seqno()); BlockIdExt block_id; if (!last_masterchain_state_->get_old_mc_block_id(shard_client_seqno_, block_id)) { - abort_query(td::Status::Error("failed to get shard client block id")); - return; + co_return td::Status::Error("failed to get shard client block id"); } - td::actor::send_closure(manager_, &ValidatorManager::get_shard_state_from_db_short, block_id, - [SelfId = actor_id(this)](td::Result> R) { - if (R.is_error()) { - td::actor::send_closure(SelfId, &ArchiveImporterLocal::abort_query, - R.move_as_error_prefix("failed to get shard client state: ")); - return; - } - td::actor::send_closure(SelfId, &ArchiveImporterLocal::got_shard_client_state, - td::Ref{R.move_as_ok()}); - }); + state = td::Ref{ + co_await td::actor::ask(manager_, &ValidatorManager::get_shard_state_from_db_short, block_id)}; } -} - -void ArchiveImporterLocal::got_shard_client_state(td::Ref state) { CHECK(state->get_seqno() == shard_client_seqno_); LOG(DEBUG) << "got_shard_client_state " << shard_client_seqno_; shard_client_state_ = state; new_shard_client_seqno_ = shard_client_seqno_; - current_shard_client_seqno_ = shard_client_seqno_; for (auto &shard : state->get_shards()) { visited_shard_blocks_.insert(shard->top_block_id()); } - try_advance_shard_client_seqno(); + + while (co_await try_advance_shard_client_seqno()) { + LOG(DEBUG) << "advanced shard client seqno to " << new_shard_client_seqno_; + } + if (new_shard_client_seqno_ == shard_client_seqno_) { + LOG(INFO) << "No new shard blocks, shard client seqno = " << new_shard_client_seqno_; + } else { + LOG(INFO) << "New shard client seqno = " << new_shard_client_seqno_; + } + + for (const BlockIdExt &block_id : new_zerostates_) { + LOG(INFO) << "Downloading zerostate " << block_id.to_str(); + auto [task, promise] = td::actor::StartedTask>::make_bridge(); + td::actor::create_actor( + "downloadstate", block_id, shard_client_state_->get_block_id(), + shard_client_state_->persistent_state_split_depth(block_id.id.workchain), 2, manager_, td::Timestamp::in(3600), + std::move(promise)) + .release(); + co_await std::move(task); + } + co_return td::Unit{}; } -void ArchiveImporterLocal::try_advance_shard_client_seqno() { +td::actor::Task ArchiveImporterLocal::try_advance_shard_client_seqno() { BlockSeqno seqno = new_shard_client_seqno_ + 1; auto it = masterchain_blocks_.find(seqno); - if (it != masterchain_blocks_.end()) { - try_advance_shard_client_seqno_cont(blocks_[it->second].block); - return; + if (it == masterchain_blocks_.end() && seqno > last_masterchain_state_->get_seqno()) { + co_return false; } - if (seqno > last_masterchain_state_->get_seqno()) { - processed_shard_blocks(); - return; - } - BlockIdExt block_id; - if (!last_masterchain_state_->get_old_mc_block_id(seqno, block_id)) { - abort_query(td::Status::Error("failed to get old mc block id")); - return; + td::Ref mc_block; + if (it != masterchain_blocks_.end()) { + mc_block = blocks_[it->second].block; + } else { + BlockIdExt block_id; + if (!last_masterchain_state_->get_old_mc_block_id(seqno, block_id)) { + co_return td::Status::Error("failed to get old mc block id"); + } + mc_block = co_await td::actor::ask(manager_, &ValidatorManager::get_block_data_from_db_short, block_id); } - td::actor::send_closure(manager_, &ValidatorManager::get_block_data_from_db_short, block_id, - [SelfId = actor_id(this)](td::Result> R) { - if (R.is_error()) { - td::actor::send_closure(SelfId, &ArchiveImporterLocal::abort_query, - R.move_as_error_prefix("failed to get block data: ")); - return; - } - td::actor::send_closure(SelfId, &ArchiveImporterLocal::try_advance_shard_client_seqno_cont, - R.move_as_ok()); - }); -} -void ArchiveImporterLocal::try_advance_shard_client_seqno_cont(td::Ref mc_block) { - CHECK(mc_block.not_null()); - CHECK(mc_block->block_id().seqno() == new_shard_client_seqno_ + 1); + CHECK(mc_block.not_null() && mc_block->block_id().seqno() == new_shard_client_seqno_ + 1); LOG(DEBUG) << "try_advance_shard_client_seqno " << new_shard_client_seqno_ + 1; block::gen::Block::Record rec; @@ -380,7 +368,6 @@ void ArchiveImporterLocal::try_advance_shard_client_seqno_cont(td::Ref prev; BlockIdExt mc_blkid; @@ -389,12 +376,12 @@ void ArchiveImporterLocal::try_advance_shard_client_seqno_cont(td::Ref top_shard_blocks; - shard_config->process_shard_hashes([&](block::McShardHash &shard) { + shard_config->process_shard_hashes([&](const block::McShardHash &shard) { if (!opts_->need_monitor(shard.shard(), shard_client_state_)) { return 0; } @@ -407,231 +394,182 @@ void ArchiveImporterLocal::try_advance_shard_client_seqno_cont(td::Refblock_id().seqno()] = {mc_block->block_id(), std::move(top_shard_blocks)}; ++new_shard_client_seqno_; LOG(DEBUG) << "Advancing shard client seqno to " << new_shard_client_seqno_; for (const BlockIdExt &block_id : blocks_to_import) { + blocks_to_apply_shards_.emplace_back(block_id, mc_block->block_id()); blocks_[block_id].import = true; } - td::actor::send_closure(actor_id(this), &ArchiveImporterLocal::try_advance_shard_client_seqno); + co_return true; } -void ArchiveImporterLocal::processed_shard_blocks() { - if (new_shard_client_seqno_ == shard_client_seqno_) { - LOG(INFO) << "No new shard blocks"; - } else { - LOG(INFO) << "New shard client seqno = " << new_shard_client_seqno_; - } - - td::MultiPromise mp; - auto ig = mp.init_guard(); - for (const BlockIdExt &block_id : new_zerostates_) { - LOG(INFO) << "Downloading zerostate " << block_id.to_str(); - td::actor::create_actor( - "downloadstate", block_id, shard_client_state_->get_block_id(), - shard_client_state_->persistent_state_split_depth(block_id.id.workchain), 2, manager_, td::Timestamp::in(3600), - ig.get_promise().wrap([](td::Ref &&) { return td::Unit(); })) - .release(); - } - ig.add_promise([SelfId = actor_id(this)](td::Result R) { - if (R.is_error()) { - td::actor::send_closure(SelfId, &ArchiveImporterLocal::abort_query, R.move_as_error()); - } else { - td::actor::send_closure(SelfId, &ArchiveImporterLocal::store_data); - } - }); -} - -void ArchiveImporterLocal::store_data() { - td::MultiPromise mp; - auto ig = mp.init_guard(); - +td::actor::Task ArchiveImporterLocal::store_data() { + std::vector> tasks; if (opts_->get_permanent_celldb()) { std::vector> blocks; - for (auto &[_, info] : blocks_) { + for (auto &[block_id, info] : blocks_) { if (info.import) { blocks.push_back(info.block); } } - td::actor::send_closure(manager_, &ValidatorManager::set_block_state_from_data_preliminary, std::move(blocks), - ig.get_promise()); + tasks.push_back(td::actor::ask(manager_, &ValidatorManager::set_block_state_from_data_bulk, std::move(blocks))); } for (auto &[block_id, info] : blocks_) { - if (info.import) { - td::actor::send_closure( - manager_, &ValidatorManager::get_block_handle, block_id, true, - [promise = ig.get_promise(), block = info.block, manager = manager_](td::Result R) mutable { - TRY_RESULT_PROMISE(promise, handle, std::move(R)); - td::actor::send_closure(manager, &ValidatorManager::set_block_data, handle, std::move(block), - std::move(promise)); - }); - if (info.proof_link.not_null()) { - run_check_proof_link_query(block_id, info.proof_link, manager_, td::Timestamp::in(600.0), - ig.get_promise().wrap([](BlockHandle &&) { return td::Unit(); })); - } + if (!info.import) { + continue; } - } - - ig.add_promise([SelfId = actor_id(this)](td::Result R) { - if (R.is_error()) { - td::actor::send_closure(SelfId, &ArchiveImporterLocal::abort_query, R.move_as_error()); - } else { - td::actor::send_closure(SelfId, &ArchiveImporterLocal::apply_next_masterchain_block); + tasks.push_back(store_block_data(info.block).start()); + if (info.proof_link.not_null()) { + auto [task, promise] = td::actor::StartedTask::make_bridge(); + run_check_proof_link_query(block_id, info.proof_link, manager_, td::Timestamp::in(600.0), std::move(promise)); + tasks.push_back( + std::move(task).then([](BlockHandle &&) -> td::actor::Task { co_return td::Unit{}; }).start()); } - }); -} - -void ArchiveImporterLocal::apply_next_masterchain_block() { - auto it = masterchain_blocks_.find(last_masterchain_state_->get_seqno() + 1); - if (it == masterchain_blocks_.end()) { - LOG(INFO) << "Applied masterchain blocks, last seqno = " << last_masterchain_state_->get_seqno(); - apply_shard_blocks(); - return; } - BlockIdExt block_id = it->second; - LOG(DEBUG) << "Applying masterchain block " << block_id.to_str(); - BlockInfo &info = blocks_[block_id]; - run_apply_block_query(block_id, info.block, block_id, manager_, td::Timestamp::in(600.0), - [=, SelfId = actor_id(this), manager = manager_](td::Result R) { - if (R.is_error()) { - td::actor::send_closure(SelfId, &ArchiveImporterLocal::abort_query, R.move_as_error()); - return; - } - td::actor::send_closure( - manager, &ValidatorManager::get_shard_state_from_db_short, block_id, - [=](td::Result> R2) { - if (R2.is_error()) { - td::actor::send_closure(SelfId, &ArchiveImporterLocal::abort_query, - R2.move_as_error()); - return; - } - td::actor::send_closure(SelfId, &ArchiveImporterLocal::applied_next_masterchain_block, - td::Ref{R2.move_as_ok()}); - }); - }); + co_await td::actor::all(std::move(tasks)); + co_return td::Unit{}; } -void ArchiveImporterLocal::applied_next_masterchain_block(td::Ref state) { - last_masterchain_state_ = state; - imported_any_ = true; - LOG(DEBUG) << "Applied masterchain block " << state->get_block_id().to_str(); - apply_next_masterchain_block(); +td::actor::Task ArchiveImporterLocal::store_block_data(td::Ref block) { + BlockHandle handle = co_await td::actor::ask(manager_, &ValidatorManager::get_block_handle, block->block_id(), true); + co_await td::actor::ask(manager_, &ValidatorManager::set_block_data, handle, block); + co_return td::Unit{}; } -void ArchiveImporterLocal::apply_shard_blocks() { - if (current_shard_client_seqno_ == new_shard_client_seqno_) { - finish_query(); - return; +td::actor::Task ArchiveImporterLocal::apply_blocks() { + LOG(WARNING) << "Applying " << blocks_to_apply_mc_.size() << " mc blocks, " << blocks_to_apply_shards_.size() + << " shard blocks"; + if (opts_->get_permanent_celldb()) { + co_await apply_blocks_async(blocks_to_apply_mc_); + final_masterchain_state_seqno_ = + blocks_to_apply_mc_.empty() ? last_masterchain_state_->get_seqno() : blocks_to_apply_mc_.back().first.seqno(); + if (!blocks_to_apply_mc_.empty()) { + imported_any_ = true; + } + co_await apply_blocks_async(blocks_to_apply_shards_); + if (!blocks_to_apply_shards_.empty()) { + imported_any_ = true; + } + } else { + for (const auto &[block_id, _] : blocks_to_apply_mc_) { + auto it = blocks_.find(block_id); + CHECK(it != blocks_.end()); + td::Ref block = it->second.block; + CHECK(block.not_null()); + auto [task, promise] = td::actor::StartedTask::make_bridge(); + run_apply_block_query(block_id, block, block_id, manager_, td::Timestamp::in(600.0), std::move(promise)); + co_await std::move(task); + imported_any_ = true; + final_masterchain_state_seqno_ = block_id.seqno(); + } + for (const auto &[block_id, mc_block_id] : blocks_to_apply_shards_) { + auto it = blocks_.find(block_id); + CHECK(it != blocks_.end()); + td::Ref block = it->second.block; + CHECK(block.not_null()); + auto [task, promise] = td::actor::StartedTask::make_bridge(); + run_apply_block_query(block_id, block, mc_block_id, manager_, td::Timestamp::in(600.0), std::move(promise)); + co_await std::move(task); + imported_any_ = true; + } } - auto it = shard_configs_.find(current_shard_client_seqno_ + 1); - if (it == shard_configs_.end()) { - abort_query(td::Status::Error("no shard config for the next shard client seqno")); - return; + final_shard_client_seqno_ = new_shard_client_seqno_; + co_return td::Unit{}; +} + +td::actor::Task ArchiveImporterLocal::apply_blocks_async( + const std::vector> &blocks) { + std::vector> tasks_1; + for (const auto &[block_id, mc_block_id] : blocks) { + tasks_1.push_back(apply_block_async_1(block_id, mc_block_id).start()); } + auto handles = co_await td::actor::all(std::move(tasks_1)); - td::MultiPromise mp; - auto ig = mp.init_guard(); - BlockIdExt mc_block_id = it->second.first; - LOG(DEBUG) << "Applying top shard blocks from " << current_shard_client_seqno_ + 1; - for (const BlockIdExt &block_id : it->second.second) { - apply_shard_block(block_id, mc_block_id, ig.get_promise()); + std::vector> tasks_2; + for (const auto &handle : handles) { + tasks_2.push_back(apply_block_async_2(handle).start()); } + co_await td::actor::all(std::move(tasks_2)); - ig.add_promise([SelfId = actor_id(this)](td::Result R) { - if (R.is_error()) { - td::actor::send_closure(SelfId, &ArchiveImporterLocal::abort_query, R.move_as_error()); - return; - } - td::actor::send_closure(SelfId, &ArchiveImporterLocal::applied_shard_blocks); - }); -} + std::vector> tasks_3; + for (const auto &handle : handles) { + tasks_3.push_back(apply_block_async_3(handle).start()); + } + co_await td::actor::all(std::move(tasks_3)); -void ArchiveImporterLocal::applied_shard_blocks() { - LOG(DEBUG) << "Applied top shard blocks from " << current_shard_client_seqno_ + 1; - ++current_shard_client_seqno_; - imported_any_ = true; - apply_shard_blocks(); -} + std::vector> tasks_4; + for (const auto &handle : handles) { + tasks_4.push_back(apply_block_async_4(handle).start()); + } + co_await td::actor::all(std::move(tasks_4)); -void ArchiveImporterLocal::apply_shard_block(BlockIdExt block_id, BlockIdExt mc_block_id, - td::Promise promise) { - td::actor::send_closure( - manager_, &ValidatorManager::get_block_handle, block_id, true, - [=, SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable { - R.ensure(); - td::actor::send_closure(SelfId, &ArchiveImporterLocal::apply_shard_block_cont1, R.move_as_ok(), mc_block_id, - std::move(promise)); - }); + co_return td::Unit{}; } -void ArchiveImporterLocal::apply_shard_block_cont1(BlockHandle handle, BlockIdExt mc_block_id, - td::Promise promise) { - if (handle->is_applied()) { - promise.set_value(td::Unit()); - return; +td::actor::Task ArchiveImporterLocal::apply_block_async_1(BlockIdExt block_id, BlockIdExt mc_block_id) { + // apply_block_async_1-4 are same as ApplyBlock, but in parallel and without setting "apply" flag + CHECK(block_id.seqno() != 0); + auto it = blocks_.find(block_id); + CHECK(it != blocks_.end()); + td::Ref block = it->second.block; + CHECK(block.not_null()); + LOG(DEBUG) << "Applying block " << block_id.to_str() << ", mc_block_seqno=" << mc_block_id.to_str(); + BlockHandle handle = co_await td::actor::ask(manager_, &ValidatorManager::get_block_handle, block_id, true); + + CHECK(!handle->id().is_masterchain() || handle->inited_proof()); + CHECK(handle->id().is_masterchain() || handle->inited_proof_link()); + CHECK(handle->inited_merge_before()); + CHECK(handle->inited_split_after()); + CHECK(handle->inited_prev()); + CHECK(handle->inited_state_root_hash()); + CHECK(handle->inited_logical_time()); + + td::Ref state = + co_await td::actor::ask(manager_, &ValidatorManager::wait_block_state, handle, 0, td::Timestamp::in(600.0), true); + CHECK(handle->received_state()); + if (!handle->is_applied()) { + co_await td::actor::ask(manager_, &ValidatorManager::set_next_block, handle->one_prev(true), block_id); + if (handle->merge_before()) { + co_await td::actor::ask(manager_, &ValidatorManager::set_next_block, handle->one_prev(false), block_id); + } } - promise = [=, SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable { - TRY_STATUS_PROMISE(promise, R.move_as_status()); - td::actor::send_closure(SelfId, &ArchiveImporterLocal::apply_shard_block_cont2, handle, mc_block_id, - std::move(promise)); - }; - - if (!handle->merge_before() && handle->one_prev(true).shard_full() == handle->id().shard_full()) { - apply_shard_block(handle->one_prev(true), mc_block_id, std::move(promise)); + if (!block_id.is_masterchain()) { + handle->set_masterchain_ref_block(mc_block_id.seqno()); } else { - td::MultiPromise mp; - auto ig = mp.init_guard(); - ig.add_promise(std::move(promise)); - check_shard_block_applied(handle->one_prev(true), ig.get_promise()); - if (handle->merge_before()) { - check_shard_block_applied(handle->one_prev(false), ig.get_promise()); - } + td::Ref mc_state{state}; + td::uint32 monitor_min_split = mc_state->monitor_min_split_depth(basechainId); + td::actor::send_closure(db_, &Db::set_archive_current_shard_split_depth, monitor_min_split); } -} -void ArchiveImporterLocal::apply_shard_block_cont2(BlockHandle handle, BlockIdExt mc_block_id, - td::Promise promise) { - td::Ref block = blocks_[handle->id()].block; - CHECK(block.not_null()); - LOG(DEBUG) << "Applying shard block " << handle->id().to_str(); - run_apply_block_query(handle->id(), std::move(block), mc_block_id, manager_, td::Timestamp::in(600.0), - std::move(promise)); + co_return handle; } -void ArchiveImporterLocal::check_shard_block_applied(BlockIdExt block_id, td::Promise promise) { - td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, block_id, false, - [SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable { - TRY_RESULT_PROMISE(promise, handle, std::move(R)); - if (!handle->is_applied()) { - promise.set_error(td::Status::Error(ErrorCode::notready, "not applied")); - } else { - promise.set_value(td::Unit()); - } - }); +td::actor::Task ArchiveImporterLocal::apply_block_async_2(BlockHandle handle) { + // Running add_handle in order is required to properly update ltdb in ArchiveSlice + co_await td::actor::ask(db_, &Db::add_handle_to_archive, handle); + co_return td::Unit{}; } -void ArchiveImporterLocal::abort_query(td::Status error) { - if (!imported_any_) { - LOG(ERROR) << "Archive import: " << error; - promise_.set_error(std::move(error)); - stop(); - } else { - LOG(WARNING) << "Archive import: " << error; - finish_query(); - } +td::actor::Task ArchiveImporterLocal::apply_block_async_3(BlockHandle handle) { + td::Ref state = co_await td::actor::ask(manager_, &ValidatorManager::get_shard_state_from_db, handle); + co_await td::actor::ask(manager_, &ValidatorManager::new_block, handle, state); + co_return td::Unit{}; } -void ArchiveImporterLocal::finish_query() { - LOG(WARNING) << "Imported archive in " << perf_timer_.elapsed() - << "s : mc_seqno=" << last_masterchain_state_->get_seqno() - << " shard_seqno=" << current_shard_client_seqno_; - promise_.set_value({last_masterchain_state_->get_seqno(), - std::min(last_masterchain_state_->get_seqno(), current_shard_client_seqno_)}); - stop(); +td::actor::Task ArchiveImporterLocal::apply_block_async_4(BlockHandle handle) { + handle->set_applied(); + CHECK(handle->handle_moved_to_archive()); + CHECK(handle->moved_to_archive()); + if (handle->need_flush()) { + auto [task, promise] = td::actor::StartedTask::make_bridge(); + handle->flush(manager_, handle, std::move(promise)); + co_await std::move(task); + } + co_return td::Unit{}; } } // namespace validator diff --git a/validator/import-db-slice-local.hpp b/validator/import-db-slice-local.hpp index 335e2a609..a4e97f90b 100644 --- a/validator/import-db-slice-local.hpp +++ b/validator/import-db-slice-local.hpp @@ -17,8 +17,7 @@ #pragma once #include "td/actor/actor.h" -#include "td/utils/port/path.h" -#include "validator/db/package.hpp" +#include "td/actor/coro_task.h" #include "validator/interfaces/validator-manager.h" namespace ton { @@ -29,49 +28,45 @@ class ArchiveImporterLocal : public td::actor::Actor { public: ArchiveImporterLocal(std::string db_root, td::Ref state, BlockSeqno shard_client_seqno, td::Ref opts, td::actor::ActorId manager, - std::vector to_import_files, + td::actor::ActorId db, std::vector to_import_files, td::Promise> promise); void start_up() override; - void abort_query(td::Status error); - void finish_query(); + td::actor::Task run(); + td::actor::Task run_inner(); + void read_files(); td::Status process_package(std::string path); - void process_masterchain_blocks(); - void process_masterchain_blocks_cont(); + td::actor::Task process_masterchain_blocks(); + td::actor::Task import_first_key_block(); + td::actor::Task check_masterchain_proofs(); - void import_first_key_block(); - void checked_key_block_proof(BlockHandle handle); - void applied_key_block(td::Ref state); + td::actor::Task process_shard_blocks(); + td::actor::Task try_advance_shard_client_seqno(); - void checked_masterchain_proofs(); - void got_shard_client_state(td::Ref state); + td::actor::Task store_data(); + td::actor::Task store_block_data(td::Ref block); - void try_advance_shard_client_seqno(); - void try_advance_shard_client_seqno_cont(td::Ref mc_block); + td::actor::Task apply_blocks(); + td::actor::Task apply_blocks_async(const std::vector>& blocks); - void processed_shard_blocks(); - void store_data(); - void apply_next_masterchain_block(); - void applied_next_masterchain_block(td::Ref state); - - void apply_shard_blocks(); - void applied_shard_blocks(); - - void apply_shard_block(BlockIdExt block_id, BlockIdExt mc_block_id, td::Promise promise); - void apply_shard_block_cont1(BlockHandle handle, BlockIdExt mc_block_id, td::Promise promise); - void apply_shard_block_cont2(BlockHandle handle, BlockIdExt mc_block_id, td::Promise promise); - void check_shard_block_applied(BlockIdExt block_id, td::Promise promise); + td::actor::Task apply_block_async_1(BlockIdExt block_id, BlockIdExt mc_block_id); + td::actor::Task apply_block_async_2(BlockHandle handle); + td::actor::Task apply_block_async_3(BlockHandle handle); + td::actor::Task apply_block_async_4(BlockHandle handle); private: std::string db_root_; td::Ref last_masterchain_state_; BlockSeqno shard_client_seqno_; + BlockSeqno final_masterchain_state_seqno_ = 0; + BlockSeqno final_shard_client_seqno_ = 0; td::Ref opts_; td::actor::ActorId manager_; + td::actor::ActorId db_; std::vector to_import_files_; td::Promise> promise_; @@ -87,9 +82,10 @@ class ArchiveImporterLocal : public td::actor::Actor { td::Ref shard_client_state_; BlockSeqno new_shard_client_seqno_; - BlockSeqno current_shard_client_seqno_; std::set visited_shard_blocks_; std::set new_zerostates_; + std::vector> blocks_to_apply_mc_; + std::vector> blocks_to_apply_shards_; std::map>> shard_configs_; diff --git a/validator/interfaces/db.h b/validator/interfaces/db.h index cd3de75de..880b36781 100644 --- a/validator/interfaces/db.h +++ b/validator/interfaces/db.h @@ -53,8 +53,8 @@ class Db : public td::actor::Actor { td::Promise> promise) = 0; virtual void store_block_state_from_data(BlockHandle handle, td::Ref block, td::Promise> promise) = 0; - virtual void store_block_state_from_data_preliminary(std::vector> blocks, - td::Promise promise) = 0; + virtual void store_block_state_from_data_bulk(std::vector> blocks, + td::Promise promise) = 0; virtual void get_block_state(ConstBlockHandle handle, td::Promise> promise) = 0; virtual void store_block_state_part(BlockId effective_block, td::Ref cell, td::Promise> promise) = 0; @@ -131,6 +131,8 @@ class Db : public td::actor::Actor { virtual void get_archive_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit, td::Promise promise) = 0; virtual void set_async_mode(bool mode, td::Promise promise) = 0; + virtual void add_handle_to_archive(BlockHandle handle, td::Promise promise) = 0; + virtual void set_archive_current_shard_split_depth(td::uint32 value) = 0; virtual void run_gc(UnixTime mc_ts, UnixTime gc_ts, double archive_ttl) = 0; diff --git a/validator/interfaces/validator-manager.h b/validator/interfaces/validator-manager.h index d6257f7c8..7f9b7b4c6 100644 --- a/validator/interfaces/validator-manager.h +++ b/validator/interfaces/validator-manager.h @@ -247,8 +247,8 @@ class ValidatorManager : public ValidatorManagerInterface { td::Promise> promise) = 0; virtual void set_block_state_from_data(BlockHandle handle, td::Ref block, td::Promise> promise) = 0; - virtual void set_block_state_from_data_preliminary(std::vector> blocks, - td::Promise promise) = 0; + virtual void set_block_state_from_data_bulk(std::vector> blocks, + td::Promise promise) = 0; virtual void get_cell_db_reader(td::Promise> promise) = 0; virtual void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, PersistentStateType type, td::BufferSlice state, diff --git a/validator/manager-disk.cpp b/validator/manager-disk.cpp index 0bba54df6..ebd2b9512 100644 --- a/validator/manager-disk.cpp +++ b/validator/manager-disk.cpp @@ -708,9 +708,9 @@ void ValidatorManagerImpl::set_block_state_from_data(BlockHandle handle, td::Ref td::actor::send_closure(db_, &Db::store_block_state_from_data, handle, block, std::move(promise)); } -void ValidatorManagerImpl::set_block_state_from_data_preliminary(std::vector> blocks, - td::Promise promise) { - td::actor::send_closure(db_, &Db::store_block_state_from_data_preliminary, std::move(blocks), std::move(promise)); +void ValidatorManagerImpl::set_block_state_from_data_bulk(std::vector> blocks, + td::Promise promise) { + td::actor::send_closure(db_, &Db::store_block_state_from_data_bulk, std::move(blocks), std::move(promise)); } void ValidatorManagerImpl::get_cell_db_reader(td::Promise> promise) { diff --git a/validator/manager-disk.hpp b/validator/manager-disk.hpp index 0bef96ee9..1ecc7f1b5 100644 --- a/validator/manager-disk.hpp +++ b/validator/manager-disk.hpp @@ -155,8 +155,7 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise> promise) override; void set_block_state_from_data(BlockHandle handle, td::Ref block, td::Promise> promise) override; - void set_block_state_from_data_preliminary(std::vector> blocks, - td::Promise promise) override; + void set_block_state_from_data_bulk(std::vector> blocks, td::Promise promise) override; void get_cell_db_reader(td::Promise> promise) override; void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, PersistentStateType type, td::BufferSlice state, td::Promise promise) override; diff --git a/validator/manager-hardfork.hpp b/validator/manager-hardfork.hpp index 77cef2a1d..3885286d1 100644 --- a/validator/manager-hardfork.hpp +++ b/validator/manager-hardfork.hpp @@ -186,8 +186,7 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise> promise) override { UNREACHABLE(); } - void set_block_state_from_data_preliminary(std::vector> blocks, - td::Promise promise) override { + void set_block_state_from_data_bulk(std::vector> blocks, td::Promise promise) override { UNREACHABLE(); } void get_cell_db_reader(td::Promise> promise) override; diff --git a/validator/manager.cpp b/validator/manager.cpp index 101c871c7..6750a5b62 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -1420,9 +1420,9 @@ void ValidatorManagerImpl::set_block_state_from_data(BlockHandle handle, td::Ref td::actor::send_closure(db_, &Db::store_block_state_from_data, handle, block, std::move(promise)); } -void ValidatorManagerImpl::set_block_state_from_data_preliminary(std::vector> blocks, - td::Promise promise) { - td::actor::send_closure(db_, &Db::store_block_state_from_data_preliminary, std::move(blocks), std::move(promise)); +void ValidatorManagerImpl::set_block_state_from_data_bulk(std::vector> blocks, + td::Promise promise) { + td::actor::send_closure(db_, &Db::store_block_state_from_data_bulk, std::move(blocks), std::move(promise)); } void ValidatorManagerImpl::get_cell_db_reader(td::Promise> promise) { @@ -2167,7 +2167,7 @@ void ValidatorManagerImpl::prestart_sync() { R.ensure(); td::actor::send_closure(SelfId, &ValidatorManagerImpl::download_next_archive); }); - td::actor::send_closure(db_, &Db::set_async_mode, false, std::move(P)); + td::actor::send_closure(db_, &Db::set_async_mode, true, std::move(P)); } void ValidatorManagerImpl::download_next_archive() { @@ -2194,12 +2194,13 @@ void ValidatorManagerImpl::download_next_archive() { } }); if (to_import_files.empty()) { - td::actor::create_actor("archiveimport", db_root_, last_masterchain_state_, seqno, opts_, - actor_id(this), std::move(to_import_files), std::move(P)) + td::actor::create_actor(PSTRING() << "archiveimport." << seqno, db_root_, last_masterchain_state_, + seqno, opts_, actor_id(this), std::move(to_import_files), std::move(P)) .release(); } else { - td::actor::create_actor("archiveimport", db_root_, last_masterchain_state_, seqno, opts_, - actor_id(this), std::move(to_import_files), std::move(P)) + td::actor::create_actor(PSTRING() << "archiveimport." << seqno, db_root_, + last_masterchain_state_, seqno, opts_, actor_id(this), db_.get(), + std::move(to_import_files), std::move(P)) .release(); } } diff --git a/validator/manager.hpp b/validator/manager.hpp index f5a166871..b788bd538 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -449,8 +449,7 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise> promise) override; void set_block_state_from_data(BlockHandle handle, td::Ref block, td::Promise> promise) override; - void set_block_state_from_data_preliminary(std::vector> blocks, - td::Promise promise) override; + void set_block_state_from_data_bulk(std::vector> blocks, td::Promise promise) override; void get_cell_db_reader(td::Promise> promise) override; void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, PersistentStateType type, td::BufferSlice state, td::Promise promise) override;