Skip to content

Commit a9e6d4b

Browse files
nuno-fariaalamb
andauthored
feat: Limit the memory used in the file metadata cache (#17031)
* feat: Limit the memory used in the file metadata cache * Implement custom LRU queue * Use parking_lot::Mutex * Add is_empty unit test * Rename config to metadata_cache_limit, Set limit to 50M * Remove Option from the metadata memory limit * Add license to lru_queue * Update datafusion/execution/src/cache/cache_unit.rs Removes the previous unreachable!(). Co-authored-by: Andrew Lamb <[email protected]> * Fix syntax error * Fix clippy error --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 84bbce6 commit a9e6d4b

File tree

10 files changed

+974
-69
lines changed

10 files changed

+974
-69
lines changed

datafusion/core/src/execution/context/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1068,6 +1068,10 @@ impl SessionContext {
10681068
builder.with_max_temp_directory_size(directory_size as u64)
10691069
}
10701070
"temp_directory" => builder.with_temp_file_path(value),
1071+
"metadata_cache_limit" => {
1072+
let limit = Self::parse_memory_limit(value)?;
1073+
builder.with_metadata_cache_limit(limit)
1074+
}
10711075
_ => {
10721076
return Err(DataFusionError::Plan(format!(
10731077
"Unknown runtime configuration: {variable}"

datafusion/core/tests/sql/runtime_config.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,39 @@ async fn test_max_temp_directory_size_enforcement() {
200200
);
201201
}
202202

203+
#[tokio::test]
204+
async fn test_test_metadata_cache_limit() {
205+
let ctx = SessionContext::new();
206+
207+
let update_limit = async |ctx: &SessionContext, limit: &str| {
208+
ctx.sql(
209+
format!("SET datafusion.runtime.metadata_cache_limit = '{limit}'").as_str(),
210+
)
211+
.await
212+
.unwrap()
213+
.collect()
214+
.await
215+
.unwrap();
216+
};
217+
218+
let get_limit = |ctx: &SessionContext| -> usize {
219+
ctx.task_ctx()
220+
.runtime_env()
221+
.cache_manager
222+
.get_file_metadata_cache()
223+
.cache_limit()
224+
};
225+
226+
update_limit(&ctx, "100M").await;
227+
assert_eq!(get_limit(&ctx), 100 * 1024 * 1024);
228+
229+
update_limit(&ctx, "2G").await;
230+
assert_eq!(get_limit(&ctx), 2 * 1024 * 1024 * 1024);
231+
232+
update_limit(&ctx, "123K").await;
233+
assert_eq!(get_limit(&ctx), 123 * 1024);
234+
}
235+
203236
#[tokio::test]
204237
async fn test_unknown_runtime_config() {
205238
let ctx = SessionContext::new();

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -449,17 +449,14 @@ impl FileFormat for ParquetFormat {
449449

450450
// Use the CachedParquetFileReaderFactory when metadata caching is enabled
451451
if self.options.global.cache_metadata {
452-
if let Some(metadata_cache) =
453-
state.runtime_env().cache_manager.get_file_metadata_cache()
454-
{
455-
let store = state
456-
.runtime_env()
457-
.object_store(conf.object_store_url.clone())?;
458-
let cached_parquet_read_factory =
459-
Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
460-
source =
461-
source.with_parquet_file_reader_factory(cached_parquet_read_factory);
462-
}
452+
let metadata_cache =
453+
state.runtime_env().cache_manager.get_file_metadata_cache();
454+
let store = state
455+
.runtime_env()
456+
.object_store(conf.object_store_url.clone())?;
457+
let cached_parquet_read_factory =
458+
Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
459+
source = source.with_parquet_file_reader_factory(cached_parquet_read_factory);
463460
}
464461

465462
if let Some(metadata_size_hint) = metadata_size_hint {

datafusion/datasource-parquet/src/reader.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,4 +288,8 @@ impl FileMetadata for CachedParquetMetaData {
288288
fn as_any(&self) -> &dyn Any {
289289
self
290290
}
291+
292+
fn memory_size(&self) -> usize {
293+
self.0.memory_size()
294+
}
291295
}

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 59 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,20 @@ pub trait FileMetadata: Any + Send + Sync {
3939
/// Returns the file metadata as [`Any`] so that it can be downcasted to a specific
4040
/// implementation.
4141
fn as_any(&self) -> &dyn Any;
42+
43+
/// Returns the size of the metadata in bytes.
44+
fn memory_size(&self) -> usize;
4245
}
4346

4447
/// Cache to store file-embedded metadata.
4548
pub trait FileMetadataCache:
4649
CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta>
4750
{
51+
// Returns the cache's memory limit in bytes.
52+
fn cache_limit(&self) -> usize;
53+
54+
// Updates the cache with a new memory limit in bytes.
55+
fn update_cache_limit(&self, limit: usize);
4856
}
4957

5058
impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
@@ -65,30 +73,36 @@ impl Debug for dyn FileMetadataCache {
6573
}
6674
}
6775

68-
#[derive(Default, Debug)]
76+
#[derive(Debug)]
6977
pub struct CacheManager {
7078
file_statistic_cache: Option<FileStatisticsCache>,
7179
list_files_cache: Option<ListFilesCache>,
72-
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
80+
file_metadata_cache: Arc<dyn FileMetadataCache>,
7381
}
7482

7583
impl CacheManager {
7684
pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
77-
let mut manager = CacheManager::default();
78-
if let Some(cc) = &config.table_files_statistics_cache {
79-
manager.file_statistic_cache = Some(Arc::clone(cc))
80-
}
81-
if let Some(lc) = &config.list_files_cache {
82-
manager.list_files_cache = Some(Arc::clone(lc))
83-
}
84-
if let Some(mc) = &config.file_metadata_cache {
85-
manager.file_metadata_cache = Some(Arc::clone(mc));
86-
} else {
87-
manager.file_metadata_cache =
88-
Some(Arc::new(DefaultFilesMetadataCache::default()));
89-
}
90-
91-
Ok(Arc::new(manager))
85+
let file_statistic_cache =
86+
config.table_files_statistics_cache.as_ref().map(Arc::clone);
87+
88+
let list_files_cache = config.list_files_cache.as_ref().map(Arc::clone);
89+
90+
let file_metadata_cache = config
91+
.file_metadata_cache
92+
.as_ref()
93+
.map(Arc::clone)
94+
.unwrap_or_else(|| {
95+
Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit))
96+
});
97+
98+
// the cache memory limit might have changed, ensure the limit is updated
99+
file_metadata_cache.update_cache_limit(config.metadata_cache_limit);
100+
101+
Ok(Arc::new(CacheManager {
102+
file_statistic_cache,
103+
list_files_cache,
104+
file_metadata_cache,
105+
}))
92106
}
93107

94108
/// Get the cache of listing files statistics.
@@ -102,12 +116,19 @@ impl CacheManager {
102116
}
103117

104118
/// Get the file embedded metadata cache.
105-
pub fn get_file_metadata_cache(&self) -> Option<Arc<dyn FileMetadataCache>> {
106-
self.file_metadata_cache.clone()
119+
pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> {
120+
Arc::clone(&self.file_metadata_cache)
121+
}
122+
123+
/// Get the limit of the file embedded metadata cache.
124+
pub fn get_metadata_cache_limit(&self) -> usize {
125+
self.file_metadata_cache.cache_limit()
107126
}
108127
}
109128

110-
#[derive(Clone, Default)]
129+
const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M
130+
131+
#[derive(Clone)]
111132
pub struct CacheManagerConfig {
112133
/// Enable cache of files statistics when listing files.
113134
/// Avoid get same file statistics repeatedly in same datafusion session.
@@ -124,6 +145,19 @@ pub struct CacheManagerConfig {
124145
/// data file (e.g., Parquet footer and page metadata).
125146
/// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`].
126147
pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
148+
/// Limit of the file-embedded metadata cache, in bytes.
149+
pub metadata_cache_limit: usize,
150+
}
151+
152+
impl Default for CacheManagerConfig {
153+
fn default() -> Self {
154+
Self {
155+
table_files_statistics_cache: Default::default(),
156+
list_files_cache: Default::default(),
157+
file_metadata_cache: Default::default(),
158+
metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
159+
}
160+
}
127161
}
128162

129163
impl CacheManagerConfig {
@@ -147,4 +181,9 @@ impl CacheManagerConfig {
147181
self.file_metadata_cache = cache;
148182
self
149183
}
184+
185+
pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
186+
self.metadata_cache_limit = limit;
187+
self
188+
}
150189
}

0 commit comments

Comments
 (0)