Skip to content

Commit a6f53a6

Browse files
committed
fixes
1 parent 2f77302 commit a6f53a6

File tree

5 files changed

+60
-30
lines changed

5 files changed

+60
-30
lines changed

src/core/compact_object.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -879,10 +879,13 @@ CompactObjType CompactObj::ObjType() const {
879879
return OBJ_STRING;
880880

881881
if (taglen_ == EXTERNAL_TAG) {
882+
VLOG(0) << "My type is external";
882883
switch (static_cast<ExternalRep>(u_.ext_ptr.representation)) {
883884
case ExternalRep::STRING:
885+
VLOG(0) << "My type is a string";
884886
return OBJ_STRING;
885887
case ExternalRep::SERIALIZED_MAP:
888+
VLOG(0) << "Mype is a hash map";
886889
return OBJ_HASH;
887890
};
888891
}
@@ -1244,6 +1247,10 @@ auto CompactObj::GetCool() const -> CoolItem {
12441247
return res;
12451248
}
12461249

1250+
void CompactObj::KeepExternal(size_t offset, size_t sz) {
1251+
SetExternal(offset, sz, GetExternalRep());
1252+
}
1253+
12471254
std::pair<size_t, size_t> CompactObj::GetExternalSlice() const {
12481255
DCHECK_EQ(EXTERNAL_TAG, taglen_);
12491256
auto& ext = u_.ext_ptr;

src/core/compact_object.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,10 @@ class CompactObj {
376376
// Returns the external data of the object incuding its ColdRecord.
377377
CoolItem GetCool() const;
378378

379+
// Prequisite: IsCool() is true.
380+
// Keeps cool record only as external value.
381+
void KeepExternal(size_t offset, size_t sz);
382+
379383
std::pair<size_t, size_t> GetExternalSlice() const;
380384

381385
// Injects either the the raw string (extracted with GetRawString()) or the usual string

src/server/hset_family.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,9 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
492492

493493
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv);
494494

495+
if (auto* ts = op_args.shard->tiered_storage(); ts)
496+
ts->TryStash(op_args.db_cntx.db_index, key, &pv);
497+
495498
return created;
496499
}
497500

src/server/tiered_storage.cc

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -79,32 +79,34 @@ tiering::DiskSegment FromCoolItem(const PrimeValue::CoolItem& item) {
7979
return {item.record->page_index * tiering::kPageSize + item.page_offset, item.serialized_size};
8080
}
8181

82-
optional<size_t> EstimateSerializedSize(const PrimeValue& pv) {
82+
// Do NOT enforce rules depending on dynamic runtime values as this is called
83+
// when scheduling stash and just before succeeeding and is expected to return the same results
84+
optional<std::pair<size_t, CompactObj::ExternalRep>> EstimateSerializedSize(const PrimeValue& pv) {
8385
switch (pv.ObjType()) {
8486
case OBJ_STRING:
85-
return pv.GetRawString().view().size();
87+
return std::make_pair(pv.GetRawString().view().size(), CompactObj::ExternalRep::STRING);
8688
case OBJ_HASH:
8789
if (pv.Encoding() == kEncodingListPack) {
8890
auto* lp = static_cast<uint8_t*>(pv.RObjPtr());
8991
size_t bytes = lpBytes(lp);
9092
bytes += lpLength(lp) * 2 * 4;
91-
return bytes;
93+
return std::make_pair(bytes, CompactObj::ExternalRep::SERIALIZED_MAP);
9294
}
9395
return {};
9496
default:
9597
return {};
9698
};
9799
}
98100

99-
size_t Serialize(const PrimeValue& pv, io::MutableBytes buffer) {
100-
DCHECK_LE(EstimateSerializedSize(pv), buffer.size());
101-
switch (pv.ObjType()) {
102-
case OBJ_STRING: {
101+
size_t Serialize(CompactObj::ExternalRep rep, const PrimeValue& pv, io::MutableBytes buffer) {
102+
DCHECK_LE(EstimateSerializedSize(pv)->first, buffer.size());
103+
switch (rep) {
104+
case CompactObj::ExternalRep::STRING: {
103105
auto sv = pv.GetRawString();
104106
memcpy(buffer.data(), sv.view().data(), sv.view().size());
105107
return sv.view().size();
106108
}
107-
case OBJ_HASH: {
109+
case CompactObj::ExternalRep::SERIALIZED_MAP: {
108110
DCHECK_EQ(pv.Encoding(), kEncodingListPack);
109111

110112
detail::ListpackWrap lw{static_cast<uint8_t*>(pv.RObjPtr())};
@@ -113,15 +115,15 @@ size_t Serialize(const PrimeValue& pv, io::MutableBytes buffer) {
113115
return tiering::SerializedMap::Serialize(
114116
entries_sv, {reinterpret_cast<char*>(buffer.data()), buffer.length()});
115117
}
116-
default:
117-
DCHECK(false);
118-
return 0;
119-
}
118+
};
119+
return 0;
120120
}
121121

122122
string SerializeString(const PrimeValue& pv) {
123-
string s(*EstimateSerializedSize(pv), 0);
124-
size_t written = Serialize(pv, {reinterpret_cast<uint8_t*>(s.data()), s.size()});
123+
auto estimate = EstimateSerializedSize(pv);
124+
string s(estimate->first, 0);
125+
size_t written =
126+
Serialize(estimate->second, pv, {reinterpret_cast<uint8_t*>(s.data()), s.size()});
125127
s.resize(written);
126128
return s;
127129
}
@@ -186,12 +188,20 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
186188
void RetireColdEntries(size_t additional_memory);
187189

188190
// Set value to be an in-memory type again. Update memory stats.
189-
void Upload(DbIndex dbid, string_view value, bool is_raw, size_t serialized_len, PrimeValue* pv) {
191+
void Upload(DbIndex dbid, string_view value, PrimeValue* pv) {
190192
DCHECK(!value.empty());
191193
DCHECK_EQ(uint8_t(pv->GetExternalRep()), uint8_t(CompactObj::ExternalRep::STRING));
192194

193-
pv->Materialize(value, is_raw);
194-
RecordDeleted(*pv, serialized_len, GetDbTableStats(dbid));
195+
switch (pv->GetExternalRep()) {
196+
case CompactObj::ExternalRep::STRING:
197+
pv->Materialize(value, true);
198+
break;
199+
case CompactObj::ExternalRep::SERIALIZED_MAP:
200+
pv->InitRobj(OBJ_HASH, kEncodingListPack, nullptr);
201+
break;
202+
};
203+
204+
RecordDeleted(*pv, value.size(), GetDbTableStats(dbid));
195205
}
196206

197207
// Find entry by key in db_slice and store external segment in place of original value.
@@ -211,7 +221,8 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
211221
ts_->CoolDown(key.first, key.second, segment, pv);
212222
} else {
213223
stats->AddTypeMemoryUsage(pv->ObjType(), -pv->MallocUsed());
214-
pv->SetExternal(segment.offset, segment.length, CompactObj::ExternalRep::STRING);
224+
auto estimation = EstimateSerializedSize(*pv);
225+
pv->SetExternal(segment.offset, segment.length, estimation->second);
215226
}
216227
} else {
217228
LOG(DFATAL) << "Should not reach here";
@@ -268,7 +279,7 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str
268279
} else {
269280
// Cut out relevant part of value and restore it to memory
270281
string_view value = page.substr(item_segment.offset - segment.offset, item_segment.length);
271-
Upload(dbid, value, true, item_segment.length, &pv);
282+
Upload(dbid, value, &pv);
272283
}
273284
}
274285
}
@@ -439,18 +450,18 @@ std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, str
439450
return {};
440451
}
441452

442-
optional<size_t> estimated = EstimateSerializedSize(*value);
453+
auto estimated = EstimateSerializedSize(*value);
443454
DCHECK(estimated);
444455

445456
tiering::OpManager::EntryId id;
446457
error_code ec;
447458

448459
value->SetStashPending(true);
449-
if (OccupiesWholePages(*estimated)) { // large enough for own page
460+
if (true /*OccupiesWholePages(*estimated)*/) { // large enough for own page
450461
id = KeyRef(dbid, key);
451-
if (auto prepared = op_manager_->PrepareStash(*estimated); prepared) {
462+
if (auto prepared = op_manager_->PrepareStash(estimated->first); prepared) {
452463
auto [offset, buf] = *prepared;
453-
size_t written = Serialize(*value, buf.bytes);
464+
size_t written = Serialize(estimated->second, *value, buf.bytes);
454465
tiering::DiskSegment segment{offset, written};
455466
op_manager_->Stash(id, segment, buf);
456467
} else {
@@ -459,7 +470,7 @@ std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, str
459470
} else if (auto bin = bins_->Stash(dbid, key, SerializeString(*value)); bin) {
460471
id = bin->first;
461472
// TODO(vlad): Write bin to prepared buffer instead of allocating one
462-
if (auto prepared = op_manager_->PrepareStash(*estimated); prepared) {
473+
if (auto prepared = op_manager_->PrepareStash(estimated->first); prepared) {
463474
auto [offset, buf] = *prepared;
464475
memcpy(buf.bytes.data(), bin->second.data(), bin->second.size());
465476
tiering::DiskSegment segment{offset, bin->second.size()};
@@ -654,10 +665,10 @@ size_t TieredStorage::ReclaimMemory(size_t goal) {
654665
->prime.FindFirst(record->key_hash, predicate);
655666
CHECK(IsValid(it));
656667
PrimeValue& pv = it->second;
657-
tiering::DiskSegment segment = FromCoolItem(pv.GetCool());
658668

659669
// Now the item is only in storage.
660-
pv.SetExternal(segment.offset, segment.length, CompactObj::ExternalRep::STRING);
670+
tiering::DiskSegment segment = FromCoolItem(pv.GetCool());
671+
pv.KeepExternal(segment.offset, segment.length);
661672

662673
auto* stats = op_manager_->GetDbTableStats(record->db_index);
663674
stats->AddTypeMemoryUsage(record->value.ObjType(), -record->value.MallocUsed());
@@ -673,13 +684,14 @@ bool TieredStorage::ShouldStash(const PrimeValue& pv) const {
673684
return false;
674685

675686
// Estimate value size
676-
optional<size_t> size = EstimateSerializedSize(pv);
677-
if (!size)
687+
auto estimation = EstimateSerializedSize(pv);
688+
if (!estimation)
678689
return false;
679690

680691
const auto& disk_stats = op_manager_->GetStats().disk_stats;
681-
return *size >= config_.min_value_size &&
682-
disk_stats.allocated_bytes + tiering::kPageSize + *size < disk_stats.max_file_size;
692+
return estimation->first >= config_.min_value_size &&
693+
disk_stats.allocated_bytes + tiering::kPageSize + estimation->first <
694+
disk_stats.max_file_size;
683695
}
684696

685697
void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str,

src/server/tiered_storage_test.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,4 +454,8 @@ TEST_F(PureDiskTSTest, Dump) {
454454
EXPECT_EQ(resp, "OK");
455455
}
456456

457+
TEST_F(PureDiskTSTest, Hash1) {
458+
Run({"hmset", "k1", "a", "a", "b", "b", "c", "c"});
459+
}
460+
457461
} // namespace dfly

0 commit comments

Comments
 (0)