Skip to content

Commit 1905a87

Browse files
committed
feat(tiering): Decoders
Signed-off-by: Vladislav Oleshko <[email protected]>
1 parent 3471fc8 commit 1905a87

File tree

8 files changed

+190
-75
lines changed

8 files changed

+190
-75
lines changed

src/core/string_or_view.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,17 @@ class StringOrView {
6565
val_ = std::string{std::get<std::string_view>(val_)};
6666
}
6767

68+
// Move out of value as string
6869
std::string Take() && {
6970
MakeOwned();
7071
return std::move(std::get<std::string>(val_));
7172
}
7273

74+
std::string* BorrowMut() {
75+
MakeOwned();
76+
return &std::get<std::string>(val_);
77+
}
78+
7379
bool empty() const {
7480
return visit([](const auto& s) { return s.empty(); }, val_);
7581
}

src/server/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ set_property(SOURCE dfly_main.cc APPEND PROPERTY COMPILE_DEFINITIONS
2626

2727
if (WITH_TIERING)
2828
SET(TX_LINUX_SRCS tiering/disk_storage.cc tiering/op_manager.cc tiering/small_bins.cc
29-
tiering/external_alloc.cc)
29+
tiering/external_alloc.cc tiering/decoders.cc)
3030

3131
add_executable(dfly_bench dfly_bench.cc)
3232
cxx_link(dfly_bench dfly_parser_lib fibers2 absl::random_random redis_lib)

src/server/tiered_storage.cc

Lines changed: 22 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
123123
}
124124
}
125125

126-
bool NotifyFetched(EntryId id, string_view value, tiering::DiskSegment segment,
127-
bool modified) override;
126+
bool NotifyFetched(EntryId id, tiering::DiskSegment segment, tiering::Decoder* decoder) override;
128127

129128
bool NotifyDelete(tiering::DiskSegment segment) override;
130129

@@ -191,6 +190,8 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str
191190
if (!IsValid(it))
192191
continue;
193192

193+
// TODO: Handle upload and cooling via type dependent decoders
194+
194195
stats_.total_defrags++;
195196
PrimeValue& pv = it->second;
196197
if (pv.IsCool()) {
@@ -210,12 +211,13 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str
210211
}
211212
}
212213

213-
bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value,
214-
tiering::DiskSegment segment, bool modified) {
214+
bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, tiering::DiskSegment segment,
215+
tiering::Decoder* decoder) {
215216
++stats_.total_fetches;
216217

217218
if (id == EntryId{kFragmentedBin}) { // Generally we read whole bins only for defrag
218-
Defragment(segment, value);
219+
auto* bdecoder = static_cast<tiering::BareDecoder*>(decoder);
220+
Defragment(segment, bdecoder->slice);
219221
return true; // delete
220222
}
221223

@@ -224,20 +226,20 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value,
224226
// Currently, our heuristic is not very smart, because we stop uploading any reads during
225227
// the snapshotting.
226228
// TODO: to revisit this when we rewrite it with more efficient snapshotting algorithm.
227-
bool should_upload = modified;
228-
should_upload |=
229-
(ts_->UploadBudget() > int64_t(value.length())) && !SliceSnapshot::IsSnaphotInProgress();
229+
bool should_upload = decoder->modified;
230+
should_upload |= (ts_->UploadBudget() > int64_t(decoder->estimated_mem_usage)) &&
231+
!SliceSnapshot::IsSnaphotInProgress();
230232

231233
if (!should_upload)
232234
return false;
233235

234236
auto key = get<OpManager::KeyRef>(id);
235237
auto* pv = Find(key);
236238
if (pv && pv->IsExternal() && segment == pv->GetExternalSlice()) {
237-
if (modified || pv->WasTouched()) {
238-
bool is_raw = !modified;
239+
if (decoder->modified || pv->WasTouched()) {
239240
++stats_.total_uploads;
240-
Upload(key.first, value, is_raw, segment.length, pv);
241+
decoder->Upload(pv);
242+
RecordDeleted(*pv, segment.length, GetDbTableStats(key.first));
241243
return true;
242244
}
243245
pv->SetTouched(true);
@@ -262,11 +264,7 @@ bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) {
262264
if (bin.fragmented) {
263265
// Trigger read to signal need for defragmentation. NotifyFetched will handle it.
264266
DVLOG(2) << "Enqueueing bin defragmentation for: " << bin.segment.offset;
265-
auto cb = [dummy = 5](auto res) -> bool {
266-
(void)dummy; // a hack to make cb non constexpr that confuses some old) compilers.
267-
return false;
268-
};
269-
Enqueue(kFragmentedBin, bin.segment, std::move(cb));
267+
Enqueue(kFragmentedBin, bin.segment, tiering::BareDecoder{}, [](auto res) {});
270268
}
271269

272270
return false;
@@ -324,14 +322,11 @@ void TieredStorage::Read(DbIndex dbid, std::string_view key, const PrimeValue& v
324322
std::function<void(io::Result<std::string>)> readf) {
325323
DCHECK(value.IsExternal());
326324
DCHECK(!value.IsCool());
327-
auto cb = [readf = std::move(readf), enc = value.GetStrEncoding()](auto res) mutable {
328-
readf(res.transform([enc](tiering::OpManager::FetchedEntry entry) {
329-
auto [ptr, raw] = entry;
330-
return raw ? enc.Decode(*ptr).Take() : *ptr; // TODO(vlad): optimize last value copy
331-
}));
332-
return false;
325+
auto cb = [readf = std::move(readf)](io::Result<tiering::StringDecoder*> res) mutable {
326+
readf(res.transform([](auto* d) { return string{d->Read()}; }));
333327
};
334-
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
328+
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), tiering::StringDecoder{value},
329+
std::move(cb));
335330
}
336331

337332
template <typename T>
@@ -341,21 +336,11 @@ TieredStorage::TResult<T> TieredStorage::Modify(DbIndex dbid, std::string_view k
341336
DCHECK(value.IsExternal());
342337

343338
util::fb2::Future<io::Result<T>> future;
344-
auto cb = [future, modf = std::move(modf), enc = value.GetStrEncoding()](auto res) mutable {
345-
if (!res.has_value()) {
346-
future.Resolve(res.get_unexpected());
347-
return false;
348-
}
349-
350-
auto [raw_val, is_raw] = *res;
351-
if (is_raw) {
352-
raw_val->resize(enc.DecodedSize(*raw_val));
353-
enc.Decode(*raw_val, raw_val->data());
354-
}
355-
future.Resolve(modf(raw_val));
356-
return true;
339+
auto cb = [future, modf = std::move(modf)](io::Result<tiering::StringDecoder*> res) mutable {
340+
future.Resolve(res.transform([&modf](auto* d) { return modf(d->Write()); }));
357341
};
358-
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
342+
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), tiering::StringDecoder{value},
343+
std::move(cb));
359344
return future;
360345
}
361346

src/server/tiering/decoders.cc

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Copyright 2025, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
5+
#include "server/tiering/decoders.h"
6+
7+
namespace dfly::tiering {
8+
9+
std::unique_ptr<Decoder> BareDecoder::Clone() const {
10+
return std::make_unique<BareDecoder>();
11+
}
12+
13+
void BareDecoder::Initialize(std::string_view slice) {
14+
this->slice = slice;
15+
}
16+
17+
void BareDecoder::Upload(CompactObj* obj) {
18+
ABSL_UNREACHABLE();
19+
}
20+
21+
StringDecoder::StringDecoder(const CompactObj& obj) : StringDecoder{obj.GetStrEncoding()} {
22+
}
23+
24+
StringDecoder::StringDecoder(CompactObj::StrEncoding encoding) : encoding_{encoding} {
25+
}
26+
27+
std::unique_ptr<Decoder> StringDecoder::Clone() const {
28+
return std::unique_ptr<StringDecoder>{new StringDecoder(encoding_)};
29+
}
30+
31+
void StringDecoder::Initialize(std::string_view slice) {
32+
slice_ = slice;
33+
value_ = encoding_.Decode(slice);
34+
estimated_mem_usage = slice.length(); // will be encoded back
35+
}
36+
37+
void StringDecoder::Upload(CompactObj* obj) {
38+
if (modified)
39+
obj->Materialize(value_.view(), false);
40+
else
41+
obj->Materialize(slice_, true);
42+
}
43+
44+
std::string_view StringDecoder::Read() const {
45+
return value_.view();
46+
}
47+
48+
std::string* StringDecoder::Write() {
49+
modified = true;
50+
return value_.BorrowMut();
51+
}
52+
53+
} // namespace dfly::tiering

src/server/tiering/decoders.h

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2025, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
5+
#pragma once
6+
7+
#include <memory>
8+
#include <optional>
9+
#include <string>
10+
#include <string_view>
11+
12+
#include "core/compact_object.h"
13+
#include "core/string_or_view.h"
14+
15+
namespace dfly::tiering {
16+
17+
// Decodes serialized value and provides it to callbacks.
18+
// Acts as generic interface to callback driver (OpManager)
19+
struct Decoder {
20+
virtual ~Decoder() = default;
21+
22+
// Poor man's type-erasure copy
23+
virtual std::unique_ptr<Decoder> Clone() const = 0;
24+
25+
// Initialize decoder from slice
26+
virtual void Initialize(std::string_view slice) = 0;
27+
28+
// Store value in compact object
29+
virtual void Upload(CompactObj* obj) = 0;
30+
31+
bool modified = false; // Must be set if modified (not equal to original slice)
32+
size_t estimated_mem_usage = 0; // Estimated usage if uploaded
33+
};
34+
35+
struct BareDecoder : public Decoder {
36+
std::unique_ptr<Decoder> Clone() const override;
37+
void Initialize(std::string_view slice) override;
38+
void Upload(CompactObj* obj) override;
39+
40+
std::string_view slice;
41+
};
42+
43+
struct StringDecoder : public Decoder {
44+
explicit StringDecoder(const CompactObj& obj);
45+
46+
std::unique_ptr<Decoder> Clone() const override;
47+
void Initialize(std::string_view slice) override;
48+
void Upload(CompactObj* obj) override;
49+
50+
std::string_view Read() const;
51+
std::string* Write();
52+
53+
private:
54+
explicit StringDecoder(CompactObj::StrEncoding encoding);
55+
56+
std::string_view slice_;
57+
CompactObj::StrEncoding encoding_;
58+
dfly::StringOrView value_;
59+
};
60+
61+
} // namespace dfly::tiering

src/server/tiering/op_manager.cc

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,11 @@ void OpManager::Close() {
5858
DCHECK(pending_reads_.empty());
5959
}
6060

61-
void OpManager::Enqueue(EntryId id, DiskSegment segment, ReadCallback cb) {
61+
void OpManager::EnqueueInternal(EntryId id, DiskSegment segment, const Decoder& decoder,
62+
ReadCallback cb) {
6263
// Fill pages for prepared read as it has no penalty and potentially covers more small segments
6364
PrepareRead(segment.ContainingPages())
64-
.ForSegment(segment, id)
65+
.ForSegment(segment, id, decoder)
6566
.callbacks.emplace_back(std::move(cb));
6667
}
6768

@@ -147,16 +148,15 @@ void OpManager::ProcessRead(size_t offset, io::Result<std::string_view> page) {
147148
}
148149

149150
bool deleting_full = false;
150-
std::string key_value;
151151

152152
// Notify functions in the loop may append items to info->key_ops during the traversal
153153
for (size_t i = 0; i < info->key_ops.size(); i++) {
154-
bool modified = false;
155154
auto& ko = info->key_ops[i];
156155
if (page) {
157-
key_value = page->substr(ko.segment.offset - info->segment.offset, ko.segment.length);
156+
size_t offset = ko.segment.offset - info->segment.offset;
157+
ko.decoder->Initialize(page->substr(offset, ko.segment.length));
158158
for (auto& cb : ko.callbacks)
159-
modified |= cb(std::pair{&key_value, !modified});
159+
cb(&*ko.decoder);
160160
} else {
161161
for (auto& cb : ko.callbacks)
162162
cb(page.get_unexpected());
@@ -167,7 +167,7 @@ void OpManager::ProcessRead(size_t offset, io::Result<std::string_view> page) {
167167
// If the item is not being deleted, report is as fetched to be cached potentially.
168168
// In case it's cached, we might need to delete it.
169169
if (page.has_value() && !delete_from_storage)
170-
delete_from_storage |= NotifyFetched(Borrowed(ko.id), key_value, ko.segment, modified);
170+
delete_from_storage |= NotifyFetched(Borrowed(ko.id), ko.segment, &*ko.decoder);
171171

172172
// If the item is being deleted, check if the full page needs to be deleted.
173173
if (delete_from_storage)
@@ -181,15 +181,22 @@ void OpManager::ProcessRead(size_t offset, io::Result<std::string_view> page) {
181181
pending_reads_.erase(offset);
182182
}
183183

184-
OpManager::EntryOps& OpManager::ReadOp::ForSegment(DiskSegment key_segment, EntryId id) {
184+
OpManager::EntryOps::EntryOps(OwnedEntryId id, DiskSegment segment, const Decoder& decoder)
185+
: id{std::move(id)}, segment{segment}, decoder{decoder.Clone()} {
186+
}
187+
188+
OpManager::EntryOps& OpManager::ReadOp::ForSegment(DiskSegment key_segment, EntryId id,
189+
const Decoder& decoder) {
185190
DCHECK_GE(key_segment.offset, segment.offset);
186191
DCHECK_LE(key_segment.length, segment.length);
187192

188193
for (auto& ops : key_ops) {
189-
if (ops.segment.offset == key_segment.offset)
194+
if (ops.segment.offset == key_segment.offset) {
195+
DCHECK(typeid(*ops.decoder) == typeid(decoder));
190196
return ops;
197+
}
191198
}
192-
return key_ops.emplace_back(ToOwned(id), key_segment);
199+
return key_ops.emplace_back(ToOwned(id), key_segment, decoder);
193200
}
194201

195202
OpManager::EntryOps* OpManager::ReadOp::Find(DiskSegment key_segment) {

0 commit comments

Comments
 (0)