Skip to content

Commit 1a86422

Browse files
Merge pull request ClickHouse#87979 from ClickHouse/backport/25.8/87914
Backport ClickHouse#87914 to 25.8: fs cache: reuse cache priority iterator among threads concurrently reserving space in cache
2 parents 6b05244 + 89c1809 commit 1a86422

12 files changed

+248
-19
lines changed

src/Common/CurrentMetrics.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@
299299
M(FilesystemCacheDelayedCleanupElements, "Filesystem cache elements in background cleanup queue") \
300300
M(FilesystemCacheHoldFileSegments, "Filesystem cache file segment which are currently hold as unreleasable") \
301301
M(FilesystemCacheKeys, "Number of keys in filesystem cache") \
302+
M(FilesystemCacheReserveThreads, "Threads number trying to reserve space in cache") \
302303
M(AsyncInsertCacheSize, "Number of async insert hash id in cache") \
303304
M(IcebergMetadataFilesCacheBytes, "Size of the iceberg metadata cache in bytes") \
304305
M(IcebergMetadataFilesCacheFiles, "Number of cached files in the iceberg metadata cache") \

src/Common/ProfileEvents.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -668,13 +668,17 @@ The server successfully detected this situation and will download merged part fr
668668
M(FilesystemCacheEvictionSkippedFileSegments, "Number of file segments skipped for eviction because of being in unreleasable state", ValueType::Number) \
669669
M(FilesystemCacheEvictionSkippedEvictingFileSegments, "Number of file segments skipped for eviction because of being in evicting state", ValueType::Number) \
670670
M(FilesystemCacheEvictionTries, "Number of filesystem cache eviction attempts", ValueType::Number) \
671+
M(FilesystemCacheEvictionReusedIterator, "Number of filesystem cache iterator reusing", ValueType::Number) \
671672
M(FilesystemCacheLockKeyMicroseconds, "Lock cache key time", ValueType::Microseconds) \
672673
M(FilesystemCacheLockMetadataMicroseconds, "Lock filesystem cache metadata time", ValueType::Microseconds) \
673674
M(FilesystemCacheLockCacheMicroseconds, "Lock filesystem cache time", ValueType::Microseconds) \
674675
M(FilesystemCacheReserveMicroseconds, "Filesystem cache space reservation time", ValueType::Microseconds) \
676+
M(FilesystemCacheReserveAttempts, "Filesystem cache space reservation attempt", ValueType::Number) \
675677
M(FilesystemCacheEvictMicroseconds, "Filesystem cache eviction time", ValueType::Microseconds) \
676678
M(FilesystemCacheGetOrSetMicroseconds, "Filesystem cache getOrSet() time", ValueType::Microseconds) \
677679
M(FilesystemCacheGetMicroseconds, "Filesystem cache get() time", ValueType::Microseconds) \
680+
M(FilesystemCacheBackgroundEvictedFileSegments, "Number of file segments evicted by background thread", ValueType::Number) \
681+
M(FilesystemCacheBackgroundEvictedBytes, "Number of bytes evicted by background thread", ValueType::Number) \
678682
M(FileSegmentWaitMicroseconds, "Wait on DOWNLOADING state", ValueType::Microseconds) \
679683
M(FileSegmentCompleteMicroseconds, "Duration of FileSegment::complete() in filesystem cache", ValueType::Microseconds) \
680684
M(FileSegmentLockMicroseconds, "Lock file segment time", ValueType::Microseconds) \

src/Interpreters/Cache/EvictionCandidates.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ void EvictionCandidates::add(
8686
it->second.candidates.push_back(candidate);
8787
candidate->setEvictingFlag(locked_key, lock);
8888
++candidates_size;
89+
candidates_bytes += candidate->size();
8990
}
9091

9192
void EvictionCandidates::removeQueueEntries(const CachePriorityGuard::Lock & lock)

src/Interpreters/Cache/EvictionCandidates.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class EvictionCandidates : private boost::noncopyable
5858
FailedCandidates getFailedCandidates() const { return failed_candidates; }
5959

6060
size_t size() const { return candidates_size; }
61+
size_t bytes() const { return candidates_bytes; }
6162

6263
auto begin() const { return candidates.begin(); }
6364

@@ -73,6 +74,7 @@ class EvictionCandidates : private boost::noncopyable
7374

7475
std::unordered_map<FileCacheKey, KeyCandidates> candidates;
7576
size_t candidates_size = 0;
77+
size_t candidates_bytes = 0;
7678
FailedCandidates failed_candidates;
7779

7880
std::vector<FinalizeEvictionFunc> on_finalize;

src/Interpreters/Cache/FileCache.cpp

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,21 @@ namespace ProfileEvents
3131
extern const Event FilesystemCacheLoadMetadataMicroseconds;
3232
extern const Event FilesystemCacheLockCacheMicroseconds;
3333
extern const Event FilesystemCacheReserveMicroseconds;
34+
extern const Event FilesystemCacheReserveAttempts;
3435
extern const Event FilesystemCacheGetOrSetMicroseconds;
3536
extern const Event FilesystemCacheGetMicroseconds;
3637
extern const Event FilesystemCacheFailToReserveSpaceBecauseOfLockContention;
3738
extern const Event FilesystemCacheFreeSpaceKeepingThreadRun;
3839
extern const Event FilesystemCacheFreeSpaceKeepingThreadWorkMilliseconds;
3940
extern const Event FilesystemCacheFailToReserveSpaceBecauseOfCacheResize;
41+
extern const Event FilesystemCacheBackgroundEvictedFileSegments;
42+
extern const Event FilesystemCacheBackgroundEvictedBytes;
4043
}
4144

4245
namespace CurrentMetrics
4346
{
4447
extern const Metric FilesystemCacheDownloadQueueElements;
48+
extern const Metric FilesystemCacheReserveThreads;
4549
}
4650

4751
namespace DB
@@ -941,7 +945,9 @@ bool FileCache::tryReserve(
941945
size_t lock_wait_timeout_milliseconds,
942946
std::string & failure_reason)
943947
{
948+
CurrentMetrics::Increment increment(CurrentMetrics::FilesystemCacheReserveThreads);
944949
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheReserveMicroseconds);
950+
ProfileEvents::increment(ProfileEvents::FilesystemCacheReserveAttempts);
945951

946952
assertInitialized();
947953

@@ -956,6 +962,11 @@ bool FileCache::tryReserve(
956962
return false;
957963
}
958964

965+
cache_reserve_active_threads.fetch_add(1, std::memory_order_relaxed);
966+
SCOPE_EXIT({
967+
cache_reserve_active_threads.fetch_sub(1, std::memory_order_relaxed);
968+
});
969+
959970
auto cache_lock = tryLockCache(std::chrono::milliseconds(lock_wait_timeout_milliseconds));
960971
if (!cache_lock)
961972
{
@@ -1021,7 +1032,14 @@ bool FileCache::tryReserve(
10211032
if (query_priority)
10221033
{
10231034
if (!query_priority->collectCandidatesForEviction(
1024-
size, required_elements_num, reserve_stat, eviction_candidates, {}, user.user_id, cache_lock))
1035+
size,
1036+
required_elements_num,
1037+
reserve_stat,
1038+
eviction_candidates,
1039+
{},
1040+
/* continue_from_last_eviction_pos */false,
1041+
user.user_id,
1042+
cache_lock))
10251043
{
10261044
const auto & stat = reserve_stat.total_stat;
10271045
failure_reason = fmt::format(
@@ -1041,8 +1059,19 @@ bool FileCache::tryReserve(
10411059
reserve_stat.stat_by_kind.clear();
10421060
}
10431061

1062+
bool continue_from_last_eviction_pos = cache_reserve_active_threads.load(std::memory_order_relaxed) > 1;
1063+
if (!continue_from_last_eviction_pos)
1064+
main_priority->resetEvictionPos(cache_lock);
1065+
10441066
if (!main_priority->collectCandidatesForEviction(
1045-
size, required_elements_num, reserve_stat, eviction_candidates, queue_iterator, user.user_id, cache_lock))
1067+
size,
1068+
required_elements_num,
1069+
reserve_stat,
1070+
eviction_candidates,
1071+
queue_iterator,
1072+
continue_from_last_eviction_pos,
1073+
user.user_id,
1074+
cache_lock))
10461075
{
10471076
const auto & stat = reserve_stat.total_stat;
10481077
failure_reason = fmt::format(
@@ -1130,6 +1159,11 @@ bool FileCache::tryReserve(
11301159
if (main_priority->getSize(cache_lock) > (1ull << 63))
11311160
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug");
11321161

1162+
if (cache_reserve_active_threads.load(std::memory_order_relaxed) == 1)
1163+
{
1164+
main_priority->resetEvictionPos(cache_lock);
1165+
}
1166+
11331167
cache_lock.unlock();
11341168
if (!file_segment.getKeyMetadata()->createBaseDirectory())
11351169
{
@@ -1187,7 +1221,12 @@ void FileCache::freeSpaceRatioKeepingThreadFunc()
11871221
/// (we use batches to make sure we do not block cache for too long,
11881222
/// by default the batch size is quite small).
11891223
desired_size_status = main_priority->collectCandidatesForEviction(
1190-
desired_size, desired_elements_num, keep_up_free_space_remove_batch, stat, eviction_candidates, lock);
1224+
desired_size,
1225+
desired_elements_num,
1226+
keep_up_free_space_remove_batch,
1227+
stat,
1228+
eviction_candidates,
1229+
lock);
11911230

11921231
#ifdef DEBUG_OR_SANITIZER_BUILD
11931232
/// Let's make sure that we correctly processed the limits.
@@ -1229,6 +1268,9 @@ void FileCache::freeSpaceRatioKeepingThreadFunc()
12291268
/// e.g. to update the in-memory state.
12301269
lock.lock();
12311270
eviction_candidates.finalize(nullptr, lock);
1271+
1272+
ProfileEvents::increment(ProfileEvents::FilesystemCacheBackgroundEvictedFileSegments, eviction_candidates.size());
1273+
ProfileEvents::increment(ProfileEvents::FilesystemCacheBackgroundEvictedBytes, eviction_candidates.bytes());
12321274
}
12331275
}
12341276
catch (...)

src/Interpreters/Cache/FileCache.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,8 @@ class FileCache : private boost::noncopyable
238238
std::atomic<bool> shutdown = false;
239239
std::atomic<bool> cache_is_being_resized = false;
240240

241+
std::atomic<size_t> cache_reserve_active_threads = 0;
242+
241243
std::mutex apply_settings_mutex;
242244

243245
CacheMetadata metadata;

src/Interpreters/Cache/IFileCachePriority.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,12 @@ class IFileCachePriority : private boost::noncopyable
159159
FileCacheReserveStat & stat,
160160
EvictionCandidates & res,
161161
IteratorPtr reservee,
162+
bool continue_from_last_eviction_pos,
162163
const UserID & user_id,
163164
const CachePriorityGuard::Lock &) = 0;
164165

166+
virtual void resetEvictionPos(const CachePriorityGuard::Lock & lock) = 0;
167+
165168
/// Collect eviction candidates sufficient to have `desired_size`
166169
/// and `desired_elements_num` as current cache state.
167170
/// Collect no more than `max_candidates_to_evict` elements.

src/Interpreters/Cache/LRUFileCachePriority.cpp

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ namespace ProfileEvents
1818
extern const Event FilesystemCacheEvictionSkippedFileSegments;
1919
extern const Event FilesystemCacheEvictionTries;
2020
extern const Event FilesystemCacheEvictionSkippedEvictingFileSegments;
21+
extern const Event FilesystemCacheEvictionReusedIterator;
2122
}
2223

2324
namespace DB
@@ -36,6 +37,7 @@ LRUFileCachePriority::LRUFileCachePriority(
3637
: IFileCachePriority(max_size_, max_elements_)
3738
, description(description_)
3839
, log(getLogger("LRUFileCachePriority" + (description.empty() ? "" : "(" + description + ")")))
40+
, eviction_pos(queue.end())
3941
{
4042
if (state_)
4143
state = state_;
@@ -100,6 +102,13 @@ LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(EntryPtr entry, cons
100102
return LRUIterator(this, iterator);
101103
}
102104

105+
void LRUFileCachePriority::increasePriority(LRUQueue::iterator it, const CachePriorityGuard::Lock &)
106+
{
107+
if (eviction_pos == it)
108+
eviction_pos = std::next(it);
109+
queue.splice(queue.end(), queue, it);
110+
}
111+
103112
LRUFileCachePriority::LRUQueue::iterator
104113
LRUFileCachePriority::remove(LRUQueue::iterator it, const CachePriorityGuard::Lock &)
105114
{
@@ -115,6 +124,8 @@ LRUFileCachePriority::remove(LRUQueue::iterator it, const CachePriorityGuard::Lo
115124
log, "Removed entry from LRU queue, key: {}, offset: {}, size: {}",
116125
entry.key, entry.offset, entry.size.load());
117126

127+
if (eviction_pos == it)
128+
eviction_pos = std::next(it);
118129
return queue.erase(it);
119130
}
120131

@@ -166,8 +177,20 @@ bool LRUFileCachePriority::LRUIterator::operator ==(const LRUIterator & other) c
166177

167178
void LRUFileCachePriority::iterate(IterateFunc func, const CachePriorityGuard::Lock & lock)
168179
{
169-
for (auto it = queue.begin(); it != queue.end();)
180+
iterateImpl(queue.begin(), func, lock);
181+
}
182+
183+
LRUFileCachePriority::LRUQueue::iterator
184+
LRUFileCachePriority::iterateImpl(LRUQueue::iterator start_pos, IterateFunc func, const CachePriorityGuard::Lock & lock)
185+
{
186+
const size_t max_elements_to_iterate = queue.size();
187+
auto it = start_pos;
188+
189+
for (size_t iterated_elements = 0; iterated_elements < max_elements_to_iterate; ++iterated_elements)
170190
{
191+
if (it == queue.end())
192+
it = queue.begin();
193+
171194
const auto & entry = **it;
172195

173196
if (entry.size == 0)
@@ -223,7 +246,7 @@ void LRUFileCachePriority::iterate(IterateFunc func, const CachePriorityGuard::L
223246
{
224247
case IterationResult::BREAK:
225248
{
226-
return;
249+
return it;
227250
}
228251
case IterationResult::CONTINUE:
229252
{
@@ -237,6 +260,7 @@ void LRUFileCachePriority::iterate(IterateFunc func, const CachePriorityGuard::L
237260
}
238261
}
239262
}
263+
return queue.end();
240264
}
241265

242266
bool LRUFileCachePriority::canFit( /// NOLINT
@@ -270,6 +294,7 @@ bool LRUFileCachePriority::collectCandidatesForEviction(
270294
FileCacheReserveStat & stat,
271295
EvictionCandidates & res,
272296
IFileCachePriority::IteratorPtr /* reservee */,
297+
bool continue_from_last_eviction_pos,
273298
const UserID &,
274299
const CachePriorityGuard::Lock & lock)
275300
{
@@ -283,7 +308,7 @@ bool LRUFileCachePriority::collectCandidatesForEviction(
283308
return canFit(size, elements, stat.total_stat.releasable_size, stat.total_stat.releasable_count, lock);
284309
};
285310

286-
iterateForEviction(res, stat, can_fit, lock);
311+
iterateForEviction(res, stat, can_fit, continue_from_last_eviction_pos, lock);
287312

288313
if (can_fit())
289314
{
@@ -349,7 +374,7 @@ IFileCachePriority::CollectStatus LRUFileCachePriority::collectCandidatesForEvic
349374
}
350375
return false;
351376
};
352-
iterateForEviction(res, stat, stop_condition, lock);
377+
iterateForEviction(res, stat, stop_condition, false, lock);
353378
chassert(status != CollectStatus::SUCCESS || stop_condition());
354379
return status;
355380
}
@@ -358,14 +383,15 @@ void LRUFileCachePriority::iterateForEviction(
358383
EvictionCandidates & res,
359384
FileCacheReserveStat & stat,
360385
StopConditionFunc stop_condition,
386+
bool continue_from_last_eviction_pos,
361387
const CachePriorityGuard::Lock & lock)
362388
{
363389
if (stop_condition())
364390
return;
365391

366392
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictionTries);
367393

368-
IterateFunc iterate_func = [&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
394+
auto iterate_func = [&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
369395
{
370396
const auto & file_segment = segment_metadata->file_segment;
371397
chassert(file_segment->assertCorrectness());
@@ -380,14 +406,27 @@ void LRUFileCachePriority::iterateForEviction(
380406
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictionSkippedFileSegments);
381407
stat.update(segment_metadata->size(), file_segment->getKind(), false);
382408
}
383-
384-
return IterationResult::CONTINUE;
385409
};
386410

387-
iterate([&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
411+
auto start_pos = queue.begin();
412+
if (continue_from_last_eviction_pos && eviction_pos != queue.end() && start_pos != eviction_pos)
388413
{
389-
return stop_condition() ? IterationResult::BREAK : iterate_func(locked_key, segment_metadata);
414+
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictionReusedIterator);
415+
start_pos = eviction_pos;
416+
}
417+
418+
auto iteration_pos = iterateImpl(
419+
start_pos,
420+
[&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
421+
{
422+
if (stop_condition())
423+
return IterationResult::BREAK;
424+
iterate_func(locked_key, segment_metadata);
425+
return stop_condition() ? IterationResult::BREAK : IterationResult::CONTINUE;
390426
}, lock);
427+
428+
if (continue_from_last_eviction_pos)
429+
eviction_pos = iteration_pos;
391430
}
392431

393432
LRUFileCachePriority::LRUIterator LRUFileCachePriority::move(
@@ -415,6 +454,8 @@ LRUFileCachePriority::LRUIterator LRUFileCachePriority::move(
415454
}
416455
#endif
417456

457+
if (other.eviction_pos == it.iterator)
458+
other.eviction_pos = std::next(it.iterator);
418459
queue.splice(queue.end(), other.queue, it.iterator);
419460

420461
updateSize(entry.size);
@@ -534,7 +575,7 @@ void LRUFileCachePriority::LRUIterator::decrementSize(size_t size)
534575
size_t LRUFileCachePriority::LRUIterator::increasePriority(const CachePriorityGuard::Lock & lock)
535576
{
536577
assertValid();
537-
cache_priority->queue.splice(cache_priority->queue.end(), cache_priority->queue, iterator);
578+
cache_priority->increasePriority(iterator, lock);
538579
cache_priority->check(lock);
539580
return ++((*iterator)->hits);
540581
}
@@ -547,6 +588,7 @@ void LRUFileCachePriority::LRUIterator::assertValid() const
547588

548589
void LRUFileCachePriority::shuffle(const CachePriorityGuard::Lock &)
549590
{
591+
chassert(eviction_pos == queue.end());
550592
std::vector<LRUQueue::iterator> its;
551593
its.reserve(queue.size());
552594
for (auto it = queue.begin(); it != queue.end(); ++it)

0 commit comments

Comments
 (0)