Skip to content

Commit 79cfe8e

Browse files
delamarch3alamb
andauthored
Add runtime config options for list_files_cache_limit and list_files_cache_ttl (#19108)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #19056 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Make the list file cache memory limit and TTL configurable via runtime config. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> * Add ability to `SET` and `RESET` `list_files_cache_limit` and `list_files_cache_ttl` * `list_files_cache_ttl` will expect the duration to look like either `1m30s` or `30` (I'm wondering if it would be simpler for it to just accept a single unit?) * Add `update_cache_ttl()` to the `ListFilesCache` trait so we can update it from `RuntimeEnvBuilder::build()` * Add config entries ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> `update_cache_ttl()` has been added to the `ListFilesCache` trait <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 199cf06 commit 79cfe8e

File tree

8 files changed

+247
-9
lines changed

8 files changed

+247
-9
lines changed

datafusion/core/src/execution/context/mod.rs

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use std::collections::HashSet;
2121
use std::fmt::Debug;
2222
use std::sync::{Arc, Weak};
23+
use std::time::Duration;
2324

2425
use super::options::ReadOptions;
2526
use crate::datasource::dynamic_file::DynamicListTableFactory;
@@ -72,7 +73,10 @@ use datafusion_common::{
7273
tree_node::{TreeNodeRecursion, TreeNodeVisitor},
7374
DFSchema, DataFusionError, ParamValues, SchemaReference, TableReference,
7475
};
75-
use datafusion_execution::cache::cache_manager::DEFAULT_METADATA_CACHE_LIMIT;
76+
use datafusion_execution::cache::cache_manager::{
77+
DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL,
78+
DEFAULT_METADATA_CACHE_LIMIT,
79+
};
7680
pub use datafusion_execution::config::SessionConfig;
7781
use datafusion_execution::disk_manager::{
7882
DiskManagerBuilder, DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
@@ -1168,6 +1172,14 @@ impl SessionContext {
11681172
let limit = Self::parse_memory_limit(value)?;
11691173
builder.with_metadata_cache_limit(limit)
11701174
}
1175+
"list_files_cache_limit" => {
1176+
let limit = Self::parse_memory_limit(value)?;
1177+
builder.with_object_list_cache_limit(limit)
1178+
}
1179+
"list_files_cache_ttl" => {
1180+
let duration = Self::parse_duration(value)?;
1181+
builder.with_object_list_cache_ttl(Some(duration))
1182+
}
11711183
_ => return plan_err!("Unknown runtime configuration: {variable}"),
11721184
// Remember to update `reset_runtime_variable()` when adding new options
11731185
};
@@ -1199,6 +1211,14 @@ impl SessionContext {
11991211
"metadata_cache_limit" => {
12001212
builder = builder.with_metadata_cache_limit(DEFAULT_METADATA_CACHE_LIMIT);
12011213
}
1214+
"list_files_cache_limit" => {
1215+
builder = builder
1216+
.with_object_list_cache_limit(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT);
1217+
}
1218+
"list_files_cache_ttl" => {
1219+
builder =
1220+
builder.with_object_list_cache_ttl(DEFAULT_LIST_FILES_CACHE_TTL);
1221+
}
12021222
_ => return plan_err!("Unknown runtime configuration: {variable}"),
12031223
};
12041224

@@ -1239,6 +1259,34 @@ impl SessionContext {
12391259
}
12401260
}
12411261

1262+
fn parse_duration(duration: &str) -> Result<Duration> {
1263+
let mut minutes = None;
1264+
let mut seconds = None;
1265+
1266+
for duration in duration.split_inclusive(&['m', 's']) {
1267+
let (number, unit) = duration.split_at(duration.len() - 1);
1268+
let number: u64 = number.parse().map_err(|_| {
1269+
plan_datafusion_err!("Failed to parse number from duration '{duration}'")
1270+
})?;
1271+
1272+
match unit {
1273+
"m" if minutes.is_none() && seconds.is_none() => minutes = Some(number),
1274+
"s" if seconds.is_none() => seconds = Some(number),
1275+
_ => plan_err!("Invalid duration, unit must be either 'm' (minutes), or 's' (seconds), and be in the correct order")?,
1276+
}
1277+
}
1278+
1279+
let duration = Duration::from_secs(
1280+
minutes.unwrap_or_default() * 60 + seconds.unwrap_or_default(),
1281+
);
1282+
1283+
if duration.is_zero() {
1284+
return plan_err!("Duration must be greater than 0 seconds");
1285+
}
1286+
1287+
Ok(duration)
1288+
}
1289+
12421290
async fn create_custom_table(
12431291
&self,
12441292
cmd: &CreateExternalTable,
@@ -2673,4 +2721,24 @@ mod tests {
26732721

26742722
Ok(())
26752723
}
2724+
2725+
#[test]
2726+
fn test_parse_duration() {
2727+
// Valid durations
2728+
for (duration, want) in [
2729+
("1s", Duration::from_secs(1)),
2730+
("1m", Duration::from_secs(60)),
2731+
("1m0s", Duration::from_secs(60)),
2732+
("1m1s", Duration::from_secs(61)),
2733+
] {
2734+
let have = SessionContext::parse_duration(duration).unwrap();
2735+
assert_eq!(want, have);
2736+
}
2737+
2738+
// Invalid durations
2739+
for duration in ["0s", "0m", "1s0m", "1s1m"] {
2740+
let have = SessionContext::parse_duration(duration);
2741+
assert!(have.is_err());
2742+
}
2743+
}
26762744
}

datafusion/core/tests/sql/runtime_config.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,14 @@
1818
//! Tests for runtime configuration SQL interface
1919
2020
use std::sync::Arc;
21+
use std::time::Duration;
2122

2223
use datafusion::execution::context::SessionContext;
2324
use datafusion::execution::context::TaskContext;
25+
use datafusion::prelude::SessionConfig;
26+
use datafusion_execution::cache::cache_manager::CacheManagerConfig;
27+
use datafusion_execution::cache::DefaultListFilesCache;
28+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
2429
use datafusion_physical_plan::common::collect;
2530

2631
#[tokio::test]
@@ -233,6 +238,93 @@ async fn test_test_metadata_cache_limit() {
233238
assert_eq!(get_limit(&ctx), 123 * 1024);
234239
}
235240

241+
#[tokio::test]
242+
async fn test_list_files_cache_limit() {
243+
let list_files_cache = Arc::new(DefaultListFilesCache::default());
244+
245+
let rt = RuntimeEnvBuilder::new()
246+
.with_cache_manager(
247+
CacheManagerConfig::default().with_list_files_cache(Some(list_files_cache)),
248+
)
249+
.build_arc()
250+
.unwrap();
251+
252+
let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt);
253+
254+
let update_limit = async |ctx: &SessionContext, limit: &str| {
255+
ctx.sql(
256+
format!("SET datafusion.runtime.list_files_cache_limit = '{limit}'").as_str(),
257+
)
258+
.await
259+
.unwrap()
260+
.collect()
261+
.await
262+
.unwrap();
263+
};
264+
265+
let get_limit = |ctx: &SessionContext| -> usize {
266+
ctx.task_ctx()
267+
.runtime_env()
268+
.cache_manager
269+
.get_list_files_cache()
270+
.unwrap()
271+
.cache_limit()
272+
};
273+
274+
update_limit(&ctx, "100M").await;
275+
assert_eq!(get_limit(&ctx), 100 * 1024 * 1024);
276+
277+
update_limit(&ctx, "2G").await;
278+
assert_eq!(get_limit(&ctx), 2 * 1024 * 1024 * 1024);
279+
280+
update_limit(&ctx, "123K").await;
281+
assert_eq!(get_limit(&ctx), 123 * 1024);
282+
}
283+
284+
#[tokio::test]
285+
async fn test_list_files_cache_ttl() {
286+
let list_files_cache = Arc::new(DefaultListFilesCache::default());
287+
288+
let rt = RuntimeEnvBuilder::new()
289+
.with_cache_manager(
290+
CacheManagerConfig::default().with_list_files_cache(Some(list_files_cache)),
291+
)
292+
.build_arc()
293+
.unwrap();
294+
295+
let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt);
296+
297+
let update_limit = async |ctx: &SessionContext, limit: &str| {
298+
ctx.sql(
299+
format!("SET datafusion.runtime.list_files_cache_ttl = '{limit}'").as_str(),
300+
)
301+
.await
302+
.unwrap()
303+
.collect()
304+
.await
305+
.unwrap();
306+
};
307+
308+
let get_limit = |ctx: &SessionContext| -> Duration {
309+
ctx.task_ctx()
310+
.runtime_env()
311+
.cache_manager
312+
.get_list_files_cache()
313+
.unwrap()
314+
.cache_ttl()
315+
.unwrap()
316+
};
317+
318+
update_limit(&ctx, "1m").await;
319+
assert_eq!(get_limit(&ctx), Duration::from_secs(60));
320+
321+
update_limit(&ctx, "30s").await;
322+
assert_eq!(get_limit(&ctx), Duration::from_secs(30));
323+
324+
update_limit(&ctx, "1m30s").await;
325+
assert_eq!(get_limit(&ctx), Duration::from_secs(90));
326+
}
327+
236328
#[tokio::test]
237329
async fn test_unknown_runtime_config() {
238330
let ctx = SessionContext::new();

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ use std::fmt::{Debug, Formatter};
2727
use std::sync::Arc;
2828
use std::time::Duration;
2929

30-
use super::list_files_cache::DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT;
30+
pub use super::list_files_cache::{
31+
DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL,
32+
};
3133

3234
/// A cache for [`Statistics`].
3335
///
@@ -76,6 +78,9 @@ pub trait ListFilesCache:
7678

7779
/// Updates the cache with a new memory limit in bytes.
7880
fn update_cache_limit(&self, limit: usize);
81+
82+
/// Updates the cache with a new TTL (time-to-live).
83+
fn update_cache_ttl(&self, ttl: Option<Duration>);
7984
}
8085

8186
/// Generic file-embedded metadata used with [`FileMetadataCache`].
@@ -173,7 +178,15 @@ impl CacheManager {
173178
let file_statistic_cache =
174179
config.table_files_statistics_cache.as_ref().map(Arc::clone);
175180

176-
let list_files_cache = config.list_files_cache.as_ref().map(Arc::clone);
181+
let list_files_cache = config
182+
.list_files_cache
183+
.as_ref()
184+
.inspect(|c| {
185+
// the cache memory limit or ttl might have changed, ensure they are updated
186+
c.update_cache_limit(config.list_files_cache_limit);
187+
c.update_cache_ttl(config.list_files_cache_ttl);
188+
})
189+
.map(Arc::clone);
177190

178191
let file_metadata_cache = config
179192
.file_metadata_cache
@@ -262,7 +275,7 @@ impl Default for CacheManagerConfig {
262275
table_files_statistics_cache: Default::default(),
263276
list_files_cache: Default::default(),
264277
list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
265-
list_files_cache_ttl: None,
278+
list_files_cache_ttl: DEFAULT_LIST_FILES_CACHE_TTL,
266279
file_metadata_cache: Default::default(),
267280
metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
268281
}
@@ -303,8 +316,8 @@ impl CacheManagerConfig {
303316
/// Sets the TTL (time-to-live) for entries in the list files cache.
304317
///
305318
/// Default: None (infinite).
306-
pub fn with_list_files_cache_ttl(mut self, ttl: Duration) -> Self {
307-
self.list_files_cache_ttl = Some(ttl);
319+
pub fn with_list_files_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
320+
self.list_files_cache_ttl = ttl;
308321
self
309322
}
310323

datafusion/execution/src/cache/list_files_cache.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,10 @@ fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize {
141141
}
142142

143143
/// The default memory limit for the [`DefaultListFilesCache`]
144-
pub(super) const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB
144+
pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB
145+
146+
/// The default cache TTL for the [`DefaultListFilesCache`]
147+
pub const DEFAULT_LIST_FILES_CACHE_TTL: Option<Duration> = None; // Infinite
145148

146149
/// Handles the inner state of the [`DefaultListFilesCache`] struct.
147150
pub struct DefaultListFilesCacheState {
@@ -157,7 +160,7 @@ impl Default for DefaultListFilesCacheState {
157160
lru_queue: LruQueue::new(),
158161
memory_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
159162
memory_used: 0,
160-
ttl: None,
163+
ttl: DEFAULT_LIST_FILES_CACHE_TTL,
161164
}
162165
}
163166
}
@@ -294,6 +297,12 @@ impl ListFilesCache for DefaultListFilesCache {
294297
state.memory_limit = limit;
295298
state.evict_entries();
296299
}
300+
301+
fn update_cache_ttl(&self, ttl: Option<Duration>) {
302+
let mut state = self.state.lock().unwrap();
303+
state.ttl = ttl;
304+
state.evict_entries();
305+
}
297306
}
298307

299308
impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {

datafusion/execution/src/runtime_env.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ fn create_runtime_config_entries(
101101
max_temp_directory_size: Option<String>,
102102
temp_directory: Option<String>,
103103
metadata_cache_limit: Option<String>,
104+
list_files_cache_limit: Option<String>,
105+
list_files_cache_ttl: Option<String>,
104106
) -> Vec<ConfigEntry> {
105107
vec![
106108
ConfigEntry {
@@ -123,6 +125,16 @@ fn create_runtime_config_entries(
123125
value: metadata_cache_limit,
124126
description: "Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
125127
},
128+
ConfigEntry {
129+
key: "datafusion.runtime.list_files_cache_limit".to_string(),
130+
value: list_files_cache_limit,
131+
description: "Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
132+
},
133+
ConfigEntry {
134+
key: "datafusion.runtime.list_files_cache_ttl".to_string(),
135+
value: list_files_cache_ttl,
136+
description: "TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes.",
137+
},
126138
]
127139
}
128140

@@ -227,6 +239,14 @@ impl RuntimeEnv {
227239
}
228240
}
229241

242+
fn format_duration(duration: Duration) -> String {
243+
let total = duration.as_secs();
244+
let mins = total / 60;
245+
let secs = total % 60;
246+
247+
format!("{mins}m{secs}s")
248+
}
249+
230250
let memory_limit_value = match self.memory_pool.memory_limit() {
231251
MemoryLimit::Finite(size) => Some(format_byte_size(
232252
size.try_into()
@@ -259,11 +279,25 @@ impl RuntimeEnv {
259279
.expect("Metadata cache size conversion failed"),
260280
);
261281

282+
let list_files_cache_limit = self.cache_manager.get_list_files_cache_limit();
283+
let list_files_cache_value = format_byte_size(
284+
list_files_cache_limit
285+
.try_into()
286+
.expect("List files cache size conversion failed"),
287+
);
288+
289+
let list_files_cache_ttl = self
290+
.cache_manager
291+
.get_list_files_cache_ttl()
292+
.map(format_duration);
293+
262294
create_runtime_config_entries(
263295
memory_limit_value,
264296
Some(max_temp_dir_value),
265297
temp_dir_value,
266298
Some(metadata_cache_value),
299+
Some(list_files_cache_value),
300+
list_files_cache_ttl,
267301
)
268302
}
269303
}
@@ -394,7 +428,7 @@ impl RuntimeEnvBuilder {
394428
}
395429

396430
/// Specifies the duration entries in the object list cache will be considered valid.
397-
pub fn with_object_list_cache_ttl(mut self, ttl: Duration) -> Self {
431+
pub fn with_object_list_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
398432
self.cache_manager = self.cache_manager.with_list_files_cache_ttl(ttl);
399433
self
400434
}
@@ -473,6 +507,8 @@ impl RuntimeEnvBuilder {
473507
Some("100G".to_string()),
474508
None,
475509
Some("50M".to_owned()),
510+
Some("1M".to_owned()),
511+
None,
476512
)
477513
}
478514

0 commit comments

Comments
 (0)