Skip to content

Commit 7ff6487

Browse files
authored
Global segment cache across DataFusion (#2958)
Share a single capped segment cache across DataFusion
1 parent cc546b3 commit 7ff6487

File tree

2 files changed

+100
-36
lines changed

2 files changed

+100
-36
lines changed
Lines changed: 91 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::sync::Arc;
22

3+
use async_trait::async_trait;
34
use chrono::{DateTime, Utc};
45
use datafusion_common::ScalarValue;
56
use moka::future::Cache;
@@ -8,77 +9,76 @@ use object_store::{ObjectMeta, ObjectStore};
89
use vortex_array::ArrayRegistry;
910
use vortex_array::aliases::DefaultHashBuilder;
1011
use vortex_array::stats::{Precision, Stat};
12+
use vortex_buffer::ByteBuffer;
1113
use vortex_dtype::DType;
1214
use vortex_error::{VortexError, VortexResult, vortex_err};
15+
use vortex_file::segments::SegmentCache;
1316
use vortex_file::{Footer, SegmentSpec, VortexFile, VortexOpenOptions};
1417
use vortex_layout::LayoutRegistry;
1518
use vortex_layout::segments::SegmentId;
1619
use vortex_metrics::VortexMetrics;
1720

1821
#[derive(Clone)]
1922
pub(crate) struct VortexFileCache {
20-
inner: Cache<Key, VortexFile, DefaultHashBuilder>,
23+
file_cache: Cache<FileKey, VortexFile, DefaultHashBuilder>,
24+
segment_cache: Cache<SegmentKey, ByteBuffer, DefaultHashBuilder>,
2125
array_registry: Arc<ArrayRegistry>,
2226
layout_registry: Arc<LayoutRegistry>,
2327
metrics: VortexMetrics,
2428
}
2529

26-
#[derive(Hash, Eq, PartialEq, Debug)]
27-
pub(crate) struct Key {
28-
location: Path,
30+
/// Cache key for a [`VortexFile`].
31+
#[derive(Hash, Eq, PartialEq, Debug, Clone)]
32+
struct FileKey {
33+
location: Arc<Path>,
2934
m_time: DateTime<Utc>,
3035
}
3136

32-
impl From<&ObjectMeta> for Key {
37+
impl From<&ObjectMeta> for FileKey {
3338
fn from(value: &ObjectMeta) -> Self {
3439
Self {
35-
location: value.location.clone(),
40+
location: Arc::new(value.location.clone()),
3641
m_time: value.last_modified,
3742
}
3843
}
3944
}
4045

41-
/// Approximate the in-memory size of a layout
42-
fn estimate_layout_size(footer: &Footer) -> usize {
43-
let segments_size = footer.segment_map().len() * size_of::<SegmentSpec>();
44-
let stats_size = footer
45-
.statistics()
46-
.iter()
47-
.map(|v| {
48-
v.iter()
49-
.map(|_| size_of::<Stat>() + size_of::<Precision<ScalarValue>>())
50-
.sum::<usize>()
51-
})
52-
.sum::<usize>();
53-
54-
let root_layout = footer.layout();
55-
let layout_size = size_of::<DType>()
56-
+ root_layout.metadata().map(|b| b.len()).unwrap_or_default()
57-
+ root_layout.nsegments() * size_of::<SegmentId>();
58-
59-
segments_size + stats_size + layout_size
46+
/// Global cache key for a segment.
47+
#[derive(Hash, Eq, PartialEq, Debug)]
48+
struct SegmentKey {
49+
file: FileKey,
50+
segment_id: SegmentId,
6051
}
6152

6253
impl VortexFileCache {
6354
pub fn new(
6455
size_mb: usize,
56+
segment_size_mb: usize,
6557
array_registry: Arc<ArrayRegistry>,
6658
layout_registry: Arc<LayoutRegistry>,
6759
metrics: VortexMetrics,
6860
) -> Self {
69-
let inner = Cache::builder()
61+
let file_cache = Cache::builder()
7062
.max_capacity(size_mb as u64 * (2 << 20))
71-
.eviction_listener(|k: Arc<Key>, _v: VortexFile, cause| {
72-
log::trace!("Removed {} due to {:?}", k.location, cause);
63+
.eviction_listener(|k: Arc<FileKey>, _v: VortexFile, cause| {
64+
log::trace!("Removed {:?} due to {:?}", k, cause);
7365
})
7466
.weigher(|_k, vxf| {
75-
let size = estimate_layout_size(vxf.footer());
76-
u32::try_from(size).unwrap_or(u32::MAX)
67+
u32::try_from(estimate_layout_size(vxf.footer())).unwrap_or(u32::MAX)
7768
})
7869
.build_with_hasher(DefaultHashBuilder::default());
7970

71+
let segment_cache = Cache::builder()
72+
.max_capacity(segment_size_mb as u64 * (2 << 20))
73+
.eviction_listener(|k: Arc<SegmentKey>, _v: ByteBuffer, cause| {
74+
log::trace!("Removed {:?} due to {:?}", k, cause);
75+
})
76+
.weigher(|_k, v| u32::try_from(v.len()).unwrap_or(u32::MAX))
77+
.build_with_hasher(DefaultHashBuilder::default());
78+
8079
Self {
81-
inner,
80+
file_cache,
81+
segment_cache,
8282
array_registry,
8383
layout_registry,
8484
metrics,
@@ -90,9 +90,10 @@ impl VortexFileCache {
9090
object: &ObjectMeta,
9191
object_store: Arc<dyn ObjectStore>,
9292
) -> VortexResult<VortexFile> {
93-
self.inner
93+
let file_key = FileKey::from(object);
94+
self.file_cache
9495
.try_get_with(
95-
Key::from(object),
96+
file_key.clone(),
9697
VortexOpenOptions::file()
9798
.with_array_registry(self.array_registry.clone())
9899
.with_layout_registry(self.layout_registry.clone())
@@ -101,6 +102,10 @@ impl VortexFileCache {
101102
.child_with_tags([("filename", object.location.to_string())]),
102103
)
103104
.with_file_size(object.size as u64)
105+
.with_segment_cache(Arc::new(VortexFileSegmentCache {
106+
file_key,
107+
segment_cache: self.segment_cache.clone(),
108+
}))
104109
.open_object_store(&object_store, object.location.as_ref()),
105110
)
106111
.await
@@ -109,3 +114,56 @@ impl VortexFileCache {
109114
})
110115
}
111116
}
117+
118+
/// A [`SegmentCache`] implementation that uses the shared global segment cache.
119+
struct VortexFileSegmentCache {
120+
file_key: FileKey,
121+
segment_cache: Cache<SegmentKey, ByteBuffer, DefaultHashBuilder>,
122+
}
123+
124+
#[async_trait]
125+
impl SegmentCache for VortexFileSegmentCache {
126+
async fn get(&self, segment_id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
127+
Ok(self
128+
.segment_cache
129+
.get(&SegmentKey {
130+
file: self.file_key.clone(),
131+
segment_id,
132+
})
133+
.await)
134+
}
135+
136+
async fn put(&self, segment_id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
137+
self.segment_cache
138+
.insert(
139+
SegmentKey {
140+
file: self.file_key.clone(),
141+
segment_id,
142+
},
143+
buffer,
144+
)
145+
.await;
146+
Ok(())
147+
}
148+
}
149+
150+
/// Approximate the in-memory size of a layout
151+
fn estimate_layout_size(footer: &Footer) -> usize {
152+
let segments_size = footer.segment_map().len() * size_of::<SegmentSpec>();
153+
let stats_size = footer
154+
.statistics()
155+
.iter()
156+
.map(|v| {
157+
v.iter()
158+
.map(|_| size_of::<Stat>() + size_of::<Precision<ScalarValue>>())
159+
.sum::<usize>()
160+
})
161+
.sum::<usize>();
162+
163+
let root_layout = footer.layout();
164+
let layout_size = size_of::<DType>()
165+
+ root_layout.metadata().map(|b| b.len()).unwrap_or_default()
166+
+ root_layout.nsegments() * size_of::<SegmentId>();
167+
168+
segments_size + stats_size + layout_size
169+
}

vortex-datafusion/src/persistent/format.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,17 @@ impl Debug for VortexFormat {
5757
#[derive(Debug)]
5858
pub struct VortexFormatOptions {
5959
/// The size of the in-memory [`vortex_file::Footer`] cache.
60-
pub cache_size_mb: usize,
60+
pub footer_cache_size_mb: usize,
61+
/// The size of the in-memory segment cache.
62+
pub segment_cache_size_mb: usize,
6163
}
6264

6365
impl Default for VortexFormatOptions {
6466
fn default() -> Self {
65-
Self { cache_size_mb: 256 }
67+
Self {
68+
footer_cache_size_mb: 64,
69+
segment_cache_size_mb: 1024,
70+
}
6671
}
6772
}
6873

@@ -141,7 +146,8 @@ impl VortexFormat {
141146
let opts = VortexFormatOptions::default();
142147
Self {
143148
file_cache: VortexFileCache::new(
144-
opts.cache_size_mb,
149+
opts.footer_cache_size_mb,
150+
opts.segment_cache_size_mb,
145151
array_registry,
146152
layout_registry,
147153
metrics.clone(),

0 commit comments

Comments
 (0)