diff --git a/src/core/string_or_view.h b/src/core/string_or_view.h index 67ce86028264..f09eed0071e5 100644 --- a/src/core/string_or_view.h +++ b/src/core/string_or_view.h @@ -65,11 +65,17 @@ class StringOrView { val_ = std::string{std::get(val_)}; } + // Move out of value as string std::string Take() && { MakeOwned(); return std::move(std::get(val_)); } + std::string* BorrowMut() { + MakeOwned(); + return &std::get(val_); + } + bool empty() const { return visit([](const auto& s) { return s.empty(); }, val_); } diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 192498defb70..bdcd7ac33f69 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -26,7 +26,7 @@ set_property(SOURCE dfly_main.cc APPEND PROPERTY COMPILE_DEFINITIONS if (WITH_TIERING) SET(TX_LINUX_SRCS tiering/disk_storage.cc tiering/op_manager.cc tiering/small_bins.cc - tiering/external_alloc.cc) + tiering/external_alloc.cc tiering/decoders.cc) add_executable(dfly_bench dfly_bench.cc) cxx_link(dfly_bench dfly_parser_lib fibers2 absl::random_random redis_lib) diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index d13b708c38e6..2a8cdf49ec77 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -123,8 +123,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { } } - bool NotifyFetched(EntryId id, string_view value, tiering::DiskSegment segment, - bool modified) override; + bool NotifyFetched(EntryId id, tiering::DiskSegment segment, tiering::Decoder* decoder) override; bool NotifyDelete(tiering::DiskSegment segment) override; @@ -191,6 +190,8 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str if (!IsValid(it)) continue; + // TODO: Handle upload and cooling via type dependent decoders + stats_.total_defrags++; PrimeValue& pv = it->second; if (pv.IsCool()) { @@ -210,12 +211,13 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str } } -bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value, - tiering::DiskSegment segment, bool modified) { +bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, tiering::DiskSegment segment, + tiering::Decoder* decoder) { ++stats_.total_fetches; if (id == EntryId{kFragmentedBin}) { // Generally we read whole bins only for defrag - Defragment(segment, value); + auto* bdecoder = static_cast(decoder); + Defragment(segment, bdecoder->slice); return true; // delete } @@ -224,9 +226,9 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value, // Currently, our heuristic is not very smart, because we stop uploading any reads during // the snapshotting. // TODO: to revisit this when we rewrite it with more efficient snapshotting algorithm. - bool should_upload = modified; - should_upload |= - (ts_->UploadBudget() > int64_t(value.length())) && !SliceSnapshot::IsSnaphotInProgress(); + bool should_upload = decoder->modified; + should_upload |= (ts_->UploadBudget() > int64_t(decoder->estimated_mem_usage)) && + !SliceSnapshot::IsSnaphotInProgress(); if (!should_upload) return false; @@ -234,10 +236,10 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value, auto key = get(id); auto* pv = Find(key); if (pv && pv->IsExternal() && segment == pv->GetExternalSlice()) { - if (modified || pv->WasTouched()) { - bool is_raw = !modified; + if (decoder->modified || pv->WasTouched()) { ++stats_.total_uploads; - Upload(key.first, value, is_raw, segment.length, pv); + decoder->Upload(pv); + RecordDeleted(*pv, segment.length, GetDbTableStats(key.first)); return true; } pv->SetTouched(true); @@ -262,11 +264,7 @@ bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) { if (bin.fragmented) { // Trigger read to signal need for defragmentation. NotifyFetched will handle it. DVLOG(2) << "Enqueueing bin defragmentation for: " << bin.segment.offset; - auto cb = [dummy = 5](auto res) -> bool { - (void)dummy; // a hack to make cb non constexpr that confuses some old) compilers. - return false; - }; - Enqueue(kFragmentedBin, bin.segment, std::move(cb)); + Enqueue(kFragmentedBin, bin.segment, tiering::BareDecoder{}, [](auto res) {}); } return false; @@ -324,14 +322,11 @@ void TieredStorage::Read(DbIndex dbid, std::string_view key, const PrimeValue& v std::function)> readf) { DCHECK(value.IsExternal()); DCHECK(!value.IsCool()); - auto cb = [readf = std::move(readf), enc = value.GetStrEncoding()](auto res) mutable { - readf(res.transform([enc](tiering::OpManager::FetchedEntry entry) { - auto [ptr, raw] = entry; - return raw ? enc.Decode(*ptr).Take() : *ptr; // TODO(vlad): optimize last value copy - })); - return false; + auto cb = [readf = std::move(readf)](io::Result res) mutable { + readf(res.transform([](auto* d) { return string{d->Read()}; })); }; - op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb)); + op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), tiering::StringDecoder{value}, + std::move(cb)); } template @@ -341,21 +336,11 @@ TieredStorage::TResult TieredStorage::Modify(DbIndex dbid, std::string_view k DCHECK(value.IsExternal()); util::fb2::Future> future; - auto cb = [future, modf = std::move(modf), enc = value.GetStrEncoding()](auto res) mutable { - if (!res.has_value()) { - future.Resolve(res.get_unexpected()); - return false; - } - - auto [raw_val, is_raw] = *res; - if (is_raw) { - raw_val->resize(enc.DecodedSize(*raw_val)); - enc.Decode(*raw_val, raw_val->data()); - } - future.Resolve(modf(raw_val)); - return true; + auto cb = [future, modf = std::move(modf)](io::Result res) mutable { + future.Resolve(res.transform([&modf](auto* d) { return modf(d->Write()); })); }; - op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb)); + op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), tiering::StringDecoder{value}, + std::move(cb)); return future; } diff --git a/src/server/tiering/decoders.cc b/src/server/tiering/decoders.cc new file mode 100644 index 000000000000..5b677eaec2e9 --- /dev/null +++ b/src/server/tiering/decoders.cc @@ -0,0 +1,53 @@ +// Copyright 2025, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/tiering/decoders.h" + +namespace dfly::tiering { + +std::unique_ptr BareDecoder::Clone() const { + return std::make_unique(); +} + +void BareDecoder::Initialize(std::string_view slice) { + this->slice = slice; +} + +void BareDecoder::Upload(CompactObj* obj) { + ABSL_UNREACHABLE(); +} + +StringDecoder::StringDecoder(const CompactObj& obj) : StringDecoder{obj.GetStrEncoding()} { +} + +StringDecoder::StringDecoder(CompactObj::StrEncoding encoding) : encoding_{encoding} { +} + +std::unique_ptr StringDecoder::Clone() const { + return std::unique_ptr{new StringDecoder(encoding_)}; +} + +void StringDecoder::Initialize(std::string_view slice) { + slice_ = slice; + value_ = encoding_.Decode(slice); + estimated_mem_usage = slice.length(); // will be encoded back +} + +void StringDecoder::Upload(CompactObj* obj) { + if (modified) + obj->Materialize(value_.view(), false); + else + obj->Materialize(slice_, true); +} + +std::string_view StringDecoder::Read() const { + return value_.view(); +} + +std::string* StringDecoder::Write() { + modified = true; + return value_.BorrowMut(); +} + +} // namespace dfly::tiering diff --git a/src/server/tiering/decoders.h b/src/server/tiering/decoders.h new file mode 100644 index 000000000000..e85fe7e19145 --- /dev/null +++ b/src/server/tiering/decoders.h @@ -0,0 +1,61 @@ +// Copyright 2025, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include +#include +#include +#include + +#include "core/compact_object.h" +#include "core/string_or_view.h" + +namespace dfly::tiering { + +// Decodes serialized value and provides it to callbacks. +// Acts as generic interface to callback driver (OpManager) +struct Decoder { + virtual ~Decoder() = default; + + // Poor man's type-erasure copy + virtual std::unique_ptr Clone() const = 0; + + // Initialize decoder from slice + virtual void Initialize(std::string_view slice) = 0; + + // Store value in compact object + virtual void Upload(CompactObj* obj) = 0; + + bool modified = false; // Must be set if modified (not equal to original slice) + size_t estimated_mem_usage = 0; // Estimated usage if uploaded +}; + +struct BareDecoder : public Decoder { + std::unique_ptr Clone() const override; + void Initialize(std::string_view slice) override; + void Upload(CompactObj* obj) override; + + std::string_view slice; +}; + +struct StringDecoder : public Decoder { + explicit StringDecoder(const CompactObj& obj); + + std::unique_ptr Clone() const override; + void Initialize(std::string_view slice) override; + void Upload(CompactObj* obj) override; + + std::string_view Read() const; + std::string* Write(); + + private: + explicit StringDecoder(CompactObj::StrEncoding encoding); + + std::string_view slice_; + CompactObj::StrEncoding encoding_; + dfly::StringOrView value_; +}; + +} // namespace dfly::tiering diff --git a/src/server/tiering/op_manager.cc b/src/server/tiering/op_manager.cc index f59477ec24d6..f13e6c11d01d 100644 --- a/src/server/tiering/op_manager.cc +++ b/src/server/tiering/op_manager.cc @@ -58,10 +58,11 @@ void OpManager::Close() { DCHECK(pending_reads_.empty()); } -void OpManager::Enqueue(EntryId id, DiskSegment segment, ReadCallback cb) { +void OpManager::EnqueueInternal(EntryId id, DiskSegment segment, const Decoder& decoder, + ReadCallback cb) { // Fill pages for prepared read as it has no penalty and potentially covers more small segments PrepareRead(segment.ContainingPages()) - .ForSegment(segment, id) + .ForSegment(segment, id, decoder) .callbacks.emplace_back(std::move(cb)); } @@ -147,16 +148,15 @@ void OpManager::ProcessRead(size_t offset, io::Result page) { } bool deleting_full = false; - std::string key_value; // Notify functions in the loop may append items to info->key_ops during the traversal for (size_t i = 0; i < info->key_ops.size(); i++) { - bool modified = false; auto& ko = info->key_ops[i]; if (page) { - key_value = page->substr(ko.segment.offset - info->segment.offset, ko.segment.length); + size_t offset = ko.segment.offset - info->segment.offset; + ko.decoder->Initialize(page->substr(offset, ko.segment.length)); for (auto& cb : ko.callbacks) - modified |= cb(std::pair{&key_value, !modified}); + cb(&*ko.decoder); } else { for (auto& cb : ko.callbacks) cb(page.get_unexpected()); @@ -167,7 +167,7 @@ void OpManager::ProcessRead(size_t offset, io::Result page) { // If the item is not being deleted, report is as fetched to be cached potentially. // In case it's cached, we might need to delete it. if (page.has_value() && !delete_from_storage) - delete_from_storage |= NotifyFetched(Borrowed(ko.id), key_value, ko.segment, modified); + delete_from_storage |= NotifyFetched(Borrowed(ko.id), ko.segment, &*ko.decoder); // If the item is being deleted, check if the full page needs to be deleted. if (delete_from_storage) @@ -181,15 +181,22 @@ void OpManager::ProcessRead(size_t offset, io::Result page) { pending_reads_.erase(offset); } -OpManager::EntryOps& OpManager::ReadOp::ForSegment(DiskSegment key_segment, EntryId id) { +OpManager::EntryOps::EntryOps(OwnedEntryId id, DiskSegment segment, const Decoder& decoder) + : id{std::move(id)}, segment{segment}, decoder{decoder.Clone()} { +} + +OpManager::EntryOps& OpManager::ReadOp::ForSegment(DiskSegment key_segment, EntryId id, + const Decoder& decoder) { DCHECK_GE(key_segment.offset, segment.offset); DCHECK_LE(key_segment.length, segment.length); for (auto& ops : key_ops) { - if (ops.segment.offset == key_segment.offset) + if (ops.segment.offset == key_segment.offset) { + DCHECK(typeid(*ops.decoder) == typeid(decoder)); return ops; + } } - return key_ops.emplace_back(ToOwned(id), key_segment); + return key_ops.emplace_back(ToOwned(id), key_segment, decoder); } OpManager::EntryOps* OpManager::ReadOp::Find(DiskSegment key_segment) { diff --git a/src/server/tiering/op_manager.h b/src/server/tiering/op_manager.h index 322794efcea8..4509e02852c7 100644 --- a/src/server/tiering/op_manager.h +++ b/src/server/tiering/op_manager.h @@ -10,6 +10,7 @@ #include #include "server/tiering/common.h" +#include "server/tiering/decoders.h" #include "server/tiering/disk_storage.h" #include "server/tx_base.h" #include "util/fibers/future.h" @@ -36,19 +37,6 @@ class OpManager { using EntryId = std::variant; using OwnedEntryId = std::variant>; - using FetchedEntry = std::pair; - - // Callback for post-read completion. Returns whether the value was modified. - // We use fu2 function to allow moveable semantics. The arguments are: - // bool - true if the string is raw as it was extracted from the prime value. - // string* - the string that may potentially be modified by the callbacks that subsribed to this - // read. The callback run in the same order as the order of invocation, guaranteeing - // consistent read after modifications. - using ReadCallback = - fu2::function_base, - false /* non-throwing*/, false /* strong exceptions guarantees*/, - bool(io::Result)>; - explicit OpManager(size_t max_size); virtual ~OpManager(); @@ -60,7 +48,14 @@ class OpManager { // Enqueue callback to be executed once value is read. Trigger read if none is pending yet for // this segment. Multiple entries can be obtained from a single segment, but every distinct id // will have it's own independent callback loop that can safely modify the underlying value - void Enqueue(EntryId id, DiskSegment segment, ReadCallback cb); + template + void Enqueue(EntryId id, DiskSegment segment, const D& decoder, F&& cb) { + static_assert(std::is_base_of_v); + auto erased_cb = [cb = std::forward(cb)](io::Result res) mutable { + cb(res.transform([](Decoder* ptr) { return static_cast(ptr); })); + }; + EnqueueInternal(id, segment, decoder, std::move(erased_cb)); + } // Delete entry with pending io void Delete(EntryId id); @@ -74,14 +69,21 @@ class OpManager { Stats GetStats() const; protected: + using ReadCallback = + fu2::function_base, + false /* non-throwing*/, false /* strong exceptions guarantees*/, + void(io::Result)>; + + // Type erased continuation of Enqueue + void EnqueueInternal(EntryId id, DiskSegment segment, const Decoder& decoder, ReadCallback cb); + // Notify that a stash succeeded and the entry was stored at the provided segment or failed with // given error virtual void NotifyStashed(EntryId id, const io::Result& segment) = 0; // Notify that an entry was successfully fetched. Includes whether entry was modified. // Returns true if value needs to be deleted from the storage. - virtual bool NotifyFetched(EntryId id, std::string_view value, DiskSegment segment, - bool modified) = 0; + virtual bool NotifyFetched(EntryId id, DiskSegment segment, Decoder*) = 0; // Notify delete. Return true if the filled segment needs to be marked as free. virtual bool NotifyDelete(DiskSegment segment) = 0; @@ -89,12 +91,12 @@ class OpManager { protected: // Describes pending futures for a single entry struct EntryOps { - EntryOps(OwnedEntryId id, DiskSegment segment) : id(std::move(id)), segment(segment) { - } + EntryOps(OwnedEntryId id, DiskSegment segment, const Decoder& decoder); OwnedEntryId id; DiskSegment segment; absl::InlinedVector callbacks; + std::unique_ptr decoder; bool deleting = false; }; @@ -104,7 +106,7 @@ class OpManager { } // Get ops for id or create new - EntryOps& ForSegment(DiskSegment segment, EntryId id); + EntryOps& ForSegment(DiskSegment segment, EntryId id, const Decoder& decoder); // Find if there are operations for the given segment, return nullptr otherwise EntryOps* Find(DiskSegment segment); diff --git a/src/server/tiering/op_manager_test.cc b/src/server/tiering/op_manager_test.cc index 28ea3f124d40..db68dc6be52f 100644 --- a/src/server/tiering/op_manager_test.cc +++ b/src/server/tiering/op_manager_test.cc @@ -20,6 +20,19 @@ namespace dfly::tiering { using namespace std; using namespace std::string_literals; +struct TestDecoder : tiering::BareDecoder { + std::unique_ptr Clone() const override { + return std::make_unique(); + } + + void Initialize(std::string_view slice) override { + tiering::BareDecoder::Initialize(slice); + value = slice; + } + + string value; +}; + ostream& operator<<(ostream& os, const OpManager::Stats& stats) { return os << "pending_read_cnt: " << stats.pending_read_cnt << ", pending_stash_cnt: " << stats.pending_stash_cnt @@ -46,9 +59,8 @@ struct OpManagerTest : PoolTestBase, OpManager { util::fb2::Future Read(EntryId id, DiskSegment segment) { util::fb2::Future future; - Enqueue(id, segment, [future](io::Result res) mutable { - auto [value, _] = *res; - future.Resolve(*value); + Enqueue(id, segment, TestDecoder{}, [future](io::Result res) mutable { + future.Resolve((*res)->value); return false; }); return future; @@ -61,9 +73,9 @@ struct OpManagerTest : PoolTestBase, OpManager { ASSERT_TRUE(inserted); } - bool NotifyFetched(EntryId id, std::string_view value, DiskSegment segment, - bool modified) override { - fetched_[id] = value; + bool NotifyFetched(EntryId id, DiskSegment segment, Decoder* decoder) override { + auto* tdecoder = static_cast(decoder); + fetched_[id] = std::move(tdecoder->value); return false; } @@ -165,8 +177,8 @@ TEST_F(OpManagerTest, Modify) { // Atomically issue sequence of modify-read operations std::vector> futures; for (size_t i = 0; i < 10; i++) { - Enqueue(0u, stashed_[0u], [i](auto res) { - absl::StrAppend(res->first, i); + Enqueue(0u, stashed_[0u], TestDecoder{}, [i](io::Result res) { + absl::StrAppend(&(*res)->value, i); return true; }); futures.emplace_back(Read(0u, stashed_[0u]));