Skip to content

Commit 28ee7f3

Browse files
authored
chore: implement the iterative fragmentation check (#5766)
* chore: implement the iterative fragmentation check Before - we had relatively slow check for counting wasted fragmentation via zmalloc_get_allocator_wasted_blocks that took 10ms or more in production. The reason for that is that it iterate over all the memory pages on a single shard through the single call. Now we implement an iterative version of it by iterating over a single page queue data-structure in the heap. Once we start the iterative process we will continue aggregating stats over all the page queues in the heap until we reach the end and then conclude if defragmentation is needed. this should reduce the call time to EngineShard::DefragTaskState::CheckRequired by x70 (number of page queues in the heap). --------- Signed-off-by: Roman Gershman <[email protected]>
1 parent f45a0bd commit 28ee7f3

File tree

8 files changed

+175
-41
lines changed

8 files changed

+175
-41
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
commit e0cda4eb4a54cfcd33afcd5fbd7ecd86510ac4f9
2+
Author: Roman Gershman <[email protected]>
3+
Date: Wed Sep 3 23:30:34 2025 +0300
4+
5+
chore: track comitted size of full pages in a heap
6+
7+
Signed-off-by: Roman Gershman <[email protected]>
8+
9+
diff --git a/include/mimalloc/types.h b/include/mimalloc/types.h
10+
index a15d9cba..34d99a94 100644
11+
--- a/include/mimalloc/types.h
12+
+++ b/include/mimalloc/types.h
13+
@@ -559,9 +559,10 @@ struct mi_heap_s {
14+
uintptr_t cookie; // random cookie to verify pointers (see `_mi_ptr_cookie`)
15+
uintptr_t keys[2]; // two random keys used to encode the `thread_delayed_free` list
16+
mi_random_ctx_t random; // random number context used for secure allocation
17+
- size_t page_count; // total number of pages in the `pages` queues.
18+
- size_t page_retired_min; // smallest retired index (retired pages are fully free, but still in the page queues)
19+
- size_t page_retired_max; // largest retired index into the `pages` array.
20+
+ uint32_t page_count; // total number of pages in the `pages` queues.
21+
+ uint16_t page_retired_min; // smallest retired index (retired pages are fully free, but still in the page queues)
22+
+ uint16_t page_retired_max; // largest retired index into the `pages` array.
23+
+ size_t full_page_size; // total size of pages residing in MI_BIN_FULL bin.
24+
long generic_count; // how often is `_mi_malloc_generic` called?
25+
long generic_collect_count; // how often is `_mi_malloc_generic` called without collecting?
26+
mi_heap_t* next; // list of heaps per thread
27+
diff --git a/src/init.c b/src/init.c
28+
index 3fc8b033..61ee4c76 100644
29+
--- a/src/init.c
30+
+++ b/src/init.c
31+
@@ -118,6 +118,7 @@ mi_decl_cache_align const mi_heap_t _mi_heap_empty = {
32+
{ {0}, {0}, 0, true }, // random
33+
0, // page count
34+
MI_BIN_FULL, 0, // page retired min/max
35+
+ 0, // full page size
36+
0, 0, // generic count
37+
NULL, // next
38+
false, // can reclaim
39+
@@ -167,6 +168,7 @@ mi_decl_cache_align mi_heap_t _mi_heap_main = {
40+
{ {0x846ca68b}, {0}, 0, true }, // random
41+
0, // page count
42+
MI_BIN_FULL, 0, // page retired min/max
43+
+ 0, // full page size
44+
0, 0, // generic count
45+
NULL, // next heap
46+
false, // can reclaim
47+
diff --git a/src/page-queue.c b/src/page-queue.c
48+
index c719b626..524b09d8 100644
49+
--- a/src/page-queue.c
50+
+++ b/src/page-queue.c
51+
@@ -232,6 +232,10 @@ static void mi_page_queue_remove(mi_page_queue_t* queue, mi_page_t* page) {
52+
page->next = NULL;
53+
page->prev = NULL;
54+
// mi_atomic_store_ptr_release(mi_atomic_cast(void*, &page->heap), NULL);
55+
+ if (mi_page_queue_is_full(queue)) {
56+
+ mi_assert_internal(heap->full_page_size >= mi_page_block_size(page) * page->capacity);
57+
+ heap->full_page_size -= mi_page_block_size(page) * page->capacity;
58+
+ }
59+
mi_page_set_in_full(page,false);
60+
}
61+
62+
@@ -246,6 +250,9 @@ static void mi_page_queue_push(mi_heap_t* heap, mi_page_queue_t* queue, mi_page_
63+
(mi_page_is_large_or_huge(page) && mi_page_queue_is_huge(queue)) ||
64+
(mi_page_is_in_full(page) && mi_page_queue_is_full(queue)));
65+
66+
+ if (mi_page_queue_is_full(queue)) {
67+
+ heap->full_page_size += mi_page_block_size(page) * page->capacity;
68+
+ }
69+
mi_page_set_in_full(page, mi_page_queue_is_full(queue));
70+
// mi_atomic_store_ptr_release(mi_atomic_cast(void*, &page->heap), heap);
71+
page->next = queue->first;
72+
@@ -339,6 +346,12 @@ static void mi_page_queue_enqueue_from_ex(mi_page_queue_t* to, mi_page_queue_t*
73+
}
74+
}
75+
76+
+ if (mi_page_queue_is_full(to)) {
77+
+ heap->full_page_size += mi_page_block_size(page) * page->capacity;
78+
+ } else if (mi_page_queue_is_full(from)) {
79+
+ mi_assert_internal(heap->full_page_size >= mi_page_block_size(page) * page->capacity);
80+
+ heap->full_page_size -= mi_page_block_size(page) * page->capacity;
81+
+ }
82+
mi_page_set_in_full(page, mi_page_queue_is_full(to));
83+
}
84+

src/core/page_usage_stats.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
#pragma once
66

77
#include <absl/container/btree_map.h>
8+
9+
#define MI_BUILD_RELEASE 1
810
#include <mimalloc/types.h>
911

1012
#include "core/bloom.h"

src/core/segment_allocator.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
//
44
#include "core/segment_allocator.h"
55

6+
#define MI_BUILD_RELEASE 1
67
#include <mimalloc/types.h>
78

89
#include "base/logging.h"

src/external_libs.cmake

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ ExternalProject_Add(mimalloc2_project
7575
patch -p1 -d ${THIRD_PARTY_DIR}/mimalloc2/ -i ${MIMALLOC_PATCH_DIR}/0_base.patch
7676
COMMAND patch -p1 -d ${THIRD_PARTY_DIR}/mimalloc2/ -i ${MIMALLOC_PATCH_DIR}/1_add_stat_type.patch
7777
COMMAND patch -p1 -d ${THIRD_PARTY_DIR}/mimalloc2/ -i ${MIMALLOC_PATCH_DIR}/2_return_stat.patch
78+
COMMAND patch -p1 -d ${THIRD_PARTY_DIR}/mimalloc2/ -i ${MIMALLOC_PATCH_DIR}/3_track_full_size.patch
7879
BUILD_COMMAND make mimalloc-static
7980

8081
INSTALL_COMMAND make install

src/redis/zmalloc.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,16 @@ Note that if a block is not used, it would not counted as wasted
122122
*/
123123
int zmalloc_get_allocator_wasted_blocks(float ratio, size_t* allocated, size_t* commited,
124124
size_t* wasted);
125+
struct fragmentation_info {
126+
size_t committed;
127+
size_t wasted;
128+
unsigned bin;
129+
};
130+
131+
// Like zmalloc_get_allocator_wasted_blocks but incremental.
132+
// struct fragmentation_info must be passed first set to zero. Returns -1 needs to continue,
133+
// 0 if done.
134+
int zmalloc_get_allocator_fragmentation_step(float ratio, struct fragmentation_info* info);
125135

126136
/*
127137
* checks whether a page that the pointer ptr located at is underutilized.

src/redis/zmalloc_mi.c

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
#include <assert.h>
66
#include <mimalloc.h>
7+
8+
#define MI_BUILD_RELEASE 1
79
#include <mimalloc/types.h>
810
#include <string.h>
911
#include <unistd.h>
@@ -167,10 +169,49 @@ int zmalloc_get_allocator_wasted_blocks(float ratio, size_t* allocated, size_t*
167169
*allocated = sum.allocated;
168170
*commited = sum.comitted;
169171
*wasted = sum.wasted;
170-
171172
return 1;
172173
}
173174

175+
// Implemented based on this mimalloc code:
176+
// https://github.com/microsoft/mimalloc/blob/main/src/heap.c#L27
177+
int zmalloc_get_allocator_fragmentation_step(float ratio, struct fragmentation_info* info) {
178+
if (zmalloc_heap->page_count == 0 || info->bin >= MI_BIN_FULL) {
179+
// We avoid iterating over full pages since they are fully utilized.
180+
return 0;
181+
}
182+
183+
mi_page_queue_t* pq = &zmalloc_heap->pages[info->bin];
184+
const mi_page_t* page = pq->first;
185+
while (page != NULL) {
186+
const mi_page_t* next = page->next;
187+
188+
const size_t bsize = page->block_size;
189+
190+
size_t committed = page->capacity * bsize;
191+
info->committed += committed;
192+
if (page->used < page->capacity) {
193+
size_t used = page->used * bsize;
194+
195+
size_t threshold = (double)committed * ratio;
196+
if (used < threshold) {
197+
info->wasted += (committed - used);
198+
}
199+
}
200+
page = next;
201+
}
202+
203+
info->bin++;
204+
if (info->bin == MI_BIN_FULL) { // reached end of bins, reset state
205+
// Add total comitted size of MI_BIN_FULL that we do not traverse
206+
// as its tracked by zmalloc_heap->full_page_size variable.
207+
info->committed += zmalloc_heap->full_page_size;
208+
info->bin = 0;
209+
return 0;
210+
}
211+
212+
return -1;
213+
}
214+
174215
void init_zmalloc_threadlocal(void* heap) {
175216
if (zmalloc_heap)
176217
return;
@@ -179,8 +220,7 @@ void init_zmalloc_threadlocal(void* heap) {
179220

180221
void zmalloc_page_is_underutilized(void* ptr, float ratio, int collect_stats,
181222
mi_page_usage_stats_t* result) {
182-
*result = mi_heap_page_is_underutilized(zmalloc_heap, ptr, ratio,
183-
collect_stats);
223+
*result = mi_heap_page_is_underutilized(zmalloc_heap, ptr, ratio, collect_stats);
184224
}
185225

186226
char* zstrdup(const char* s) {

src/server/dragonfly_test.cc

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -745,9 +745,7 @@ TEST_F(DflyEngineTest, Issue742) {
745745
}
746746

747747
TEST_F(DefragDflyEngineTest, TestDefragOption) {
748-
if (pp_->GetNextProactor()->GetKind() == util::ProactorBase::EPOLL) {
749-
GTEST_SKIP() << "Defragmentation via idle task is only supported in io uring";
750-
}
748+
GTEST_SKIP() << "Defragmentation check takes too long. Disabling this test";
751749

752750
// mem_defrag_threshold is based on RSS statistic, but we don't count it in the test
753751
absl::SetFlag(&FLAGS_mem_defrag_threshold, 0.0);

src/server/engine_shard.cc

Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ ABSL_FLAG(float, mem_defrag_threshold, 0.7,
3131
"Minimum percentage of used memory relative to maxmemory cap before running "
3232
"defragmentation");
3333

34-
ABSL_FLAG(uint32_t, mem_defrag_check_sec_interval, 10,
34+
ABSL_FLAG(uint32_t, mem_defrag_check_sec_interval, 60,
3535
"Number of seconds between every defragmentation necessity check");
3636

3737
ABSL_FLAG(float, mem_defrag_waste_threshold, 0.2,
@@ -70,24 +70,6 @@ namespace {
7070

7171
constexpr uint64_t kCursorDoneState = 0u;
7272

73-
struct ShardMemUsage {
74-
std::size_t commited = 0;
75-
std::size_t used = 0;
76-
std::size_t wasted_mem = 0;
77-
};
78-
79-
std::ostream& operator<<(std::ostream& os, const ShardMemUsage& mem) {
80-
return os << "commited: " << mem.commited << " vs used " << mem.used << ", wasted memory "
81-
<< mem.wasted_mem;
82-
}
83-
84-
ShardMemUsage ReadShardMemUsage(float wasted_ratio) {
85-
ShardMemUsage usage;
86-
zmalloc_get_allocator_wasted_blocks(wasted_ratio, &usage.used, &usage.commited,
87-
&usage.wasted_mem);
88-
return usage;
89-
}
90-
9173
bool HasContendedLocks(ShardId shard_id, Transaction* trx, const DbTable* table) {
9274
auto is_contended = [table](LockFp fp) { return table->trans_locks.Find(fp)->IsContended(); };
9375

@@ -249,26 +231,42 @@ bool EngineShard::DefragTaskState::CheckRequired() {
249231
return false;
250232
}
251233

252-
const std::size_t global_threshold = limit * GetFlag(FLAGS_mem_defrag_threshold);
234+
static thread_local fragmentation_info finfo{.committed = 0, .wasted = 0, .bin = 0};
235+
236+
const std::size_t global_threshold = double(limit) * GetFlag(FLAGS_mem_defrag_threshold);
253237
if (global_threshold > rss_mem_current.load(memory_order_relaxed)) {
238+
finfo.bin = 0; // reset.
254239
return false;
255240
}
256241

257-
const auto now = time(nullptr);
258-
const auto seconds_from_prev_check = now - last_check_time;
259-
const auto mem_defrag_interval = GetFlag(FLAGS_mem_defrag_check_sec_interval);
242+
if (finfo.bin == 0) { // did not start the iterative checking yet
243+
const auto now = time(nullptr);
244+
const auto seconds_from_prev_check = now - last_check_time;
245+
const auto mem_defrag_interval = GetFlag(FLAGS_mem_defrag_check_sec_interval);
260246

261-
if (seconds_from_prev_check < mem_defrag_interval) {
262-
return false;
247+
if (seconds_from_prev_check < mem_defrag_interval) {
248+
return false;
249+
}
250+
251+
// start checking.
252+
finfo.committed = finfo.wasted = 0;
263253
}
264-
last_check_time = now;
265254

266-
ShardMemUsage usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold));
255+
uint64_t start = absl::GetCurrentTimeNanos();
256+
int res = zmalloc_get_allocator_fragmentation_step(
257+
GetFlag(FLAGS_mem_defrag_page_utilization_threshold), &finfo);
258+
uint64_t duration = absl::GetCurrentTimeNanos() - start;
259+
VLOG_IF(1, duration > 20'000) << "Reading memory usage took " << duration / 1'000
260+
<< " usec on bin " << finfo.bin;
261+
if (res == 0) {
262+
// finished checking.
263+
last_check_time = time(nullptr);
267264

268-
const double waste_threshold = GetFlag(FLAGS_mem_defrag_waste_threshold);
269-
if (usage.wasted_mem > (uint64_t(usage.commited * waste_threshold))) {
270-
VLOG(1) << "memory issue found for memory " << usage;
271-
return true;
265+
const double waste_threshold = GetFlag(FLAGS_mem_defrag_waste_threshold);
266+
if (finfo.wasted > size_t(finfo.committed * waste_threshold)) {
267+
VLOG(1) << "memory fragmentation issue found: " << finfo.wasted << " " << finfo.committed;
268+
return true;
269+
}
272270
}
273271

274272
return false;
@@ -322,11 +320,11 @@ std::optional<CollectedPageStats> EngineShard::DoDefrag(CollectPageStats collect
322320
defrag_state_.UpdateScanState(cur.token());
323321

324322
if (reallocations > 0) {
325-
VLOG(1) << "shard " << slice.shard_id() << ": successfully defrag " << reallocations
323+
VLOG(2) << "shard " << slice.shard_id() << ": successfully defrag " << reallocations
326324
<< " times, did it in " << traverses_count << " cursor is at the "
327325
<< (defrag_state_.cursor == kCursorDoneState ? "end" : "in progress");
328326
} else {
329-
VLOG(1) << "shard " << slice.shard_id() << ": run the defrag " << traverses_count
327+
VLOG(2) << "shard " << slice.shard_id() << ": run the defrag " << traverses_count
330328
<< " times out of maximum " << kMaxTraverses << ", with cursor at "
331329
<< (defrag_state_.cursor == kCursorDoneState ? "end" : "in progress")
332330
<< " but no location for defrag were found";
@@ -361,7 +359,7 @@ uint32_t EngineShard::DefragTask() {
361359
return util::ProactorBase::kOnIdleMaxLevel;
362360
}
363361
}
364-
return kRunAtLowPriority;
362+
return 3; // priority.
365363
}
366364

367365
EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap)
@@ -713,7 +711,7 @@ void EngineShard::RetireExpiredAndEvict() {
713711
stats_.total_heartbeat_expired_keys += stats.deleted;
714712
stats_.total_heartbeat_expired_bytes += stats.deleted_bytes;
715713
++stats_.total_heartbeat_expired_calls;
716-
VLOG(1) << "Heartbeat expired " << stats.deleted << " keys with total bytes "
714+
VLOG(2) << "Heartbeat expired " << stats.deleted << " keys with total bytes "
717715
<< stats.deleted_bytes << " with total expire flow calls "
718716
<< stats_.total_heartbeat_expired_calls;
719717
}

0 commit comments

Comments
 (0)