Skip to content

Commit e37974d

Browse files
authored
fix(mount): fix not released read cache entries
This PR targets fixing unexpected high memory consumption while reading and client shows "Maximum requested offset should be greater than or equal current offset" message. The found case can be described the following way: - assume some previous reads have left behind some read buffers in the buffer pool. - previous reads have read the interval currently requested. - immediately previous reads have generated readahead requests currently in progress in lower offsets of the file. - the request is instantly replied from cache, but adding extra requests is triggered. - the maximum offset requested is set then to those lower offsets. - the forced insert adds an entry from the buffers pool with zero size but actually has the space reserved. - the above message is then shown and addExtraRequests_ returns without adding the inserted entry to any other container. - that entry is unable to get its done set to true, which implies it can only be released while destroying the ReadRecord instance, often at file descriptor close. The proposed solution prevents the entry insertion in the cases of an early return. The maximumRequestedOffset update was also modified that avoids early returns while keeping the same meaning. The test test_slow_rereads was added to check the previously described issues. A new mode "only_slow" was added to the chunk_operations_eio library. The util create-and-reread-file was created specifically for this test. Signed-off-by: Dave <dave@leil.io>
1 parent bd9b620 commit e37974d

File tree

5 files changed

+171
-8
lines changed

5 files changed

+171
-8
lines changed

src/mount/readdata.cc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -418,8 +418,9 @@ void ReadaheadOperationsManager::addExtraRequests_(
418418
}
419419

420420
if (!rrec->readaheadRequests.empty()) {
421-
maximumRequestedOffset = rrec->readaheadRequests.lastPendingRequest()
422-
->requestPtr->endOffset();
421+
maximumRequestedOffset =
422+
std::max(maximumRequestedOffset,
423+
rrec->readaheadRequests.lastPendingRequest()->requestPtr->endOffset());
423424
}
424425

425426
uint64_t throughputWindow = rrec->readahead_adviser.throughputWindow();
@@ -438,7 +439,6 @@ void ReadaheadOperationsManager::addExtraRequests_(
438439
// Try to align extra requests to SFSCHUNKSIZE
439440
uint64_t extraRequestSize = std::min<uint64_t>(
440441
satisfyingSize, SFSCHUNKSIZE - (maximumRequestedOffset % SFSCHUNKSIZE));
441-
ReadCache::Entry *entry = rrec->cache.forceInsert(maximumRequestedOffset, extraRequestSize);
442442

443443
if (maximumRequestedOffset < currentOffset) {
444444
safs::log_warn(
@@ -449,6 +449,9 @@ void ReadaheadOperationsManager::addExtraRequests_(
449449
// Next subtraction will overflow, so let's just return here
450450
return;
451451
}
452+
453+
ReadCache::Entry *entry = rrec->cache.forceInsert(maximumRequestedOffset, extraRequestSize);
454+
452455
int64_t extraPriority =
453456
rrec->readahead_adviser.expectedNeededTime_us(maximumRequestedOffset - currentOffset);
454457

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
timeout_set 3 minutes
2+
3+
cacheexpirationtime_ms=15000
4+
readbuffersexpirationtime_ms=60000
5+
6+
CHUNKSERVERS=1 \
7+
DISK_PER_CHUNKSERVER=1 \
8+
CHUNKSERVER_0_DISK_0="$RAMDISK_DIR/pread_only_slow_hdd_0" \
9+
USE_RAMDISK=YES \
10+
MOUNT_EXTRA_CONFIG="sfscachemode=NEVER|maxreadaheadrequests=4`
11+
`|cacheexpirationtime=${cacheexpirationtime_ms}|readworkers=1`
12+
`|readaheadmaxwindowsize=16384`
13+
`|readbuffersexpirationtime=${readbuffersexpirationtime_ms}" \
14+
MASTER_EXTRA_CONFIG="CHUNKS_LOOP_MIN_TIME = 1`
15+
`|CHUNKS_LOOP_MAX_CPU = 90`
16+
`|CHUNKS_LOOP_PERIOD = 10`
17+
`|OPERATIONS_DELAY_INIT = 0`
18+
`|OPERATIONS_DELAY_DISCONNECT = 0" \
19+
setup_local_empty_saunafs info
20+
21+
cd ${info[mount0]}
22+
23+
# Create a file with 30 chunks, and read it to make sure client creates all necessary read buffers
24+
dd if=/dev/zero of=file bs=1M count=$((30 * 64)) status=none
25+
dd if=file of=/dev/null bs=1M count=$((30 * 64)) status=none
26+
27+
saunafs settrashtime 0 file
28+
rm file
29+
30+
# Restart the first chunkserver preloading pread with slow version of reads
31+
LD_PRELOAD="${SAUNAFS_INSTALL_FULL_LIBDIR}/libchunk_operations_eio.so" \
32+
assert_success saunafs_chunkserver_daemon 0 restart
33+
saunafs_wait_for_all_ready_chunkservers
34+
35+
create-and-reread-file "file" $(( (cacheexpirationtime_ms + readbuffersexpirationtime_ms) / 1000)) &
36+
37+
while ! [[ -f notify_file_reread ]]; do sleep 0.1; done
38+
39+
# At least 75s must have passed since the last read of the file (see create_and_reread_file.cc),
40+
# so all read buffers should have been expired by now. Check that sfsmount is not using too much
41+
# memory, which would indicate that it is keeping all read buffers in memory instead of expiring
42+
# them.
43+
44+
sfsmount_pid="$(pgrep -f "sfsmount.*${info[mount0]}" | head -n1)"
45+
assert_success test -n "${sfsmount_pid}"
46+
47+
rss_kb="$(ps -o rss= -p "${sfsmount_pid}" | tr -d '[:space:]')"
48+
vsz_kb="$(ps -o vsz= -p "${sfsmount_pid}" | tr -d '[:space:]')"
49+
50+
echo "sfsmount pid=${sfsmount_pid} rss=${rss_kb}KB vsz=${vsz_kb}KB"
51+
52+
# 1 GB RSS is a very high limit that should be never reached if read buffers are expired properly
53+
assert_less_than "${rss_kb}" $((1024 * 1024))
54+
55+
wait

utils/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ install(TARGETS readdir-unlink-test RUNTIME DESTINATION ${BIN_SUBDIR})
7272
add_executable(big-session-metadata-benchmark big_session_metadata_benchmark.cc)
7373
install(TARGETS big-session-metadata-benchmark RUNTIME DESTINATION ${BIN_SUBDIR})
7474

75+
# create_and_reread_file test for testing some rereads after creating a file
76+
add_executable(create-and-reread-file create_and_reread_file.cc)
77+
install(TARGETS create-and-reread-file RUNTIME DESTINATION ${BIN_SUBDIR})
78+
7579
add_executable(metadata-notifier metadata_notifier.cc)
7680
target_link_libraries(metadata-notifier sfscommon)
7781
install(TARGETS metadata-notifier RUNTIME DESTINATION ${BIN_SUBDIR})

utils/chunk_operations_eio.c

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ int EIO_replies = 0;
3232
// * fsync always fails with EIO if file name contains "fsync_EIO"
3333
// * pread fails with EIO if offset>FAR_OFFSET_THRESHOLD and file name contains "pread_far_EIO"
3434
// * pwrite fails with EIO if offset>FAR_OFFSET_THRESHOLD and file name contains "pwrite_far_EIO"
35+
// * pread fails with EIO if the first operation and takes 2s more if file name contains
36+
// "pread_slow_and_one_eio_trigger"
37+
// * pwrite fails with EIO if the first operation and takes 2s more if file name contains
38+
// "pwrite_slow_and_one_eio_trigger"
39+
// * pread takes 10ms + 1us per 250B if file name contains "pread_only_slow"
40+
// * pwrite takes 10ms + 1us per 250B if file name contains "pwrite_only_slow"
3541

3642
// returns -1 on failure and sets errno (via readlink call)
3743
ssize_t read_filename(int fd, char *buf, int bufsize) {
@@ -42,11 +48,12 @@ ssize_t read_filename(int fd, char *buf, int bufsize) {
4248
return readlink(fdpath, buf, bufsize);
4349
}
4450

45-
static int err_on_operation(int fd, const char* opname, size_t offset) {
51+
static int err_on_operation(int fd, const char* opname, size_t offset, size_t size) {
4652
char filename[FILENAME_BUFSIZE] = {0};
4753
char always_eio_trigger[COMMAND_BUFSIZE] = {0};
4854
char far_eio_trigger[COMMAND_BUFSIZE] = {0};
4955
char slow_and_one_eio_trigger[COMMAND_BUFSIZE] = {0};
56+
char only_slow[COMMAND_BUFSIZE] = {0};
5057

5158
ssize_t result = read_filename(fd, filename, FILENAME_BUFSIZE);
5259
if (result == -1) {
@@ -61,6 +68,7 @@ static int err_on_operation(int fd, const char* opname, size_t offset) {
6168
sprintf(always_eio_trigger, "%s_EIO", opname);
6269
sprintf(far_eio_trigger, "%s_far_EIO", opname);
6370
sprintf(slow_and_one_eio_trigger, "%s_slow_and_one_EIO", opname);
71+
sprintf(only_slow, "%s_only_slow", opname);
6472

6573
// TODO: remove fixed pattern for SMRs after the basic support
6674
if (strstr(filename, always_eio_trigger) || strstr(filename, "sauna_nullb0")) {
@@ -75,6 +83,12 @@ static int err_on_operation(int fd, const char* opname, size_t offset) {
7583
return EIO;
7684
}
7785
return 0;
86+
} else if (strstr(filename, only_slow)) {
87+
// sleep for a while to simulate a slow operation on hdd of 250MB/s peak throughput
88+
int kBaseLatency_us = 10 * 1000; // 10ms
89+
int bytesPerUs = 250; // 250B per 1us
90+
usleep(kBaseLatency_us + size / bytesPerUs);
91+
return 0;
7892
} else {
7993
return 0;
8094
}
@@ -86,7 +100,7 @@ ssize_t pread(int fd, void *buf, size_t count, off_t offset) {
86100
int err;
87101
static pread_t _pread = NULL;
88102

89-
err = err_on_operation(fd, "pread", offset);
103+
err = err_on_operation(fd, "pread", offset, count);
90104
if (err) {
91105
errno = err;
92106
return -1;
@@ -101,7 +115,7 @@ ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset) {
101115
int err;
102116
static pwrite_t _pwrite = NULL;
103117

104-
err = err_on_operation(fd, "pwrite", offset);
118+
err = err_on_operation(fd, "pwrite", offset, count);
105119
if (err) {
106120
errno = err;
107121
return -1;
@@ -116,7 +130,7 @@ int close(int fd) {
116130
int err;
117131
static close_t _close = NULL;
118132

119-
err = err_on_operation(fd, "close", 0);
133+
err = err_on_operation(fd, "close", 0, 0);
120134
if (err) {
121135
errno = err;
122136
return -1;
@@ -131,7 +145,7 @@ int fsync(int fd) {
131145
int err;
132146
static fsync_t _fsync = NULL;
133147

134-
err = err_on_operation(fd, "fsync", 0);
148+
err = err_on_operation(fd, "fsync", 0, 0);
135149
if (err) {
136150
errno = err;
137151
return -1;

utils/create_and_reread_file.cc

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
Copyright 2026 Leil Storage OÜ
3+
4+
This file is part of SaunaFS.
5+
6+
SaunaFS is free software: you can redistribute it and/or modify
7+
it under the terms of the GNU General Public License as published by
8+
the Free Software Foundation, version 3.
9+
10+
SaunaFS is distributed in the hope that it will be useful,
11+
but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
GNU General Public License for more details.
14+
15+
You should have received a copy of the GNU General Public License
16+
along with SaunaFS. If not, see <http://www.gnu.org/licenses/>.
17+
*/
18+
19+
#include "common/aligned_allocator.h"
20+
21+
#include "utils/data_generator.h"
22+
23+
#ifndef SFSCHUNKSIZE
24+
#define SFSCHUNKSIZE (SFSBLOCKSIZE * SFSBLOCKSINCHUNK)
25+
#endif
26+
27+
int main(int argc, char** argv) {
28+
if (argc != 3) {
29+
std::cerr << "Usage:\n"
30+
" "
31+
<< argv[0]
32+
<< " <file> <SLEEP_TIME>\n"
33+
"Creates a file with the specified name and fills it with generated data.\n"
34+
"Afterwards reads and reads again some chunks from it.\n"
35+
"Sleeps for the specified time in seconds and creates a notification file.\n"
36+
"At the end closes the file descriptor.\n";
37+
return 1;
38+
}
39+
40+
uint64_t fileSize = 30 * SFSCHUNKSIZE; // 1920 MiB (30 chunks of 64 MiB)
41+
DataGenerator generator(0);
42+
generator.createFile(argv[1], fileSize);
43+
uint32_t sleepTime = std::stoul(argv[2]);
44+
std::cerr << "File created successfully.\n";
45+
46+
int fd = open(argv[1], O_RDONLY | O_DIRECT);
47+
utils_passert(fd >= 0);
48+
49+
// Read the last two chunks
50+
std::vector<char, AlignedAllocator<char, SFSBLOCKSIZE>> buffer(SFSBLOCKSIZE);
51+
for (uint64_t offset = fileSize - 2 * SFSCHUNKSIZE; offset < fileSize; offset += SFSBLOCKSIZE) {
52+
ssize_t bytesRead = pread(fd, buffer.data(), buffer.size(), offset);
53+
utils_passert(bytesRead == (ssize_t)buffer.size());
54+
}
55+
std::cerr << "Last two chunks read successfully.\n";
56+
57+
// Read first three chunks
58+
for (uint64_t offset = 0; offset < 3 * SFSCHUNKSIZE; offset += SFSBLOCKSIZE) {
59+
ssize_t bytesRead = pread(fd, buffer.data(), buffer.size(), offset);
60+
utils_passert(bytesRead == (ssize_t)buffer.size());
61+
}
62+
63+
std::cerr << "First three and last two chunks read successfully.\n";
64+
// Sleep for a while to process some of the readahead requests
65+
usleep(600000);
66+
67+
// Read last chunk again
68+
for (uint64_t offset = fileSize - SFSCHUNKSIZE; offset < fileSize; offset += SFSBLOCKSIZE) {
69+
ssize_t bytesRead = pread(fd, buffer.data(), buffer.size(), offset);
70+
utils_passert(bytesRead == (ssize_t)buffer.size());
71+
}
72+
std::cerr << "Reads completed successfully.\n";
73+
74+
sleep(sleepTime);
75+
76+
auto notify_file_reread = open("notify_file_reread", O_CREAT | O_WRONLY, 0644);
77+
utils_passert(notify_file_reread >= 0);
78+
utils_zassert(close(notify_file_reread));
79+
80+
sleep(1);
81+
82+
utils_zassert(close(fd));
83+
std::cerr << "FD closed successfully.\n";
84+
unlink("notify_file_reread");
85+
86+
return 0;
87+
}

0 commit comments

Comments
 (0)