Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions plugins/engine-datafusion/jni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ crate-type = ["cdylib"]

[dependencies]
# DataFusion dependencies
datafusion = "49.0.0"
datafusion-expr = "49.0.0"
datafusion-datasource = "49.0.0"
datafusion = "50.0.0"
datafusion-expr = "50.0.0"
datafusion-datasource = "50.0.0"
arrow-json = "55.2"
arrow = { version = "55.2", features = ["ffi", "ipc_compression"] }
arrow = { version = "56.2", features = ["ffi", "ipc_compression"] }
#arrow = "55.2.0"
arrow-array = "55.2.0"
arrow-schema = "55.2.0"
arrow-array = "56.2.0"
arrow-schema = "56.2.0"
arrow-buffer = "55.2.0"

# JNI dependencies
jni = "0.21"

# Substrait support
datafusion-substrait = "49.0.0"
datafusion-substrait = "50.0.0"
prost = "0.13"


Expand Down
76 changes: 76 additions & 0 deletions plugins/engine-datafusion/jni/src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@

use std::sync::{Arc, Mutex};


use datafusion::execution::cache::cache_manager::{FileMetadataCache};
use datafusion::execution::cache::cache_unit::{DefaultFilesMetadataCache};
use datafusion::execution::cache::CacheAccessor;
use object_store::ObjectMeta;

// Wrapper to make Mutex<DefaultFilesMetadataCache> implement FileMetadataCache
pub struct MutexFileMetadataCache {
pub inner: Mutex<DefaultFilesMetadataCache>,
}

impl MutexFileMetadataCache {
pub fn new(cache: DefaultFilesMetadataCache) -> Self {
Self {
inner: Mutex::new(cache),
}
}
}
Comment on lines +11 to +21
Copy link
Contributor Author

@abhita abhita Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CacheManager introduced here aims to perform periodic updates to Cache(refresh/stale files).
It is expected to have multiple calls to CacheManager for mutating operations resulting in multiple references at a given time for its' underlying registered Cache.

DefaultFilesMetadataCache of datafusion is internally wrapped in a Mutex and requires mut access for operations like remove.
Refer: https://github.com/apache/datafusion/blob/main/datafusion/execution/src/cache/cache_unit.rs#L312-L315
https://github.com/apache/datafusion/blob/main/datafusion/execution/src/cache/cache_unit.rs#L402

Having multiple references to Cache, trying to acquire a mutable reference for methods like remove would lead to failures. Hence explicit handling of mutable references is required for which MutexFileMetadataCache is introduced

impl DefaultFilesMetadataCache {
    pub fn remove(&mut self, key: &str) -> Option<...> { ... }
    pub fn insert(&mut self, key: &str, value: ...) { ... }
}

Why it fails in shared contexts
When we share a cache instance like this:

let cache = Arc::new(DefaultFilesMetadataCache::new(...));
cache.remove("key");

Rust will complain:
cannot borrow data in an Arc as mutable

Reason for above failure - Because Arc only gives shared immutable access (&self), and we can’t call a method requiring &mut self on it — even if the method internally uses a lock

Using

unsafe { &mut *(cache_ptr as *mut DefaultFilesMetadataCache) }

manually converts a raw pointer into a mutable reference, asserting exclusive ownership.
This is undefined behaviour if the cache is shared or accessed elsewhere, as it violates Rust’s aliasing guarantees.

Even though DefaultFilesMetadataCache has internal locks, its API requires &mut self, and Rust still enforces exclusive access at the type level.
To safely share and mutate it across threads, wrap it in Mutex or RwLock, which manage exclusive access at runtime without violating safety rules.

Why the MutexFileMetadataCache wrapper helps

By introducing this:

pub struct MutexFileMetadataCache {
    pub inner: Mutex<DefaultFilesMetadataCache>,
}

we can now share it safely with Arc and still perform mutations:

let cache = Arc::new(MutexFileMetadataCache::new(DefaultFilesMetadataCache::new(...)));

{
    let mut locked = cache.inner.lock().unwrap();
    locked.remove("key");
}

The outer Mutex provides mutability via runtime locking while keeping Rust’s compile-time borrow checker happy — because all operations happen through a lock guard that gives a unique mutable reference.


// Implement CacheAccessor which is required by FileMetadataCache
impl CacheAccessor<ObjectMeta, Arc<dyn datafusion::execution::cache::cache_manager::FileMetadata>> for MutexFileMetadataCache {
type Extra = ObjectMeta;

fn get(&self, k: &ObjectMeta) -> Option<Arc<dyn datafusion::execution::cache::cache_manager::FileMetadata>> {
self.inner.lock().unwrap().get(k)
}

fn get_with_extra(&self, k: &ObjectMeta, extra: &Self::Extra) -> Option<Arc<dyn datafusion::execution::cache::cache_manager::FileMetadata>> {
self.inner.lock().unwrap().get_with_extra(k, extra)
}

fn put(&self, k: &ObjectMeta, v: Arc<dyn datafusion::execution::cache::cache_manager::FileMetadata>) -> Option<Arc<dyn datafusion::execution::cache::cache_manager::FileMetadata>> {
self.inner.lock().unwrap().put(k, v)
}

fn put_with_extra(&self, k: &ObjectMeta, v: Arc<dyn datafusion::execution::cache::cache_manager::FileMetadata>, e: &Self::Extra) -> Option<Arc<dyn datafusion::execution::cache::cache_manager::FileMetadata>> {
self.inner.lock().unwrap().put_with_extra(k, v, e)
}

fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn datafusion::execution::cache::cache_manager::FileMetadata>> {
self.inner.lock().unwrap().remove(k)
}

fn contains_key(&self, k: &ObjectMeta) -> bool {
self.inner.lock().unwrap().contains_key(k)
}

fn len(&self) -> usize {
self.inner.lock().unwrap().len()
}

fn clear(&self) {
self.inner.lock().unwrap().clear()
}

fn name(&self) -> String {
self.inner.lock().unwrap().name()
}
}

impl FileMetadataCache for MutexFileMetadataCache {
fn cache_limit(&self) -> usize {
self.inner.lock().unwrap().cache_limit()
}

fn update_cache_limit(&self, limit: usize) {
self.inner.lock().unwrap().update_cache_limit(limit)
}

fn list_entries(&self) -> std::collections::HashMap<object_store::path::Path, datafusion::execution::cache::cache_manager::FileMetadataCacheEntry> {
self.inner.lock().unwrap().list_entries()
}
}
Loading
Loading