Skip to content

Conversation

abhita
Copy link
Contributor

@abhita abhita commented Oct 15, 2025

Description

Introducing CacheManager to handle registration of Supported Caches and Periodic Updates with each refresh.
Each Node has a single GlobalRuntimeEnv holding the CacheManager which is shared across searches.

Added Support for MetadataCache
Changes involve:

  1. Cluster Settings based Cache Registration when DataFusionService is initialized
    "datafusion.metadata.cache.size.limit"
    "datafusion.metadata.cache.enabled"

  2. Cache Updates for Each Refresh:
    ->Interim Flow:
    Compute new set of files for each updated CatalogSnapshot and add to Cache
    ->TODO:
    Can be updated to capture the set of new files from CatalogSnapshot.
    Deleted files will be removed by FileDeletionListener

  3. Dynamic Cache Size Update through Cluster Settings

  4. DataFusion Version upgrade from 49.0.0 to 50.0.0 to support metadata cache features

  5. TODO: Exception Handling

Steps to Test

1 ./gradlew :plugins:engine-datafusion:assemble
2. Execute the test -> org.opensearch.datafusion.DataFusionCacheManagerTests

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

@abhita abhita changed the title Add Support for MetadataCache Add CacheManager for DataFusion Oct 15, 2025
Copy link
Contributor

❌ Gradle check result for d85a24a: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

❌ Gradle check result for a3d3d94: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

}

#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createGlobalRuntimev1(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : createGlobalRuntimev1 to createGlobalRuntimenv

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@abhita abhita closed this Oct 16, 2025
@abhita abhita reopened this Oct 16, 2025
Copy link
Contributor

❌ Gradle check result for 6f7df16: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Comment on lines +11 to +21
pub struct MutexFileMetadataCache {
pub inner: Mutex<DefaultFilesMetadataCache>,
}

impl MutexFileMetadataCache {
pub fn new(cache: DefaultFilesMetadataCache) -> Self {
Self {
inner: Mutex::new(cache),
}
}
}
Copy link
Contributor Author

@abhita abhita Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CacheManager introduced here aims to perform periodic updates to Cache(refresh/stale files).
It is expected to have multiple calls to CacheManager for mutating operations resulting in multiple references at a given time for its' underlying registered Cache.

DefaultFilesMetadataCache of datafusion is internally wrapped in a Mutex and requires mut access for operations like remove.
Refer: https://github.com/apache/datafusion/blob/main/datafusion/execution/src/cache/cache_unit.rs#L312-L315
https://github.com/apache/datafusion/blob/main/datafusion/execution/src/cache/cache_unit.rs#L402

Having multiple references to Cache, trying to acquire a mutable reference for methods like remove would lead to failures. Hence explicit handling of mutable references is required for which MutexFileMetadataCache is introduced

impl DefaultFilesMetadataCache {
    pub fn remove(&mut self, key: &str) -> Option<...> { ... }
    pub fn insert(&mut self, key: &str, value: ...) { ... }
}

Why it fails in shared contexts
When we share a cache instance like this:

let cache = Arc::new(DefaultFilesMetadataCache::new(...));
cache.remove("key");

Rust will complain:
cannot borrow data in an Arc as mutable

Reason for above failure - Because Arc only gives shared immutable access (&self), and we can’t call a method requiring &mut self on it — even if the method internally uses a lock

Using

unsafe { &mut *(cache_ptr as *mut DefaultFilesMetadataCache) }

manually converts a raw pointer into a mutable reference, asserting exclusive ownership.
This is undefined behaviour if the cache is shared or accessed elsewhere, as it violates Rust’s aliasing guarantees.

Even though DefaultFilesMetadataCache has internal locks, its API requires &mut self, and Rust still enforces exclusive access at the type level.
To safely share and mutate it across threads, wrap it in Mutex or RwLock, which manage exclusive access at runtime without violating safety rules.

Why the MutexFileMetadataCache wrapper helps

By introducing this:

pub struct MutexFileMetadataCache {
    pub inner: Mutex<DefaultFilesMetadataCache>,
}

we can now share it safely with Arc and still perform mutations:

let cache = Arc::new(MutexFileMetadataCache::new(DefaultFilesMetadataCache::new(...)));

{
    let mut locked = cache.inner.lock().unwrap();
    locked.remove("key");
}

The outer Mutex provides mutability via runtime locking while keeping Rust’s compile-time borrow checker happy — because all operations happen through a lock guard that gives a unique mutable reference.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants