diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index 6859991cc68d..8912365cec4a 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -1224,11 +1224,13 @@ CompactObj::ExternalRep CompactObj::GetExternalRep() const { return static_cast(u_.ext_ptr.representation); } -void CompactObj::SetCool(size_t offset, uint32_t sz, detail::TieredColdRecord* record) { +void CompactObj::SetCool(size_t offset, uint32_t sz, ExternalRep rep, + detail::TieredColdRecord* record) { // We copy the mask of the "cooled" referenced object because it contains the encoding info. SetMeta(EXTERNAL_TAG, record->value.mask_); u_.ext_ptr.is_cool = 1; + u_.ext_ptr.representation = static_cast(rep); u_.ext_ptr.page_offset = offset % 4096; u_.ext_ptr.serialized_size = sz; u_.ext_ptr.cool_record = record; @@ -1244,6 +1246,10 @@ auto CompactObj::GetCool() const -> CoolItem { return res; } +void CompactObj::Freeze(size_t offset, size_t sz) { + SetExternal(offset, sz, GetExternalRep()); +} + std::pair CompactObj::GetExternalSlice() const { DCHECK_EQ(EXTERNAL_TAG, taglen_); auto& ext = u_.ext_ptr; diff --git a/src/core/compact_object.h b/src/core/compact_object.h index 50f1797d9c93..92af8ac2414c 100644 --- a/src/core/compact_object.h +++ b/src/core/compact_object.h @@ -364,7 +364,8 @@ class CompactObj { } // Assigns a cooling record to the object together with its external slice. - void SetCool(size_t offset, uint32_t serialized_size, detail::TieredColdRecord* record); + void SetCool(size_t offset, uint32_t serialized_size, ExternalRep rep, + detail::TieredColdRecord* record); struct CoolItem { uint16_t page_offset; @@ -376,6 +377,10 @@ class CompactObj { // Returns the external data of the object incuding its ColdRecord. CoolItem GetCool() const; + // Prequisite: IsCool() is true. + // Keeps cool record only as external value and discard in-memory part. + void Freeze(size_t offset, size_t sz); + std::pair GetExternalSlice() const; // Injects either the the raw string (extracted with GetRawString()) or the usual string diff --git a/src/core/detail/listpack_wrap.cc b/src/core/detail/listpack_wrap.cc index e08bdfa5caff..7d86c8d73c1d 100644 --- a/src/core/detail/listpack_wrap.cc +++ b/src/core/detail/listpack_wrap.cc @@ -33,12 +33,11 @@ void ListpackWrap::Iterator::Read() { next_ptr_ = lpNext(lp_, next_ptr_); } -ListpackWrap::~ListpackWrap() { - DCHECK(!dirty_); +ListpackWrap ListpackWrap::WithCapacity(size_t capacity) { + return ListpackWrap{lpNew(capacity)}; } uint8_t* ListpackWrap::GetPointer() { - dirty_ = false; return lp_; } @@ -59,7 +58,6 @@ bool ListpackWrap::Delete(std::string_view key) { return false; lp_ = lpDeleteRangeWithEntry(lp_, &ptr, 2); - dirty_ = true; return true; } @@ -86,7 +84,6 @@ bool ListpackWrap::Insert(std::string_view key, std::string_view value, bool ski lp_ = lpReplace(lp_, &vptr, vsrc, value.size()); DCHECK_EQ(0u, lpLength(lp_) % 2); - dirty_ = true; updated = true; } } @@ -96,7 +93,6 @@ bool ListpackWrap::Insert(std::string_view key, std::string_view value, bool ski // TODO: we should at least allocate once for both elements lp_ = lpAppend(lp_, fsrc, key.size()); lp_ = lpAppend(lp_, vsrc, value.size()); - dirty_ = true; } return !updated; @@ -106,6 +102,10 @@ size_t ListpackWrap::size() const { return lpLength(lp_) / 2; } +size_t ListpackWrap::DataBytes() const { + return lpBytes(lp_); +} + ListpackWrap::Iterator ListpackWrap::begin() const { return Iterator{lp_, lpFirst(lp_), intbuf_}; } diff --git a/src/core/detail/listpack_wrap.h b/src/core/detail/listpack_wrap.h index de11dd0874c6..80f5e9484b1b 100644 --- a/src/core/detail/listpack_wrap.h +++ b/src/core/detail/listpack_wrap.h @@ -15,8 +15,6 @@ struct ListpackWrap { using IntBuf = uint8_t[2][24]; public: - ~ListpackWrap(); - struct Iterator { using iterator_category = std::forward_iterator_tag; using difference_type = std::ptrdiff_t; @@ -48,6 +46,9 @@ struct ListpackWrap { explicit ListpackWrap(uint8_t* lp) : lp_{lp} { } + // Create listpack with capacity + static ListpackWrap WithCapacity(size_t capacity); + uint8_t* GetPointer(); // Get new updated pointer Iterator Find(std::string_view key) const; // Linear search bool Delete(std::string_view key); @@ -57,13 +58,14 @@ struct ListpackWrap { Iterator end() const; size_t size() const; // number of entries + size_t DataBytes() const; + // Get view from raw listpack iterator static std::string_view GetView(uint8_t* lp_it, uint8_t int_buf[]); private: uint8_t* lp_; // the listpack itself mutable IntBuf intbuf_; // buffer for integers decoded to strings - bool dirty_ = false; // whether lp_ was updated, but never retrieved with GetPointer }; } // namespace dfly::detail diff --git a/src/server/common.cc b/src/server/common.cc index 5613e8c64cd3..2b4c8cf465cc 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -162,7 +162,7 @@ bool ParseDouble(string_view src, double* value) { #define ADD(x) (x) += o.x TieredStats& TieredStats::operator+=(const TieredStats& o) { - static_assert(sizeof(TieredStats) == 160); + static_assert(sizeof(TieredStats) == 168); ADD(total_stashes); ADD(total_fetches); @@ -182,6 +182,8 @@ TieredStats& TieredStats::operator+=(const TieredStats& o) { ADD(small_bins_cnt); ADD(small_bins_entries_cnt); ADD(small_bins_filling_bytes); + ADD(small_bins_filling_entries_cnt); + ADD(total_stash_overflows); ADD(cold_storage_bytes); ADD(total_offloading_steps); diff --git a/src/server/common.h b/src/server/common.h index 34f83183cf36..d226736bad41 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -91,6 +91,7 @@ struct TieredStats { uint64_t small_bins_cnt = 0; uint64_t small_bins_entries_cnt = 0; size_t small_bins_filling_bytes = 0; + size_t small_bins_filling_entries_cnt = 0; size_t cold_storage_bytes = 0; uint64_t clients_throttled = 0; // current number of throttled clients diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index a7b45a263512..500561c8133d 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -75,6 +75,7 @@ struct HMapWrap { } public: + // Create from non-external prime value HMapWrap(const PrimeValue& pv, DbContext db_cntx) { DCHECK(!pv.IsExternal() || pv.IsCool()); if (pv.Encoding() == kEncodingListPack) @@ -83,6 +84,9 @@ struct HMapWrap { impl_ = GetStringMap(pv, db_cntx); } + explicit HMapWrap(detail::ListpackWrap lw) : impl_{std::move(lw)} { + } + explicit HMapWrap(tiering::SerializedMap* sm) : impl_{sm} { } @@ -193,7 +197,12 @@ OpResult ExecuteRO(Transaction* tx, F&& f) { using D = tiering::SerializedMapDecoder; util::fb2::Future> fut; auto read_cb = [fut, f = std::move(f)](io::Result res) mutable { - HMapWrap hw{res.value()->Get()}; + // Create wrapper from different types + Overloaded ov{ + [](tiering::SerializedMap* sm) { return HMapWrap{sm}; }, + [](detail::ListpackWrap* lw) { return HMapWrap{*lw}; }, + }; + auto hw = visit(ov, res.value()->Read()); fut.Resolve(f(hw)); }; @@ -216,15 +225,34 @@ OpResult ExecuteRO(Transaction* tx, F&& f) { } // Wrap write handler -template auto WrapW(F&& f) { - using RT = std::invoke_result_t; - return [f = std::forward(f)](Transaction* t, EngineShard* es) -> RT { +template auto ExecuteW(Transaction* tx, F&& f) { + using T = typename std::invoke_result_t::Type; + auto shard_cb = [f = std::forward(f)](Transaction* t, + EngineShard* es) -> OpResult> { + // Fetch value of hash type auto [key, op_args] = KeyAndArgs(t, es); auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_HASH); RETURN_ON_BAD_STATUS(it_res); auto& pv = it_res->it->second; + // Enqueue read for future values + if (pv.IsExternal() && !pv.IsCool()) { + using D = tiering::SerializedMapDecoder; + util::fb2::Future> fut; + auto read_cb = [fut, f = std::move(f)](io::Result res) mutable { + // Create wrapper from different types + HMapWrap hw{*res.value()->Write()}; + fut.Resolve(f(hw)); + + // soak listpack wrapper back to get updated value + *res.value()->Write() = *hw.Get(); + }; + + es->tiered_storage()->Read(op_args.db_cntx.db_index, key, pv, D{}, std::move(read_cb)); + return CbVariant{std::move(fut)}; + } + // Remove document before modification op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, pv); @@ -240,8 +268,11 @@ template auto WrapW(F&& f) { else op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv); - return res; + RETURN_ON_BAD_STATUS(res); + return CbVariant{std::move(res).value()}; }; + + return Unwrap(tx->ScheduleSingleHopT(std::move(shard_cb))); } size_t EstimateListpackMinBytes(CmdArgList members) { @@ -391,14 +422,20 @@ OpResult> OpHMGet(const HMapWrap& hw, CmdArgList fields) { DCHECK(!fields.empty()); std::vector result(fields.size()); - if (auto lw = hw.Get(); lw) { + if (auto sm = hw.Get(); sm) { + for (size_t i = 0; i < fields.size(); ++i) { + if (auto it = (*sm)->Find(fields[i]); it != (*sm)->end()) { + result[i].emplace(it->second, sdslen(it->second)); + } + } + } else { absl::flat_hash_map> reverse; reverse.reserve(fields.size() + 1); for (size_t i = 0; i < fields.size(); ++i) { reverse[ArgS(fields, i)].push_back(i); // map fields to their index. } - for (const auto [key, value] : *lw) { + for (const auto [key, value] : hw.Range()) { if (auto it = reverse.find(key); it != reverse.end()) { for (size_t index : it->second) { DCHECK_LT(index, result.size()); @@ -406,13 +443,6 @@ OpResult> OpHMGet(const HMapWrap& hw, CmdArgList fields) { } } } - } else { - StringMap* sm = *hw.Get(); - for (size_t i = 0; i < fields.size(); ++i) { - if (auto it = sm->Find(fields[i]); it != sm->end()) { - result[i].emplace(it->second, sdslen(it->second)); - } - } } return result; @@ -424,8 +454,9 @@ struct OpSetParams { bool keepttl = false; }; -OpResult OpSet(const OpArgs& op_args, string_view key, CmdArgList values, - const OpSetParams& op_sp = OpSetParams{}) { +OpResult> OpSet(const OpArgs& op_args, string_view key, CmdArgList values, + const OpSetParams& op_sp = OpSetParams{}, + optional>* bp_anker = nullptr) { DCHECK(!values.empty() && 0 == values.size() % 2); VLOG(2) << "OpSet(" << key << ")"; @@ -438,6 +469,26 @@ OpResult OpSet(const OpArgs& op_args, string_view key, CmdArgList valu auto& it = add_res.it; PrimeValue& pv = it->second; + // If the value is external, enqueue read and modify it there + if (pv.IsExternal() && !pv.IsCool()) { + CHECK(op_sp.ttl == UINT32_MAX); // TODO: remove + using D = tiering::SerializedMapDecoder; + util::fb2::Future> fut; + auto read_cb = [fut, values, &op_sp](io::Result res) mutable { + // Create wrapper from different types + auto& lw = *res.value()->Write(); + uint32_t created = 0; + for (size_t i = 0; i < values.size(); i += 2) { + created += lw.Insert(values[i], values[i + 1], op_sp.skip_if_exists); + } + fut.Resolve(created); + }; + + op_args.shard->tiered_storage()->Read(op_args.db_cntx.db_index, key, pv, D{}, + std::move(read_cb)); + return CbVariant{std::move(fut)}; + } + if (add_res.is_new) { if (op_sp.ttl == UINT32_MAX) { lp = lpNew(0); @@ -492,7 +543,13 @@ OpResult OpSet(const OpArgs& op_args, string_view key, CmdArgList valu op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv); - return created; + if (auto* ts = op_args.shard->tiered_storage(); ts) { + auto bp = ts->TryStash(op_args.db_cntx.db_index, key, &pv, true); + if (bp && bp_anker) + *bp_anker = std::move(*bp); + } + + return CbVariant{created}; } void HGetGeneric(CmdArgList args, uint8_t getall_mask, Transaction* tx, SinkReplyBuilder* builder) { @@ -584,7 +641,8 @@ void HSetEx(CmdArgList args, const CommandContext& cmd_cntx) { return OpSet(t->GetOpArgs(shard), key, fields, op_sp); }; - OpResult result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); + auto delayed_result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); + OpResult result = Unwrap(std::move(delayed_result)); if (result) { cmd_cntx.rb->SendLong(*result); } else { @@ -615,7 +673,7 @@ void HSetFamily::HDel(CmdArgList args, const CommandContext& cmd_cntx) { deleted += hw.Erase(s); return deleted; }; - HSetReplies{cmd_cntx.rb}.Send(cmd_cntx.tx->ScheduleSingleHopT(WrapW(cb))); + HSetReplies{cmd_cntx.rb}.Send(ExecuteW(cmd_cntx.tx, std::move(cb))); } void HSetFamily::HExpire(CmdArgList args, const CommandContext& cmd_cntx) { @@ -853,12 +911,18 @@ void HSetFamily::HSet(CmdArgList args, const CommandContext& cmd_cntx) { return cmd_cntx.rb->SendError(facade::WrongNumArgsError(cmd), kSyntaxErrType); } + optional> tiered_backpressure; + args.remove_prefix(1); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpSet(t->GetOpArgs(shard), key, args); + return OpSet(t->GetOpArgs(shard), key, args, OpSetParams{}, &tiered_backpressure); }; - OpResult result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); + auto delayed_result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); + OpResult result = Unwrap(std::move(delayed_result)); + + if (tiered_backpressure) + tiered_backpressure->GetFor(10ms); if (result && cmd == "HSET") { cmd_cntx.rb->SendLong(*result); @@ -873,7 +937,7 @@ void HSetFamily::HSetNx(CmdArgList args, const CommandContext& cmd_cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { return OpSet(t->GetOpArgs(shard), key, args.subspan(1), OpSetParams{.skip_if_exists = true}); }; - HSetReplies{cmd_cntx.rb}.Send(cmd_cntx.tx->ScheduleSingleHopT(cb)); + HSetReplies{cmd_cntx.rb}.Send(Unwrap(cmd_cntx.tx->ScheduleSingleHopT(cb))); } void StrVecEmplaceBack(StringVec& str_vec, const listpackEntry& lp) { diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index b46efc99d688..ae1b09adeeff 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -17,6 +17,7 @@ #include "base/flag_utils.h" #include "base/flags.h" #include "base/logging.h" +#include "core/detail/listpack_wrap.h" #include "server/common.h" #include "server/db_slice.h" #include "server/engine_shard_set.h" @@ -24,9 +25,14 @@ #include "server/table.h" #include "server/tiering/common.h" #include "server/tiering/op_manager.h" +#include "server/tiering/serialized_map.h" #include "server/tiering/small_bins.h" #include "server/tx_base.h" +extern "C" { +#include "redis/listpack.h" +} + using namespace facade; using AtLeast64 = base::ConstrainedNumericFlagValue; // ABSL_FLAG breaks with commas @@ -46,6 +52,8 @@ ABSL_FLAG(float, tiered_offload_threshold, 0.5, ABSL_FLAG(float, tiered_upload_threshold, 0.1, "Ratio of free memory (free/max memory) below which uploading stops"); +ABSL_FLAG(bool, tiered_experimental_hash_support, false, "Experimental hash datatype offloading"); + namespace dfly { using namespace std; @@ -73,6 +81,58 @@ tiering::DiskSegment FromCoolItem(const PrimeValue::CoolItem& item) { return {item.record->page_index * tiering::kPageSize + item.page_offset, item.serialized_size}; } +// Determine required byte size and encoding type based on value. +// TODO(vlad): Maybe split into different accessors? +// Do NOT enforce rules depending on dynamic runtime values as this is called +// when scheduling stash and just before succeeeding and is expected to return the same results +optional> EstimateSerializedSize( + const PrimeValue& pv) { + switch (pv.ObjType()) { + case OBJ_STRING: + return std::make_pair(pv.GetRawString().view().size(), CompactObj::ExternalRep::STRING); + case OBJ_HASH: + if (pv.Encoding() == kEncodingListPack) { + auto* lp = static_cast(pv.RObjPtr()); + size_t bytes = lpBytes(lp); + bytes += lpLength(lp) * 2 * 4; + return std::make_pair(bytes, CompactObj::ExternalRep::SERIALIZED_MAP); + } + return {}; + default: + return {}; + }; +} + +size_t Serialize(CompactObj::ExternalRep rep, const PrimeValue& pv, io::MutableBytes buffer) { + DCHECK_LE(EstimateSerializedSize(pv)->first, buffer.size()); + switch (rep) { + case CompactObj::ExternalRep::STRING: { + auto sv = pv.GetRawString(); + memcpy(buffer.data(), sv.view().data(), sv.view().size()); + return sv.view().size(); + } + case CompactObj::ExternalRep::SERIALIZED_MAP: { + DCHECK_EQ(pv.Encoding(), kEncodingListPack); + + // TODO(vlad): Optimize copy for serialization + detail::ListpackWrap lw{static_cast(pv.RObjPtr())}; + vector> entries(lw.begin(), lw.end()); + return tiering::SerializedMap::Serialize( + entries, {reinterpret_cast(buffer.data()), buffer.length()}); + } + }; + return 0; +} + +string SerializeString(const PrimeValue& pv) { + auto estimate = EstimateSerializedSize(pv); + string s(estimate->first, 0); + size_t written = + Serialize(estimate->second, pv, {reinterpret_cast(s.data()), s.size()}); + s.resize(written); + return s; +} + } // anonymous namespace class TieredStorage::ShardOpManager : public tiering::OpManager { @@ -133,12 +193,20 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { void RetireColdEntries(size_t additional_memory); // Set value to be an in-memory type again. Update memory stats. - void Upload(DbIndex dbid, string_view value, bool is_raw, size_t serialized_len, PrimeValue* pv) { + void Upload(DbIndex dbid, string_view value, PrimeValue* pv) { DCHECK(!value.empty()); - DCHECK_EQ(uint8_t(pv->GetExternalRep()), uint8_t(CompactObj::ExternalRep::STRING)); + switch (pv->GetExternalRep()) { + case CompactObj::ExternalRep::STRING: + pv->Materialize(value, true); + break; + case CompactObj::ExternalRep::SERIALIZED_MAP: + tiering::SerializedMapDecoder decoder{}; + decoder.Initialize(value); + decoder.Upload(pv); + break; + }; - pv->Materialize(value, is_raw); - RecordDeleted(*pv, serialized_len, GetDbTableStats(dbid)); + RecordDeleted(*pv, value.size(), GetDbTableStats(dbid)); } // Find entry by key in db_slice and store external segment in place of original value. @@ -153,12 +221,13 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { stats->tiered_used_bytes += segment.length; stats_.total_stashes++; + CompactObj::ExternalRep rep = EstimateSerializedSize(*pv)->second; if (ts_->config_.experimental_cooling) { RetireColdEntries(pv->MallocUsed()); - ts_->CoolDown(key.first, key.second, segment, pv); + ts_->CoolDown(key.first, key.second, segment, rep, pv); } else { stats->AddTypeMemoryUsage(pv->ObjType(), -pv->MallocUsed()); - pv->SetExternal(segment.offset, segment.length, CompactObj::ExternalRep::STRING); + pv->SetExternal(segment.offset, segment.length, rep); } } else { LOG(DFATAL) << "Should not reach here"; @@ -215,7 +284,7 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str } else { // Cut out relevant part of value and restore it to memory string_view value = page.substr(item_segment.offset - segment.offset, item_segment.length); - Upload(dbid, value, true, item_segment.length, &pv); + Upload(dbid, value, &pv); } } } @@ -386,36 +455,40 @@ std::optional> TieredStorage::TryStash(DbIndex dbid, str return {}; } - StringOrView raw_string = value->GetRawString(); - value->SetStashPending(true); + auto estimated = EstimateSerializedSize(*value); + DCHECK(estimated); tiering::OpManager::EntryId id; error_code ec; - // TODO(vlad): Replace with encoders for different types - auto stash_string = [&](std::string_view str) { - if (auto prepared = op_manager_->PrepareStash(str.size()); prepared) { + value->SetStashPending(true); + if (OccupiesWholePages(estimated->first)) { // large enough for own page + id = KeyRef(dbid, key); + if (auto prepared = op_manager_->PrepareStash(estimated->first); prepared) { auto [offset, buf] = *prepared; - memcpy(buf.bytes.data(), str.data(), str.size()); - tiering::DiskSegment segment{offset, str.size()}; + size_t written = Serialize(estimated->second, *value, buf.bytes); + tiering::DiskSegment segment{offset, written}; op_manager_->Stash(id, segment, buf); } else { ec = prepared.error(); } - }; - - if (OccupiesWholePages(value->Size())) { // large enough for own page - id = KeyRef(dbid, key); - stash_string(raw_string.view()); - } else if (auto bin = bins_->Stash(dbid, key, raw_string.view()); bin) { + } else if (auto bin = bins_->Stash(dbid, key, SerializeString(*value)); bin) { id = bin->first; // TODO(vlad): Write bin to prepared buffer instead of allocating one - stash_string(bin->second); + if (auto prepared = op_manager_->PrepareStash(estimated->first); prepared) { + auto [offset, buf] = *prepared; + memcpy(buf.bytes.data(), bin->second.data(), bin->second.size()); + tiering::DiskSegment segment{offset, bin->second.size()}; + op_manager_->Stash(id, segment, buf); + } else { + ec = prepared.error(); + } } else { return {}; // silently added to bin } if (ec) { + value->SetStashPending(false); LOG_IF(ERROR, ec != errc::file_too_large) << "Stash failed immediately" << ec.message(); visit([this](auto id) { op_manager_->ClearIoPending(id); }, id); return {}; @@ -454,7 +527,8 @@ void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* if (auto node = stash_backpressure_.extract(make_pair(dbid, key)); !node.empty()) std::move(node.mapped()).Resolve(false); - if (OccupiesWholePages(value->Size())) { + auto estimated = EstimateSerializedSize(*value); + if (OccupiesWholePages(estimated->first)) { op_manager_->Delete(KeyRef(dbid, key)); } else if (auto bin = bins_->Delete(dbid, key); bin) { op_manager_->Delete(*bin); @@ -489,6 +563,7 @@ TieredStats TieredStorage::GetStats() const { stats.small_bins_cnt = bins_stats.stashed_bins_cnt; stats.small_bins_entries_cnt = bins_stats.stashed_entries_cnt; stats.small_bins_filling_bytes = bins_stats.current_bin_bytes; + stats.small_bins_filling_entries_cnt = bins_stats.current_entries_cnt; } { // Own stats @@ -513,13 +588,14 @@ void TieredStorage::UpdateFromFlags() { .write_depth_limit = absl::GetFlag(FLAGS_tiered_storage_write_depth), .offload_threshold = absl::GetFlag(FLAGS_tiered_offload_threshold), .upload_threshold = absl::GetFlag(FLAGS_tiered_upload_threshold), + .experimental_hash_offload = absl::GetFlag(FLAGS_tiered_experimental_hash_support), }; } std::vector TieredStorage::GetMutableFlagNames() { return base::GetFlagNames(FLAGS_tiered_min_value_size, FLAGS_tiered_experimental_cooling, FLAGS_tiered_storage_write_depth, FLAGS_tiered_offload_threshold, - FLAGS_tiered_upload_threshold); + FLAGS_tiered_upload_threshold, FLAGS_tiered_experimental_hash_support); } bool TieredStorage::ShouldOffload() const { @@ -596,10 +672,10 @@ size_t TieredStorage::ReclaimMemory(size_t goal) { ->prime.FindFirst(record->key_hash, predicate); CHECK(IsValid(it)); PrimeValue& pv = it->second; - tiering::DiskSegment segment = FromCoolItem(pv.GetCool()); // Now the item is only in storage. - pv.SetExternal(segment.offset, segment.length, CompactObj::ExternalRep::STRING); + tiering::DiskSegment segment = FromCoolItem(pv.GetCool()); + pv.Freeze(segment.offset, segment.length); auto* stats = op_manager_->GetDbTableStats(record->db_index); stats->AddTypeMemoryUsage(record->value.ObjType(), -record->value.MallocUsed()); @@ -610,14 +686,28 @@ size_t TieredStorage::ReclaimMemory(size_t goal) { } bool TieredStorage::ShouldStash(const PrimeValue& pv) const { + // Check value state + if (pv.IsExternal() || pv.HasStashPending()) + return false; + + // Estimate value size + auto estimation = EstimateSerializedSize(pv); + if (!estimation) + return false; + + // For now, hash offloading is conditional + if (pv.ObjType() == OBJ_HASH && !config_.experimental_hash_offload) + return false; + const auto& disk_stats = op_manager_->GetStats().disk_stats; - return !pv.IsExternal() && !pv.HasStashPending() && pv.ObjType() == OBJ_STRING && - pv.Size() >= config_.min_value_size && - disk_stats.allocated_bytes + tiering::kPageSize + pv.Size() < disk_stats.max_file_size; + return estimation->first >= config_.min_value_size && + disk_stats.allocated_bytes + tiering::kPageSize + estimation->first < + disk_stats.max_file_size; } void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str, - const tiering::DiskSegment& segment, PrimeValue* pv) { + const tiering::DiskSegment& segment, CompactObj::ExternalRep rep, + PrimeValue* pv) { detail::TieredColdRecord* record = CompactObj::AllocateMR(); cool_queue_.push_front(*record); stats_.cool_memory_used += (sizeof(detail::TieredColdRecord) + pv->MallocUsed()); @@ -627,7 +717,7 @@ void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str, record->page_index = segment.offset / tiering::kPageSize; record->value = std::move(*pv); - pv->SetCool(segment.offset, segment.length, record); + pv->SetCool(segment.offset, segment.length, rep, record); } PrimeValue TieredStorage::Warmup(DbIndex dbid, PrimeValue::CoolItem item) { @@ -636,10 +726,6 @@ PrimeValue TieredStorage::Warmup(DbIndex dbid, PrimeValue::CoolItem item) { // We remove it from both cool storage and the offline storage. PrimeValue hot = DeleteCool(item.record); op_manager_->DeleteOffloaded(dbid, segment); - - // Bring it back to the PrimeTable. - DCHECK(hot.ObjType() == OBJ_STRING); - return hot; } diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index 1d7f2e455d50..49483727273e 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -116,7 +116,7 @@ class TieredStorage { // Moves pv contents to the cool storage and updates pv to point to it. void CoolDown(DbIndex db_ind, std::string_view str, const tiering::DiskSegment& segment, - PrimeValue* pv); + CompactObj::ExternalRep rep, PrimeValue* pv); PrimeValue DeleteCool(detail::TieredColdRecord* record); detail::TieredColdRecord* PopCool(); @@ -138,6 +138,7 @@ class TieredStorage { unsigned write_depth_limit; float offload_threshold; float upload_threshold; + bool experimental_hash_offload; } config_; struct { uint64_t stash_overflow_cnt = 0; diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index 87d5d87f56f1..9121c0d0b02a 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -28,6 +28,7 @@ ABSL_DECLARE_FLAG(float, tiered_offload_threshold); ABSL_DECLARE_FLAG(float, tiered_upload_threshold); ABSL_DECLARE_FLAG(unsigned, tiered_storage_write_depth); ABSL_DECLARE_FLAG(bool, tiered_experimental_cooling); +ABSL_DECLARE_FLAG(bool, tiered_experimental_hash_support); namespace dfly { @@ -489,4 +490,63 @@ TEST_F(PureDiskTSTest, Dump) { EXPECT_EQ(resp, "OK"); } +TEST_P(LatentCoolingTSTest, SimpleHash) { + absl::FlagSaver saver; + absl::SetFlag(&FLAGS_tiered_experimental_hash_support, true); + // For now, never upload as its not implemented yet + absl::SetFlag(&FLAGS_tiered_upload_threshold, 0.0); + UpdateFromFlags(); + + const size_t kNUM = 100; + + auto build_command = [](string_view key) { + vector cmd = {"HSET", string{key}}; + for (char c = 'a'; c <= 'z'; c++) { + cmd.push_back(string{1, c}); + cmd.push_back(string{31, 'x'} + c); + } + return cmd; + }; + + // Create some hashes + for (size_t i = 0; i < kNUM; i++) { + Run(build_command(absl::StrCat("k", i))); + } + + // Wait for all to be stashed or in end up in bins + ExpectConditionWithinTimeout([=] { + auto metrics = GetMetrics(); + VLOG(0) << metrics.tiered_stats.total_stashes << " " + << metrics.tiered_stats.small_bins_entries_cnt; + return metrics.tiered_stats.total_stashes + + metrics.tiered_stats.small_bins_filling_entries_cnt == + kNUM; + }); + + // Verify correctness + for (size_t i = 0; i < kNUM; i++) { + string key = absl::StrCat("k", i); + EXPECT_THAT(Run({"HLEN", key}), IntArg(26)); + + auto resp = Run({"HGET", key, string{1, 'f'}}); + auto v = string{31, 'x'} + 'f'; + EXPECT_EQ(resp, v); + } + + // Wait for all offloads again + ExpectConditionWithinTimeout([=] { + auto metrics = GetMetrics(); + return metrics.db_stats[0].tiered_entries + + metrics.tiered_stats.small_bins_filling_entries_cnt == + kNUM; + }); + + // HDEL + for (size_t i = 0; i < kNUM; i++) { + string key = absl::StrCat("k", i); + EXPECT_THAT(Run({"DEL", key, string{1, 'c'}}), IntArg(1)); + EXPECT_THAT(Run({"HLEN", key}), IntArg(25)); + } +} + } // namespace dfly diff --git a/src/server/tiering/decoders.cc b/src/server/tiering/decoders.cc index 3478734d2ca9..920184f8f4f6 100644 --- a/src/server/tiering/decoders.cc +++ b/src/server/tiering/decoders.cc @@ -4,8 +4,15 @@ #include "server/tiering/decoders.h" +#include "core/compact_object.h" +#include "core/detail/listpack_wrap.h" +#include "core/overloaded.h" #include "server/tiering/serialized_map.h" +extern "C" { +#include "redis/redis_aux.h" // for OBJ_HASH +} + namespace dfly::tiering { std::unique_ptr BareDecoder::Clone() const { @@ -72,16 +79,44 @@ void SerializedMapDecoder::Initialize(std::string_view slice) { } Decoder::UploadMetrics SerializedMapDecoder::GetMetrics() const { - return UploadMetrics{.modified = false, - .estimated_mem_usage = map_->DataBytes() + map_->size() * 2 * 8}; + Overloaded ov{ + [](const SerializedMap& sm) { return sm.DataBytes() + sm.size() * 8; }, + [](const detail::ListpackWrap& lw) { return lw.DataBytes(); }, + }; + size_t bytes = visit(Overloaded{ov, [&](const auto& ptr) { return ov(*ptr); }}, map_); + return UploadMetrics{.modified = modified_, .estimated_mem_usage = bytes}; } void SerializedMapDecoder::Upload(CompactObj* obj) { - ABSL_UNREACHABLE(); + if (std::holds_alternative>(map_)) + MakeOwned(); + + obj->InitRobj(OBJ_HASH, kEncodingListPack, Write()->GetPointer()); +} + +std::variant SerializedMapDecoder::Read() const { + using RT = std::variant; + return std::visit([](auto& ptr) -> RT { return ptr.get(); }, map_); } -SerializedMap* SerializedMapDecoder::Get() const { - return map_.get(); +detail::ListpackWrap* SerializedMapDecoder::Write() { + if (std::holds_alternative>(map_)) + return std::get>(map_).get(); + + // Convert SerializedMap to listpack + MakeOwned(); + modified_ = true; + return Write(); +} + +void SerializedMapDecoder::MakeOwned() { + auto& map = std::get>(map_); + + auto lw = detail::ListpackWrap::WithCapacity(GetMetrics().estimated_mem_usage); + for (const auto& [key, value] : *map) + lw.Insert(key, value, true); + + map_ = std::make_unique(lw); } } // namespace dfly::tiering diff --git a/src/server/tiering/decoders.h b/src/server/tiering/decoders.h index 66149f93aade..d42c7d46a047 100644 --- a/src/server/tiering/decoders.h +++ b/src/server/tiering/decoders.h @@ -12,6 +12,10 @@ #include "core/compact_object.h" #include "core/string_or_view.h" +namespace dfly::detail { +struct ListpackWrap; +} + namespace dfly::tiering { struct SerializedMap; @@ -77,10 +81,14 @@ struct SerializedMapDecoder : public Decoder { UploadMetrics GetMetrics() const override; void Upload(CompactObj* obj) override; - SerializedMap* Get() const; + std::variant Read() const; + dfly::detail::ListpackWrap* Write(); private: - std::unique_ptr map_; + void MakeOwned(); // Convert to listpack + + bool modified_; + std::variant, std::unique_ptr> map_; }; } // namespace dfly::tiering diff --git a/src/server/tiering/serialized_map.h b/src/server/tiering/serialized_map.h index 1f9dd6a57193..2c17c2ec75b3 100644 --- a/src/server/tiering/serialized_map.h +++ b/src/server/tiering/serialized_map.h @@ -50,7 +50,7 @@ struct SerializedMap { size_t DataBytes() const; // Input for serialization - using Input = const absl::Span>; + using Input = const absl::Span>; // Buffer size required for serialization static size_t SerializeSize(Input); diff --git a/src/server/tiering/serialized_map_test.cc b/src/server/tiering/serialized_map_test.cc index adb745565005..e6be96f853b0 100644 --- a/src/server/tiering/serialized_map_test.cc +++ b/src/server/tiering/serialized_map_test.cc @@ -12,11 +12,11 @@ using namespace std; class SerializedMapTest : public ::testing::Test {}; TEST_F(SerializedMapTest, TestBasic) { - const vector> kBase = {{"first key", "first value"}, - {"second key", "second value"}, - {"third key", "third value"}, - {"fourth key", "fourth value"}, - {"fifth key", "fifth value"}}; + const vector> kBase = {{"first key", "first value"}, + {"second key", "second value"}, + {"third key", "third value"}, + {"fourth key", "fourth value"}, + {"fifth key", "fifth value"}}; // Serialize kBase to buffer std::string buffer; @@ -31,7 +31,8 @@ TEST_F(SerializedMapTest, TestBasic) { // Check entries size_t idx = 0; for (auto it = map.begin(); it != map.end(); ++it, ++idx) { - EXPECT_EQ(*it, kBase[idx]); + EXPECT_EQ((*it).first, kBase[idx].first); + EXPECT_EQ((*it).second, kBase[idx].second); } } diff --git a/src/server/tiering/small_bins.cc b/src/server/tiering/small_bins.cc index df1dd8a33c64..1ef6e90ae513 100644 --- a/src/server/tiering/small_bins.cc +++ b/src/server/tiering/small_bins.cc @@ -164,7 +164,8 @@ SmallBins::BinInfo SmallBins::Delete(DiskSegment segment) { SmallBins::Stats SmallBins::GetStats() const { return Stats{.stashed_bins_cnt = stashed_bins_.size(), .stashed_entries_cnt = stats_.stashed_entries_cnt, - .current_bin_bytes = current_bin_bytes_}; + .current_bin_bytes = current_bin_bytes_, + .current_entries_cnt = current_bin_.size()}; } SmallBins::KeyHashDbList SmallBins::DeleteBin(DiskSegment segment, std::string_view value) { diff --git a/src/server/tiering/small_bins.h b/src/server/tiering/small_bins.h index 849f1291c9d3..44920d544a2f 100644 --- a/src/server/tiering/small_bins.h +++ b/src/server/tiering/small_bins.h @@ -25,6 +25,7 @@ class SmallBins { size_t stashed_bins_cnt = 0; size_t stashed_entries_cnt = 0; size_t current_bin_bytes = 0; + size_t current_entries_cnt = 0; }; using BinId = unsigned;