Skip to content

Commit 4442b13

Browse files
authored
feat(hset): Load tiered values in read-only (#5996)
Load tiered hash values with readonly operations
1 parent 400a6cf commit 4442b13

File tree

11 files changed

+177
-56
lines changed

11 files changed

+177
-56
lines changed

src/facade/op_status.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ class OpResultBase {
6666

6767
template <typename V> class OpResult : public OpResultBase {
6868
public:
69+
using Type = V;
70+
6971
OpResult(V&& v) : v_(std::move(v)) {
7072
}
7173

src/server/hset_family.cc

Lines changed: 82 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
#include "server/hset_family.h"
66

77
#include "server/family_utils.h"
8+
#include "server/tiered_storage.h"
9+
#include "server/tiering/decoders.h"
10+
#include "server/tiering/serialized_map.h"
811

912
extern "C" {
1013
#include "redis/listpack.h"
@@ -52,31 +55,49 @@ bool IsGoodForListpack(CmdArgList args, const uint8_t* lp) {
5255

5356
using container_utils::GetStringMap;
5457

58+
// Generic wrapper for multiple underlying map <string, string> types
59+
// holding a variant of:
60+
// 1. Listpack
61+
// 2. StringMap
62+
// 3. SerializedMap (tiered)
5563
struct HMapWrap {
5664
private:
57-
template <typename F> decltype(auto) visit2(F f) const { // Cast T* to T&
58-
return std::visit(Overloaded{[&f](StringMap* s) { return f(*s); }, f}, impl_);
65+
template <typename F> decltype(auto) VisitRef(F f) const { // Cast T* to T&
66+
return std::visit(Overloaded{[&f](auto* s) { return f(*s); }, f}, impl_);
67+
}
68+
69+
template <typename F> decltype(auto) VisitMut(F& f) {
70+
auto serialized_bust = [&](tiering::SerializedMap* s) {
71+
ABSL_UNREACHABLE(); // Serialized maps should never be mutable
72+
return f(static_cast<StringMap*>(nullptr)); // purely for same return type
73+
};
74+
return std::visit(Overloaded{f, serialized_bust}, impl_);
5975
}
6076

6177
public:
6278
HMapWrap(const PrimeValue& pv, DbContext db_cntx) {
79+
DCHECK(!pv.IsExternal() || pv.IsCool());
6380
if (pv.Encoding() == kEncodingListPack)
6481
impl_ = detail::ListpackWrap{static_cast<uint8_t*>(pv.RObjPtr())};
6582
else
6683
impl_ = GetStringMap(pv, db_cntx);
6784
}
6885

86+
explicit HMapWrap(tiering::SerializedMap* sm) : impl_{sm} {
87+
}
88+
6989
size_t Length() const {
7090
Overloaded ov{
7191
[](StringMap* s) { return s->UpperBoundSize(); },
7292
[](const detail::ListpackWrap& lw) { return lw.size(); },
93+
[](tiering::SerializedMap* s) { return s->size(); },
7394
};
7495
return visit(ov, impl_);
7596
}
7697

7798
auto Find(std::string_view key) const {
7899
using RT = optional<pair<string_view, string_view>>;
79-
return visit2([key](auto& h) -> RT {
100+
return VisitRef([key](auto& h) -> RT {
80101
if (auto it = h.Find(key); it != h.end())
81102
return *it;
82103
return std::nullopt;
@@ -86,31 +107,31 @@ struct HMapWrap {
86107
auto Range() const {
87108
auto f = [](auto p) -> pair<string_view, string_view> { return p; }; // implicit conversion
88109
using IT = base::it::CompoundIterator<decltype(f), detail::ListpackWrap::Iterator,
89-
StringMap::iterator>;
110+
StringMap::iterator, tiering::SerializedMap::Iterator>;
90111
auto cb = [f](auto& h) -> std::pair<IT, IT> {
91112
return {{f, h.begin()}, {std::nullopt, h.end()}};
92113
};
93-
return base::it::Range(visit2(cb));
114+
return base::it::Range(VisitRef(cb));
94115
}
95116

96117
bool Erase(std::string_view key) {
97118
Overloaded ov{[key](StringMap* s) { return s->Erase(key); },
98119
[key](detail::ListpackWrap& lw) { return lw.Delete(key); }};
99-
return visit(ov, impl_);
120+
return VisitMut(ov);
100121
}
101122

102123
void AddOrUpdate(std::string_view key, std::string_view value) {
103124
Overloaded ov{[&](StringMap* sm) { sm->AddOrUpdate(key, value); },
104125
[&](detail::ListpackWrap& lw) { lw.Insert(key, value, false); }};
105-
visit(ov, impl_);
126+
VisitMut(ov);
106127
}
107128

108129
void Launder(PrimeValue& pv) {
109130
Overloaded ov{
110131
[](StringMap* s) {},
111132
[&](detail::ListpackWrap& lw) { pv.SetRObjPtr(lw.GetPointer()); },
112133
};
113-
visit(ov, impl_);
134+
VisitMut(ov);
114135
}
115136

116137
template <typename T> optional<T> Get() const {
@@ -120,8 +141,8 @@ struct HMapWrap {
120141
}
121142

122143
private:
123-
variant<StringMap*, detail::ListpackWrap> impl_;
124-
};
144+
variant<StringMap*, tiering::SerializedMap*, detail::ListpackWrap> impl_;
145+
}; // namespace dfly
125146

126147
// Delete if length is zero
127148
void DeleteHw(HMapWrap& hw, const OpArgs& op_args, std::string_view key) {
@@ -139,21 +160,59 @@ auto KeyAndArgs(Transaction* t, EngineShard* es) {
139160
return std::make_pair(t->GetShardArgs(es->shard_id()).Front(), t->GetOpArgs(es));
140161
}
141162

142-
// Wrap read-only handler
143-
template <typename F> auto WrapRO(F&& f) {
144-
using RT = std::invoke_result_t<F, HMapWrap>;
145-
return [f = std::forward<F>(f)](Transaction* t, EngineShard* es) -> RT {
163+
// A wrappable callback returns a OpResult<T> or the future version of it for tiered values.
164+
// Because the top-level value needs to be an OpResult, the variant is wrapped as an OpResult again.
165+
// However, we can take the "result" out of the bare value and keep it only on the top-level.
166+
template <typename T> using CbVariant = std::variant<T, ::util::fb2::Future<OpResult<T>>>;
167+
168+
// Unwrap possibly future result to a regular one
169+
template <typename T> OpResult<T> Unwrap(OpResult<CbVariant<T>> result) {
170+
if (!result.ok())
171+
return result.status();
172+
173+
Overloaded ov{
174+
[](T res) -> OpResult<T> { return res; },
175+
[](util::fb2::Future<OpResult<T>> fut) -> OpResult<T> { return fut.Get(); },
176+
};
177+
return visit(ov, std::move(result).value());
178+
}
179+
180+
// Execute callback on generic HMapWrap, possibly on offloaded value and waiting for result
181+
template <typename F, typename T = typename std::invoke_result_t<F, HMapWrap>::Type>
182+
OpResult<T> ExecuteRO(Transaction* tx, F&& f) {
183+
auto shard_cb = [f = std::forward<F>(f)](Transaction* t,
184+
EngineShard* es) -> OpResult<CbVariant<T>> {
185+
// Fetch value of hash type
146186
auto [key, op_args] = KeyAndArgs(t, es);
147187
auto it_res = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_HASH);
148188
RETURN_ON_BAD_STATUS(it_res);
189+
auto& pv = (*it_res)->second;
190+
191+
// Enqueue read for future values
192+
if (pv.IsExternal() && !pv.IsCool()) {
193+
using D = tiering::SerializedMapDecoder;
194+
util::fb2::Future<OpResult<T>> fut;
195+
auto read_cb = [fut, f = std::move(f)](io::Result<D*> res) mutable {
196+
HMapWrap hw{res.value()->Get()};
197+
fut.Resolve(f(hw));
198+
};
199+
200+
es->tiered_storage()->Read(op_args.db_cntx.db_index, key, pv, D{}, std::move(read_cb));
201+
return CbVariant<T>{std::move(fut)};
202+
}
149203

150-
HMapWrap hw{(*it_res)->second, op_args.db_cntx};
204+
HMapWrap hw{pv, op_args.db_cntx};
151205
auto res = f(hw);
152206

153207
if (hw.Length() == 0) // Expirations might have emptied it
154208
DeleteHw(hw, op_args, key);
155-
return res;
209+
210+
// Move result into variant or keep error status
211+
RETURN_ON_BAD_STATUS(res);
212+
return CbVariant<T>{std::move(res).value()};
156213
};
214+
215+
return Unwrap(tx->ScheduleSingleHopT(std::move(shard_cb)));
157216
}
158217

159218
// Wrap write handler
@@ -452,7 +511,7 @@ void HGetGeneric(CmdArgList args, uint8_t getall_mask, Transaction* tx, SinkRepl
452511
return res;
453512
};
454513

455-
OpResult<vector<string>> result = tx->ScheduleSingleHopT(WrapRO(cb));
514+
OpResult<vector<string>> result = ExecuteRO(tx, cb);
456515
auto* rb = static_cast<RedisReplyBuilder*>(builder);
457516
switch (result.status()) {
458517
case OpStatus::OK:
@@ -611,7 +670,7 @@ void HSetFamily::HGet(CmdArgList args, const CommandContext& cmd_cntx) {
611670
return OpStatus::KEY_NOTFOUND;
612671
};
613672

614-
OpResult<string> result = cmd_cntx.tx->ScheduleSingleHopT(WrapRO(cb));
673+
OpResult<string> result = ExecuteRO(cmd_cntx.tx, cb);
615674
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
616675
switch (result.status()) {
617676
case OpStatus::OK:
@@ -627,7 +686,7 @@ void HSetFamily::HMGet(CmdArgList args, const CommandContext& cmd_cntx) {
627686
auto fields = args.subspan(1);
628687
auto cb = [fields](const HMapWrap& hw) { return OpHMGet(hw, fields); };
629688

630-
OpResult<vector<OptStr>> result = cmd_cntx.tx->ScheduleSingleHopT(WrapRO(cb));
689+
OpResult<vector<OptStr>> result = ExecuteRO(cmd_cntx.tx, cb);
631690
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
632691
switch (result.status()) {
633692
case OpStatus::OK:
@@ -651,19 +710,19 @@ void HSetFamily::HStrLen(CmdArgList args, const CommandContext& cmd_cntx) {
651710
return it->second.length();
652711
return OpStatus::KEY_NOTFOUND;
653712
};
654-
HSetReplies{cmd_cntx.rb}.Send(cmd_cntx.tx->ScheduleSingleHopT(WrapRO(cb)));
713+
HSetReplies{cmd_cntx.rb}.Send(ExecuteRO(cmd_cntx.tx, cb));
655714
}
656715

657716
void HSetFamily::HLen(CmdArgList args, const CommandContext& cmd_cntx) {
658717
auto cb = [](const HMapWrap& hw) -> OpResult<uint32_t> { return hw.Length(); };
659-
HSetReplies{cmd_cntx.rb}.Send(cmd_cntx.tx->ScheduleSingleHopT(WrapRO(cb)));
718+
HSetReplies{cmd_cntx.rb}.Send(ExecuteRO(cmd_cntx.tx, cb));
660719
}
661720

662721
void HSetFamily::HExists(CmdArgList args, const CommandContext& cmd_cntx) {
663722
auto cb = [field = args[1]](const HMapWrap& hw) -> OpResult<uint32_t> {
664723
return hw.Find(field) ? 1 : 0;
665724
};
666-
HSetReplies{cmd_cntx.rb}.Send(cmd_cntx.tx->ScheduleSingleHopT(WrapRO(cb)));
725+
HSetReplies{cmd_cntx.rb}.Send(ExecuteRO(cmd_cntx.tx, cb));
667726
}
668727

669728
void HSetFamily::HIncrBy(CmdArgList args, const CommandContext& cmd_cntx) {
@@ -768,7 +827,7 @@ void HSetFamily::HScan(CmdArgList args, const CommandContext& cmd_cntx) {
768827
const ScanOpts& scan_op = ops.value();
769828
auto cb = [&](const HMapWrap& hw) { return OpScan(hw, &cursor, scan_op); };
770829

771-
OpResult<StringVec> result = cmd_cntx.tx->ScheduleSingleHopT(WrapRO(cb));
830+
OpResult<StringVec> result = ExecuteRO(cmd_cntx.tx, cb);
772831
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
773832
switch (result.status()) {
774833
case OpStatus::KEY_NOTFOUND:

src/server/tiered_storage.cc

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,15 @@ void TieredStorage::Close() {
324324
op_manager_->Close();
325325
}
326326

327+
void TieredStorage::ReadInternal(DbIndex dbid, std::string_view key, const PrimeValue& value,
328+
const tiering::Decoder& decoder,
329+
std::function<void(io::Result<tiering::Decoder*>)> cb) {
330+
DCHECK(value.IsExternal());
331+
DCHECK(!value.IsCool());
332+
// TODO: imporve performance by avoiding one more function wrap
333+
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), decoder, std::move(cb));
334+
}
335+
327336
TieredStorage::TResult<string> TieredStorage::Read(DbIndex dbid, string_view key,
328337
const PrimeValue& value) {
329338
util::fb2::Future<io::Result<string>> fut;
@@ -333,24 +342,24 @@ TieredStorage::TResult<string> TieredStorage::Read(DbIndex dbid, string_view key
333342

334343
void TieredStorage::Read(DbIndex dbid, std::string_view key, const PrimeValue& value,
335344
std::function<void(io::Result<std::string>)> readf) {
336-
DCHECK(value.IsExternal());
337-
DCHECK(!value.IsCool());
338345
auto cb = [readf = std::move(readf)](io::Result<tiering::StringDecoder*> res) mutable {
339346
readf(res.transform([](auto* d) { return string{d->Read()}; }));
340347
};
341-
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), tiering::StringDecoder{value},
342-
std::move(cb));
348+
Read(dbid, key, value, tiering::StringDecoder{value}, std::move(cb));
343349
}
344350

345351
template <typename T>
346352
TieredStorage::TResult<T> TieredStorage::Modify(DbIndex dbid, std::string_view key,
347353
const PrimeValue& value,
348354
std::function<T(std::string*)> modf) {
349355
DCHECK(value.IsExternal());
356+
DCHECK_EQ(value.ObjType(), OBJ_STRING);
350357

351358
util::fb2::Future<io::Result<T>> future;
352-
auto cb = [future, modf = std::move(modf)](io::Result<tiering::StringDecoder*> res) mutable {
353-
future.Resolve(res.transform([&modf](auto* d) { return modf(d->Write()); }));
359+
auto cb = [future, modf = std::move(modf)](io::Result<tiering::Decoder*> res) mutable {
360+
future.Resolve(
361+
res.transform([](auto* d) { return static_cast<tiering::StringDecoder*>(d); }) //
362+
.transform([&modf](auto* d) { return modf(d->Write()); }));
354363
};
355364
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), tiering::StringDecoder{value},
356365
std::move(cb));

src/server/tiered_storage.h

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ class DbSlice;
2525

2626
namespace tiering {
2727
class SmallBins;
28-
};
28+
struct Decoder;
29+
}; // namespace tiering
2930

3031
// Manages offloaded values
3132
class TieredStorage {
@@ -46,10 +47,21 @@ class TieredStorage {
4647
std::error_code Open(std::string_view path);
4748
void Close();
4849

49-
// Read offloaded value. It must be of external type
50+
// Enqueue read external value with generic decoder.
51+
template <typename D, typename F>
52+
void Read(DbIndex dbid, std::string_view key, const PrimeValue& value, const D& decoder, F&& f) {
53+
// TODO(vlad): untangle endless callback wrapping!
54+
// Templates don't consider implicit conversions, so explicitly convert to std::function
55+
auto wrapped_cb = [f = std::forward<F>(f)](io::Result<tiering::Decoder*> res) mutable {
56+
f(res.transform([](auto* d) { return static_cast<D*>(d); }));
57+
};
58+
ReadInternal(dbid, key, value, decoder, wrapped_cb);
59+
}
60+
61+
// Read offloaded value. It must be of external string type
5062
TResult<std::string> Read(DbIndex dbid, std::string_view key, const PrimeValue& value);
5163

52-
// Read offloaded value. It must be of external type
64+
// Read offloaded value. It must be of external string type
5365
void Read(DbIndex dbid, std::string_view key, const PrimeValue& value,
5466
std::function<void(io::Result<std::string>)> readf);
5567

@@ -95,6 +107,10 @@ class TieredStorage {
95107
}
96108

97109
private:
110+
void ReadInternal(DbIndex dbid, std::string_view key, const PrimeValue& value,
111+
const tiering::Decoder& decoder,
112+
std::function<void(io::Result<tiering::Decoder*>)> cb);
113+
98114
// Returns if a value should be stashed
99115
bool ShouldStash(const PrimeValue& pv) const;
100116

src/server/tiering/decoders.cc

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
#include "server/tiering/decoders.h"
66

7+
#include "server/tiering/serialized_map.h"
8+
79
namespace dfly::tiering {
810

911
std::unique_ptr<Decoder> BareDecoder::Clone() const {
@@ -61,4 +63,25 @@ std::string* StringDecoder::Write() {
6163
return value_.GetMutable();
6264
}
6365

66+
std::unique_ptr<Decoder> SerializedMapDecoder::Clone() const {
67+
return std::make_unique<SerializedMapDecoder>();
68+
}
69+
70+
void SerializedMapDecoder::Initialize(std::string_view slice) {
71+
map_ = std::make_unique<SerializedMap>(slice);
72+
}
73+
74+
Decoder::UploadMetrics SerializedMapDecoder::GetMetrics() const {
75+
return UploadMetrics{.modified = false,
76+
.estimated_mem_usage = map_->DataBytes() + map_->size() * 2 * 8};
77+
}
78+
79+
void SerializedMapDecoder::Upload(CompactObj* obj) {
80+
ABSL_UNREACHABLE();
81+
}
82+
83+
SerializedMap* SerializedMapDecoder::Get() const {
84+
return map_.get();
85+
}
86+
6487
} // namespace dfly::tiering

0 commit comments

Comments
 (0)