-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Add CacheManager for DataFusion #19645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/datafusion
Are you sure you want to change the base?
Add CacheManager for DataFusion #19645
Conversation
❌ Gradle check result for d85a24a: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
❌ Gradle check result for a3d3d94: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
} | ||
|
||
#[no_mangle] | ||
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createGlobalRuntimev1( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : createGlobalRuntimev1 to createGlobalRuntimenv
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
❌ Gradle check result for 6f7df16: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
pub struct MutexFileMetadataCache { | ||
pub inner: Mutex<DefaultFilesMetadataCache>, | ||
} | ||
|
||
impl MutexFileMetadataCache { | ||
pub fn new(cache: DefaultFilesMetadataCache) -> Self { | ||
Self { | ||
inner: Mutex::new(cache), | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CacheManager introduced here aims to perform periodic updates to Cache(refresh/stale files).
It is expected to have multiple calls to CacheManager for mutating operations resulting in multiple references at a given time
for its' underlying registered Cache.
DefaultFilesMetadataCache
of datafusion is internally wrapped in a Mutex
and requires mut access for operations like remove
.
Refer: https://github.com/apache/datafusion/blob/main/datafusion/execution/src/cache/cache_unit.rs#L312-L315
https://github.com/apache/datafusion/blob/main/datafusion/execution/src/cache/cache_unit.rs#L402
Having multiple references to Cache, trying to acquire a mutable reference for methods like remove
would lead to failures. Hence explicit handling of mutable references is required for which MutexFileMetadataCache
is introduced
impl DefaultFilesMetadataCache {
pub fn remove(&mut self, key: &str) -> Option<...> { ... }
pub fn insert(&mut self, key: &str, value: ...) { ... }
}
Why it fails in shared contexts
When we share a cache instance like this:
let cache = Arc::new(DefaultFilesMetadataCache::new(...));
cache.remove("key");
Rust will complain:
cannot borrow data in an Arc
as mutable
Reason for above failure - Because Arc only gives shared immutable access (&self), and we can’t call a method requiring &mut self on it — even if the method internally uses a lock
Using
unsafe { &mut *(cache_ptr as *mut DefaultFilesMetadataCache) }
manually converts a raw pointer into a mutable reference, asserting exclusive ownership.
This is undefined behaviour if the cache is shared or accessed elsewhere, as it violates Rust’s aliasing guarantees.
Even though DefaultFilesMetadataCache
has internal locks, its API requires &mut
self, and Rust still enforces exclusive access at the type level.
To safely share and mutate it across threads, wrap it in Mutex or RwLock, which manage exclusive access at runtime without violating safety rules.
Why the MutexFileMetadataCache wrapper helps
By introducing this:
pub struct MutexFileMetadataCache {
pub inner: Mutex<DefaultFilesMetadataCache>,
}
we can now share it safely with Arc and still perform mutations:
let cache = Arc::new(MutexFileMetadataCache::new(DefaultFilesMetadataCache::new(...)));
{
let mut locked = cache.inner.lock().unwrap();
locked.remove("key");
}
The outer Mutex provides mutability via runtime locking while keeping Rust’s compile-time borrow checker happy — because all operations happen through a lock guard that gives a unique mutable reference.
Description
Introducing CacheManager to handle registration of Supported Caches and Periodic Updates with each refresh.
Each Node has a single GlobalRuntimeEnv holding the CacheManager which is shared across
searches
.Added Support for MetadataCache
Changes involve:
Cluster Settings based Cache Registration when DataFusionService is initialized
"datafusion.metadata.cache.size.limit"
"datafusion.metadata.cache.enabled"
Cache Updates for Each Refresh:
->Interim Flow:
Compute new set of files for each updated CatalogSnapshot and add to Cache
->TODO:
Can be updated to capture the set of new files from
CatalogSnapshot
.Deleted files will be removed by FileDeletionListener
Dynamic Cache Size Update through Cluster Settings
DataFusion Version upgrade from
49.0.0
to50.0.0
to support metadata cache featuresTODO: Exception Handling
Steps to Test
1
./gradlew :plugins:engine-datafusion:assemble
2. Execute the test ->
org.opensearch.datafusion.DataFusionCacheManagerTests
Related Issues
Resolves #[Issue number to be closed when this PR is merged]
Check List