Skip to content

Commit 933c30f

Browse files
committed
feat: update read ahead cache pre buffer strategy
1 parent 9f513f1 commit 933c30f

File tree

2 files changed

+41
-16
lines changed

2 files changed

+41
-16
lines changed

include/paimon/utils/read_ahead_cache.h

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,33 +42,53 @@ class PAIMON_EXPORT CacheConfig {
4242
public:
4343
CacheConfig();
4444
CacheConfig(uint64_t buffer_size_limit, uint64_t range_size_limit, uint64_t hole_size_limit,
45-
uint32_t pre_buffer_range_count);
45+
uint64_t pre_buffer_limit);
4646

4747
/// Returns the maximum total size (in bytes) of cached data.
4848
uint64_t GetBufferSizeLimit() const {
4949
return buffer_size_limit_;
5050
}
5151

52+
/// Sets the maximum total size (in bytes) of cached data.
53+
void SetBufferSizeLimit(uint64_t buffer_size_limit) {
54+
buffer_size_limit_ = buffer_size_limit;
55+
}
56+
5257
/// Returns the maximum allowed size (in bytes) for a single cached range.
5358
uint64_t GetRangeSizeLimit() const {
5459
return range_size_limit_;
5560
}
5661

62+
/// Sets the maximum allowed size (in bytes) for a single cached range.
63+
void SetRangeSizeLimit(uint64_t range_size_limit) {
64+
range_size_limit_ = range_size_limit;
65+
}
66+
5767
/// Returns the maximum gap size (in bytes) considered mergeable between adjacent ranges.
5868
uint64_t GetHoleSizeLimit() const {
5969
return hole_size_limit_;
6070
}
6171

62-
/// Returns the number of ranges to pre-buffer ahead of the current read position.
63-
uint32_t GetPreBufferRangeCount() const {
64-
return pre_buffer_range_count_;
72+
/// Sets the maximum gap size (in bytes) considered mergeable between adjacent ranges.
73+
void SetHoleSizeLimit(uint64_t hole_size_limit) {
74+
hole_size_limit_ = hole_size_limit;
75+
}
76+
77+
/// Returns the maximum size to pre-buffer ahead of the current read position.
78+
uint64_t GetPreBufferLimit() const {
79+
return pre_buffer_limit_;
80+
}
81+
82+
/// Sets the maximum size to pre-buffer ahead of the current read position.
83+
void SetPreBufferLimit(uint64_t pre_buffer_limit) {
84+
pre_buffer_limit_ = pre_buffer_limit;
6585
}
6686

6787
private:
6888
uint64_t buffer_size_limit_;
6989
uint64_t range_size_limit_;
7090
uint64_t hole_size_limit_;
71-
uint32_t pre_buffer_range_count_;
91+
uint64_t pre_buffer_limit_;
7292
};
7393

7494
/// A byte range with offset and length.

src/paimon/common/utils/read_ahead_cache.cpp

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,17 +46,17 @@ struct RangeCacheEntry {
4646
};
4747

4848
CacheConfig::CacheConfig(uint64_t buffer_size_limit, uint64_t range_size_limit,
49-
uint64_t hole_size_limit, uint32_t pre_buffer_range_count)
49+
uint64_t hole_size_limit, uint64_t pre_buffer_limit)
5050
: buffer_size_limit_(buffer_size_limit),
5151
range_size_limit_(range_size_limit),
5252
hole_size_limit_(hole_size_limit),
53-
pre_buffer_range_count_(pre_buffer_range_count) {}
53+
pre_buffer_limit_(pre_buffer_limit) {}
5454

5555
CacheConfig::CacheConfig()
5656
: CacheConfig(/*buffer_size_limit=*/512 * 1024 * 1024,
5757
/*range_size_limit=*/16 * 1024 * 1024,
5858
/*hole_size_limit=*/8 * 1024,
59-
/*pre_buffer_range_count=*/6) {}
59+
/*pre_buffer_limit=*/128 * 1024 * 1024) {}
6060

6161
class ReadAheadCache::Impl {
6262
public:
@@ -70,7 +70,7 @@ class ReadAheadCache::Impl {
7070

7171
private:
7272
std::vector<RangeCacheEntry> MakeCacheEntries(const std::vector<ByteRange>& ranges) const;
73-
void PreBuffer(uint64_t offset, size_t n_extra);
73+
void PreBuffer(uint64_t offset);
7474

7575
/// Cache the given ranges in the background.
7676
///
@@ -147,7 +147,7 @@ Status ReadAheadCache::Impl::Init(std::vector<ByteRange>&& ranges) {
147147
return Status::OK();
148148
}
149149

150-
void ReadAheadCache::Impl::PreBuffer(uint64_t offset, size_t n_extra) {
150+
void ReadAheadCache::Impl::PreBuffer(uint64_t offset) {
151151
auto it = std::lower_bound(pending_ranges_.begin(), pending_ranges_.end(), offset,
152152
[](const ByteRange& range, uint64_t offset) {
153153
return range.offset + range.length <= offset;
@@ -157,13 +157,18 @@ void ReadAheadCache::Impl::PreBuffer(uint64_t offset, size_t n_extra) {
157157
}
158158

159159
size_t start_idx = std::distance(pending_ranges_.begin(), it);
160-
size_t end_idx = std::min(pending_ranges_.size(), start_idx + 1 + n_extra);
161-
162160
std::vector<ByteRange> ranges;
163-
for (size_t i = start_idx; i < end_idx; ++i) {
164-
if (!is_cached_[i].exchange(true)) {
165-
ranges.emplace_back(pending_ranges_[i]);
161+
size_t total_bytes = 0;
162+
for (size_t i = start_idx; i < pending_ranges_.size(); ++i) {
163+
size_t range_size = pending_ranges_[i].length;
164+
total_bytes += range_size;
165+
if (total_bytes > config_.GetPreBufferLimit()) {
166+
break;
167+
}
168+
if (is_cached_[i].exchange(true)) {
169+
continue;
166170
}
171+
ranges.emplace_back(pending_ranges_[i]);
167172
}
168173

169174
if (!ranges.empty()) {
@@ -197,7 +202,7 @@ Result<ByteSlice> ReadAheadCache::Impl::Read(const ByteRange& range) {
197202
if (range.length == 0) {
198203
return ByteSlice{std::make_shared<Bytes>(0, memory_pool_.get()), 0, 0};
199204
}
200-
PreBuffer(range.offset, config_.GetPreBufferRangeCount());
205+
PreBuffer(range.offset);
201206
ByteSlice result{};
202207
{
203208
std::shared_lock<std::shared_mutex> lock(rw_mutex_);

0 commit comments

Comments
 (0)