Skip to content

Commit 1839c60

Browse files
committed
feat(tiering): Serialize hashes
Signed-off-by: Vladislav Oleshko <[email protected]>
1 parent 0ceda9f commit 1839c60

File tree

1 file changed

+83
-17
lines changed

1 file changed

+83
-17
lines changed

src/server/tiered_storage.cc

Lines changed: 83 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,22 @@
1717
#include "base/flag_utils.h"
1818
#include "base/flags.h"
1919
#include "base/logging.h"
20+
#include "core/detail/listpack_wrap.h"
2021
#include "server/common.h"
2122
#include "server/db_slice.h"
2223
#include "server/engine_shard_set.h"
2324
#include "server/snapshot.h"
2425
#include "server/table.h"
2526
#include "server/tiering/common.h"
2627
#include "server/tiering/op_manager.h"
28+
#include "server/tiering/serialized_map.h"
2729
#include "server/tiering/small_bins.h"
2830
#include "server/tx_base.h"
2931

32+
extern "C" {
33+
#include "redis/listpack.h"
34+
}
35+
3036
using namespace facade;
3137

3238
using AtLeast64 = base::ConstrainedNumericFlagValue<size_t, 64>; // ABSL_FLAG breaks with commas
@@ -73,6 +79,53 @@ tiering::DiskSegment FromCoolItem(const PrimeValue::CoolItem& item) {
7379
return {item.record->page_index * tiering::kPageSize + item.page_offset, item.serialized_size};
7480
}
7581

82+
optional<size_t> EstimateSerializedSize(const PrimeValue& pv) {
83+
switch (pv.ObjType()) {
84+
case OBJ_STRING:
85+
return pv.GetRawString().view().size();
86+
case OBJ_HASH:
87+
if (pv.Encoding() == kEncodingListPack) {
88+
auto* lp = static_cast<uint8_t*>(pv.RObjPtr());
89+
size_t bytes = lpBytes(lp);
90+
bytes += lpLength(lp) * 2 * 4;
91+
return bytes;
92+
}
93+
return {};
94+
default:
95+
return {};
96+
};
97+
}
98+
99+
size_t Serialize(const PrimeValue& pv, io::MutableBytes buffer) {
100+
DCHECK_LE(EstimateSerializedSize(pv), buffer.size());
101+
switch (pv.ObjType()) {
102+
case OBJ_STRING: {
103+
auto sv = pv.GetRawString();
104+
memcpy(buffer.data(), sv.view().data(), sv.view().size());
105+
return sv.view().size();
106+
}
107+
case OBJ_HASH: {
108+
DCHECK_EQ(pv.Encoding(), kEncodingListPack);
109+
110+
detail::ListpackWrap lw{static_cast<uint8_t*>(pv.RObjPtr())};
111+
vector<pair<string, string>> entries(lw.begin(), lw.end());
112+
vector<pair<string_view, string_view>> entries_sv(entries.begin(), entries.end());
113+
return tiering::SerializedMap::Serialize(
114+
entries_sv, {reinterpret_cast<char*>(buffer.data()), buffer.length()});
115+
}
116+
default:
117+
DCHECK(false);
118+
return 0;
119+
}
120+
}
121+
122+
string SerializeString(const PrimeValue& pv) {
123+
string s(*EstimateSerializedSize(pv), 0);
124+
size_t written = Serialize(pv, {reinterpret_cast<uint8_t*>(s.data()), s.size()});
125+
s.resize(written);
126+
return s;
127+
}
128+
76129
} // anonymous namespace
77130

78131
class TieredStorage::ShardOpManager : public tiering::OpManager {
@@ -375,36 +428,41 @@ std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, str
375428
return {};
376429
}
377430

378-
StringOrView raw_string = value->GetRawString();
379-
value->SetStashPending(true);
431+
optional<size_t> estimated = EstimateSerializedSize(*value);
432+
DCHECK(estimated);
380433

381434
tiering::OpManager::EntryId id;
382435
error_code ec;
383436

384-
// TODO(vlad): Replace with encoders for different types
385-
auto stash_string = [&](std::string_view str) {
386-
if (auto prepared = op_manager_->PrepareStash(str.size()); prepared) {
437+
value->SetStashPending(true);
438+
if (OccupiesWholePages(*estimated)) { // large enough for own page
439+
id = KeyRef(dbid, key);
440+
if (auto prepared = op_manager_->PrepareStash(*estimated); prepared) {
387441
auto [offset, buf] = *prepared;
388-
memcpy(buf.bytes.data(), str.data(), str.size());
389-
tiering::DiskSegment segment{offset, str.size()};
442+
size_t written = Serialize(*value, buf.bytes);
443+
tiering::DiskSegment segment{offset, written};
390444
op_manager_->Stash(id, segment, buf);
391445
} else {
392446
ec = prepared.error();
393447
}
394-
};
395-
396-
if (OccupiesWholePages(value->Size())) { // large enough for own page
397-
id = KeyRef(dbid, key);
398-
stash_string(raw_string.view());
399-
} else if (auto bin = bins_->Stash(dbid, key, raw_string.view()); bin) {
448+
} else if (auto bin = bins_->Stash(dbid, key, SerializeString(*value)); bin) {
400449
id = bin->first;
401450
// TODO(vlad): Write bin to prepared buffer instead of allocating one
402-
stash_string(bin->second);
451+
if (auto prepared = op_manager_->PrepareStash(*estimated); prepared) {
452+
auto [offset, buf] = *prepared;
453+
memcpy(buf.bytes.data(), bin->second.data(), bin->second.size());
454+
tiering::DiskSegment segment{offset, bin->second.size()};
455+
op_manager_->Stash(id, segment, buf);
456+
} else {
457+
CHECK(false);
458+
ec = prepared.error();
459+
}
403460
} else {
404461
return {}; // silently added to bin
405462
}
406463

407464
if (ec) {
465+
value->SetStashPending(false);
408466
LOG_IF(ERROR, ec != errc::file_too_large) << "Stash failed immediately" << ec.message();
409467
visit([this](auto id) { op_manager_->ClearIoPending(id); }, id);
410468
return {};
@@ -595,10 +653,18 @@ size_t TieredStorage::ReclaimMemory(size_t goal) {
595653
}
596654

597655
bool TieredStorage::ShouldStash(const PrimeValue& pv) const {
656+
// Check value state
657+
if (pv.IsExternal() || pv.HasStashPending())
658+
return false;
659+
660+
// Estimate value size
661+
optional<size_t> size = EstimateSerializedSize(pv);
662+
if (!size)
663+
return false;
664+
598665
const auto& disk_stats = op_manager_->GetStats().disk_stats;
599-
return !pv.IsExternal() && !pv.HasStashPending() && pv.ObjType() == OBJ_STRING &&
600-
pv.Size() >= config_.min_value_size &&
601-
disk_stats.allocated_bytes + tiering::kPageSize + pv.Size() < disk_stats.max_file_size;
666+
return *size >= config_.min_value_size &&
667+
disk_stats.allocated_bytes + tiering::kPageSize + *size < disk_stats.max_file_size;
602668
}
603669

604670
void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str,

0 commit comments

Comments
 (0)