Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ void RobjWrapper::ReallocateString(MemoryResource* mr) {
void RobjWrapper::Init(unsigned type, unsigned encoding, void* inner) {
type_ = type;
encoding_ = encoding;
CHECK(inner != nullptr);
Set(inner, 0);
}

Expand Down Expand Up @@ -1224,11 +1225,13 @@ CompactObj::ExternalRep CompactObj::GetExternalRep() const {
return static_cast<CompactObj::ExternalRep>(u_.ext_ptr.representation);
}

void CompactObj::SetCool(size_t offset, uint32_t sz, detail::TieredColdRecord* record) {
void CompactObj::SetCool(size_t offset, uint32_t sz, ExternalRep rep,
detail::TieredColdRecord* record) {
// We copy the mask of the "cooled" referenced object because it contains the encoding info.
SetMeta(EXTERNAL_TAG, record->value.mask_);

u_.ext_ptr.is_cool = 1;
u_.ext_ptr.representation = static_cast<uint8_t>(rep);
u_.ext_ptr.page_offset = offset % 4096;
u_.ext_ptr.serialized_size = sz;
u_.ext_ptr.cool_record = record;
Expand All @@ -1244,6 +1247,10 @@ auto CompactObj::GetCool() const -> CoolItem {
return res;
}

void CompactObj::KeepExternal(size_t offset, size_t sz) {
SetExternal(offset, sz, GetExternalRep());
}

std::pair<size_t, size_t> CompactObj::GetExternalSlice() const {
DCHECK_EQ(EXTERNAL_TAG, taglen_);
auto& ext = u_.ext_ptr;
Expand Down
7 changes: 6 additions & 1 deletion src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ class CompactObj {
}

// Assigns a cooling record to the object together with its external slice.
void SetCool(size_t offset, uint32_t serialized_size, detail::TieredColdRecord* record);
void SetCool(size_t offset, uint32_t serialized_size, ExternalRep rep,
detail::TieredColdRecord* record);

struct CoolItem {
uint16_t page_offset;
Expand All @@ -376,6 +377,10 @@ class CompactObj {
// Returns the external data of the object incuding its ColdRecord.
CoolItem GetCool() const;

// Prequisite: IsCool() is true.
// Keeps cool record only as external value and discard in-memory part.
void KeepExternal(size_t offset, size_t sz);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KeepExternal is not clear imho.

Freeze - a more creative option, and maybe DropCool is more direct explaining what it does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.. The names just become more and more cryptic 😃

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed


std::pair<size_t, size_t> GetExternalSlice() const;

// Injects either the the raw string (extracted with GetRawString()) or the usual string
Expand Down
4 changes: 4 additions & 0 deletions src/core/detail/listpack_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ ListpackWrap::~ListpackWrap() {
DCHECK(!dirty_);
}

ListpackWrap ListpackWrap::WithCapacity(size_t capacity) {
return ListpackWrap{lpNew(capacity)};
}

uint8_t* ListpackWrap::GetPointer() {
dirty_ = false;
return lp_;
Expand Down
3 changes: 3 additions & 0 deletions src/core/detail/listpack_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ struct ListpackWrap {
explicit ListpackWrap(uint8_t* lp) : lp_{lp} {
}

// Create listpack with capacity
static ListpackWrap WithCapacity(size_t capacity);

uint8_t* GetPointer(); // Get new updated pointer
Iterator Find(std::string_view key) const; // Linear search
bool Delete(std::string_view key);
Expand Down
4 changes: 3 additions & 1 deletion src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ bool ParseDouble(string_view src, double* value) {
#define ADD(x) (x) += o.x

TieredStats& TieredStats::operator+=(const TieredStats& o) {
static_assert(sizeof(TieredStats) == 160);
static_assert(sizeof(TieredStats) == 168);

ADD(total_stashes);
ADD(total_fetches);
Expand All @@ -182,6 +182,8 @@ TieredStats& TieredStats::operator+=(const TieredStats& o) {
ADD(small_bins_cnt);
ADD(small_bins_entries_cnt);
ADD(small_bins_filling_bytes);
ADD(small_bins_filling_entries_cnt);

ADD(total_stash_overflows);
ADD(cold_storage_bytes);
ADD(total_offloading_steps);
Expand Down
1 change: 1 addition & 0 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ struct TieredStats {
uint64_t small_bins_cnt = 0;
uint64_t small_bins_entries_cnt = 0;
size_t small_bins_filling_bytes = 0;
size_t small_bins_filling_entries_cnt = 0;
size_t cold_storage_bytes = 0;

uint64_t clients_throttled = 0; // current number of throttled clients
Expand Down
3 changes: 3 additions & 0 deletions src/server/hset_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,9 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu

op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv);

if (auto* ts = op_args.shard->tiered_storage(); ts)
ts->TryStash(op_args.db_cntx.db_index, key, &pv);

return created;
}

Expand Down
152 changes: 119 additions & 33 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,22 @@
#include "base/flag_utils.h"
#include "base/flags.h"
#include "base/logging.h"
#include "core/detail/listpack_wrap.h"
#include "server/common.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
#include "server/snapshot.h"
#include "server/table.h"
#include "server/tiering/common.h"
#include "server/tiering/op_manager.h"
#include "server/tiering/serialized_map.h"
#include "server/tiering/small_bins.h"
#include "server/tx_base.h"

extern "C" {
#include "redis/listpack.h"
}

using namespace facade;

using AtLeast64 = base::ConstrainedNumericFlagValue<size_t, 64>; // ABSL_FLAG breaks with commas
Expand All @@ -46,6 +52,8 @@ ABSL_FLAG(float, tiered_offload_threshold, 0.5,
ABSL_FLAG(float, tiered_upload_threshold, 0.1,
"Ratio of free memory (free/max memory) below which uploading stops");

ABSL_FLAG(bool, tiered_experimental_hash_support, false, "Experimental hash datatype offloading");

namespace dfly {

using namespace std;
Expand Down Expand Up @@ -73,6 +81,58 @@ tiering::DiskSegment FromCoolItem(const PrimeValue::CoolItem& item) {
return {item.record->page_index * tiering::kPageSize + item.page_offset, item.serialized_size};
}

// Determine required byte size and encoding type based on value.
// TODO(vlad): Maybe split into different accessors?
// Do NOT enforce rules depending on dynamic runtime values as this is called
// when scheduling stash and just before succeeeding and is expected to return the same results
optional<pair<size_t /*size*/, CompactObj::ExternalRep>> EstimateSerializedSize(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does more than EstimateSize, and with having optional and precise ExternalRep the behavior is confusing. Maybe call it GetSerializationDescriptor which will return

struct SerializationDescriptor {
size estimated_size;
CompactObj::ExternalRep repr;

and add NONE to ExternalRep enum? or alternatively add is_valid() { return size> 0; } and return size=0 for unfit objects.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I am not a big fan of chaining optionals—or other types like std::expected—one within another, especially when we control the wrapped class and it can describe the "undef" state itself. For example, in our codebase, we have using Result = std::optional; where ResultType is another optional, or std::optional<facade::ErrorReply> where ErrorReply can hold an empty state. These levels of indirection decrease readability, imho.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree here, but in a properly structured code base type composition is always almost preferable

const PrimeValue& pv) {
switch (pv.ObjType()) {
case OBJ_STRING:
return std::make_pair(pv.GetRawString().view().size(), CompactObj::ExternalRep::STRING);
case OBJ_HASH:
if (pv.Encoding() == kEncodingListPack) {
auto* lp = static_cast<uint8_t*>(pv.RObjPtr());
size_t bytes = lpBytes(lp);
bytes += lpLength(lp) * 2 * 4;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment explaining the formula?

return std::make_pair(bytes, CompactObj::ExternalRep::SERIALIZED_MAP);
}
return {};
default:
return {};
};
}

size_t Serialize(CompactObj::ExternalRep rep, const PrimeValue& pv, io::MutableBytes buffer) {
DCHECK_LE(EstimateSerializedSize(pv)->first, buffer.size());
switch (rep) {
case CompactObj::ExternalRep::STRING: {
auto sv = pv.GetRawString();
memcpy(buffer.data(), sv.view().data(), sv.view().size());
return sv.view().size();
}
case CompactObj::ExternalRep::SERIALIZED_MAP: {
DCHECK_EQ(pv.Encoding(), kEncodingListPack);

// TODO(vlad): Optimize copy for serialization
detail::ListpackWrap lw{static_cast<uint8_t*>(pv.RObjPtr())};
vector<pair<string, string>> entries(lw.begin(), lw.end());
return tiering::SerializedMap::Serialize(
entries, {reinterpret_cast<char*>(buffer.data()), buffer.length()});
}
};
return 0;
}

string SerializeString(const PrimeValue& pv) {
auto estimate = EstimateSerializedSize(pv);
string s(estimate->first, 0);
size_t written =
Serialize(estimate->second, pv, {reinterpret_cast<uint8_t*>(s.data()), s.size()});
s.resize(written);
return s;
}

} // anonymous namespace

class TieredStorage::ShardOpManager : public tiering::OpManager {
Expand Down Expand Up @@ -133,12 +193,21 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
void RetireColdEntries(size_t additional_memory);

// Set value to be an in-memory type again. Update memory stats.
void Upload(DbIndex dbid, string_view value, bool is_raw, size_t serialized_len, PrimeValue* pv) {
void Upload(DbIndex dbid, string_view value, PrimeValue* pv) {
DCHECK(!value.empty());
DCHECK_EQ(uint8_t(pv->GetExternalRep()), uint8_t(CompactObj::ExternalRep::STRING));

pv->Materialize(value, is_raw);
RecordDeleted(*pv, serialized_len, GetDbTableStats(dbid));
switch (pv->GetExternalRep()) {
case CompactObj::ExternalRep::STRING:
pv->Materialize(value, true);
break;
case CompactObj::ExternalRep::SERIALIZED_MAP:
tiering::SerializedMapDecoder decoder{};
decoder.Initialize(value);
decoder.Upload(pv);
break;
};

RecordDeleted(*pv, value.size(), GetDbTableStats(dbid));
}

// Find entry by key in db_slice and store external segment in place of original value.
Expand All @@ -153,12 +222,13 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
stats->tiered_used_bytes += segment.length;
stats_.total_stashes++;

CompactObj::ExternalRep rep = EstimateSerializedSize(*pv)->second;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ADD a DCHECK before dereferencing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the nullopt access with throw an assert either way

if (ts_->config_.experimental_cooling) {
RetireColdEntries(pv->MallocUsed());
ts_->CoolDown(key.first, key.second, segment, pv);
ts_->CoolDown(key.first, key.second, segment, rep, pv);
} else {
stats->AddTypeMemoryUsage(pv->ObjType(), -pv->MallocUsed());
pv->SetExternal(segment.offset, segment.length, CompactObj::ExternalRep::STRING);
pv->SetExternal(segment.offset, segment.length, rep);
}
} else {
LOG(DFATAL) << "Should not reach here";
Expand Down Expand Up @@ -215,7 +285,7 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str
} else {
// Cut out relevant part of value and restore it to memory
string_view value = page.substr(item_segment.offset - segment.offset, item_segment.length);
Upload(dbid, value, true, item_segment.length, &pv);
Upload(dbid, value, &pv);
}
}
}
Expand Down Expand Up @@ -386,36 +456,40 @@ std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, str
return {};
}

StringOrView raw_string = value->GetRawString();
value->SetStashPending(true);
auto estimated = EstimateSerializedSize(*value);
DCHECK(estimated);

tiering::OpManager::EntryId id;
error_code ec;

// TODO(vlad): Replace with encoders for different types
auto stash_string = [&](std::string_view str) {
if (auto prepared = op_manager_->PrepareStash(str.size()); prepared) {
value->SetStashPending(true);
if (OccupiesWholePages(estimated->first)) { // large enough for own page
id = KeyRef(dbid, key);
if (auto prepared = op_manager_->PrepareStash(estimated->first); prepared) {
auto [offset, buf] = *prepared;
memcpy(buf.bytes.data(), str.data(), str.size());
tiering::DiskSegment segment{offset, str.size()};
size_t written = Serialize(estimated->second, *value, buf.bytes);
tiering::DiskSegment segment{offset, written};
op_manager_->Stash(id, segment, buf);
} else {
ec = prepared.error();
}
};

if (OccupiesWholePages(value->Size())) { // large enough for own page
id = KeyRef(dbid, key);
stash_string(raw_string.view());
} else if (auto bin = bins_->Stash(dbid, key, raw_string.view()); bin) {
} else if (auto bin = bins_->Stash(dbid, key, SerializeString(*value)); bin) {
id = bin->first;
// TODO(vlad): Write bin to prepared buffer instead of allocating one
stash_string(bin->second);
if (auto prepared = op_manager_->PrepareStash(estimated->first); prepared) {
auto [offset, buf] = *prepared;
memcpy(buf.bytes.data(), bin->second.data(), bin->second.size());
tiering::DiskSegment segment{offset, bin->second.size()};
op_manager_->Stash(id, segment, buf);
} else {
ec = prepared.error();
}
} else {
return {}; // silently added to bin
}

if (ec) {
value->SetStashPending(false);
LOG_IF(ERROR, ec != errc::file_too_large) << "Stash failed immediately" << ec.message();
visit([this](auto id) { op_manager_->ClearIoPending(id); }, id);
return {};
Expand Down Expand Up @@ -489,6 +563,7 @@ TieredStats TieredStorage::GetStats() const {
stats.small_bins_cnt = bins_stats.stashed_bins_cnt;
stats.small_bins_entries_cnt = bins_stats.stashed_entries_cnt;
stats.small_bins_filling_bytes = bins_stats.current_bin_bytes;
stats.small_bins_filling_entries_cnt = bins_stats.current_entries_cnt;
}

{ // Own stats
Expand All @@ -513,13 +588,14 @@ void TieredStorage::UpdateFromFlags() {
.write_depth_limit = absl::GetFlag(FLAGS_tiered_storage_write_depth),
.offload_threshold = absl::GetFlag(FLAGS_tiered_offload_threshold),
.upload_threshold = absl::GetFlag(FLAGS_tiered_upload_threshold),
.experimental_hash_offload = absl::GetFlag(FLAGS_tiered_experimental_hash_support),
};
}

std::vector<std::string> TieredStorage::GetMutableFlagNames() {
return base::GetFlagNames(FLAGS_tiered_min_value_size, FLAGS_tiered_experimental_cooling,
FLAGS_tiered_storage_write_depth, FLAGS_tiered_offload_threshold,
FLAGS_tiered_upload_threshold);
FLAGS_tiered_upload_threshold, FLAGS_tiered_experimental_hash_support);
}

bool TieredStorage::ShouldOffload() const {
Expand Down Expand Up @@ -596,10 +672,10 @@ size_t TieredStorage::ReclaimMemory(size_t goal) {
->prime.FindFirst(record->key_hash, predicate);
CHECK(IsValid(it));
PrimeValue& pv = it->second;
tiering::DiskSegment segment = FromCoolItem(pv.GetCool());

// Now the item is only in storage.
pv.SetExternal(segment.offset, segment.length, CompactObj::ExternalRep::STRING);
tiering::DiskSegment segment = FromCoolItem(pv.GetCool());
pv.KeepExternal(segment.offset, segment.length);

auto* stats = op_manager_->GetDbTableStats(record->db_index);
stats->AddTypeMemoryUsage(record->value.ObjType(), -record->value.MallocUsed());
Expand All @@ -610,14 +686,28 @@ size_t TieredStorage::ReclaimMemory(size_t goal) {
}

bool TieredStorage::ShouldStash(const PrimeValue& pv) const {
// Check value state
if (pv.IsExternal() || pv.HasStashPending())
return false;

// Estimate value size
auto estimation = EstimateSerializedSize(pv);
if (!estimation)
return false;

// For now, hash offloading is conditional
if (pv.ObjType() == OBJ_HASH && !config_.experimental_hash_offload)
return false;

const auto& disk_stats = op_manager_->GetStats().disk_stats;
return !pv.IsExternal() && !pv.HasStashPending() && pv.ObjType() == OBJ_STRING &&
pv.Size() >= config_.min_value_size &&
disk_stats.allocated_bytes + tiering::kPageSize + pv.Size() < disk_stats.max_file_size;
return estimation->first >= config_.min_value_size &&
disk_stats.allocated_bytes + tiering::kPageSize + estimation->first <
disk_stats.max_file_size;
}

void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str,
const tiering::DiskSegment& segment, PrimeValue* pv) {
const tiering::DiskSegment& segment, CompactObj::ExternalRep rep,
PrimeValue* pv) {
detail::TieredColdRecord* record = CompactObj::AllocateMR<detail::TieredColdRecord>();
cool_queue_.push_front(*record);
stats_.cool_memory_used += (sizeof(detail::TieredColdRecord) + pv->MallocUsed());
Expand All @@ -627,7 +717,7 @@ void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str,
record->page_index = segment.offset / tiering::kPageSize;
record->value = std::move(*pv);

pv->SetCool(segment.offset, segment.length, record);
pv->SetCool(segment.offset, segment.length, rep, record);
}

PrimeValue TieredStorage::Warmup(DbIndex dbid, PrimeValue::CoolItem item) {
Expand All @@ -636,10 +726,6 @@ PrimeValue TieredStorage::Warmup(DbIndex dbid, PrimeValue::CoolItem item) {
// We remove it from both cool storage and the offline storage.
PrimeValue hot = DeleteCool(item.record);
op_manager_->DeleteOffloaded(dbid, segment);

// Bring it back to the PrimeTable.
DCHECK(hot.ObjType() == OBJ_STRING);

return hot;
}

Expand Down
Loading
Loading