Skip to content

Commit 3a8ff25

Browse files
authored
chore(tiering): Add throttling metric + more tests (#6040)
1 parent 4442b13 commit 3a8ff25

File tree

6 files changed

+87
-46
lines changed

6 files changed

+87
-46
lines changed

src/server/common.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ bool ParseDouble(string_view src, double* value) {
162162
#define ADD(x) (x) += o.x
163163

164164
TieredStats& TieredStats::operator+=(const TieredStats& o) {
165-
static_assert(sizeof(TieredStats) == 144);
165+
static_assert(sizeof(TieredStats) == 160);
166166

167167
ADD(total_stashes);
168168
ADD(total_fetches);
@@ -186,6 +186,9 @@ TieredStats& TieredStats::operator+=(const TieredStats& o) {
186186
ADD(cold_storage_bytes);
187187
ADD(total_offloading_steps);
188188
ADD(total_offloading_stashes);
189+
190+
ADD(clients_throttled);
191+
ADD(total_clients_throttled);
189192
return *this;
190193
}
191194

src/server/common.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ struct TieredStats {
9393
size_t small_bins_filling_bytes = 0;
9494
size_t cold_storage_bytes = 0;
9595

96+
uint64_t clients_throttled = 0; // current number of throttled clients
97+
uint64_t total_clients_throttled = 0; // total number of throttles
98+
9699
TieredStats& operator+=(const TieredStats&);
97100
};
98101

src/server/server_family.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3060,6 +3060,9 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
30603060
append("tiered_ram_hits", m.events.ram_hits);
30613061
append("tiered_ram_cool_hits", m.events.ram_cool_hits);
30623062
append("tiered_ram_misses", m.events.ram_misses);
3063+
3064+
append("tiered_clients_throttled", m.tiered_stats.clients_throttled);
3065+
append("tiered_total_clients_throttled", m.tiered_stats.total_clients_throttled);
30633066
};
30643067

30653068
auto add_persistence_info = [&] {

src/server/tiered_storage.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,8 +422,10 @@ std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, str
422422
}
423423

424424
// If we are in the active offloading phase, throttle stashes by providing backpressure future
425-
if (provide_bp && ShouldOffload())
425+
if (provide_bp && ShouldOffload()) {
426+
stats_.total_clients_throttled++;
426427
return stash_backpressure_[{dbid, string{key}}];
428+
}
427429

428430
return {};
429431
}
@@ -494,6 +496,8 @@ TieredStats TieredStorage::GetStats() const {
494496
stats.cold_storage_bytes = stats_.cool_memory_used;
495497
stats.total_offloading_steps = stats_.offloading_steps;
496498
stats.total_offloading_stashes = stats_.offloading_stashes;
499+
stats.clients_throttled = stash_backpressure_.size();
500+
stats.total_clients_throttled = stats_.total_clients_throttled;
497501
}
498502
return stats;
499503
}

src/server/tiered_storage.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ class TieredStorage {
144144
uint64_t total_deletes = 0;
145145
uint64_t offloading_steps = 0;
146146
uint64_t offloading_stashes = 0;
147+
uint64_t total_clients_throttled = 0;
147148
size_t cool_memory_used = 0;
148149
} stats_;
149150
};

src/server/tiered_storage_test.cc

Lines changed: 71 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,33 @@ class TieredStorageTest : public BaseFamilyTest {
6363
}
6464
};
6565

66+
// Test that should run with both modes of "cooling"
67+
class LatentCoolingTSTest : public TieredStorageTest, public testing::WithParamInterface<bool> {
68+
void SetUp() override {
69+
fs.emplace();
70+
SetFlag(&FLAGS_tiered_experimental_cooling, GetParam());
71+
TieredStorageTest::SetUp();
72+
}
73+
74+
optional<absl::FlagSaver> fs;
75+
};
76+
77+
INSTANTIATE_TEST_SUITE_P(TS, LatentCoolingTSTest, testing::Values(true, false));
78+
79+
// Disabled cooling and all values are offloaded
80+
class PureDiskTSTest : public TieredStorageTest {
81+
void SetUp() override {
82+
fs.emplace();
83+
SetFlag(&FLAGS_tiered_offload_threshold, 1.0);
84+
SetFlag(&FLAGS_tiered_experimental_cooling, false);
85+
TieredStorageTest::SetUp();
86+
}
87+
88+
optional<absl::FlagSaver> fs;
89+
};
90+
6691
// Perform simple series of SET, GETSET and GET
67-
TEST_F(TieredStorageTest, SimpleGetSet) {
92+
TEST_P(LatentCoolingTSTest, SimpleGetSet) {
6893
absl::FlagSaver saver;
6994
SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // disable offloading
7095
UpdateFromFlags();
@@ -107,7 +132,8 @@ TEST_F(TieredStorageTest, SimpleGetSet) {
107132
EXPECT_EQ(metrics.db_stats[0].tiered_used_bytes, 0);
108133
}
109134

110-
TEST_F(TieredStorageTest, MGET) {
135+
// Use MGET to load multiple offloaded values
136+
TEST_P(LatentCoolingTSTest, MGET) {
111137
vector<string> command = {"MGET"}, values = {};
112138
for (char key = 'A'; key <= 'Z'; key++) {
113139
command.emplace_back(1, key);
@@ -178,7 +204,8 @@ TEST_F(TieredStorageTest, AppendStorm) {
178204
EXPECT_LE(metrics.tiered_stats.total_uploads, 2u);
179205
}
180206

181-
TEST_F(TieredStorageTest, Ranges) {
207+
// SETRANGE and GETRANGE
208+
TEST_P(LatentCoolingTSTest, Ranges) {
182209
Run({"SET", "key", string(3000, 'a')});
183210
ExpectConditionWithinTimeout([this] { return GetMetrics().tiered_stats.total_stashes >= 1; });
184211

@@ -194,7 +221,8 @@ TEST_F(TieredStorageTest, Ranges) {
194221
EXPECT_EQ(resp, string(500, 'c') + string(500, 'd'));
195222
}
196223

197-
TEST_F(TieredStorageTest, MultiDb) {
224+
// Stash values from different databases and read them back
225+
TEST_P(LatentCoolingTSTest, MultiDb) {
198226
for (size_t i = 0; i < 10; i++) {
199227
Run({"SELECT", absl::StrCat(i)});
200228
Run({"SET", absl::StrCat("k", i), BuildString(3000, char('A' + i))});
@@ -212,6 +240,7 @@ TEST_F(TieredStorageTest, MultiDb) {
212240
}
213241
}
214242

243+
// Trigger defragmentation
215244
TEST_F(TieredStorageTest, Defrag) {
216245
for (char k = 'a'; k < 'a' + 8; k++) {
217246
Run({"SET", string(1, k), string(600, k)});
@@ -248,11 +277,9 @@ TEST_F(TieredStorageTest, Defrag) {
248277
EXPECT_EQ(metrics.tiered_stats.allocated_bytes, 0u);
249278
}
250279

251-
TEST_F(TieredStorageTest, BackgroundOffloading) {
280+
TEST_F(PureDiskTSTest, BackgroundOffloading) {
252281
absl::FlagSaver saver;
253-
SetFlag(&FLAGS_tiered_offload_threshold, 1.0f); // offload all values
254-
SetFlag(&FLAGS_tiered_upload_threshold, 0.0f); // upload all values
255-
SetFlag(&FLAGS_tiered_experimental_cooling, false); // The setup works without cooling buffers
282+
SetFlag(&FLAGS_tiered_upload_threshold, 0.0f); // upload all values
256283
UpdateFromFlags();
257284

258285
const int kNum = 500;
@@ -295,18 +322,8 @@ TEST_F(TieredStorageTest, BackgroundOffloading) {
295322
EXPECT_EQ(metrics.tiered_stats.allocated_bytes, kNum * 4096);
296323
}
297324

298-
TEST_F(TieredStorageTest, FlushAll) {
299-
absl::FlagSaver saver;
300-
SetFlag(&FLAGS_tiered_offload_threshold, 1.0f); // offload all values
301-
302-
// We want to cover the interaction of FlushAll with concurrent reads from disk.
303-
// For that we disable tiered_experimental_cooling.
304-
// TODO: seems that our replacement policy will upload the entries to RAM in any case,
305-
// making this test ineffective. We should add the ability to disable promotion of offloaded
306-
// entries to RAM upon reads.
307-
SetFlag(&FLAGS_tiered_experimental_cooling, false);
308-
UpdateFromFlags();
309-
325+
// Test FLUSHALL while reading entries
326+
TEST_F(PureDiskTSTest, FlushAll) {
310327
const int kNum = 500;
311328
for (size_t i = 0; i < kNum; i++) {
312329
Run({"SET", absl::StrCat("k", i), BuildString(3000)});
@@ -344,6 +361,7 @@ TEST_F(TieredStorageTest, FlushAll) {
344361
EXPECT_EQ(metrics.db_stats.front().tiered_entries, 0u);
345362
}
346363

364+
// Check FLUSHALL clears filling bytes of small bins
347365
TEST_F(TieredStorageTest, FlushPending) {
348366
absl::FlagSaver saver;
349367
SetFlag(&FLAGS_tiered_offload_threshold, 1.0f); // offload all values
@@ -358,23 +376,42 @@ TEST_F(TieredStorageTest, FlushPending) {
358376
EXPECT_EQ(GetMetrics().tiered_stats.small_bins_filling_bytes, 0u);
359377
}
360378

361-
TEST_F(TieredStorageTest, MemoryPressure) {
362-
max_memory_limit = 20_MB;
379+
// Test that clients are throttled if many stashes are issued.
380+
// Stashes are released with CLIENT UNPAUSE to occur at the same time
381+
TEST_F(PureDiskTSTest, ThrottleClients) {
363382
absl::FlagSaver saver;
364-
absl::SetFlag(&FLAGS_tiered_upload_threshold, float(2_MB) / float(max_memory_limit));
383+
absl::SetFlag(&FLAGS_tiered_upload_threshold, 0.0);
384+
UpdateFromFlags();
365385

366-
constexpr size_t kNum = 10000;
367-
for (size_t i = 0; i < kNum; i++) {
368-
auto resp = Run({"SET", absl::StrCat("k", i), BuildString(10000)});
369-
if (resp != "OK"sv) {
370-
resp = Run({"INFO", "ALL"});
371-
ASSERT_FALSE(true) << i << "\nInfo ALL:\n" << resp.GetString();
372-
}
373-
ThisFiber::SleepFor(500us);
386+
// issue client pause to accumualte SETs
387+
Run({"CLIENT", "PAUSE", "1000"});
388+
389+
string value(4096, 'a');
390+
vector<Fiber> fibs;
391+
for (size_t i = 0; i < 100; i++) {
392+
fibs.emplace_back(pp_->at(0)->LaunchFiber([this, i, &value] {
393+
string key = absl::StrCat("k", i);
394+
Run(key, {"SET", key, value});
395+
}));
374396
}
397+
ThisFiber::Yield();
398+
399+
// Unpause
400+
Run({"CLIENT", "UNPAUSE"});
375401

402+
// Check if at least some of the clients were caugth throttling
403+
// but we provided backpressure for all of them
376404
auto metrics = GetMetrics();
377-
EXPECT_LT(metrics.used_mem_peak, 20_MB);
405+
EXPECT_GT(metrics.tiered_stats.clients_throttled, fibs.size() / 10);
406+
EXPECT_EQ(metrics.tiered_stats.total_clients_throttled, fibs.size());
407+
408+
for (auto& fib : fibs)
409+
fib.JoinIfNeeded();
410+
411+
// Because of the 5ms max wait time for backpressure, we can't rely on the stashes to have
412+
// finished even after all the fibers joined, so expect the condition with a timeout
413+
ExpectConditionWithinTimeout(
414+
[&] { return GetMetrics().tiered_stats.total_stashes == fibs.size(); });
378415
}
379416

380417
TEST_F(TieredStorageTest, Expiry) {
@@ -386,11 +423,7 @@ TEST_F(TieredStorageTest, Expiry) {
386423
EXPECT_EQ(resp, val);
387424
}
388425

389-
TEST_F(TieredStorageTest, SetExistingExpire) {
390-
absl::FlagSaver saver;
391-
SetFlag(&FLAGS_tiered_offload_threshold, 1.0f); // offload all values
392-
SetFlag(&FLAGS_tiered_experimental_cooling, false);
393-
426+
TEST_F(PureDiskTSTest, SetExistingExpire) {
394427
const int kNum = 20;
395428
for (size_t i = 0; i < kNum; i++) {
396429
Run({"SETEX", absl::StrCat("k", i), "100", BuildString(256)});
@@ -407,13 +440,7 @@ TEST_F(TieredStorageTest, SetExistingExpire) {
407440
}
408441
}
409442

410-
TEST_F(TieredStorageTest, Dump) {
411-
absl::FlagSaver saver;
412-
SetFlag(&FLAGS_tiered_offload_threshold, 1.0f); // offload all values
413-
414-
// we want to test without cooling to trigger disk I/O on reads.
415-
SetFlag(&FLAGS_tiered_experimental_cooling, false);
416-
443+
TEST_F(PureDiskTSTest, Dump) {
417444
const int kNum = 10;
418445
for (size_t i = 0; i < kNum; i++) {
419446
Run({"SET", absl::StrCat("k", i), BuildString(3000)}); // big enough to trigger offloading.

0 commit comments

Comments
 (0)