Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/core/string_or_view.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,17 @@ class StringOrView {
val_ = std::string{std::get<std::string_view>(val_)};
}

// Move out of value as string
std::string Take() && {
MakeOwned();
return std::move(std::get<std::string>(val_));
}

std::string* BorrowMut() {
MakeOwned();
return &std::get<std::string>(val_);
}

bool empty() const {
return visit([](const auto& s) { return s.empty(); }, val_);
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ set_property(SOURCE dfly_main.cc APPEND PROPERTY COMPILE_DEFINITIONS

if (WITH_TIERING)
SET(TX_LINUX_SRCS tiering/disk_storage.cc tiering/op_manager.cc tiering/small_bins.cc
tiering/external_alloc.cc)
tiering/external_alloc.cc tiering/decoders.cc)

add_executable(dfly_bench dfly_bench.cc)
cxx_link(dfly_bench dfly_parser_lib fibers2 absl::random_random redis_lib)
Expand Down
59 changes: 22 additions & 37 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
}
}

bool NotifyFetched(EntryId id, string_view value, tiering::DiskSegment segment,
bool modified) override;
bool NotifyFetched(EntryId id, tiering::DiskSegment segment, tiering::Decoder* decoder) override;

bool NotifyDelete(tiering::DiskSegment segment) override;

Expand Down Expand Up @@ -191,6 +190,8 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str
if (!IsValid(it))
continue;

// TODO: Handle upload and cooling via type dependent decoders

stats_.total_defrags++;
PrimeValue& pv = it->second;
if (pv.IsCool()) {
Expand All @@ -210,12 +211,13 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str
}
}

bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value,
tiering::DiskSegment segment, bool modified) {
bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, tiering::DiskSegment segment,
tiering::Decoder* decoder) {
++stats_.total_fetches;

if (id == EntryId{kFragmentedBin}) { // Generally we read whole bins only for defrag
Defragment(segment, value);
auto* bdecoder = static_cast<tiering::BareDecoder*>(decoder);
Defragment(segment, bdecoder->slice);
return true; // delete
}

Expand All @@ -224,20 +226,20 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value,
// Currently, our heuristic is not very smart, because we stop uploading any reads during
// the snapshotting.
// TODO: to revisit this when we rewrite it with more efficient snapshotting algorithm.
bool should_upload = modified;
should_upload |=
(ts_->UploadBudget() > int64_t(value.length())) && !SliceSnapshot::IsSnaphotInProgress();
bool should_upload = decoder->modified;
should_upload |= (ts_->UploadBudget() > int64_t(decoder->estimated_mem_usage)) &&
!SliceSnapshot::IsSnaphotInProgress();

if (!should_upload)
return false;

auto key = get<OpManager::KeyRef>(id);
auto* pv = Find(key);
if (pv && pv->IsExternal() && segment == pv->GetExternalSlice()) {
if (modified || pv->WasTouched()) {
bool is_raw = !modified;
if (decoder->modified || pv->WasTouched()) {
++stats_.total_uploads;
Upload(key.first, value, is_raw, segment.length, pv);
decoder->Upload(pv);
RecordDeleted(*pv, segment.length, GetDbTableStats(key.first));
return true;
}
pv->SetTouched(true);
Expand All @@ -262,11 +264,7 @@ bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) {
if (bin.fragmented) {
// Trigger read to signal need for defragmentation. NotifyFetched will handle it.
DVLOG(2) << "Enqueueing bin defragmentation for: " << bin.segment.offset;
auto cb = [dummy = 5](auto res) -> bool {
(void)dummy; // a hack to make cb non constexpr that confuses some old) compilers.
return false;
};
Enqueue(kFragmentedBin, bin.segment, std::move(cb));
Enqueue(kFragmentedBin, bin.segment, tiering::BareDecoder{}, [](auto res) {});
}

return false;
Expand Down Expand Up @@ -324,14 +322,11 @@ void TieredStorage::Read(DbIndex dbid, std::string_view key, const PrimeValue& v
std::function<void(io::Result<std::string>)> readf) {
DCHECK(value.IsExternal());
DCHECK(!value.IsCool());
auto cb = [readf = std::move(readf), enc = value.GetStrEncoding()](auto res) mutable {
readf(res.transform([enc](tiering::OpManager::FetchedEntry entry) {
auto [ptr, raw] = entry;
return raw ? enc.Decode(*ptr).Take() : *ptr; // TODO(vlad): optimize last value copy
}));
return false;
auto cb = [readf = std::move(readf)](io::Result<tiering::StringDecoder*> res) mutable {
readf(res.transform([](auto* d) { return string{d->Read()}; }));
};
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), tiering::StringDecoder{value},
std::move(cb));
}

template <typename T>
Expand All @@ -341,21 +336,11 @@ TieredStorage::TResult<T> TieredStorage::Modify(DbIndex dbid, std::string_view k
DCHECK(value.IsExternal());

util::fb2::Future<io::Result<T>> future;
auto cb = [future, modf = std::move(modf), enc = value.GetStrEncoding()](auto res) mutable {
if (!res.has_value()) {
future.Resolve(res.get_unexpected());
return false;
}

auto [raw_val, is_raw] = *res;
if (is_raw) {
raw_val->resize(enc.DecodedSize(*raw_val));
enc.Decode(*raw_val, raw_val->data());
}
future.Resolve(modf(raw_val));
return true;
auto cb = [future, modf = std::move(modf)](io::Result<tiering::StringDecoder*> res) mutable {
future.Resolve(res.transform([&modf](auto* d) { return modf(d->Write()); }));
};
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), tiering::StringDecoder{value},
std::move(cb));
return future;
}

Expand Down
53 changes: 53 additions & 0 deletions src/server/tiering/decoders.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2025, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#include "server/tiering/decoders.h"

namespace dfly::tiering {

std::unique_ptr<Decoder> BareDecoder::Clone() const {
return std::make_unique<BareDecoder>();
}

void BareDecoder::Initialize(std::string_view slice) {
this->slice = slice;
}

void BareDecoder::Upload(CompactObj* obj) {
ABSL_UNREACHABLE();
}

StringDecoder::StringDecoder(const CompactObj& obj) : StringDecoder{obj.GetStrEncoding()} {
}

StringDecoder::StringDecoder(CompactObj::StrEncoding encoding) : encoding_{encoding} {
}

std::unique_ptr<Decoder> StringDecoder::Clone() const {
return std::unique_ptr<StringDecoder>{new StringDecoder(encoding_)};
}

void StringDecoder::Initialize(std::string_view slice) {
slice_ = slice;
value_ = encoding_.Decode(slice);
estimated_mem_usage = slice.length(); // will be encoded back
}

void StringDecoder::Upload(CompactObj* obj) {
if (modified)
obj->Materialize(value_.view(), false);
else
obj->Materialize(slice_, true);
}

std::string_view StringDecoder::Read() const {
return value_.view();
}

std::string* StringDecoder::Write() {
modified = true;
return value_.BorrowMut();
}

} // namespace dfly::tiering
61 changes: 61 additions & 0 deletions src/server/tiering/decoders.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2025, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include <memory>
#include <optional>
#include <string>
#include <string_view>

#include "core/compact_object.h"
#include "core/string_or_view.h"

namespace dfly::tiering {

// Decodes serialized value and provides it to callbacks.
// Acts as generic interface to callback driver (OpManager)
struct Decoder {
virtual ~Decoder() = default;

// Poor man's type-erasure copy
virtual std::unique_ptr<Decoder> Clone() const = 0;
Comment on lines +22 to +23
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Either this, so the caller can pass a decoder by value, or the caller has to pass a function creating an encoder
  2. I don't want to use unique_ptrs later, I want to initialize in place with proper align to avoid allocations


// Initialize decoder from slice
virtual void Initialize(std::string_view slice) = 0;

// Store value in compact object
virtual void Upload(CompactObj* obj) = 0;

bool modified = false; // Must be set if modified (not equal to original slice)
size_t estimated_mem_usage = 0; // Estimated usage if uploaded
};

struct BareDecoder : public Decoder {
std::unique_ptr<Decoder> Clone() const override;
void Initialize(std::string_view slice) override;
void Upload(CompactObj* obj) override;

std::string_view slice;
};

struct StringDecoder : public Decoder {
explicit StringDecoder(const CompactObj& obj);

std::unique_ptr<Decoder> Clone() const override;
void Initialize(std::string_view slice) override;
void Upload(CompactObj* obj) override;

std::string_view Read() const;
std::string* Write();

private:
explicit StringDecoder(CompactObj::StrEncoding encoding);

std::string_view slice_;
CompactObj::StrEncoding encoding_;
dfly::StringOrView value_;
};

} // namespace dfly::tiering
27 changes: 17 additions & 10 deletions src/server/tiering/op_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ void OpManager::Close() {
DCHECK(pending_reads_.empty());
}

void OpManager::Enqueue(EntryId id, DiskSegment segment, ReadCallback cb) {
void OpManager::EnqueueInternal(EntryId id, DiskSegment segment, const Decoder& decoder,
ReadCallback cb) {
// Fill pages for prepared read as it has no penalty and potentially covers more small segments
PrepareRead(segment.ContainingPages())
.ForSegment(segment, id)
.ForSegment(segment, id, decoder)
.callbacks.emplace_back(std::move(cb));
}

Expand Down Expand Up @@ -147,16 +148,15 @@ void OpManager::ProcessRead(size_t offset, io::Result<std::string_view> page) {
}

bool deleting_full = false;
std::string key_value;

// Notify functions in the loop may append items to info->key_ops during the traversal
for (size_t i = 0; i < info->key_ops.size(); i++) {
bool modified = false;
auto& ko = info->key_ops[i];
if (page) {
key_value = page->substr(ko.segment.offset - info->segment.offset, ko.segment.length);
size_t offset = ko.segment.offset - info->segment.offset;
ko.decoder->Initialize(page->substr(offset, ko.segment.length));
for (auto& cb : ko.callbacks)
modified |= cb(std::pair{&key_value, !modified});
cb(&*ko.decoder);
} else {
for (auto& cb : ko.callbacks)
cb(page.get_unexpected());
Expand All @@ -167,7 +167,7 @@ void OpManager::ProcessRead(size_t offset, io::Result<std::string_view> page) {
// If the item is not being deleted, report is as fetched to be cached potentially.
// In case it's cached, we might need to delete it.
if (page.has_value() && !delete_from_storage)
delete_from_storage |= NotifyFetched(Borrowed(ko.id), key_value, ko.segment, modified);
delete_from_storage |= NotifyFetched(Borrowed(ko.id), ko.segment, &*ko.decoder);

// If the item is being deleted, check if the full page needs to be deleted.
if (delete_from_storage)
Expand All @@ -181,15 +181,22 @@ void OpManager::ProcessRead(size_t offset, io::Result<std::string_view> page) {
pending_reads_.erase(offset);
}

OpManager::EntryOps& OpManager::ReadOp::ForSegment(DiskSegment key_segment, EntryId id) {
OpManager::EntryOps::EntryOps(OwnedEntryId id, DiskSegment segment, const Decoder& decoder)
: id{std::move(id)}, segment{segment}, decoder{decoder.Clone()} {
}

OpManager::EntryOps& OpManager::ReadOp::ForSegment(DiskSegment key_segment, EntryId id,
const Decoder& decoder) {
DCHECK_GE(key_segment.offset, segment.offset);
DCHECK_LE(key_segment.length, segment.length);

for (auto& ops : key_ops) {
if (ops.segment.offset == key_segment.offset)
if (ops.segment.offset == key_segment.offset) {
DCHECK(typeid(*ops.decoder) == typeid(decoder));
return ops;
}
}
return key_ops.emplace_back(ToOwned(id), key_segment);
return key_ops.emplace_back(ToOwned(id), key_segment, decoder);
}

OpManager::EntryOps* OpManager::ReadOp::Find(DiskSegment key_segment) {
Expand Down
Loading
Loading