Skip to content

Commit bd886d3

Browse files
committed
Improve format string cache error handling and tests
Return errors when format string cache limit is reached, preventing leaks while still reusing existing entries. Update documentation for clarity. Expand tests to validate over-limit error path and ensure interned strings are reused after limit is hit.
1 parent dbd8882 commit bd886d3

File tree

1 file changed

+62
-39
lines changed

1 file changed

+62
-39
lines changed

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 62 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
//! Serde code to convert from protocol buffers to Rust data structures.
1919
20-
use std::collections::{HashMap, VecDeque};
20+
use std::collections::HashMap;
2121
use std::sync::{Arc, Mutex, OnceLock};
2222

2323
use arrow::array::RecordBatch;
@@ -790,12 +790,24 @@ fn intern_format_strings(
790790
options: &protobuf::FormatOptions,
791791
) -> Result<InternedFormatStrings> {
792792
Ok(InternedFormatStrings {
793-
null: intern_format_str(&options.null),
794-
date_format: options.date_format.as_deref().map(intern_format_str),
795-
datetime_format: options.datetime_format.as_deref().map(intern_format_str),
796-
timestamp_format: options.timestamp_format.as_deref().map(intern_format_str),
797-
timestamp_tz_format: options.timestamp_tz_format.as_deref().map(intern_format_str),
798-
time_format: options.time_format.as_deref().map(intern_format_str),
793+
null: intern_format_str(&options.null)?,
794+
date_format: options.date_format.as_deref().map(intern_format_str).transpose()?,
795+
datetime_format: options
796+
.datetime_format
797+
.as_deref()
798+
.map(intern_format_str)
799+
.transpose()?,
800+
timestamp_format: options
801+
.timestamp_format
802+
.as_deref()
803+
.map(intern_format_str)
804+
.transpose()?,
805+
timestamp_tz_format: options
806+
.timestamp_tz_format
807+
.as_deref()
808+
.map(intern_format_str)
809+
.transpose()?,
810+
time_format: options.time_format.as_deref().map(intern_format_str).transpose()?,
799811
})
800812
}
801813

@@ -871,59 +883,45 @@ const FORMAT_STRING_CACHE_LIMIT: usize = 1024;
871883
/// Cache for interned format strings.
872884
///
873885
/// We leak strings to satisfy the `'static` lifetime required by
874-
/// `ArrowFormatOptions` in cast options. The cache retains those
875-
/// references to maximize reuse and bounds the number of retained
876-
/// entries to avoid unbounded growth.
886+
/// `ArrowFormatOptions` in cast options. To avoid unbounded growth,
887+
/// once the cache reaches the limit we only allow lookups for strings
888+
/// that are already interned.
877889
static FORMAT_STRING_CACHE: OnceLock<Mutex<FormatStringCache>> = OnceLock::new();
878890

879891
#[derive(Default)]
880892
struct FormatStringCache {
881893
entries: HashMap<String, &'static str>,
882-
order: VecDeque<String>,
883894
}
884895

885896
impl FormatStringCache {
886897
fn get(&mut self, value: &str) -> Option<&'static str> {
887-
let interned = self.entries.get(value).copied()?;
888-
self.touch(value);
889-
Some(interned)
898+
self.entries.get(value).copied()
890899
}
891900

892-
fn insert(&mut self, value: &str) -> &'static str {
901+
fn insert(&mut self, value: &str) -> Result<&'static str> {
893902
if let Some(existing) = self.get(value) {
894-
return existing;
903+
return Ok(existing);
895904
}
896905

897-
let leaked = Box::leak(value.to_owned().into_boxed_str());
898906
if self.entries.len() >= FORMAT_STRING_CACHE_LIMIT {
899-
self.evict_oldest();
907+
return Err(internal_datafusion_err!(
908+
"Format string cache limit ({}) reached; cannot intern new format string {value:?}",
909+
FORMAT_STRING_CACHE_LIMIT
910+
));
900911
}
901912

913+
let leaked = Box::leak(value.to_owned().into_boxed_str());
902914
let key = value.to_owned();
903915
self.entries.insert(key.clone(), leaked);
904-
self.order.push_back(key);
905-
leaked
906-
}
907-
908-
fn touch(&mut self, value: &str) {
909-
if let Some(position) = self.order.iter().position(|key| key == value) {
910-
self.order.remove(position);
911-
}
912-
self.order.push_back(value.to_owned());
913-
}
914-
915-
fn evict_oldest(&mut self) {
916-
if let Some(oldest) = self.order.pop_front() {
917-
self.entries.remove(&oldest);
918-
}
916+
Ok(leaked)
919917
}
920918
}
921919

922920
fn format_string_cache() -> &'static Mutex<FormatStringCache> {
923921
FORMAT_STRING_CACHE.get_or_init(|| Mutex::new(FormatStringCache::default()))
924922
}
925923

926-
fn intern_format_str(value: &str) -> &'static str {
924+
fn intern_format_str(value: &str) -> Result<&'static str> {
927925
let mut cache = format_string_cache()
928926
.lock()
929927
.expect("format string cache lock poisoned");
@@ -1028,7 +1026,7 @@ mod tests {
10281026
let to_fill = FORMAT_STRING_CACHE_LIMIT.saturating_sub(current_len);
10291027
for _ in 0..to_fill {
10301028
let value = unique_value("unit-test-fill");
1031-
intern_format_str(&value);
1029+
intern_format_str(&value).unwrap();
10321030
}
10331031

10341032
let cache_len = format_string_cache()
@@ -1042,11 +1040,36 @@ mod tests {
10421040
);
10431041

10441042
let overflow_value = unique_value("unit-test-overflow");
1045-
let first = intern_format_str(&overflow_value);
1046-
let second = intern_format_str(&overflow_value);
1043+
let overflow_options = protobuf::FormatOptions {
1044+
safe: true,
1045+
null: overflow_value,
1046+
date_format: None,
1047+
datetime_format: None,
1048+
timestamp_format: None,
1049+
timestamp_tz_format: None,
1050+
time_format: None,
1051+
duration_format: protobuf::DurationFormat::Pretty as i32,
1052+
types_info: false,
1053+
};
1054+
let error = format_options_from_proto(&overflow_options).unwrap_err();
1055+
assert!(
1056+
error.to_string().contains("Format string cache limit"),
1057+
"unexpected error: {error}"
1058+
);
1059+
1060+
let existing_value = format_string_cache()
1061+
.lock()
1062+
.expect("format string cache lock poisoned")
1063+
.entries
1064+
.keys()
1065+
.next()
1066+
.cloned()
1067+
.expect("cache should have entries after fill");
1068+
let existing = intern_format_str(&existing_value).unwrap();
1069+
let again = intern_format_str(&existing_value).unwrap();
10471070
assert!(
1048-
std::ptr::eq(first, second),
1049-
"cache should reuse overflow strings while they remain cached"
1071+
std::ptr::eq(existing, again),
1072+
"cache should reuse existing strings after the limit"
10501073
);
10511074
}
10521075

0 commit comments

Comments
 (0)