Skip to content

Commit 0b01cc5

Browse files
authored
chore(server): Some memory code updates (#5379)
Update memory management code
1 parent 618bedb commit 0b01cc5

File tree

9 files changed

+92
-87
lines changed

9 files changed

+92
-87
lines changed

src/server/common.cc

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,19 +110,13 @@ std::string_view LockTagOptions::Tag(std::string_view key) const {
110110
return key.substr(start + 1, end - start - 1);
111111
}
112112

113-
atomic_uint64_t used_mem_peak(0);
113+
size_t max_memory_limit = 0;
114114
atomic_uint64_t used_mem_current(0);
115115
atomic_uint64_t rss_mem_current(0);
116-
atomic_uint64_t rss_mem_peak(0);
117116

118117
unsigned kernel_version = 0;
119-
size_t max_memory_limit = 0;
120118
Namespaces* namespaces = nullptr;
121119

122-
size_t FetchRssMemory(const io::StatusData& sdata) {
123-
return sdata.vm_rss + sdata.hugetlb_pages;
124-
}
125-
126120
const char* GlobalStateName(GlobalState s) {
127121
switch (s) {
128122
case GlobalState::ACTIVE:

src/server/common.h

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,11 @@ bool ParseDouble(std::string_view src, double* value);
128128

129129
const char* RdbTypeName(unsigned type);
130130

131-
// Cached values, updated frequently to represent the correct state of the system.
132-
extern std::atomic_uint64_t used_mem_peak;
131+
extern size_t max_memory_limit; // Value of maxmemory flag
132+
133+
// Globally used atomics for memory readings
133134
extern std::atomic_uint64_t used_mem_current;
134135
extern std::atomic_uint64_t rss_mem_current;
135-
extern std::atomic_uint64_t rss_mem_peak;
136-
137-
extern size_t max_memory_limit;
138-
139-
size_t FetchRssMemory(const io::StatusData& sdata);
140136

141137
extern Namespaces* namespaces;
142138

src/server/db_slice.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,9 +253,7 @@ class DbSlice {
253253
expire_base_[generation & 1] = now;
254254
}
255255

256-
// From time to time DbSlice is set with a new set of params needed to estimate its
257-
// memory usage.
258-
void SetCachedParams(int64_t budget, size_t bytes_per_object) {
256+
void UpdateMemoryParams(int64_t budget, size_t bytes_per_object) {
259257
memory_budget_ = budget;
260258
bytes_per_object_ = bytes_per_object;
261259
}
@@ -632,8 +630,12 @@ class DbSlice {
632630

633631
uint64_t version_ = 1; // Used to version entries in the PrimeTable.
634632
uint64_t next_moved_id_ = 1;
633+
634+
// Estimation of available memory dedicated to this shard.
635+
// Recalculated periodically by dividing free memory left among all shards equally
635636
ssize_t memory_budget_ = SSIZE_MAX / 2;
636637
size_t bytes_per_object_ = 0;
638+
637639
size_t soft_budget_limit_ = 0;
638640
size_t table_memory_ = 0;
639641
uint64_t entries_count_ = 0;

src/server/engine_shard.cc

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -476,11 +476,11 @@ uint32_t EngineShard::DefragTask() {
476476
}
477477

478478
EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap)
479-
: queue_(kQueueLen, 1, 1),
479+
: txq_([](const Transaction* t) { return t->txid(); }),
480+
queue_(kQueueLen, 1, 1),
480481
queue2_(kQueueLen / 2, 2, 2),
481-
txq_([](const Transaction* t) { return t->txid(); }),
482-
mi_resource_(heap),
483-
shard_id_(pb->GetPoolIndex()) {
482+
shard_id_(pb->GetPoolIndex()),
483+
mi_resource_(heap) {
484484
queue_.Start(absl::StrCat("shard_queue_", shard_id()));
485485
queue2_.Start(absl::StrCat("l2_queue_", shard_id()));
486486
}
@@ -859,30 +859,26 @@ void EngineShard::RetireExpiredAndEvict() {
859859

860860
void EngineShard::CacheStats() {
861861
uint64_t now = fb2::ProactorBase::GetMonotonicTimeNs();
862-
if (cache_stats_time_ + 1000000 > now) // 1ms
862+
if (last_mem_params_.updated_at + 1000000 > now) // 1ms
863863
return;
864864

865-
cache_stats_time_ = now;
866-
// Used memory for this shard.
867865
size_t used_mem = UsedMemory();
868866
DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id());
869867

870-
// delta can wrap if used_memory is smaller than last_cached_used_memory_ and it's fine.
871-
size_t delta = used_mem - last_cached_used_memory_;
872-
last_cached_used_memory_ = used_mem;
868+
// Reflect local memory change on global value
869+
size_t delta = used_mem - last_mem_params_.used_mem; // negative value wraps safely
873870
size_t current = used_mem_current.fetch_add(delta, memory_order_relaxed) + delta;
874871
ssize_t free_mem = max_memory_limit - current;
875872

873+
// Estimate bytes per object, excluding table memory
876874
size_t entries = db_slice.entries_count();
877-
size_t table_memory = db_slice.table_memory();
878-
879-
if (tiered_storage_) {
880-
table_memory += tiered_storage_->CoolMemoryUsage();
881-
}
875+
size_t table_memory =
876+
db_slice.table_memory() + (tiered_storage_ ? tiered_storage_->CoolMemoryUsage() : 0);
882877
size_t obj_memory = table_memory <= used_mem ? used_mem - table_memory : 0;
883-
884878
size_t bytes_per_obj = entries > 0 ? obj_memory / entries : 0;
885-
db_slice.SetCachedParams(free_mem / shard_set->size(), bytes_per_obj);
879+
880+
db_slice.UpdateMemoryParams(free_mem / shard_set->size(), bytes_per_obj);
881+
last_mem_params_ = {now, used_mem};
886882
}
887883

888884
size_t EngineShard::UsedMemory() const {

src/server/engine_shard.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class EngineShard {
122122
return stats_;
123123
}
124124

125-
// Returns used memory for this shard.
125+
// Calculate memory used by shard by summing multiple sources
126126
size_t UsedMemory() const;
127127

128128
TieredStorage* tiered_storage() {
@@ -247,19 +247,22 @@ class EngineShard {
247247
// --------------------------------------------------------------------------
248248
uint32_t DefragTask();
249249

250+
TxQueue txq_;
250251
TaskQueue queue_, queue2_;
251252

252-
TxQueue txq_;
253-
MiMemoryResource mi_resource_;
254253
ShardId shard_id_;
255-
256254
Stats stats_;
257255

258256
// Become passive if replica: don't automatially evict expired items.
259257
bool is_replica_ = false;
260258

261-
size_t last_cached_used_memory_ = 0;
262-
uint64_t cache_stats_time_ = 0; // monotonic, set by ProactorBase::GetMonotonicTimeNs.
259+
// Precise tracking of used memory by persistent shard local values and structures
260+
MiMemoryResource mi_resource_;
261+
262+
struct {
263+
uint64_t updated_at = 0; // from GetMonotonicTimeNs
264+
size_t used_mem = 0;
265+
} last_mem_params_;
263266

264267
// Logical ts used to order distributed transactions.
265268
TxId committed_txid_ = 0;

src/server/server_family.cc

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,16 @@ inline CommandId::Handler3 HandlerFunc(ServerFamily* se, EngineFunc f) {
251251
return [=](CmdArgList args, const CommandContext& cntx) { return (se->*f)(args, cntx); };
252252
}
253253

254+
// Captured memory peaks
255+
struct {
256+
std::atomic<size_t> used = 0;
257+
std::atomic<size_t> rss = 0;
258+
} glob_memory_peaks;
259+
260+
size_t FetchRssMemory(const io::StatusData& sdata) {
261+
return sdata.vm_rss + sdata.hugetlb_pages;
262+
}
263+
254264
using CI = CommandId;
255265

256266
struct CmdArgListFormatter {
@@ -670,11 +680,11 @@ bool ReadProcStats(io::StatusData* sdata) {
670680
return false;
671681
}
672682

673-
size_t total_rss = FetchRssMemory(sdata_res.value());
683+
size_t total_rss = FetchRssMemory(*sdata_res);
674684
rss_mem_current.store(total_rss, memory_order_relaxed);
675-
if (rss_mem_peak.load(memory_order_relaxed) < total_rss) {
676-
rss_mem_peak.store(total_rss, memory_order_relaxed);
677-
}
685+
if (total_rss > glob_memory_peaks.rss.load(memory_order_relaxed))
686+
glob_memory_peaks.rss.store(total_rss, memory_order_relaxed);
687+
678688
*sdata = *sdata_res;
679689
return true;
680690
}
@@ -1028,51 +1038,35 @@ void ServerFamily::Shutdown() {
10281038
}
10291039

10301040
bool ServerFamily::HasPrivilegedInterface() {
1031-
for (auto* listener : listeners_) {
1032-
if (listener->IsPrivilegedInterface()) {
1033-
return true;
1034-
}
1035-
}
1036-
return false;
1041+
return any_of(listeners_.begin(), listeners_.end(),
1042+
[](auto* l) { return l->IsPrivilegedInterface(); });
10371043
}
10381044

10391045
void ServerFamily::UpdateMemoryGlobalStats() {
1040-
ShardId sid = EngineShard::tlocal()->shard_id();
1041-
if (sid != 0) { // This function is executed periodicaly on all shards. To ensure the logic
1042-
// bellow runs only on one shard we return is the shard is not 0.
1046+
// Called from all shards, but one updates global stats below
1047+
if (EngineShard::tlocal()->shard_id() > 0)
10431048
return;
1044-
}
10451049

1050+
// Update used memory peak
10461051
uint64_t mem_current = used_mem_current.load(std::memory_order_relaxed);
1047-
if (mem_current > used_mem_peak.load(memory_order_relaxed)) {
1048-
used_mem_peak.store(mem_current, memory_order_relaxed);
1049-
}
1052+
if (mem_current > glob_memory_peaks.used.load(memory_order_relaxed))
1053+
glob_memory_peaks.used.store(mem_current, memory_order_relaxed);
10501054

10511055
io::StatusData status_data;
1052-
bool success = ReadProcStats(&status_data);
1056+
bool success = ReadProcStats(&status_data); // updates glob_memory_peaks.rss
10531057
if (!success)
10541058
return;
10551059

10561060
size_t total_rss = FetchRssMemory(status_data);
1061+
1062+
// Decide on stopping or accepting new connections based on oom deny ratio
10571063
double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
10581064
if (rss_oom_deny_ratio > 0) {
10591065
size_t memory_limit = max_memory_limit * rss_oom_deny_ratio;
1060-
if (total_rss > memory_limit && accepting_connections_ && HasPrivilegedInterface()) {
1061-
for (auto* listener : listeners_) {
1062-
if (!listener->IsPrivilegedInterface()) {
1063-
listener->socket()->proactor()->Await([listener]() { listener->pause_accepting(); });
1064-
}
1065-
}
1066-
accepting_connections_ = false;
1067-
1068-
} else if (total_rss < memory_limit && !accepting_connections_) {
1069-
for (auto* listener : listeners_) {
1070-
if (!listener->IsPrivilegedInterface()) {
1071-
listener->socket()->proactor()->Await([listener]() { listener->resume_accepting(); });
1072-
}
1073-
}
1074-
accepting_connections_ = true;
1075-
}
1066+
if (total_rss > memory_limit && accepting_connections_ && HasPrivilegedInterface())
1067+
ChangeConnectionAccept(false);
1068+
else if (total_rss < memory_limit && !accepting_connections_)
1069+
ChangeConnectionAccept(true);
10761070
}
10771071
}
10781072

@@ -1409,8 +1403,8 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
14091403
bool success = ReadProcStats(&sdata);
14101404
AppendMetricWithoutLabels("memory_used_bytes", "", m.heap_used_bytes, MetricType::GAUGE,
14111405
&resp->body());
1412-
AppendMetricWithoutLabels("memory_used_peak_bytes", "", used_mem_peak.load(memory_order_relaxed),
1413-
MetricType::GAUGE, &resp->body());
1406+
AppendMetricWithoutLabels("memory_used_peak_bytes", "", m.used_mem_peak, MetricType::GAUGE,
1407+
&resp->body());
14141408
AppendMetricWithoutLabels("memory_fiberstack_vms_bytes",
14151409
"virtual memory size used by all the fibers", m.worker_fiber_stack_size,
14161410
MetricType::GAUGE, &resp->body());
@@ -2061,6 +2055,14 @@ void ServerFamily::ClientUnPauseCmd(CmdArgList args, SinkReplyBuilder* builder)
20612055
builder->SendOk();
20622056
}
20632057

2058+
void ServerFamily::ChangeConnectionAccept(bool accept) {
2059+
DCHECK_NE(accept, accepting_connections_);
2060+
auto h = accept ? &ListenerInterface::resume_accepting : &ListenerInterface::pause_accepting;
2061+
for (auto* listener : GetNonPriviligedListeners())
2062+
listener->socket()->proactor()->Await([listener, h]() { (listener->*h)(); });
2063+
accepting_connections_ = accept;
2064+
}
2065+
20642066
void ClientHelp(SinkReplyBuilder* builder) {
20652067
string_view help_arr[] = {
20662068
"CLIENT <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
@@ -2452,13 +2454,19 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const {
24522454

24532455
// Update peak stats. We rely on the fact that GetMetrics is called frequently enough to
24542456
// update peak_stats_ from it.
2455-
util::fb2::LockGuard lk{peak_stats_mu_};
2456-
UpdateMax(&peak_stats_.conn_dispatch_queue_bytes,
2457-
result.facade_stats.conn_stats.dispatch_queue_bytes);
2458-
UpdateMax(&peak_stats_.conn_read_buf_capacity, result.facade_stats.conn_stats.read_buf_capacity);
2457+
{
2458+
util::fb2::LockGuard lk{peak_stats_mu_};
2459+
UpdateMax(&peak_stats_.conn_dispatch_queue_bytes,
2460+
result.facade_stats.conn_stats.dispatch_queue_bytes);
2461+
UpdateMax(&peak_stats_.conn_read_buf_capacity,
2462+
result.facade_stats.conn_stats.read_buf_capacity);
2463+
result.peak_stats = peak_stats_;
2464+
}
24592465

24602466
result.peak_stats = peak_stats_;
24612467
result.cmd_latency_map = service_.mutable_registry()->LatencyMap();
2468+
result.used_mem_peak = glob_memory_peaks.used.load(memory_order_relaxed);
2469+
result.used_mem_rss_peak = glob_memory_peaks.rss.load(memory_order_relaxed);
24622470

24632471
uint64_t delta_ms = (absl::GetCurrentTimeNanos() - start) / 1'000'000;
24642472
if (delta_ms > 30) {
@@ -2547,9 +2555,8 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
25472555
auto add_mem_info = [&] {
25482556
append("used_memory", m.heap_used_bytes);
25492557
append("used_memory_human", HumanReadableNumBytes(m.heap_used_bytes));
2550-
const auto ump = used_mem_peak.load(memory_order_relaxed);
2551-
append("used_memory_peak", ump);
2552-
append("used_memory_peak_human", HumanReadableNumBytes(ump));
2558+
append("used_memory_peak", m.used_mem_peak);
2559+
append("used_memory_peak_human", HumanReadableNumBytes(m.used_mem_peak));
25532560

25542561
// Virtual memory size, upper bound estimation on the RSS memory used by the fiber stacks.
25552562
append("fibers_stack_vms", m.worker_fiber_stack_size);
@@ -2562,7 +2569,7 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
25622569
append("used_memory_rss", rss);
25632570
append("used_memory_rss_human", HumanReadableNumBytes(rss));
25642571
}
2565-
append("used_memory_peak_rss", rss_mem_peak.load(memory_order_relaxed));
2572+
append("used_memory_peak_rss", glob_memory_peaks.used.load(memory_order_relaxed));
25662573

25672574
append("maxmemory", max_memory_limit);
25682575
append("maxmemory_human", HumanReadableNumBytes(max_memory_limit));

src/server/server_family.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ struct Metrics {
8787

8888
size_t qps = 0;
8989

90+
size_t used_mem_peak = 0;
91+
size_t used_mem_rss_peak = 0;
92+
9093
size_t heap_used_bytes = 0;
9194
size_t small_string_bytes = 0;
9295
uint32_t traverse_ttl_per_sec = 0;
@@ -374,14 +377,17 @@ class ServerFamily {
374377
void ClientPauseCmd(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx);
375378
void ClientUnPauseCmd(CmdArgList args, SinkReplyBuilder* builder);
376379

380+
// Set accepting_connections_ and update listners according to it
381+
void ChangeConnectionAccept(bool accept);
382+
377383
util::fb2::Fiber snapshot_schedule_fb_;
378384
util::fb2::Fiber load_fiber_;
379385

380386
Service& service_;
381387

382388
util::AcceptServer* acceptor_ = nullptr;
383389
std::vector<facade::Listener*> listeners_;
384-
bool accepting_connections_ = true;
390+
bool accepting_connections_ = true; // reject connections near oom
385391
util::ProactorBase* pb_task_ = nullptr;
386392

387393
mutable util::fb2::Mutex replicaof_mu_, save_mu_;

src/server/tiered_storage.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ void TieredStorage::ShardOpManager::RetireColdEntries(size_t additional_memory)
286286

287287
// Update memory_budget directly since we know that gained bytes were released.
288288
// We will overwrite the budget correctly in the next Hearbeat.
289-
db_slice_.SetCachedParams(gained + db_slice_.memory_budget(), db_slice_.bytes_per_object());
289+
db_slice_.UpdateMemoryParams(gained + db_slice_.memory_budget(), db_slice_.bytes_per_object());
290290
}
291291
}
292292

src/server/tiered_storage_test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,8 @@ TEST_F(TieredStorageTest, MemoryPressure) {
324324
ThisFiber::SleepFor(500us);
325325
}
326326

327-
EXPECT_LT(used_mem_peak.load(), 20_MB);
327+
auto metrics = GetMetrics();
328+
EXPECT_LT(metrics.used_mem_peak, 20_MB);
328329
}
329330

330331
TEST_F(TieredStorageTest, Expiry) {

0 commit comments

Comments
 (0)