Skip to content

Commit 481409d

Browse files
authored
Enable Metadata Cache Complete Warm-up (#20117)
* Update MetadataCache Warm-up to include page-indexes * disable dynamic metadtaa cache limit updates
1 parent 3e0daa3 commit 481409d

File tree

5 files changed

+39
-45
lines changed

5 files changed

+39
-45
lines changed

plugins/engine-datafusion/jni/src/custom_cache_manager.rs

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ use std::sync::{Arc, Mutex};
22
use datafusion::execution::cache::cache_manager::{FileMetadataCache, CacheManagerConfig};
33
use datafusion::execution::cache::cache_unit::{DefaultFileStatisticsCache, DefaultFilesMetadataCache, DefaultListFilesCache};
44
use datafusion::execution::cache::CacheAccessor;
5+
use datafusion::datasource::physical_plan::parquet::metadata::DFParquetMetadata;
56
use tokio::runtime::Runtime;
67
use crate::cache::MutexFileMetadataCache;
7-
use crate::util::{create_object_meta_from_file, construct_file_metadata};
8+
use crate::util::{create_object_meta_from_file};
89

910
/// Custom CacheManager that holds cache references directly
1011
pub struct CustomCacheManager {
@@ -39,7 +40,8 @@ impl CustomCacheManager {
3940
let file_static_cache = Arc::new(DefaultFileStatisticsCache::default());
4041
// Add file metadata cache if available
4142
if let Some(cache) = self.get_file_metadata_cache_for_datafusion() {
42-
config = config.with_file_metadata_cache(Some(cache));
43+
config = config.with_file_metadata_cache(Some(cache.clone()))
44+
.with_metadata_cache_limit(cache.cache_limit());
4345
}
4446
config = config.with_files_statistics_cache(Some(file_static_cache.clone()));
4547
// Future: Add stats cache when implemented
@@ -256,31 +258,40 @@ impl CustomCacheManager {
256258

257259
let store = Arc::new(object_store::local::LocalFileSystem::new());
258260

259-
//TODO: Use TokioRuntimePtr to block on the async operation
260-
let metadata = Runtime::new()
261+
// Get cache reference for DataFusion metadata loading
262+
let cache_ref = self.file_metadata_cache.as_ref()
263+
.ok_or_else(|| "No file metadata cache configured".to_string())?;
264+
265+
let metadata_cache = cache_ref.clone() as Arc<dyn FileMetadataCache>;
266+
267+
// Use DataFusion's metadata loading by passing reference to file_metadata_cache to get complete metadata
268+
// IMPORTANT: When a cache is provided to DFParquetMetadata, fetch_metadata() will:
269+
// 1. Enable page index loading (with_page_indexes(true))
270+
// 2. Load the complete metadata including column and offset indexes
271+
// 3. Automatically put the metadata into the cache (lines 155-160 in datafusion's metadata.rs)
272+
// This ensures we cache exactly what DataFusion would cache during query execution
273+
let _parquet_metadata = Runtime::new()
261274
.map_err(|e| format!("Failed to create Tokio Runtime: {}", e))?
262275
.block_on(async {
263-
construct_file_metadata(store.as_ref(), object_meta, data_format)
264-
.await
265-
.map_err(|e| format!("Failed to construct file metadata: {}", e))
266-
})?;
276+
let df_metadata = DFParquetMetadata::new(store.as_ref(), object_meta)
277+
.with_file_metadata_cache(Some(metadata_cache));
267278

268-
// Get the cache directly from our stored reference
269-
let cache = self.file_metadata_cache.as_ref()
270-
.ok_or_else(|| "No file metadata cache configured".to_string())?;
271-
272-
match cache.inner.lock() {
273-
Ok(mut cache_guard) => {
274-
cache_guard.put(object_meta, metadata);
279+
// fetch_metadata() performs the cache put operation internally
280+
df_metadata.fetch_metadata().await
281+
.map_err(|e| format!("Failed to fetch metadata: {}", e))
282+
})?;
275283

284+
// Verify the metadata was cached properly
285+
match cache_ref.inner.lock() {
286+
Ok(cache_guard) => {
276287
if cache_guard.contains_key(object_meta) {
277288
Ok(true)
278289
} else {
279-
println!("Failed to cache metadata for: {}", file_path);
290+
println!("[CACHE ERROR] Failed to cache metadata for: {}", file_path);
280291
Ok(false)
281292
}
282293
}
283-
Err(e) => Err(format!("Cache put failed: {}", e))
294+
Err(e) => Err(format!("Failed to verify cache: {}", e))
284295
}
285296
}
286297
}

plugins/engine-datafusion/jni/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,6 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_createGlo
235235

236236
let runtime_env = RuntimeEnvBuilder::new().with_cache_manager(cache_manager_config)
237237
.with_memory_pool(memory_pool.clone())
238-
.with_metadata_cache_limit(250 * 1024 * 1024) // 250 MB
239238
.build().unwrap();
240239

241240
let runtime = DataFusionRuntime {

plugins/engine-datafusion/jni/src/query_executor.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,16 @@ pub async fn execute_query_with_cross_rt_stream(
6565

6666
let runtimeEnv = &runtime.runtime_env;
6767

68+
let file_metadata_cache = runtime.runtime_env.cache_manager.get_file_metadata_cache();
69+
6870
let runtime_env = match RuntimeEnvBuilder::from_runtime_env(runtimeEnv)
6971
.with_cache_manager(
7072
CacheManagerConfig::default()
7173
.with_list_files_cache(Some(list_file_cache.clone()))
72-
.with_file_metadata_cache(Some(runtimeEnv.cache_manager.get_file_metadata_cache()))
74+
.with_file_metadata_cache(Some(file_metadata_cache.clone()))
75+
.with_metadata_cache_limit(file_metadata_cache.cache_limit())
7376
.with_files_statistics_cache(runtimeEnv.cache_manager.get_file_statistic_cache()),
74-
)
75-
.with_metadata_cache_limit(250 * 1024 * 1024) // 250 MB
76-
.build() {
77+
).build() {
7778
Ok(env) => env,
7879
Err(e) => {
7980
error!("Failed to build runtime env: {}", e);
@@ -211,13 +212,15 @@ pub async fn execute_fetch_phase(
211212
let list_file_cache = Arc::new(DefaultListFilesCache::default());
212213
list_file_cache.put(table_path.prefix(), object_meta);
213214

215+
let file_metadata_cache = runtime.runtime_env.cache_manager.get_file_metadata_cache();
216+
214217
let runtime_env = RuntimeEnvBuilder::new()
215218
.with_cache_manager(
216219
CacheManagerConfig::default().with_list_files_cache(Some(list_file_cache))
217-
.with_file_metadata_cache(Some(runtime.runtime_env.cache_manager.get_file_metadata_cache()))
218-
.with_files_statistics_cache(runtime.runtime_env.cache_manager.get_file_statistic_cache()),
220+
.with_file_metadata_cache(Some(file_metadata_cache.clone()))
221+
.with_files_statistics_cache(runtime.runtime_env.cache_manager.get_file_statistic_cache())
222+
.with_metadata_cache_limit(file_metadata_cache.cache_limit()),
219223
)
220-
.with_metadata_cache_limit(250 * 1024 * 1024) // 250 MB
221224
.build()?;
222225
let ctx = SessionContext::new_with_config_rt(SessionConfig::new(), Arc::new(runtime_env));
223226

plugins/engine-datafusion/jni/src/util.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -209,25 +209,6 @@ pub fn create_object_meta_from_file(file_path: &str) -> Result<Vec<ObjectMeta>,
209209
Ok(vec![object_meta])
210210
}
211211

212-
pub async fn construct_file_metadata(
213-
store: &dyn ObjectStore,
214-
object_meta: &ObjectMeta,
215-
data_format: &str,
216-
) -> Result<Arc<dyn FileMetadata>, Box<dyn std::error::Error>> {
217-
match data_format.to_lowercase().as_str() {
218-
"parquet" => {
219-
let df_metadata = DFParquetMetadata::new(
220-
store,
221-
object_meta
222-
);
223-
224-
let parquet_metadata = df_metadata.fetch_metadata().await?;
225-
let par = CachedParquetMetaData::new(parquet_metadata);
226-
Ok(Arc::new(par))
227-
},
228-
_ => Err(format!("Unsupported data format: {}", data_format).into())
229-
}
230-
}
231212
/// Set success result by calling an ActionListener
232213
pub fn set_action_listener_ok(env: &mut JNIEnv, listener: JObject, value: jlong) {
233214
let long_obj = env.new_object("java/lang/Long", "(J)V", &[value.into()])

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CacheUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public static long createCacheConfig(ClusterSettings clusterSettings) {
105105
type.getEvictionType(clusterSettings));
106106

107107
NativeBridge.createCache(cacheManagerPtr, type.cacheTypeName, type.getSizeLimit(clusterSettings).getBytes(), type.getEvictionType(clusterSettings));
108-
clusterSettings.addSettingsUpdateConsumer(type.sizeLimitSetting,(v) -> NativeBridge.cacheManagerUpdateSizeLimitForCacheType(cacheManagerPtr, CacheType.METADATA.getCacheTypeName(),v.getBytes()));
108+
// clusterSettings.addSettingsUpdateConsumer(type.sizeLimitSetting,(v) -> NativeBridge.cacheManagerUpdateSizeLimitForCacheType(cacheManagerPtr, CacheType.METADATA.getCacheTypeName(),v.getBytes()));
109109
} else {
110110
logger.debug("Cache type {} is disabled", type.getCacheTypeName());
111111
}

0 commit comments

Comments
 (0)