Skip to content

Commit b28369e

Browse files
authored
fix(tiering): Recompute size before upload (#6031)
1 parent f67af15 commit b28369e

File tree

6 files changed

+92
-21
lines changed

6 files changed

+92
-21
lines changed

src/core/compact_object.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1255,8 +1255,7 @@ std::pair<size_t, size_t> CompactObj::GetExternalSlice() const {
12551255
void CompactObj::Materialize(std::string_view blob, bool is_raw) {
12561256
CHECK(IsExternal()) << int(taglen_);
12571257
DCHECK_EQ(u_.ext_ptr.representation, static_cast<uint8_t>(ExternalRep::STRING));
1258-
1259-
DCHECK_GT(blob.size(), kInlineLen);
1258+
DCHECK_GT(blob.size(), kInlineLen); // There are no mutable commands that shrink strings
12601259

12611260
if (is_raw) {
12621261
if (kUseSmallStrings && SmallString::CanAllocate(blob.size())) {
@@ -1267,6 +1266,7 @@ void CompactObj::Materialize(std::string_view blob, bool is_raw) {
12671266
u_.r_obj.SetString(blob, tl.local_mr);
12681267
}
12691268
} else {
1269+
mask_bits_.encoding = NONE_ENC; // reset encoding
12701270
EncodeString(blob, false);
12711271
}
12721272
}

src/core/compact_object_test.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,12 @@ TEST_F(CompactObjectTest, StrEncodingAndMaterialize) {
620620
obj.SetExternal(0, 0, CompactObj::ExternalRep::STRING); // dummy values
621621
obj.Materialize(raw_str, true);
622622
EXPECT_EQ(test_str, obj.ToString());
623+
624+
// Restore from external again, but not as a raw value
625+
obj.SetExternal(0, 0, CompactObj::ExternalRep::STRING);
626+
auto test_str2 = test_str + "updated";
627+
obj.Materialize(test_str2, false);
628+
EXPECT_EQ(obj.ToString(), test_str2);
623629
}
624630
}
625631
}

src/server/tiered_storage.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -230,13 +230,15 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, tiering::DiskSegme
230230
return true; // delete
231231
}
232232

233+
tiering::Decoder::UploadMetrics metrics = decoder->GetMetrics();
234+
233235
// 1. When modified is true we MUST upload the value back to memory.
234236
// 2. On the other hand, if read is caused by snapshotting we do not want to fetch it.
235237
// Currently, our heuristic is not very smart, because we stop uploading any reads during
236238
// the snapshotting.
237239
// TODO: to revisit this when we rewrite it with more efficient snapshotting algorithm.
238-
bool should_upload = decoder->modified;
239-
should_upload |= (ts_->UploadBudget() > int64_t(decoder->estimated_mem_usage)) &&
240+
bool should_upload = metrics.modified;
241+
should_upload |= (ts_->UploadBudget() > int64_t(metrics.estimated_mem_usage)) &&
240242
!SliceSnapshot::IsSnaphotInProgress();
241243

242244
if (!should_upload)
@@ -245,7 +247,7 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, tiering::DiskSegme
245247
auto key = get<OpManager::KeyRef>(id);
246248
auto* pv = Find(key);
247249
if (pv && pv->IsExternal() && segment == pv->GetExternalSlice()) {
248-
if (decoder->modified || pv->WasTouched()) {
250+
if (metrics.modified || pv->WasTouched()) {
249251
++stats_.total_uploads;
250252
decoder->Upload(pv);
251253
RecordDeleted(*pv, segment.length, GetDbTableStats(key.first));

src/server/tiered_storage_test.cc

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ class TieredStorageTest : public BaseFamilyTest {
6767
TEST_F(TieredStorageTest, SimpleGetSet) {
6868
absl::FlagSaver saver;
6969
SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // disable offloading
70+
UpdateFromFlags();
71+
7072
const int kMin = 256;
7173
const int kMax = tiering::kPageSize + 10;
7274

@@ -122,16 +124,58 @@ TEST_F(TieredStorageTest, MGET) {
122124
EXPECT_EQ(elements[i], values[i]);
123125
}
124126

125-
TEST_F(TieredStorageTest, SimpleAppend) {
126-
// TODO: use pipelines to issue APPEND/GET/APPEND sequence,
127-
// currently it's covered only for op_manager_test
128-
for (size_t sleep : {0, 100, 500, 1000}) {
129-
Run({"SET", "k0", BuildString(3000)});
130-
if (sleep)
131-
util::ThisFiber::SleepFor(sleep * 1us);
132-
EXPECT_THAT(Run({"APPEND", "k0", "B"}), IntArg(3001));
133-
ASSERT_EQ(Run({"GET", "k0"}), BuildString(3000) + 'B') << sleep;
127+
// Issue many APPEND commands to an offloaded value that are executed at once (with CLIENT PAUSE).
128+
// They should all finish within the same io completion loop.
129+
TEST_F(TieredStorageTest, AppendStorm) {
130+
const size_t kAppends = 20;
131+
132+
absl::FlagSaver saver;
133+
absl::SetFlag(&FLAGS_tiered_offload_threshold, 1.0);
134+
absl::SetFlag(&FLAGS_tiered_upload_threshold, 0.0);
135+
absl::SetFlag(&FLAGS_tiered_experimental_cooling, false);
136+
UpdateFromFlags();
137+
138+
// Offload single value
139+
string base_value(4096, 'a');
140+
Run({"SET", "key", base_value});
141+
ExpectConditionWithinTimeout([this] { return GetMetrics().tiered_stats.total_stashes == 1; });
142+
143+
// Accumulate APPENDs
144+
Run({"CLIENT", "pause", "1000"});
145+
vector<Fiber> fibs;
146+
for (size_t i = 0; i < kAppends; i++) {
147+
fibs.emplace_back(pp_->at(0)->LaunchFiber([this, i] {
148+
Run(absl::StrCat(i), {"APPEND", "key", string(96, 'b')});
149+
}));
134150
}
151+
152+
// Throw in a SETRANGE
153+
fibs.emplace_back(pp_->at(0)->LaunchFiber([this] {
154+
Run("range", {"SETRANGE", "key", "0", string(96, 'x')});
155+
}));
156+
157+
// Throw in a GETRANGE to a range that keeps constant
158+
string get_range;
159+
fibs.emplace_back(pp_->at(0)->LaunchFiber([this, &get_range] {
160+
get_range = Run("get", {"GETRANGE", "key", "96", "191"}).GetString();
161+
}));
162+
163+
// Unlock and wait
164+
Run({"CLIENT", "unpause"});
165+
for (auto& f : fibs)
166+
f.JoinIfNeeded();
167+
168+
// Check partial result is right
169+
EXPECT_EQ(get_range, string(96, 'a'));
170+
171+
// Get value and verify it
172+
auto value = Run({"GET", "key"});
173+
EXPECT_EQ(value, string(96, 'x') + string(4000, 'a') + string(kAppends * 96, 'b'));
174+
175+
// Check value was read no more than once for APPENDs and once for GET
176+
auto metrics = GetMetrics();
177+
EXPECT_LE(metrics.tiered_stats.total_fetches, 2u);
178+
EXPECT_LE(metrics.tiered_stats.total_uploads, 2u);
135179
}
136180

137181
TEST_F(TieredStorageTest, Ranges) {
@@ -207,6 +251,7 @@ TEST_F(TieredStorageTest, Defrag) {
207251
TEST_F(TieredStorageTest, BackgroundOffloading) {
208252
absl::FlagSaver saver;
209253
SetFlag(&FLAGS_tiered_offload_threshold, 1.0f); // offload all values
254+
SetFlag(&FLAGS_tiered_upload_threshold, 0.0f); // upload all values
210255
SetFlag(&FLAGS_tiered_experimental_cooling, false); // The setup works without cooling buffers
211256
UpdateFromFlags();
212257

@@ -247,7 +292,6 @@ TEST_F(TieredStorageTest, BackgroundOffloading) {
247292
// should be re-stashed again.
248293
EXPECT_EQ(metrics.tiered_stats.total_stashes, kNum + metrics.tiered_stats.total_uploads)
249294
<< resp.GetString();
250-
EXPECT_EQ(metrics.tiered_stats.total_fetches, kNum * 2);
251295
EXPECT_EQ(metrics.tiered_stats.allocated_bytes, kNum * 4096);
252296
}
253297

src/server/tiering/decoders.cc

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ void BareDecoder::Upload(CompactObj* obj) {
1818
ABSL_UNREACHABLE();
1919
}
2020

21+
Decoder::UploadMetrics BareDecoder::GetMetrics() const {
22+
ABSL_UNREACHABLE();
23+
return UploadMetrics{};
24+
}
25+
2126
StringDecoder::StringDecoder(const CompactObj& obj) : StringDecoder{obj.GetStrEncoding()} {
2227
}
2328

@@ -31,22 +36,28 @@ std::unique_ptr<Decoder> StringDecoder::Clone() const {
3136
void StringDecoder::Initialize(std::string_view slice) {
3237
slice_ = slice;
3338
value_ = encoding_.Decode(slice);
34-
estimated_mem_usage = slice.length(); // will be encoded back
3539
}
3640

3741
void StringDecoder::Upload(CompactObj* obj) {
38-
if (modified)
42+
if (modified_)
3943
obj->Materialize(value_.view(), false);
4044
else
4145
obj->Materialize(slice_, true);
4246
}
4347

48+
Decoder::UploadMetrics StringDecoder::GetMetrics() const {
49+
return UploadMetrics{
50+
.modified = modified_,
51+
.estimated_mem_usage = value_.view().size(),
52+
};
53+
}
54+
4455
std::string_view StringDecoder::Read() const {
4556
return value_.view();
4657
}
4758

4859
std::string* StringDecoder::Write() {
49-
modified = true;
60+
modified_ = true;
5061
return value_.GetMutable();
5162
}
5263

src/server/tiering/decoders.h

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ namespace dfly::tiering {
1717
// Decodes serialized value and provides it to callbacks.
1818
// Acts as generic interface to callback driver (OpManager)
1919
struct Decoder {
20+
struct UploadMetrics {
21+
bool modified; // whether the value as modified
22+
size_t estimated_mem_usage; // Estimated memory usage if uploaded
23+
};
24+
2025
virtual ~Decoder() = default;
2126

2227
// Poor man's type-erasure copy
@@ -25,17 +30,18 @@ struct Decoder {
2530
// Initialize decoder from slice
2631
virtual void Initialize(std::string_view slice) = 0;
2732

33+
// Compute upload metrics to determine if its worth
34+
virtual UploadMetrics GetMetrics() const = 0;
35+
2836
// Store value in compact object
2937
virtual void Upload(CompactObj* obj) = 0;
30-
31-
bool modified = false; // Must be set if modified (not equal to original slice)
32-
size_t estimated_mem_usage = 0; // Estimated usage if uploaded
3338
};
3439

3540
// Basic "bare" decoder that just stores the provided slice
3641
struct BareDecoder : public Decoder {
3742
std::unique_ptr<Decoder> Clone() const override;
3843
void Initialize(std::string_view slice) override;
44+
UploadMetrics GetMetrics() const override;
3945
void Upload(CompactObj* obj) override;
4046

4147
std::string_view slice;
@@ -46,6 +52,7 @@ struct StringDecoder : public Decoder {
4652

4753
std::unique_ptr<Decoder> Clone() const override;
4854
void Initialize(std::string_view slice) override;
55+
UploadMetrics GetMetrics() const override;
4956
void Upload(CompactObj* obj) override;
5057

5158
std::string_view Read() const;
@@ -54,6 +61,7 @@ struct StringDecoder : public Decoder {
5461
private:
5562
explicit StringDecoder(CompactObj::StrEncoding encoding);
5663

64+
bool modified_;
5765
std::string_view slice_;
5866
CompactObj::StrEncoding encoding_;
5967
dfly::StringOrView value_;

0 commit comments

Comments
 (0)