Skip to content

Commit 9dce7a9

Browse files
committed
fixes and small tests
1 parent 4205592 commit 9dce7a9

File tree

12 files changed

+77
-31
lines changed

12 files changed

+77
-31
lines changed

src/core/compact_object.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1224,11 +1224,13 @@ CompactObj::ExternalRep CompactObj::GetExternalRep() const {
12241224
return static_cast<CompactObj::ExternalRep>(u_.ext_ptr.representation);
12251225
}
12261226

1227-
void CompactObj::SetCool(size_t offset, uint32_t sz, detail::TieredColdRecord* record) {
1227+
void CompactObj::SetCool(size_t offset, uint32_t sz, ExternalRep rep,
1228+
detail::TieredColdRecord* record) {
12281229
// We copy the mask of the "cooled" referenced object because it contains the encoding info.
12291230
SetMeta(EXTERNAL_TAG, record->value.mask_);
12301231

12311232
u_.ext_ptr.is_cool = 1;
1233+
u_.ext_ptr.representation = static_cast<uint8_t>(rep);
12321234
u_.ext_ptr.page_offset = offset % 4096;
12331235
u_.ext_ptr.serialized_size = sz;
12341236
u_.ext_ptr.cool_record = record;

src/core/compact_object.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,8 @@ class CompactObj {
364364
}
365365

366366
// Assigns a cooling record to the object together with its external slice.
367-
void SetCool(size_t offset, uint32_t serialized_size, detail::TieredColdRecord* record);
367+
void SetCool(size_t offset, uint32_t serialized_size, ExternalRep rep,
368+
detail::TieredColdRecord* record);
368369

369370
struct CoolItem {
370371
uint16_t page_offset;
@@ -377,7 +378,7 @@ class CompactObj {
377378
CoolItem GetCool() const;
378379

379380
// Prequisite: IsCool() is true.
380-
// Keeps cool record only as external value.
381+
// Keeps cool record only as external value and discard in-memory part.
381382
void KeepExternal(size_t offset, size_t sz);
382383

383384
std::pair<size_t, size_t> GetExternalSlice() const;

src/server/common.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ bool ParseDouble(string_view src, double* value) {
162162
#define ADD(x) (x) += o.x
163163

164164
TieredStats& TieredStats::operator+=(const TieredStats& o) {
165-
static_assert(sizeof(TieredStats) == 160);
165+
static_assert(sizeof(TieredStats) == 168);
166166

167167
ADD(total_stashes);
168168
ADD(total_fetches);
@@ -182,6 +182,8 @@ TieredStats& TieredStats::operator+=(const TieredStats& o) {
182182
ADD(small_bins_cnt);
183183
ADD(small_bins_entries_cnt);
184184
ADD(small_bins_filling_bytes);
185+
ADD(small_bins_filling_entries_cnt);
186+
185187
ADD(total_stash_overflows);
186188
ADD(cold_storage_bytes);
187189
ADD(total_offloading_steps);

src/server/common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ struct TieredStats {
9191
uint64_t small_bins_cnt = 0;
9292
uint64_t small_bins_entries_cnt = 0;
9393
size_t small_bins_filling_bytes = 0;
94+
size_t small_bins_filling_entries_cnt = 0;
9495
size_t cold_storage_bytes = 0;
9596

9697
uint64_t clients_throttled = 0; // current number of throttled clients

src/server/hset_family.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -492,9 +492,8 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
492492

493493
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv);
494494

495-
// TODO: Enable in final PR under flag
496-
// if (auto* ts = op_args.shard->tiered_storage(); ts)
497-
// ts->TryStash(op_args.db_cntx.db_index, key, &pv);
495+
if (auto* ts = op_args.shard->tiered_storage(); ts)
496+
ts->TryStash(op_args.db_cntx.db_index, key, &pv);
498497

499498
return created;
500499
}

src/server/tiered_storage.cc

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ ABSL_FLAG(float, tiered_offload_threshold, 0.5,
5252
ABSL_FLAG(float, tiered_upload_threshold, 0.1,
5353
"Ratio of free memory (free/max memory) below which uploading stops");
5454

55-
ABSL_FLAG(bool, tiered_experimental_hash_offload, false, "Experimental hash datatype offloading");
55+
ABSL_FLAG(bool, tiered_experimental_hash_support, false, "Experimental hash datatype offloading");
5656

5757
namespace dfly {
5858

@@ -81,9 +81,12 @@ tiering::DiskSegment FromCoolItem(const PrimeValue::CoolItem& item) {
8181
return {item.record->page_index * tiering::kPageSize + item.page_offset, item.serialized_size};
8282
}
8383

84+
// Determine required byte size and encoding type based on value.
85+
// TODO(vlad): Maybe split into different accessors?
8486
// Do NOT enforce rules depending on dynamic runtime values as this is called
8587
// when scheduling stash and just before succeeeding and is expected to return the same results
86-
optional<std::pair<size_t, CompactObj::ExternalRep>> EstimateSerializedSize(const PrimeValue& pv) {
88+
optional<pair<size_t /*size*/, CompactObj::ExternalRep>> EstimateSerializedSize(
89+
const PrimeValue& pv) {
8790
switch (pv.ObjType()) {
8891
case OBJ_STRING:
8992
return std::make_pair(pv.GetRawString().view().size(), CompactObj::ExternalRep::STRING);
@@ -111,11 +114,11 @@ size_t Serialize(CompactObj::ExternalRep rep, const PrimeValue& pv, io::MutableB
111114
case CompactObj::ExternalRep::SERIALIZED_MAP: {
112115
DCHECK_EQ(pv.Encoding(), kEncodingListPack);
113116

117+
// TODO(vlad): Optimize copy for serialization
114118
detail::ListpackWrap lw{static_cast<uint8_t*>(pv.RObjPtr())};
115119
vector<pair<string, string>> entries(lw.begin(), lw.end());
116-
vector<pair<string_view, string_view>> entries_sv(entries.begin(), entries.end());
117120
return tiering::SerializedMap::Serialize(
118-
entries_sv, {reinterpret_cast<char*>(buffer.data()), buffer.length()});
121+
entries, {reinterpret_cast<char*>(buffer.data()), buffer.length()});
119122
}
120123
};
121124
return 0;
@@ -218,13 +221,13 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
218221
stats->tiered_used_bytes += segment.length;
219222
stats_.total_stashes++;
220223

224+
CompactObj::ExternalRep rep = EstimateSerializedSize(*pv)->second;
221225
if (ts_->config_.experimental_cooling) {
222226
RetireColdEntries(pv->MallocUsed());
223-
ts_->CoolDown(key.first, key.second, segment, pv);
227+
ts_->CoolDown(key.first, key.second, segment, rep, pv);
224228
} else {
225229
stats->AddTypeMemoryUsage(pv->ObjType(), -pv->MallocUsed());
226-
auto estimation = EstimateSerializedSize(*pv);
227-
pv->SetExternal(segment.offset, segment.length, estimation->second);
230+
pv->SetExternal(segment.offset, segment.length, rep);
228231
}
229232
} else {
230233
LOG(DFATAL) << "Should not reach here";
@@ -459,7 +462,7 @@ std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, str
459462
error_code ec;
460463

461464
value->SetStashPending(true);
462-
if (true /*OccupiesWholePages(*estimated)*/) { // large enough for own page
465+
if (OccupiesWholePages(estimated->first)) { // large enough for own page
463466
id = KeyRef(dbid, key);
464467
if (auto prepared = op_manager_->PrepareStash(estimated->first); prepared) {
465468
auto [offset, buf] = *prepared;
@@ -560,6 +563,7 @@ TieredStats TieredStorage::GetStats() const {
560563
stats.small_bins_cnt = bins_stats.stashed_bins_cnt;
561564
stats.small_bins_entries_cnt = bins_stats.stashed_entries_cnt;
562565
stats.small_bins_filling_bytes = bins_stats.current_bin_bytes;
566+
stats.small_bins_filling_entries_cnt = bins_stats.current_entries_cnt;
563567
}
564568

565569
{ // Own stats
@@ -584,14 +588,14 @@ void TieredStorage::UpdateFromFlags() {
584588
.write_depth_limit = absl::GetFlag(FLAGS_tiered_storage_write_depth),
585589
.offload_threshold = absl::GetFlag(FLAGS_tiered_offload_threshold),
586590
.upload_threshold = absl::GetFlag(FLAGS_tiered_upload_threshold),
587-
.experimental_hash_offload = absl::GetFlag(FLAGS_tiered_experimental_hash_offload),
591+
.experimental_hash_offload = absl::GetFlag(FLAGS_tiered_experimental_hash_support),
588592
};
589593
}
590594

591595
std::vector<std::string> TieredStorage::GetMutableFlagNames() {
592596
return base::GetFlagNames(FLAGS_tiered_min_value_size, FLAGS_tiered_experimental_cooling,
593597
FLAGS_tiered_storage_write_depth, FLAGS_tiered_offload_threshold,
594-
FLAGS_tiered_upload_threshold, FLAGS_tiered_experimental_hash_offload);
598+
FLAGS_tiered_upload_threshold, FLAGS_tiered_experimental_hash_support);
595599
}
596600

597601
bool TieredStorage::ShouldOffload() const {
@@ -702,7 +706,8 @@ bool TieredStorage::ShouldStash(const PrimeValue& pv) const {
702706
}
703707

704708
void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str,
705-
const tiering::DiskSegment& segment, PrimeValue* pv) {
709+
const tiering::DiskSegment& segment, CompactObj::ExternalRep rep,
710+
PrimeValue* pv) {
706711
detail::TieredColdRecord* record = CompactObj::AllocateMR<detail::TieredColdRecord>();
707712
cool_queue_.push_front(*record);
708713
stats_.cool_memory_used += (sizeof(detail::TieredColdRecord) + pv->MallocUsed());
@@ -712,7 +717,7 @@ void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str,
712717
record->page_index = segment.offset / tiering::kPageSize;
713718
record->value = std::move(*pv);
714719

715-
pv->SetCool(segment.offset, segment.length, record);
720+
pv->SetCool(segment.offset, segment.length, rep, record);
716721
}
717722

718723
PrimeValue TieredStorage::Warmup(DbIndex dbid, PrimeValue::CoolItem item) {
@@ -721,10 +726,6 @@ PrimeValue TieredStorage::Warmup(DbIndex dbid, PrimeValue::CoolItem item) {
721726
// We remove it from both cool storage and the offline storage.
722727
PrimeValue hot = DeleteCool(item.record);
723728
op_manager_->DeleteOffloaded(dbid, segment);
724-
725-
// Bring it back to the PrimeTable.
726-
DCHECK(hot.ObjType() == OBJ_STRING);
727-
728729
return hot;
729730
}
730731

src/server/tiered_storage.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class TieredStorage {
116116

117117
// Moves pv contents to the cool storage and updates pv to point to it.
118118
void CoolDown(DbIndex db_ind, std::string_view str, const tiering::DiskSegment& segment,
119-
PrimeValue* pv);
119+
CompactObj::ExternalRep rep, PrimeValue* pv);
120120

121121
PrimeValue DeleteCool(detail::TieredColdRecord* record);
122122
detail::TieredColdRecord* PopCool();

src/server/tiered_storage_test.cc

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ ABSL_DECLARE_FLAG(float, tiered_offload_threshold);
2828
ABSL_DECLARE_FLAG(float, tiered_upload_threshold);
2929
ABSL_DECLARE_FLAG(unsigned, tiered_storage_write_depth);
3030
ABSL_DECLARE_FLAG(bool, tiered_experimental_cooling);
31+
ABSL_DECLARE_FLAG(bool, tiered_experimental_hash_support);
3132

3233
namespace dfly {
3334

@@ -454,4 +455,41 @@ TEST_F(PureDiskTSTest, Dump) {
454455
EXPECT_EQ(resp, "OK");
455456
}
456457

458+
TEST_P(LatentCoolingTSTest, SimpleHash) {
459+
absl::FlagSaver saver;
460+
absl::SetFlag(&FLAGS_tiered_experimental_hash_support, true);
461+
UpdateFromFlags();
462+
463+
const size_t kNUM = 100;
464+
465+
auto build_command = [](string_view key) {
466+
vector<string> cmd = {"HSET", string{key}};
467+
for (char c = 'a'; c <= 'z'; c++) {
468+
cmd.push_back(string{1, c});
469+
cmd.push_back(string{31, 'x'} + c);
470+
}
471+
return cmd;
472+
};
473+
474+
// Create some hashes
475+
for (size_t i = 0; i < kNUM; i++) {
476+
Run(build_command(absl::StrCat("k", i)));
477+
}
478+
479+
// Wait for all to be stashed or in end up in bins
480+
ExpectConditionWithinTimeout([=] {
481+
auto metrics = GetMetrics();
482+
return metrics.tiered_stats.total_stashes +
483+
metrics.tiered_stats.small_bins_filling_entries_cnt ==
484+
kNUM;
485+
});
486+
487+
// Verify correctness
488+
for (size_t i = 0; i < kNUM; i++) {
489+
auto resp = Run({"HGET", absl::StrCat("k", i), string{1, 'f'}});
490+
auto v = string{31, 'x'} + 'f';
491+
EXPECT_EQ(resp, v);
492+
}
493+
}
494+
457495
} // namespace dfly

src/server/tiering/serialized_map.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ struct SerializedMap {
5050
size_t DataBytes() const;
5151

5252
// Input for serialization
53-
using Input = const absl::Span<const std::pair<std::string_view, std::string_view>>;
53+
using Input = const absl::Span<const std::pair<std::string, std::string>>;
5454

5555
// Buffer size required for serialization
5656
static size_t SerializeSize(Input);

src/server/tiering/serialized_map_test.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ using namespace std;
1212
class SerializedMapTest : public ::testing::Test {};
1313

1414
TEST_F(SerializedMapTest, TestBasic) {
15-
const vector<std::pair<string_view, string_view>> kBase = {{"first key", "first value"},
16-
{"second key", "second value"},
17-
{"third key", "third value"},
18-
{"fourth key", "fourth value"},
19-
{"fifth key", "fifth value"}};
15+
const vector<std::pair<string, string>> kBase = {{"first key", "first value"},
16+
{"second key", "second value"},
17+
{"third key", "third value"},
18+
{"fourth key", "fourth value"},
19+
{"fifth key", "fifth value"}};
2020

2121
// Serialize kBase to buffer
2222
std::string buffer;

0 commit comments

Comments
 (0)