Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crypto/common/refcnt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,14 @@ class Ref {
Ref& operator=(Ref<S>&& r);
const typename RefValue<T>::Type* operator->() const {
if (!ptr) {
CHECK(ptr && "deferencing null Ref");
LOG_CHECK(ptr) << "dereferencing null Ref<" << typeid(T).name() << ">";
throw NullRef{};
}
return RefValue<T>::make_const_ptr(ptr);
}
const typename RefValue<T>::Type& operator*() const {
if (!ptr) {
CHECK(ptr && "deferencing null Ref");
LOG_CHECK(ptr) << "dereferencing null Ref<" << typeid(T).name() << ">";
throw NullRef{};
}
return RefValue<T>::make_const_ref(ptr);
Expand Down
2 changes: 1 addition & 1 deletion validator-engine/validator-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1596,7 +1596,7 @@ td::Status ValidatorEngine::load_global_config() {
}
validator_options_.write().set_celldb_compress_depth(celldb_compress_depth_);
validator_options_.write().set_celldb_in_memory(celldb_in_memory_);
validator_options_.write().set_celldb_v2(!celldb_in_memory_);
validator_options_.write().set_celldb_v2(!celldb_in_memory_ && !permanent_celldb_);
validator_options_.write().set_celldb_disable_bloom_filter(celldb_disable_bloom_filter_);
validator_options_.write().set_max_open_archive_files(max_open_archive_files_);
validator_options_.write().set_archive_preload_period(archive_preload_period_);
Expand Down
33 changes: 13 additions & 20 deletions validator/db/archive-manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -662,13 +662,23 @@ void ArchiveManager::load_package(PackageId id) {
}
}

desc.file = td::actor::create_actor<ArchiveSlice>("slice", id.id, id.key, id.temp, false, 0, db_root_,
archive_lru_.get(), statistics_);
desc.file = create_archive_slice(id, 0);

m.emplace(id, std::move(desc));
update_permanent_slices();
}

td::actor::ActorOwn<ArchiveSlice> ArchiveManager::create_archive_slice(const PackageId &id,
td::uint32 shard_split_depth) {
auto actor = td::actor::create_actor<ArchiveSlice>(
PSTRING() << "slice." << (id.temp ? "temp." : (id.key ? "key." : "")) << id.id, id.id, id.key, id.temp, false,
shard_split_depth, db_root_, archive_lru_.get(), statistics_);
if (async_mode_) {
td::actor::send_closure(actor, &ArchiveSlice::set_async_mode, true, [](td::Result<td::Unit>) {});
}
return actor;
}

td::Result<const ArchiveManager::FileDescription *> ArchiveManager::get_file_desc(ShardIdFull shard, PackageId id,
BlockSeqno seqno, UnixTime ts,
LogicalTime lt, bool force) {
Expand Down Expand Up @@ -698,9 +708,7 @@ const ArchiveManager::FileDescription *ArchiveManager::add_file_desc(ShardIdFull
FileDescription new_desc{id, false};
td::mkdir(db_root_ + id.path()).ensure();
std::string prefix = PSTRING() << db_root_ << id.path() << id.name();
new_desc.file = td::actor::create_actor<ArchiveSlice>("slice", id.id, id.key, id.temp, false,
id.key || id.temp ? 0 : cur_shard_split_depth_, db_root_,
archive_lru_.get(), statistics_);
new_desc.file = create_archive_slice(id, id.key || id.temp ? 0 : cur_shard_split_depth_);
const FileDescription &desc = f.emplace(id, std::move(new_desc));
if (!id.temp) {
update_desc(f, desc, shard, seqno, ts, lt);
Expand Down Expand Up @@ -1205,23 +1213,8 @@ void ArchiveManager::get_archive_slice(td::uint64 archive_id, td::uint64 offset,
td::actor::send_closure(f->file_actor_id(), &ArchiveSlice::get_slice, archive_id, offset, limit, std::move(promise));
}

void ArchiveManager::commit_transaction() {
if (!async_mode_ || huge_transaction_size_++ >= 100) {
index_->commit_transaction().ensure();
if (async_mode_) {
huge_transaction_size_ = 0;
huge_transaction_started_ = false;
}
}
}

void ArchiveManager::set_async_mode(bool mode, td::Promise<td::Unit> promise) {
async_mode_ = mode;
if (!async_mode_ && huge_transaction_started_) {
index_->commit_transaction().ensure();
huge_transaction_size_ = 0;
huge_transaction_started_ = false;
}

td::MultiPromise mp;
auto ig = mp.init_guard();
Expand Down
4 changes: 1 addition & 3 deletions validator/db/archive-manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ class ArchiveManager : public td::actor::Actor {
void start_up() override;
void alarm() override;

void commit_transaction();
void set_async_mode(bool mode, td::Promise<td::Unit> promise);

void set_current_shard_split_depth(td::uint32 value) {
Expand Down Expand Up @@ -184,8 +183,6 @@ class ArchiveManager : public td::actor::Actor {
td::actor::ActorOwn<ArchiveLru> archive_lru_;
BlockSeqno finalized_up_to_{0};
bool async_mode_ = false;
bool huge_transaction_started_ = false;
td::uint32 huge_transaction_size_ = 0;
td::uint32 cur_shard_split_depth_ = 0;

DbStatistics statistics_;
Expand All @@ -201,6 +198,7 @@ class ArchiveManager : public td::actor::Actor {
std::map<std::pair<BlockSeqno, FileHash>, PermState> perm_states_; // Mc block seqno, hash -> state

void load_package(PackageId seqno);
td::actor::ActorOwn<ArchiveSlice> create_archive_slice(const PackageId &id, td::uint32 shard_split_depth);
void delete_package(PackageId seqno, td::Promise<td::Unit> promise);
void deleted_package(PackageId seqno, td::Promise<td::Unit> promise);
void get_handle_cont(BlockIdExt block_id, PackageId id, td::Promise<BlockHandle> promise);
Expand Down
144 changes: 105 additions & 39 deletions validator/db/archive-slice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

Copyright 2019-2020 Telegram Systems LLP
*/
#include <csignal>

#include "common/delay.h"
#include "td/actor/MultiPromise.h"
#include "td/db/RocksDb.h"
#include "td/utils/PathView.h"
#include "td/utils/Random.h"
#include "td/utils/port/path.h"
#include "validator/fabric.h"

Expand Down Expand Up @@ -106,6 +110,10 @@ std::string DbStatistics::to_string_and_reset() {
return ss.str();
}

void PackageWriter::tear_down() {
sync_now();
}

void PackageWriter::append(std::string filename, td::BufferSlice data,
td::Promise<std::pair<td::uint64, td::uint64>> promise) {
td::uint64 offset, size;
Expand All @@ -118,14 +126,45 @@ void PackageWriter::append(std::string filename, td::BufferSlice data,
return;
}
start = td::Timestamp::now();
offset = p->append(std::move(filename), std::move(data), !async_mode_);
offset = p->append(std::move(filename), std::move(data), false);
end = td::Timestamp::now();
size = p->size();
}
if (statistics_) {
statistics_->record_write((end.at() - start.at()) * 1e6, data_size);
}
promise.set_value(std::pair<td::uint64, td::uint64>{offset, size});
sync([=, promise = std::move(promise)](td::Result<td::Unit>) mutable {
promise.set_value(std::pair<td::uint64, td::uint64>{offset, size});
});
}

void PackageWriter::sync(td::Promise<td::Unit> promise) {
if (!async_mode_) {
sync_now();
promise.set_value(td::Unit{});
return;
}
sync_waiters_.push_back(std::move(promise));
if (!sync_pending_) {
sync_pending_ = true;
td::actor::send_closure_later(actor_id(this), &PackageWriter::sync_now);
}
}

void PackageWriter::sync_now() {
auto p = package_.lock();
if (p) {
p->sync();
}
sync_pending_ = false;
auto waiters = std::move(sync_waiters_);
sync_waiters_.clear();
if (!waiters.empty()) {
LOG(DEBUG) << "Writer huge transaction size = " << waiters.size();
}
for (auto &promise : waiters) {
promise.set_value(td::Unit{});
}
}

class PackageReader : public td::actor::Actor {
Expand Down Expand Up @@ -171,6 +210,10 @@ static std::string package_info_to_str(BlockSeqno seqno, ShardIdFull shard_prefi
return PSTRING() << seqno << "." << shard_prefix.workchain << ":" << shard_to_str(shard_prefix.shard);
}

void ArchiveSlice::tear_down() {
commit_transaction_now();
}

void ArchiveSlice::add_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
if (destroyed_) {
promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd"));
Expand Down Expand Up @@ -242,16 +285,17 @@ void ArchiveSlice::add_handle(BlockHandle handle, td::Promise<td::Unit> promise)
kv_->set(shard_key.as_slice(), shard_value.as_slice()).ensure();
}
kv_->set(get_db_key_block_info(handle->id()), handle->serialize().as_slice()).ensure();
commit_transaction();

handle->flushed_upto(version);
handle->set_handle_moved_to_archive();
commit_transaction([=, this, promise = std::move(promise)](td::Result<td::Unit> R) mutable {
R.ensure();
handle->flushed_upto(version);
handle->set_handle_moved_to_archive();

if (handle->need_flush()) {
update_handle(std::move(handle), std::move(promise));
} else {
promise.set_value(td::Unit());
}
if (handle->need_flush()) {
update_handle(std::move(handle), std::move(promise));
} else {
promise.set_value(td::Unit());
}
});
}

void ArchiveSlice::update_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
Expand All @@ -272,12 +316,13 @@ void ArchiveSlice::update_handle(BlockHandle handle, td::Promise<td::Unit> promi
kv_->set(get_db_key_block_info(handle->id()), handle->serialize().as_slice()).ensure();
handle->flushed_upto(version);
} while (handle->need_flush());
commit_transaction();
if (!temp_) {
handle->set_handle_moved_to_archive();
}

promise.set_value(td::Unit());
commit_transaction([=, this, promise = std::move(promise)](td::Result<td::Unit> R) mutable {
R.ensure();
if (!temp_) {
handle->set_handle_moved_to_archive();
}
promise.set_value(td::Unit());
});
}

void ArchiveSlice::add_file(BlockHandle handle, FileReference ref_id, td::BufferSlice data,
Expand Down Expand Up @@ -329,8 +374,7 @@ void ArchiveSlice::add_file_cont(size_t idx, FileReference ref_id, td::uint64 of
kv_->set("status", td::to_string(size)).ensure();
kv_->set(ref_id.hash().to_hex(), td::to_string(offset)).ensure();
}
commit_transaction();
promise.set_value(td::Unit());
commit_transaction(std::move(promise));
}

void ArchiveSlice::get_handle(BlockIdExt block_id, td::Promise<BlockHandle> promise) {
Expand Down Expand Up @@ -408,7 +452,8 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P
promise.set_value(std::move(R.move_as_ok().second));
}
});
td::actor::create_actor<PackageReader>("reader", p->package, offset, std::move(P), statistics_.pack_statistics)
td::actor::create_actor<PackageReader>(PSTRING() << "reader." << td::PathView(p->path).file_name(), p->package,
offset, std::move(P), statistics_.pack_statistics)
.release();
}

Expand Down Expand Up @@ -715,6 +760,7 @@ void ArchiveSlice::do_close() {
if (destroyed_) {
return;
}
commit_transaction_now();
CHECK(status_ != st_closed && active_queries_ == 0);
LOG(DEBUG) << "Closing archive slice " << db_path_;
status_ = st_closed;
Expand Down Expand Up @@ -744,30 +790,47 @@ void ArchiveSlice::end_async_query() {
}

void ArchiveSlice::begin_transaction() {
if (!async_mode_ || !huge_transaction_started_) {
kv_->begin_transaction().ensure();
if (async_mode_) {
huge_transaction_started_ = true;
}
if (transaction_started_) {
return;
}
transaction_started_ = true;
kv_->begin_transaction().ensure();
}

void ArchiveSlice::commit_transaction() {
if (!async_mode_ || huge_transaction_size_++ >= 100) {
kv_->commit_transaction().ensure();
if (async_mode_) {
huge_transaction_size_ = 0;
huge_transaction_started_ = false;
}
void ArchiveSlice::commit_transaction(td::Promise<td::Unit> promise) {
if (!transaction_started_ || !async_mode_) {
commit_transaction_now();
promise.set_value(td::Unit{});
return;
}
transaction_commit_waiters_.push_back(std::move(promise));
if (!transaction_commit_pending_) {
transaction_commit_pending_ = true;
td::actor::send_closure_later(actor_id(this), &ArchiveSlice::commit_transaction_now);
}
}

void ArchiveSlice::commit_transaction_now() {
if (!transaction_started_) {
return;
}
kv_->commit_transaction().ensure();
transaction_started_ = false;
transaction_commit_pending_ = false;
auto waiters = std::move(transaction_commit_waiters_);
transaction_commit_waiters_.clear();
if (!waiters.empty()) {
LOG(DEBUG) << "Huge transaction size = " << waiters.size();
}
for (auto &promise : waiters) {
promise.set_value(td::Unit{});
}
}

void ArchiveSlice::set_async_mode(bool mode, td::Promise<td::Unit> promise) {
async_mode_ = mode;
if (!async_mode_ && huge_transaction_started_ && kv_) {
kv_->commit_transaction().ensure();
huge_transaction_size_ = 0;
huge_transaction_started_ = false;
if (!async_mode_) {
commit_transaction_now();
}

td::MultiPromise mp;
Expand Down Expand Up @@ -823,7 +886,7 @@ td::Result<ArchiveSlice::PackageInfo *> ArchiveSlice::choose_package(BlockSeqno
if (shard_separated_) {
kv_->set(PSTRING() << "info." << v, package_info_to_str(masterchain_seqno, shard_prefix)).ensure();
}
commit_transaction();
commit_transaction_now();
add_package(masterchain_seqno, shard_prefix, 0, default_package_version());
return &packages_[v];
} else {
Expand Down Expand Up @@ -858,7 +921,8 @@ void ArchiveSlice::add_package(td::uint32 seqno, ShardIdFull shard_prefix, td::u
if (version >= 1) {
pack->truncate(size).ensure();
}
auto writer = td::actor::create_actor<PackageWriter>("writer", pack, async_mode_, statistics_.pack_statistics);
auto writer = td::actor::create_actor<PackageWriter>(PSTRING() << "writer." << td::PathView(path).file_name(), pack,
async_mode_, statistics_.pack_statistics);
packages_.emplace_back(std::move(pack), std::move(writer), seqno, shard_prefix, path, idx, version);
}

Expand All @@ -882,6 +946,7 @@ void destroy_db(std::string name, td::uint32 attempt, td::Promise<td::Unit> prom
} // namespace

void ArchiveSlice::destroy(td::Promise<td::Unit> promise) {
commit_transaction_now();
before_query();
destroyed_ = true;

Expand Down Expand Up @@ -1089,7 +1154,8 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle, td::
package->writer.reset();
td::unlink(package->path).ensure();
td::rename(package->path + ".new", package->path).ensure();
package->writer = td::actor::create_actor<PackageWriter>("writer", new_package, async_mode_);
package->writer = td::actor::create_actor<PackageWriter>(
PSTRING() << "writer." << td::PathView(package->path).file_name(), new_package, async_mode_);
}

std::vector<PackageInfo> new_packages_info;
Expand Down
Loading
Loading