Skip to content

Commit 0ef8c03

Browse files
authored
Segment caching (#2934)
Initial read segments are now explicitly cached separately from the segment cache, and we default to _always_ store segments in the segment cache as they are read.
1 parent 4c469fb commit 0ef8c03

File tree

6 files changed

+161
-90
lines changed

6 files changed

+161
-90
lines changed

pyvortex/src/file.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use vortex::dtype::{DType, PType};
1313
use vortex::error::{VortexExpect, vortex_err};
1414
use vortex::expr::{ExprRef, ident, select};
1515
use vortex::file::scan::SplitBy;
16+
use vortex::file::segments::MokaSegmentCache;
1617
use vortex::file::{VortexFile, VortexOpenOptions};
1718
use vortex::io::TokioFile;
1819
use vortex::stream::{ArrayStream, ArrayStreamAdapter, ArrayStreamExt};
@@ -38,7 +39,11 @@ pub(crate) fn init(py: Python, parent: &Bound<PyModule>) -> PyResult<()> {
3839

3940
#[pyfunction]
4041
pub fn open(path: &str) -> PyResult<PyVortexFile> {
41-
let vxf = TOKIO_RUNTIME.block_on(VortexOpenOptions::file().open(TokioFile::open(path)?))?;
42+
let vxf = TOKIO_RUNTIME.block_on(
43+
VortexOpenOptions::file()
44+
.with_segment_cache(Arc::new(MokaSegmentCache::new(256 << 20)))
45+
.open(TokioFile::open(path)?),
46+
)?;
4247
Ok(PyVortexFile { vxf })
4348
}
4449

vortex-file/src/generic.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
use std::sync::{Arc, Mutex};
22

33
use futures::{StreamExt, pin_mut};
4-
use moka::future::CacheBuilder;
54
use vortex_error::{VortexExpect, VortexResult};
65
use vortex_io::{Dispatch, InstrumentedReadAt, IoDispatcher, VortexReadAt};
76
use vortex_layout::segments::{SegmentEvents, SegmentSource};
87
use vortex_metrics::VortexMetrics;
98

109
use crate::driver::CoalescedDriver;
11-
use crate::segments::{CachedSegmentSource, InMemorySegmentCache, SegmentCache};
10+
use crate::segments::{
11+
InitialReadSegmentCache, MokaSegmentCache, SegmentCache, SegmentCacheMetrics,
12+
SegmentCacheSourceAdapter,
13+
};
1214
use crate::{FileType, SegmentSourceFactory, SegmentSpec, VortexFile, VortexOpenOptions};
1315

1416
/// A type of Vortex file that supports any [`VortexReadAt`] implementation.
@@ -27,10 +29,9 @@ impl VortexOpenOptions<GenericVortexFile> {
2729
/// Open a file using the provided [`VortexReadAt`] implementation.
2830
pub fn file() -> Self {
2931
Self::new(Default::default())
30-
.with_segment_cache(Arc::new(InMemorySegmentCache::new(
31-
// For now, use a fixed 1GB overhead.
32-
CacheBuilder::new(1 << 30),
33-
)))
32+
// Start with an initial in-memory cache of 256MB.
33+
// TODO(ngates): would it be better to default to a home directory disk cache?
34+
.with_segment_cache(Arc::new(MokaSegmentCache::new(256 << 20)))
3435
.with_initial_read_size(Self::INITIAL_READ_SIZE)
3536
}
3637

@@ -42,10 +43,18 @@ impl VortexOpenOptions<GenericVortexFile> {
4243
pub async fn open<R: VortexReadAt + Send>(self, read: R) -> VortexResult<VortexFile> {
4344
let footer = self.read_footer(&read).await?;
4445

46+
let segment_cache = Arc::new(SegmentCacheMetrics::new(
47+
InitialReadSegmentCache {
48+
initial: self.initial_read_segments,
49+
fallback: self.segment_cache,
50+
},
51+
self.metrics.clone(),
52+
));
53+
4554
let segment_source_factory = Arc::new(GenericVortexFileIo {
4655
read: Mutex::new(read),
4756
segment_map: footer.segment_map().clone(),
48-
segment_cache: self.segment_cache,
57+
segment_cache,
4958
options: self.options,
5059
});
5160

@@ -70,7 +79,7 @@ impl<R: VortexReadAt + Send> SegmentSourceFactory for GenericVortexFileIo<R> {
7079
let (segment_source, events) = SegmentEvents::create();
7180

7281
// Wrap the source to resolve segments from the initial read cache.
73-
let segment_source = Arc::new(CachedSegmentSource::new(
82+
let segment_source = Arc::new(SegmentCacheSourceAdapter::new(
7483
self.segment_cache.clone(),
7584
segment_source,
7685
));

vortex-file/src/open.rs

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
use std::sync::Arc;
1+
use std::sync::{Arc, RwLock};
22

33
use flatbuffers::root;
4-
use futures::stream::FuturesUnordered;
5-
use futures::{StreamExt, TryStreamExt, stream};
64
use vortex_array::ArrayRegistry;
5+
use vortex_array::aliases::hash_map::HashMap;
76
use vortex_buffer::{Alignment, ByteBuffer, ByteBufferMut};
87
use vortex_dtype::DType;
98
use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_err};
@@ -18,7 +17,7 @@ use crate::segments::{NoOpSegmentCache, SegmentCache};
1817
use crate::{DEFAULT_REGISTRY, EOF_SIZE, MAGIC_BYTES, MAX_FOOTER_SIZE, VERSION};
1918

2019
pub trait FileType: Sized {
21-
type Options: Clone;
20+
type Options;
2221
}
2322

2423
/// Open options for a Vortex file reader.
@@ -36,9 +35,9 @@ pub struct VortexOpenOptions<F: FileType> {
3635
/// An optional, externally provided, file layout.
3736
// TODO(ngates): add an optional DType so we only read the layout segment.
3837
footer: Option<Footer>,
39-
/// TODO(ngates): rename to initial_segments: HashMap<SegmentId, ByteBuffer>
4038
pub(crate) segment_cache: Arc<dyn SegmentCache>,
4139
pub(crate) initial_read_size: u64,
40+
pub(crate) initial_read_segments: RwLock<HashMap<SegmentId, ByteBuffer>>,
4241
pub(crate) metrics: VortexMetrics,
4342
}
4443

@@ -53,6 +52,7 @@ impl<F: FileType> VortexOpenOptions<F> {
5352
footer: None,
5453
segment_cache: Arc::new(NoOpSegmentCache),
5554
initial_read_size: 0,
55+
initial_read_segments: Default::default(),
5656
metrics: VortexMetrics::default(),
5757
}
5858
}
@@ -201,8 +201,7 @@ impl<F: FileType> VortexOpenOptions<F> {
201201

202202
// If the initial read happened to cover any segments, then we can populate the
203203
// segment cache
204-
self.populate_segments(initial_offset, &initial_read, &footer)
205-
.await?;
204+
self.populate_initial_segments(initial_offset, &initial_read, &footer);
206205

207206
Ok(footer)
208207
}
@@ -265,30 +264,31 @@ impl<F: FileType> VortexOpenOptions<F> {
265264
}
266265

267266
/// Populate segments in the cache that were covered by the initial read.
268-
async fn populate_segments(
267+
fn populate_initial_segments(
269268
&self,
270269
initial_offset: u64,
271270
initial_read: &ByteBuffer,
272271
footer: &Footer,
273-
) -> VortexResult<()> {
274-
stream::iter(
275-
footer
276-
.segment_map()
277-
.iter()
278-
.enumerate()
279-
.filter(|(_, segment)| segment.offset > initial_offset)
280-
.map(|(idx, segment)| async move {
281-
let segment_id = SegmentId::from(u32::try_from(idx)?);
282-
let offset = usize::try_from(segment.offset - initial_offset)?;
283-
let buffer = initial_read
284-
.slice(offset..offset + (segment.length as usize))
285-
.aligned(segment.alignment);
286-
self.segment_cache.put(segment_id, buffer).await
287-
}),
288-
)
289-
.collect::<FuturesUnordered<_>>()
290-
.await
291-
.try_collect::<()>()
292-
.await
272+
) {
273+
let first_idx = footer
274+
.segment_map()
275+
.partition_point(|segment| segment.offset < initial_offset);
276+
277+
let mut initial_segments = self
278+
.initial_read_segments
279+
.write()
280+
.vortex_expect("poisoned lock");
281+
282+
for idx in first_idx..footer.segment_map().len() {
283+
let segment = &footer.segment_map()[idx];
284+
let segment_id =
285+
SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
286+
let offset =
287+
usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
288+
let buffer = initial_read
289+
.slice(offset..offset + (segment.length as usize))
290+
.aligned(segment.alignment);
291+
initial_segments.insert(segment_id, buffer);
292+
}
293293
}
294294
}

vortex-file/src/segments/cache.rs

Lines changed: 110 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
1+
use std::sync::{Arc, RwLock};
2+
13
use async_trait::async_trait;
4+
use futures::FutureExt;
25
use moka::future::{Cache, CacheBuilder};
36
use moka::policy::EvictionPolicy;
47
use rustc_hash::FxBuildHasher;
8+
use vortex_array::aliases::hash_map::HashMap;
59
use vortex_buffer::ByteBuffer;
610
use vortex_error::{VortexExpect, VortexResult};
7-
use vortex_layout::segments::SegmentId;
11+
use vortex_layout::segments::{SegmentFuture, SegmentId, SegmentSource};
12+
use vortex_metrics::{Counter, VortexMetrics};
813

914
/// A cache for storing and retrieving individual segment data.
1015
#[async_trait]
1116
pub trait SegmentCache: Send + Sync {
1217
async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>>;
1318
async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()>;
14-
async fn remove(&self, id: SegmentId) -> VortexResult<()>;
1519
}
1620

1721
pub(crate) struct NoOpSegmentCache;
@@ -25,32 +29,31 @@ impl SegmentCache for NoOpSegmentCache {
2529
async fn put(&self, _id: SegmentId, _buffer: ByteBuffer) -> VortexResult<()> {
2630
Ok(())
2731
}
28-
29-
async fn remove(&self, _id: SegmentId) -> VortexResult<()> {
30-
Ok(())
31-
}
3232
}
3333

34-
pub(crate) struct InMemorySegmentCache(Cache<SegmentId, ByteBuffer, FxBuildHasher>);
34+
/// A [`SegmentCache`] based around an in-memory Moka cache.
35+
pub struct MokaSegmentCache(Cache<SegmentId, ByteBuffer, FxBuildHasher>);
3536

36-
impl InMemorySegmentCache {
37-
pub fn new(builder: CacheBuilder<SegmentId, ByteBuffer, Cache<SegmentId, ByteBuffer>>) -> Self {
37+
impl MokaSegmentCache {
38+
pub fn new(max_capacity_bytes: u64) -> Self {
3839
Self(
39-
builder
40+
CacheBuilder::new(max_capacity_bytes)
41+
.name("vortex-segment-cache")
4042
// Weight each segment by the number of bytes in the buffer.
41-
.weigher(|_, buffer| {
43+
.weigher(|_, buffer: &ByteBuffer| {
4244
u32::try_from(buffer.len().min(u32::MAX as usize)).vortex_expect("must fit")
4345
})
44-
// We configure LRU instead of LFU since we're likely to re-read segments between
45-
// filter and projection.
46-
.eviction_policy(EvictionPolicy::lru())
46+
// We configure LFU (vs LRU) since the cache is mostly used when re-reading the
47+
// same file - it is _not_ used when reading the same segments during a single
48+
// scan.
49+
.eviction_policy(EvictionPolicy::tiny_lfu())
4750
.build_with_hasher(FxBuildHasher),
4851
)
4952
}
5053
}
5154

5255
#[async_trait]
53-
impl SegmentCache for InMemorySegmentCache {
56+
impl SegmentCache for MokaSegmentCache {
5457
async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
5558
Ok(self.0.get(&id).await)
5659
}
@@ -59,9 +62,99 @@ impl SegmentCache for InMemorySegmentCache {
5962
self.0.insert(id, buffer).await;
6063
Ok(())
6164
}
65+
}
66+
67+
/// Segment cache containing the initial read segments.
68+
pub(crate) struct InitialReadSegmentCache {
69+
pub(crate) initial: RwLock<HashMap<SegmentId, ByteBuffer>>,
70+
pub(crate) fallback: Arc<dyn SegmentCache>,
71+
}
72+
73+
#[async_trait]
74+
impl SegmentCache for InitialReadSegmentCache {
75+
async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
76+
if let Some(buffer) = self.initial.read().vortex_expect("poisoned lock").get(&id) {
77+
return Ok(Some(buffer.clone()));
78+
}
79+
self.fallback.get(id).await
80+
}
81+
82+
async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
83+
self.fallback.put(id, buffer).await
84+
}
85+
}
86+
87+
pub struct SegmentCacheMetrics<C> {
88+
segment_cache: C,
89+
90+
hits: Arc<Counter>,
91+
misses: Arc<Counter>,
92+
stores: Arc<Counter>,
93+
}
6294

63-
async fn remove(&self, id: SegmentId) -> VortexResult<()> {
64-
self.0.invalidate(&id).await;
95+
impl<C: SegmentCache> SegmentCacheMetrics<C> {
96+
pub fn new(segment_cache: C, metrics: VortexMetrics) -> Self {
97+
Self {
98+
segment_cache,
99+
hits: metrics.counter("vortex.file.segments.cache.hits"),
100+
misses: metrics.counter("vortex.file.segments.cache.misses"),
101+
stores: metrics.counter("vortex.file.segments.cache.stores"),
102+
}
103+
}
104+
}
105+
106+
#[async_trait]
107+
impl<C: SegmentCache> SegmentCache for SegmentCacheMetrics<C> {
108+
async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
109+
let result = self.segment_cache.get(id).await?;
110+
if result.is_some() {
111+
self.hits.inc()
112+
} else {
113+
self.misses.inc()
114+
}
115+
Ok(result)
116+
}
117+
118+
async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
119+
self.segment_cache.put(id, buffer).await?;
120+
self.stores.inc();
65121
Ok(())
66122
}
67123
}
124+
125+
pub struct SegmentCacheSourceAdapter {
126+
cache: Arc<dyn SegmentCache>,
127+
source: Arc<dyn SegmentSource>,
128+
}
129+
130+
impl SegmentCacheSourceAdapter {
131+
pub fn new(cache: Arc<dyn SegmentCache>, source: Arc<dyn SegmentSource>) -> Self {
132+
Self { cache, source }
133+
}
134+
}
135+
136+
impl SegmentSource for SegmentCacheSourceAdapter {
137+
fn request(&self, id: SegmentId, for_whom: &Arc<str>) -> SegmentFuture {
138+
let cache = self.cache.clone();
139+
let delegate = self.source.request(id, for_whom);
140+
let for_whom = for_whom.clone();
141+
142+
async move {
143+
if let Ok(Some(segment)) = cache.get(id).await {
144+
log::debug!("Resolved segment {} for {} from cache", id, &for_whom);
145+
return Ok(segment);
146+
}
147+
let result = delegate.await?;
148+
if let Err(e) = cache.put(id, result.clone()).await {
149+
log::warn!(
150+
"Failed to store segment {} for {} in cache: {}",
151+
id,
152+
&for_whom,
153+
e
154+
);
155+
}
156+
Ok(result)
157+
}
158+
.boxed()
159+
}
160+
}

vortex-file/src/segments/cached.rs

Lines changed: 0 additions & 34 deletions
This file was deleted.

vortex-file/src/segments/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
mod cache;
2-
mod cached;
32
pub(crate) mod writer;
43

54
pub use cache::*;
6-
pub use cached::*;

0 commit comments

Comments
 (0)