Skip to content

Commit 64ac01b

Browse files
committed
feat(tiering): Decoders
Signed-off-by: Vladislav Oleshko <[email protected]>
1 parent 3471fc8 commit 64ac01b

File tree

8 files changed

+180
-75
lines changed

8 files changed

+180
-75
lines changed

src/core/string_or_view.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,17 @@ class StringOrView {
6565
val_ = std::string{std::get<std::string_view>(val_)};
6666
}
6767

68+
// Move out of value as string
6869
std::string Take() && {
6970
MakeOwned();
7071
return std::move(std::get<std::string>(val_));
7172
}
7273

74+
std::string* BorrowMut() {
75+
MakeOwned();
76+
return &std::get<std::string>(val_);
77+
}
78+
7379
bool empty() const {
7480
return visit([](const auto& s) { return s.empty(); }, val_);
7581
}

src/server/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ set_property(SOURCE dfly_main.cc APPEND PROPERTY COMPILE_DEFINITIONS
2626

2727
if (WITH_TIERING)
2828
SET(TX_LINUX_SRCS tiering/disk_storage.cc tiering/op_manager.cc tiering/small_bins.cc
29-
tiering/external_alloc.cc)
29+
tiering/external_alloc.cc tiering/decoders.cc)
3030

3131
add_executable(dfly_bench dfly_bench.cc)
3232
cxx_link(dfly_bench dfly_parser_lib fibers2 absl::random_random redis_lib)

src/server/tiered_storage.cc

Lines changed: 23 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
123123
}
124124
}
125125

126-
bool NotifyFetched(EntryId id, string_view value, tiering::DiskSegment segment,
127-
bool modified) override;
126+
bool NotifyFetched(EntryId id, tiering::DiskSegment segment, tiering::Decoder* decoder) override;
128127

129128
bool NotifyDelete(tiering::DiskSegment segment) override;
130129

@@ -210,12 +209,12 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str
210209
}
211210
}
212211

213-
bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value,
214-
tiering::DiskSegment segment, bool modified) {
212+
bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, tiering::DiskSegment segment,
213+
tiering::Decoder* decoder) {
215214
++stats_.total_fetches;
216215

217216
if (id == EntryId{kFragmentedBin}) { // Generally we read whole bins only for defrag
218-
Defragment(segment, value);
217+
// Defragment(segment, value);
219218
return true; // delete
220219
}
221220

@@ -224,20 +223,20 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value,
224223
// Currently, our heuristic is not very smart, because we stop uploading any reads during
225224
// the snapshotting.
226225
// TODO: to revisit this when we rewrite it with more efficient snapshotting algorithm.
227-
bool should_upload = modified;
228-
should_upload |=
229-
(ts_->UploadBudget() > int64_t(value.length())) && !SliceSnapshot::IsSnaphotInProgress();
226+
bool should_upload = decoder->modified;
227+
should_upload |= (ts_->UploadBudget() > int64_t(decoder->estimated_mem_usage)) &&
228+
!SliceSnapshot::IsSnaphotInProgress();
230229

231230
if (!should_upload)
232231
return false;
233232

234233
auto key = get<OpManager::KeyRef>(id);
235234
auto* pv = Find(key);
236235
if (pv && pv->IsExternal() && segment == pv->GetExternalSlice()) {
237-
if (modified || pv->WasTouched()) {
238-
bool is_raw = !modified;
236+
if (decoder->modified || pv->WasTouched()) {
239237
++stats_.total_uploads;
240-
Upload(key.first, value, is_raw, segment.length, pv);
238+
decoder->Upload(pv);
239+
// Upload(key.first, value, is_raw, segment.length, pv);
241240
return true;
242241
}
243242
pv->SetTouched(true);
@@ -262,11 +261,11 @@ bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) {
262261
if (bin.fragmented) {
263262
// Trigger read to signal need for defragmentation. NotifyFetched will handle it.
264263
DVLOG(2) << "Enqueueing bin defragmentation for: " << bin.segment.offset;
265-
auto cb = [dummy = 5](auto res) -> bool {
266-
(void)dummy; // a hack to make cb non constexpr that confuses some old) compilers.
267-
return false;
268-
};
269-
Enqueue(kFragmentedBin, bin.segment, std::move(cb));
264+
// auto cb = [dummy = 5](auto res) -> bool {
265+
// (void)dummy; // a hack to make cb non constexpr that confuses some old) compilers.
266+
// return false;
267+
// };
268+
// Enqueue(kFragmentedBin, bin.segment, std::move(cb));
270269
}
271270

272271
return false;
@@ -324,14 +323,11 @@ void TieredStorage::Read(DbIndex dbid, std::string_view key, const PrimeValue& v
324323
std::function<void(io::Result<std::string>)> readf) {
325324
DCHECK(value.IsExternal());
326325
DCHECK(!value.IsCool());
327-
auto cb = [readf = std::move(readf), enc = value.GetStrEncoding()](auto res) mutable {
328-
readf(res.transform([enc](tiering::OpManager::FetchedEntry entry) {
329-
auto [ptr, raw] = entry;
330-
return raw ? enc.Decode(*ptr).Take() : *ptr; // TODO(vlad): optimize last value copy
331-
}));
332-
return false;
326+
auto cb = [readf = std::move(readf)](io::Result<tiering::StringDecoder*> res) mutable {
327+
readf(res.transform([](auto* d) { return string{d->Read()}; }));
333328
};
334-
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
329+
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), tiering::StringDecoder{value},
330+
std::move(cb));
335331
}
336332

337333
template <typename T>
@@ -341,21 +337,11 @@ TieredStorage::TResult<T> TieredStorage::Modify(DbIndex dbid, std::string_view k
341337
DCHECK(value.IsExternal());
342338

343339
util::fb2::Future<io::Result<T>> future;
344-
auto cb = [future, modf = std::move(modf), enc = value.GetStrEncoding()](auto res) mutable {
345-
if (!res.has_value()) {
346-
future.Resolve(res.get_unexpected());
347-
return false;
348-
}
349-
350-
auto [raw_val, is_raw] = *res;
351-
if (is_raw) {
352-
raw_val->resize(enc.DecodedSize(*raw_val));
353-
enc.Decode(*raw_val, raw_val->data());
354-
}
355-
future.Resolve(modf(raw_val));
356-
return true;
340+
auto cb = [future, modf = std::move(modf)](io::Result<tiering::StringDecoder*> res) mutable {
341+
future.Resolve(res.transform([&modf](auto* d) { return modf(d->Write()); }));
357342
};
358-
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
343+
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), tiering::StringDecoder{value},
344+
std::move(cb));
359345
return future;
360346
}
361347

src/server/tiering/decoders.cc

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright 2025, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
5+
#include "server/tiering/decoders.h"
6+
7+
namespace dfly::tiering {
8+
9+
StringDecoder::StringDecoder(const CompactObj& obj) : StringDecoder{obj.GetStrEncoding()} {
10+
}
11+
12+
StringDecoder::StringDecoder(CompactObj::StrEncoding encoding) : encoding_{encoding} {
13+
}
14+
15+
std::unique_ptr<Decoder> StringDecoder::Clone() const {
16+
return std::unique_ptr<StringDecoder>{new StringDecoder(encoding_)};
17+
}
18+
19+
void StringDecoder::Initialize(std::string_view slice) {
20+
value_ = encoding_.Decode(slice);
21+
estimated_mem_usage = value_.view().length();
22+
}
23+
24+
void StringDecoder::Upload(CompactObj* obj) {
25+
obj->Materialize(value_.view(), false);
26+
}
27+
28+
std::string_view StringDecoder::Read() const {
29+
return value_.view();
30+
}
31+
32+
std::string* StringDecoder::Write() {
33+
modified = true;
34+
return value_.BorrowMut();
35+
}
36+
37+
} // namespace dfly::tiering

src/server/tiering/decoders.h

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright 2025, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
5+
#pragma once
6+
7+
#include <memory>
8+
#include <optional>
9+
#include <string>
10+
#include <string_view>
11+
12+
#include "core/compact_object.h"
13+
#include "core/string_or_view.h"
14+
15+
namespace dfly::tiering {
16+
17+
// Decodes serialized value and provides it to callbacks.
18+
// Acts as generic interface to callback driver (OpManager)
19+
struct Decoder {
20+
virtual ~Decoder() = default;
21+
22+
// Poor man's type-erasure copy
23+
virtual std::unique_ptr<Decoder> Clone() const = 0;
24+
25+
// Initialize decoder from slice
26+
virtual void Initialize(std::string_view slice) = 0;
27+
28+
// Store value in compact object
29+
virtual void Upload(CompactObj* obj) = 0;
30+
31+
bool modified = false; // Must be set if modified (not equal to original slice)
32+
size_t estimated_mem_usage = 0; // Estimated usage if uploaded
33+
};
34+
35+
struct StringDecoder : public Decoder {
36+
explicit StringDecoder(const CompactObj& obj);
37+
38+
std::unique_ptr<Decoder> Clone() const override;
39+
void Initialize(std::string_view slice) override;
40+
void Upload(CompactObj* obj) override;
41+
42+
std::string_view Read() const;
43+
std::string* Write();
44+
45+
private:
46+
explicit StringDecoder(CompactObj::StrEncoding encoding);
47+
48+
CompactObj::StrEncoding encoding_;
49+
dfly::StringOrView value_;
50+
};
51+
52+
} // namespace dfly::tiering

src/server/tiering/op_manager.cc

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,11 @@ void OpManager::Close() {
5858
DCHECK(pending_reads_.empty());
5959
}
6060

61-
void OpManager::Enqueue(EntryId id, DiskSegment segment, ReadCallback cb) {
61+
void OpManager::EnqueueInternal(EntryId id, DiskSegment segment, const Decoder& decoder,
62+
ReadCallback cb) {
6263
// Fill pages for prepared read as it has no penalty and potentially covers more small segments
6364
PrepareRead(segment.ContainingPages())
64-
.ForSegment(segment, id)
65+
.ForSegment(segment, id, decoder)
6566
.callbacks.emplace_back(std::move(cb));
6667
}
6768

@@ -147,16 +148,15 @@ void OpManager::ProcessRead(size_t offset, io::Result<std::string_view> page) {
147148
}
148149

149150
bool deleting_full = false;
150-
std::string key_value;
151151

152152
// Notify functions in the loop may append items to info->key_ops during the traversal
153153
for (size_t i = 0; i < info->key_ops.size(); i++) {
154-
bool modified = false;
155154
auto& ko = info->key_ops[i];
156155
if (page) {
157-
key_value = page->substr(ko.segment.offset - info->segment.offset, ko.segment.length);
156+
size_t offset = ko.segment.offset - info->segment.offset;
157+
ko.decoder->Initialize(page->substr(offset, ko.segment.length));
158158
for (auto& cb : ko.callbacks)
159-
modified |= cb(std::pair{&key_value, !modified});
159+
cb(&*ko.decoder);
160160
} else {
161161
for (auto& cb : ko.callbacks)
162162
cb(page.get_unexpected());
@@ -167,7 +167,7 @@ void OpManager::ProcessRead(size_t offset, io::Result<std::string_view> page) {
167167
// If the item is not being deleted, report is as fetched to be cached potentially.
168168
// In case it's cached, we might need to delete it.
169169
if (page.has_value() && !delete_from_storage)
170-
delete_from_storage |= NotifyFetched(Borrowed(ko.id), key_value, ko.segment, modified);
170+
delete_from_storage |= NotifyFetched(Borrowed(ko.id), ko.segment, &*ko.decoder);
171171

172172
// If the item is being deleted, check if the full page needs to be deleted.
173173
if (delete_from_storage)
@@ -181,15 +181,22 @@ void OpManager::ProcessRead(size_t offset, io::Result<std::string_view> page) {
181181
pending_reads_.erase(offset);
182182
}
183183

184-
OpManager::EntryOps& OpManager::ReadOp::ForSegment(DiskSegment key_segment, EntryId id) {
184+
OpManager::EntryOps::EntryOps(OwnedEntryId id, DiskSegment segment, const Decoder& decoder)
185+
: id{std::move(id)}, segment{segment}, decoder{decoder.Clone()} {
186+
}
187+
188+
OpManager::EntryOps& OpManager::ReadOp::ForSegment(DiskSegment key_segment, EntryId id,
189+
const Decoder& decoder) {
185190
DCHECK_GE(key_segment.offset, segment.offset);
186191
DCHECK_LE(key_segment.length, segment.length);
187192

188193
for (auto& ops : key_ops) {
189-
if (ops.segment.offset == key_segment.offset)
194+
if (ops.segment.offset == key_segment.offset) {
195+
DCHECK(typeid(*ops.decoder) == typeid(decoder));
190196
return ops;
197+
}
191198
}
192-
return key_ops.emplace_back(ToOwned(id), key_segment);
199+
return key_ops.emplace_back(ToOwned(id), key_segment, decoder);
193200
}
194201

195202
OpManager::EntryOps* OpManager::ReadOp::Find(DiskSegment key_segment) {

src/server/tiering/op_manager.h

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <variant>
1111

1212
#include "server/tiering/common.h"
13+
#include "server/tiering/decoders.h"
1314
#include "server/tiering/disk_storage.h"
1415
#include "server/tx_base.h"
1516
#include "util/fibers/future.h"
@@ -36,19 +37,6 @@ class OpManager {
3637
using EntryId = std::variant<unsigned, KeyRef>;
3738
using OwnedEntryId = std::variant<unsigned, std::pair<DbIndex, std::string>>;
3839

39-
using FetchedEntry = std::pair<std::string* /* ptr */, bool /* raw */>;
40-
41-
// Callback for post-read completion. Returns whether the value was modified.
42-
// We use fu2 function to allow moveable semantics. The arguments are:
43-
// bool - true if the string is raw as it was extracted from the prime value.
44-
// string* - the string that may potentially be modified by the callbacks that subsribed to this
45-
// read. The callback run in the same order as the order of invocation, guaranteeing
46-
// consistent read after modifications.
47-
using ReadCallback =
48-
fu2::function_base<true /*owns*/, false /*moveable*/, fu2::capacity_fixed<40, 8>,
49-
false /* non-throwing*/, false /* strong exceptions guarantees*/,
50-
bool(io::Result<FetchedEntry>)>;
51-
5240
explicit OpManager(size_t max_size);
5341
virtual ~OpManager();
5442

@@ -60,7 +48,14 @@ class OpManager {
6048
// Enqueue callback to be executed once value is read. Trigger read if none is pending yet for
6149
// this segment. Multiple entries can be obtained from a single segment, but every distinct id
6250
// will have it's own independent callback loop that can safely modify the underlying value
63-
void Enqueue(EntryId id, DiskSegment segment, ReadCallback cb);
51+
template <typename D, typename F>
52+
void Enqueue(EntryId id, DiskSegment segment, const D& decoder, F&& cb) {
53+
static_assert(std::is_base_of_v<Decoder, D>);
54+
auto erased_cb = [cb = std::forward<F>(cb)](io::Result<Decoder*> res) mutable {
55+
cb(res.transform([](Decoder* ptr) { return static_cast<D*>(ptr); }));
56+
};
57+
EnqueueInternal(id, segment, decoder, std::move(erased_cb));
58+
}
6459

6560
// Delete entry with pending io
6661
void Delete(EntryId id);
@@ -74,27 +69,34 @@ class OpManager {
7469
Stats GetStats() const;
7570

7671
protected:
72+
using ReadCallback =
73+
fu2::function_base<true /*owns*/, false /*moveable*/, fu2::capacity_fixed<40, 8>,
74+
false /* non-throwing*/, false /* strong exceptions guarantees*/,
75+
void(io::Result<Decoder*>)>;
76+
77+
// Type erased continuation of Enqueue
78+
void EnqueueInternal(EntryId id, DiskSegment segment, const Decoder& decoder, ReadCallback cb);
79+
7780
// Notify that a stash succeeded and the entry was stored at the provided segment or failed with
7881
// given error
7982
virtual void NotifyStashed(EntryId id, const io::Result<DiskSegment>& segment) = 0;
8083

8184
// Notify that an entry was successfully fetched. Includes whether entry was modified.
8285
// Returns true if value needs to be deleted from the storage.
83-
virtual bool NotifyFetched(EntryId id, std::string_view value, DiskSegment segment,
84-
bool modified) = 0;
86+
virtual bool NotifyFetched(EntryId id, DiskSegment segment, Decoder*) = 0;
8587

8688
// Notify delete. Return true if the filled segment needs to be marked as free.
8789
virtual bool NotifyDelete(DiskSegment segment) = 0;
8890

8991
protected:
9092
// Describes pending futures for a single entry
9193
struct EntryOps {
92-
EntryOps(OwnedEntryId id, DiskSegment segment) : id(std::move(id)), segment(segment) {
93-
}
94+
EntryOps(OwnedEntryId id, DiskSegment segment, const Decoder& decoder);
9495

9596
OwnedEntryId id;
9697
DiskSegment segment;
9798
absl::InlinedVector<ReadCallback, 1> callbacks;
99+
std::unique_ptr<Decoder> decoder;
98100
bool deleting = false;
99101
};
100102

@@ -104,7 +106,7 @@ class OpManager {
104106
}
105107

106108
// Get ops for id or create new
107-
EntryOps& ForSegment(DiskSegment segment, EntryId id);
109+
EntryOps& ForSegment(DiskSegment segment, EntryId id, const Decoder& decoder);
108110

109111
// Find if there are operations for the given segment, return nullptr otherwise
110112
EntryOps* Find(DiskSegment segment);

0 commit comments

Comments
 (0)