diff --git a/plugins/engine-datafusion/jni/Cargo.toml b/plugins/engine-datafusion/jni/Cargo.toml index 4dbb5374f7443..5108e57fe4da7 100644 --- a/plugins/engine-datafusion/jni/Cargo.toml +++ b/plugins/engine-datafusion/jni/Cargo.toml @@ -11,21 +11,21 @@ crate-type = ["cdylib"] [dependencies] # DataFusion dependencies -datafusion = "49.0.0" -datafusion-expr = "49.0.0" -datafusion-datasource = "49.0.0" +datafusion = "50.0.0" +datafusion-expr = "50.0.0" +datafusion-datasource = "50.0.0" arrow-json = "55.2" -arrow = { version = "55.2", features = ["ffi", "ipc_compression"] } +arrow = { version = "56.2", features = ["ffi", "ipc_compression"] } #arrow = "55.2.0" -arrow-array = "55.2.0" -arrow-schema = "55.2.0" +arrow-array = "56.2.0" +arrow-schema = "56.2.0" arrow-buffer = "55.2.0" # JNI dependencies jni = "0.21" # Substrait support -datafusion-substrait = "49.0.0" +datafusion-substrait = "50.0.0" prost = "0.13" diff --git a/plugins/engine-datafusion/jni/src/cache.rs b/plugins/engine-datafusion/jni/src/cache.rs new file mode 100644 index 0000000000000..ec435bcf1c30e --- /dev/null +++ b/plugins/engine-datafusion/jni/src/cache.rs @@ -0,0 +1,76 @@ + +use std::sync::{Arc, Mutex}; + + +use datafusion::execution::cache::cache_manager::{FileMetadataCache}; +use datafusion::execution::cache::cache_unit::{DefaultFilesMetadataCache}; +use datafusion::execution::cache::CacheAccessor; +use object_store::ObjectMeta; + +// Wrapper to make Mutex implement FileMetadataCache +pub struct MutexFileMetadataCache { + pub inner: Mutex, +} + +impl MutexFileMetadataCache { + pub fn new(cache: DefaultFilesMetadataCache) -> Self { + Self { + inner: Mutex::new(cache), + } + } +} + +// Implement CacheAccessor which is required by FileMetadataCache +impl CacheAccessor> for MutexFileMetadataCache { + type Extra = ObjectMeta; + + fn get(&self, k: &ObjectMeta) -> Option> { + self.inner.lock().unwrap().get(k) + } + + fn get_with_extra(&self, k: &ObjectMeta, extra: &Self::Extra) -> Option> { + self.inner.lock().unwrap().get_with_extra(k, extra) + } + + fn put(&self, k: &ObjectMeta, v: Arc) -> Option> { + self.inner.lock().unwrap().put(k, v) + } + + fn put_with_extra(&self, k: &ObjectMeta, v: Arc, e: &Self::Extra) -> Option> { + self.inner.lock().unwrap().put_with_extra(k, v, e) + } + + fn remove(&mut self, k: &ObjectMeta) -> Option> { + self.inner.lock().unwrap().remove(k) + } + + fn contains_key(&self, k: &ObjectMeta) -> bool { + self.inner.lock().unwrap().contains_key(k) + } + + fn len(&self) -> usize { + self.inner.lock().unwrap().len() + } + + fn clear(&self) { + self.inner.lock().unwrap().clear() + } + + fn name(&self) -> String { + self.inner.lock().unwrap().name() + } +} + +impl FileMetadataCache for MutexFileMetadataCache { + fn cache_limit(&self) -> usize { + self.inner.lock().unwrap().cache_limit() + } + + fn update_cache_limit(&self, limit: usize) { + self.inner.lock().unwrap().update_cache_limit(limit) + } + + fn list_entries(&self) -> std::collections::HashMap { + self.inner.lock().unwrap().list_entries() + } +} diff --git a/plugins/engine-datafusion/jni/src/lib.rs b/plugins/engine-datafusion/jni/src/lib.rs index 7155154d14e75..8f65d1d573401 100644 --- a/plugins/engine-datafusion/jni/src/lib.rs +++ b/plugins/engine-datafusion/jni/src/lib.rs @@ -6,10 +6,11 @@ * compatible open source license. */ use std::ptr::addr_of_mut; +use datafusion_expr::expr_rewriter::unalias; use jni::objects::{JByteArray, JClass, JObject}; use jni::sys::{jbyteArray, jlong, jstring}; use jni::JNIEnv; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use arrow_array::{Array, StructArray}; use arrow_array::ffi::FFI_ArrowArray; use arrow_schema::DataType; @@ -18,16 +19,10 @@ use arrow_schema::ffi::FFI_ArrowSchema; mod util; mod row_id_optimizer; mod listing_table; +mod cache; use datafusion::execution::context::SessionContext; -use crate::util::{create_object_meta_from_filenames, parse_string_arr, set_object_result_error, set_object_result_ok}; -use datafusion::datasource::file_format::csv::CsvFormat; -use datafusion::datasource::listing::{ListingTableUrl}; -use datafusion::execution::cache::cache_manager::CacheManagerConfig; -use datafusion::execution::cache::cache_unit::DefaultListFilesCache; -use datafusion::execution::cache::CacheAccessor; -use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder}; use datafusion::prelude::SessionConfig; use datafusion::DATAFUSION_VERSION; use datafusion::datasource::file_format::parquet::ParquetFormat; @@ -41,6 +36,17 @@ use prost::Message; use tokio::runtime::Runtime; use crate::listing_table::{ListingOptions, ListingTable, ListingTableConfig}; use crate::row_id_optimizer::FilterRowIdOptimizer; +use crate::cache::{MutexFileMetadataCache}; +use crate::util::{construct_file_metadata, create_object_meta_from_file, create_object_meta_from_filenames, parse_string_arr, set_object_result_error, set_object_result_ok}; + +use datafusion::datasource::file_format::csv::CsvFormat; +use datafusion::datasource::listing::{ListingTableUrl}; +use datafusion::execution::cache::cache_manager::{self, CacheManagerConfig, FileMetadataCache}; +use datafusion::datasource::file_format::file_compression_type::FileCompressionType; +use datafusion::datasource::physical_plan::FileMeta; +use datafusion::execution::cache::cache_unit::{DefaultFilesMetadataCache, DefaultListFilesCache}; +use datafusion::execution::cache::CacheAccessor; +use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder}; /// Create a new DataFusion session context #[no_mangle] @@ -121,6 +127,25 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createG ctx } +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createGlobalRuntimev1( + _env: JNIEnv, + _class: JClass, + cache_config_ptr: jlong, +) -> jlong { + // Take ownership of the CacheManagerConfig + let cache_manager_config = unsafe { Box::from_raw(cache_config_ptr as *mut CacheManagerConfig) }; + + // Create RuntimeEnv with the configured cache manager + let runtime_env = RuntimeEnvBuilder::default() + .with_cache_manager(*cache_manager_config) + .build() + .unwrap(); + + let ptr =Box::into_raw(Box::new(runtime_env)) as jlong; + ptr +} + #[no_mangle] pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createSessionContext( _env: JNIEnv, @@ -193,7 +218,6 @@ impl ShardView { } } - #[no_mangle] pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_executeSubstraitQuery( mut env: JNIEnv, @@ -304,6 +328,112 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute }) } +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_executeSubstraitQueryv1( + mut env: JNIEnv, + _class: JClass, + shard_view_ptr: jlong, + substrait_bytes: jbyteArray, + tokio_runtime_env_ptr: jlong, + global_runtime_env_ptr: jlong, + // callback: JObject, +) -> jlong { + let shard_view = unsafe { &*(shard_view_ptr as *const ShardView) }; + let runtime_ptr = unsafe { &*(tokio_runtime_env_ptr as *const Runtime)}; + let runtime_env = unsafe { &*(global_runtime_env_ptr as *const RuntimeEnv) }; + + let table_path = shard_view.table_path(); + let files_meta = shard_view.files_meta(); + + println!("Table path: {}", table_path); + println!("Files: {:?}", files_meta); + + // TODO: get config from CSV DataFormat + let mut config = SessionConfig::new(); + // config.options_mut().execution.parquet.pushdown_filters = true; + + let state = datafusion::execution::SessionStateBuilder::new() + .with_config(config) + .with_runtime_env(Arc::new(runtime_env.clone())) + .with_default_features() + // .with_optimizer_rule(Arc::new(OptimizeRowId)) + // .with_physical_optimizer_rule(Arc::new(FilterRowIdOptimizer)) // TODO: enable only for query phase + .build(); + + let ctx = SessionContext::new_with_state(state); + + // Create default parquet options + let file_format = ParquetFormat::new(); + let listing_options = ListingOptions::new(Arc::new(file_format)) + .with_file_extension(".parquet"); // TODO: take this as parameter + // .with_table_partition_cols(vec![("row_base".to_string(), DataType::Int32)]); // TODO: enable only for query phase + + // Ideally the executor will give this + runtime_ptr.block_on(async { + let resolved_schema = listing_options + .infer_schema(&ctx.state(), &table_path.clone()) + .await.unwrap(); + + + let config = ListingTableConfig::new(table_path.clone()) + .with_listing_options(listing_options) + .with_schema(resolved_schema); + + // Create a new TableProvider + let provider = Arc::new(ListingTable::try_new(config).unwrap()); + let shard_id = table_path.prefix().filename().expect("error in fetching Path"); + ctx.register_table("index-7", provider) + .expect("Failed to attach the Table"); + + }); + + // TODO : how to close ctx ? + // Convert Java byte array to Rust Vec + let plan_bytes_obj = unsafe { JByteArray::from_raw(substrait_bytes) }; + let plan_bytes_vec = match env.convert_byte_array(plan_bytes_obj) { + Ok(bytes) => bytes, + Err(e) => { + let error_msg = format!("Failed to convert plan bytes: {}", e); + env.throw_new("java/lang/Exception", error_msg); + return 0; + } + }; + + let substrait_plan = match Plan::decode(plan_bytes_vec.as_slice()) { + Ok(plan) => { + println!("SUBSTRAIT rust: Decoding is successful, Plan has {} relations", plan.relations.len()); + plan + }, + Err(e) => { + return 0; + } + }; + + //let runtime = unsafe { &mut *(runtime_ptr as *mut Runtime) }; + runtime_ptr.block_on(async { + + let logical_plan = match from_substrait_plan(&ctx.state(), &substrait_plan).await { + Ok(plan) => { + println!("SUBSTRAIT Rust: LogicalPlan: {:?}", plan); + plan + }, + Err(e) => { + println!("SUBSTRAIT Rust: Failed to convert Substrait plan: {}", e); + return 0; + } + }; + + let dataframe = ctx.execute_logical_plan(logical_plan).await.unwrap(); + let stream = dataframe.execute_stream().await.unwrap(); + let stream_ptr = Box::into_raw(Box::new(stream)) as jlong; + + stream_ptr + + }) +} + + + // If we need to create session context separately #[no_mangle] pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_nativeCreateSessionContext( @@ -432,3 +562,240 @@ pub extern "system" fn Java_org_opensearch_datafusion_RecordBatchStream_getSchem } } } + +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_initCacheManagerConfig( + _env: JNIEnv, + _class: JClass, +) -> jlong { + let config = CacheManagerConfig::default(); + Box::into_raw(Box::new(config)) as jlong +} + + +/// Create a metadata cache and add it to the CacheManagerConfig +/// The config_ptr remains the same, only the contents are updated +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createMetadataCache( + _env: JNIEnv, + _class: JClass, + config_ptr: jlong, + size_limit: jlong, +) -> jlong { + // Create cache with wrapper that implements FileMetadataCache + let inner_cache = DefaultFilesMetadataCache::new(size_limit.try_into().unwrap()); + let wrapped_cache = Arc::new(MutexFileMetadataCache::new(inner_cache)); + + // Update the CacheManagerConfig at the same memory location + if config_ptr != 0 { + let cache_manager_config = unsafe { &mut *(config_ptr as *mut CacheManagerConfig) }; + *cache_manager_config = cache_manager_config.clone() + .with_file_metadata_cache(Some(wrapped_cache.clone())); + } + + // Return the Arc pointer for JNI operations + Box::into_raw(Box::new(wrapped_cache)) as jlong +} + +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_metadataCachePut( + mut env: JNIEnv, + _class: JClass, + cache_ptr: jlong, + file_path: JString, +) -> bool { + let file_path: String = match env.get_string(&file_path) { + Ok(s) => s.into(), + Err(_) => return false + }; + let cache = unsafe { &*(cache_ptr as *const Arc) }; + let data_format = if file_path.to_lowercase().ends_with(".parquet") { + "parquet" + } else { + return false; // Skip unsupported formats + }; + + let object_meta = create_object_meta_from_file(&file_path); + let store = Arc::new(object_store::local::LocalFileSystem::new()); + + // Use Runtime to block on the async operation + let metadata = Runtime::new() + .expect("Failed to create Tokio Runtime") + .block_on(async { + construct_file_metadata(store.as_ref(), &object_meta, data_format) + .await + .expect("Failed to construct file metadata") + }); + + let metadata = cache.put(&object_meta, metadata); + + if metadata.is_none() { + println!("Failed to cache metadata for: {}", file_path); + return false; + } + + println!("Cached metadata for: {}", file_path); + true +} + +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_metadataCacheRemove( + mut env: JNIEnv, + _class: JClass, + cache_ptr: jlong, + file_path: JString, +) -> bool { + let file_path: String = match env.get_string(&file_path) { + Ok(s) => s.into(), + Err(_) => return false, + }; + let cache = unsafe { &*(cache_ptr as *const Arc) }; + let object_meta = create_object_meta_from_file(&file_path); + + // Lock the mutex and remove + if let Some(_cache_obj) = cache.inner.lock().unwrap().remove(&object_meta) { + println!("Cache removed for: {}", file_path); + true + } else { + println!("Item not found in cache: {}", file_path); + false + } +} + +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_metadataCacheGet( + mut env: JNIEnv, + _class: JClass, + cache_ptr: jlong, + file_path: JString, +) -> bool { + let file_path: String = match env.get_string(&file_path) { + Ok(s) => s.into(), + Err(_) => return false, + }; + + let cache = unsafe { &*(cache_ptr as *const Arc) }; + let object_meta = create_object_meta_from_file(&file_path); + + match cache.get(&object_meta) { + Some(metadata) => { + println!("Retrieved metadata for: {} - size: {:?}", file_path, metadata.memory_size()); + true + }, + None => { + println!("No metadata found for: {}", file_path); + false + }, + } +} + +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_metadataCacheGetEntries( + mut env: JNIEnv, + _class: JClass, + cache_ptr: jlong, +) -> jni::sys::jobjectArray { + let cache = unsafe { &*(cache_ptr as *const Arc) }; + + // Get all entries from the cache + let entries = cache.list_entries(); + + println!("Retrieved {} cache entries", entries.len()); + + // Create String array class + let string_class = match env.find_class("java/lang/String") { + Ok(cls) => cls, + Err(e) => { + println!("Failed to find String class: {:?}", e); + return std::ptr::null_mut(); + } + }; + + // Create array with size = number of entries * 3 (path, size, hit_count for each entry) + let array_size = (entries.len() * 3) as i32; + let result_array = match env.new_object_array(array_size, &string_class, JObject::null()) { + Ok(arr) => arr, + Err(e) => { + println!("Failed to create object array: {:?}", e); + return std::ptr::null_mut(); + } + }; + + // Fill the array with entries + let mut index = 0; + for (path, entry) in entries.iter() { + // Add path + if let Ok(path_str) = env.new_string(path.as_ref()) { + let _ = env.set_object_array_element(&result_array, index, path_str); + } + index += 1; + + // Add size + if let Ok(size_str) = env.new_string(entry.size_bytes.to_string()) { + let _ = env.set_object_array_element(&result_array, index, size_str); + } + index += 1; + + // Add hit_count + if let Ok(hit_count_str) = env.new_string(entry.hits.to_string()) { + let _ = env.set_object_array_element(&result_array, index, hit_count_str); + } + index += 1; + } + + result_array.as_raw() +} + +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_metadataCacheGetSize( + mut env: JNIEnv, + _class: JClass, + cache_ptr: jlong +) -> usize { + let cache = unsafe { &*(cache_ptr as *const Arc) }; + cache.inner.lock().unwrap().memory_used() +} + +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_metadataCacheContainsFile( + mut env: JNIEnv, + _class: JClass, + cache_ptr: jlong, + file_path: JString +) -> bool { + let file_path: String = match env.get_string(&file_path) { + Ok(s) => s.into(), + Err(_) => return false + }; + let cache = unsafe { &*(cache_ptr as *const Arc) }; + let object_meta = create_object_meta_from_file(&file_path); + cache.inner.lock().unwrap().contains_key(&object_meta) +} + +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_metadataCacheUpdateSizeLimit( + env: JNIEnv, + _class: JClass, + cache_ptr: jlong, + new_size_limit: usize +) -> bool { + let cache = unsafe { &*(cache_ptr as *const Arc) }; + cache.inner.lock().unwrap().update_cache_limit(new_size_limit); + if cache.inner.lock().unwrap().cache_limit() == new_size_limit { + println!("Cache size limit updated to: {}", new_size_limit); + true + } else { + println!("Failed to update cache size limit to: {}", new_size_limit); + false + } +} + +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_metadataCacheClear( + env: JNIEnv, + _class: JClass, + cache_ptr: jlong +) { + let cache = unsafe { &*(cache_ptr as *const Arc) }; + cache.inner.lock().unwrap().clear(); +} diff --git a/plugins/engine-datafusion/jni/src/listing_table.rs b/plugins/engine-datafusion/jni/src/listing_table.rs index a28a6292ec3c1..b10e67325136b 100644 --- a/plugins/engine-datafusion/jni/src/listing_table.rs +++ b/plugins/engine-datafusion/jni/src/listing_table.rs @@ -53,7 +53,7 @@ use datafusion::execution::{ use datafusion_expr::{ dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType, }; -use datafusion::physical_expr::schema_rewriter::PhysicalExprAdapterFactory; +use datafusion::physical_expr_adapter::schema_rewriter::PhysicalExprAdapterFactory; use datafusion::physical_expr_common::sort_expr::LexOrdering; use datafusion::physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics}; use futures::{future, stream, Stream, StreamExt, TryStreamExt}; diff --git a/plugins/engine-datafusion/jni/src/util.rs b/plugins/engine-datafusion/jni/src/util.rs index 1b6da12c49ede..e63ef226e5102 100644 --- a/plugins/engine-datafusion/jni/src/util.rs +++ b/plugins/engine-datafusion/jni/src/util.rs @@ -8,10 +8,14 @@ use datafusion::arrow::array::RecordBatch; use jni::objects::{JObject, JObjectArray, JString}; use jni::sys::jlong; use jni::JNIEnv; -use object_store::{path::Path as ObjectPath, ObjectMeta}; +use object_store::{path::Path as ObjectPath, ObjectMeta, ObjectStore}; use std::collections::HashMap; use std::error::Error; use std::fs; +use std::sync::Arc; +use datafusion::datasource::physical_plan::parquet::CachedParquetMetaData; +use datafusion::datasource::physical_plan::parquet::metadata::DFParquetMetadata; +use datafusion::execution::cache::cache_manager::FileMetadata; /// Set error message from a result using a Consumer Java callback pub fn set_error_message_batch(env: &mut JNIEnv, callback: JObject, result: Result, Err>) { @@ -160,7 +164,6 @@ pub fn throw_exception(env: &mut JNIEnv, message: &str) { pub fn create_object_meta_from_filenames(base_path: &str, filenames: Vec) -> Vec { filenames.into_iter().map(|filename| { let filename = filename.as_str(); - // Handle both full paths and relative filenames let full_path = if filename.starts_with('/') || filename.contains(base_path) { // Already a full path @@ -168,20 +171,42 @@ pub fn create_object_meta_from_filenames(base_path: &str, filenames: Vec } else { // Just a filename, needs base_path format!("{}/{}", base_path.trim_end_matches('/'), filename) - }; + }; create_object_meta_from_file(&full_path) + }).collect() +} - let file_size = fs::metadata(&full_path).map(|m| m.len()).unwrap_or(0); - let modified = fs::metadata(&full_path) - .and_then(|m| m.modified()) - .map(|t| DateTime::::from(t)) - .unwrap_or_else(|_| Utc::now()); +pub fn create_object_meta_from_file(file_path: &str) -> ObjectMeta { + let file_size = fs::metadata(&file_path).map(|m| m.len()).unwrap_or(0); + let modified = fs::metadata(&file_path) + .and_then(|m| m.modified()) + .map(|t| DateTime::::from(t)) + .unwrap_or_else(|_| Utc::now()); ObjectMeta { - location: ObjectPath::from(full_path), + location: ObjectPath::from(file_path), last_modified: modified, size: file_size, e_tag: None, version: None, } - }).collect() +} + +pub async fn construct_file_metadata( + store: &dyn ObjectStore, + object_meta: &ObjectMeta, + data_format: &str, +) -> Result, Box> { + match data_format.to_lowercase().as_str() { + "parquet" => { + let df_metadata = DFParquetMetadata::new( + store, + object_meta + ); + + let parquet_metadata = df_metadata.fetch_metadata().await?; + let par = CachedParquetMetaData::new(parquet_metadata); + Ok(Arc::new(par)) + }, + _ => Err(format!("Unsupported data format: {}", data_format).into()) + } } diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java index 45a2da3e6afa3..9f570b7489e0b 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionPlugin.java @@ -8,11 +8,17 @@ package org.opensearch.datafusion; +import java.util.Arrays; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; @@ -24,6 +30,7 @@ import org.opensearch.datafusion.search.DatafusionQuery; import org.opensearch.datafusion.search.DatafusionReaderManager; import org.opensearch.datafusion.search.DatafusionSearcher; +import org.opensearch.datafusion.search.cache.CacheSettings; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.shard.ShardPath; @@ -51,6 +58,10 @@ import java.util.Map; import java.util.function.Supplier; +import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_ENABLED; +import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_EVICTION_TYPE; +import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_SIZE_LIMIT; + /** * Main plugin class for OpenSearch DataFusion integration. * @@ -60,6 +71,8 @@ public class DataFusionPlugin extends Plugin implements ActionPlugin, SearchEngi private DataFusionService dataFusionService; private final boolean isDataFusionEnabled; + private static final Logger logger = LogManager.getLogger(DataFusionPlugin.class); + /** * Constructor for DataFusionPlugin. * @param settings The settings for the DataFusionPlugin. @@ -103,7 +116,7 @@ public Collection createComponents( if (!isDataFusionEnabled) { return Collections.emptyList(); } - dataFusionService = new DataFusionService(dataSourceCodecs); + dataFusionService = new DataFusionService(dataSourceCodecs, clusterService.getClusterSettings()); for(DataFormat format : this.getSupportedFormats()) { dataSourceCodecs.get(format); @@ -167,4 +180,14 @@ public List getRestHandlers( } return List.of(new ActionHandler<>(NodesDataFusionInfoAction.INSTANCE, TransportNodesDataFusionInfoAction.class)); } + + @Override + public List> getSettings() { + return Stream.of( + CacheSettings.CACHE_SETTINGS, + CacheSettings.CACHE_ENABLED) + .flatMap(x -> x.stream()) + .collect(Collectors.toList()); + + } } diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionQueryJNI.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionQueryJNI.java index 6cdc09bd040f7..af3de03838a53 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionQueryJNI.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionQueryJNI.java @@ -79,6 +79,8 @@ private static synchronized void loadNativeLibrary() { */ public static native long createGlobalRuntime(); + public static native long createGlobalRuntimev1(long cacheManagerPtr); + public static native long createTokioRuntime(); /** @@ -113,7 +115,9 @@ private static synchronized void loadNativeLibrary() { * @param substraitPlan the serialized Substrait query plan * @return stream pointer for result iteration */ - public static native long executeSubstraitQuery(long cachePtr, byte[] substraitPlan, long runtimePtr); + // public static native long executeSubstraitQuery(long cachePtr, byte[] substraitPlan, long runtimePtr); + + public static native long executeSubstraitQuery(long cachePtr, byte[] substraitPlan, long tokioRuntimePtr, long runtimePtr); public static native long createDatafusionReader(String path, String[] files); @@ -148,4 +152,78 @@ private static synchronized void loadNativeLibrary() { * @param streamPtr the stream pointer to close */ public static native void closeStream(long streamPtr); + + public static native long initCacheManagerConfig(); + + // METADATA cache specific methods + + /** + * Create metadata cache + * @param cacheConfigPtr cache configuration pointer + * @param sizeLimit sizeLimit for cache + * @return cache pointer + */ + public static native long createMetadataCache(long cacheConfigPtr, long sizeLimit); + + /** + * Put metadata entry into cache + * @param cachePtr the cache pointer + * @param filePath the file path + * @return status code + */ + public static native boolean metadataCachePut(long cachePtr, String filePath); + + /** + * Get metadata from cache + * @param cachePtr the cache pointer + * @param filePath the file path + * @return cached metadata or null if not found + */ + // return type boolean to be changed + public static native boolean metadataCacheGet(long cachePtr, String filePath); + + /** + * Remove metadata from cache + * @param cachePtr the cache pointer + * @param filePath the file path + * @return status code + */ + public static native boolean metadataCacheRemove(long cachePtr, String filePath); + + /** + * Memory consumed by metadataCache + * @param cachePtr the cache pointer + * @return memory used + */ + public static native long metadataCacheGetSize(long cachePtr); + + /** + * Memory consumed by metadataCache + * @param cachePtr the cache pointer + * @param newSizeLimit new size limit + * @return boolean + */ + public static native boolean metadataCacheUpdateSizeLimit(long cachePtr, long newSizeLimit); + + /** + * Check if a file exists in metadataCache + * @param cachePtr the cache pointer + * @param filePath the file path + * @return boolean + */ + public static native boolean metadataCacheContainsFile(long cachePtr, String filePath); + + /** + * Get all entries from the metadata cache + * @param cachePtr the cache pointer + * @return String array containing cache entries in triplets: [path1, size1, hitCount1, path2, size2, hitCount2, ...] + * Each entry consists of 3 consecutive elements: file path, size in bytes, and hit count + */ + public static native String[] metadataCacheGetEntries(long cachePtr); + + /** + * Clears all entries from the metadata cache + * @param cachePtr the cache pointer + */ + public static native void metadataCacheClear(long cachePtr); } diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionService.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionService.java index 9548ced599723..68b9fcd11095b 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionService.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionService.java @@ -11,9 +11,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.concurrent.ConcurrentMapLong; import org.opensearch.datafusion.core.GlobalRuntimeEnv; +import org.opensearch.datafusion.search.cache.CacheManager; import org.opensearch.vectorized.execution.search.DataFormat; import org.opensearch.vectorized.execution.search.spi.DataSourceCodec; import org.opensearch.vectorized.execution.search.spi.RecordBatchStream; @@ -31,17 +33,26 @@ public class DataFusionService extends AbstractLifecycleComponent { private final ConcurrentMapLong sessionEngines = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); private final DataSourceRegistry dataSourceRegistry; + private final Long tokioRuntimePtr; private final GlobalRuntimeEnv globalRuntimeEnv; + private CacheManager cacheManager; /** * Creates a new DataFusion service instance. */ - public DataFusionService(Map dataSourceCodecs) { + + public DataFusionService(Map dataSourceCodecs, ClusterSettings clusterSettings) { this.dataSourceRegistry = new DataSourceRegistry(dataSourceCodecs); // to verify jni String version = DataFusionQueryJNI.getVersionInfo(); - this.globalRuntimeEnv = new GlobalRuntimeEnv(); + this.tokioRuntimePtr = DataFusionQueryJNI.createTokioRuntime(); + this.globalRuntimeEnv = new GlobalRuntimeEnv(clusterSettings); + this.cacheManager = globalRuntimeEnv.getCacheManager(); + } + + public Long getTokioRuntimePointer() { + return tokioRuntimePtr; } @Override @@ -165,9 +176,9 @@ public long getRuntimePointer() { return globalRuntimeEnv.getPointer(); } - public long getTokioRuntimePointer() { - return globalRuntimeEnv.getTokioRuntimePtr(); - } + //public long getTokioRuntimePointer() { + // return globalRuntimeEnv.getTokioRuntimePtr(); + // } /** * Close the session context and clean up resources @@ -207,4 +218,8 @@ public String getVersion() { version.append("]}"); return version.toString(); } + + public CacheManager getCacheManager() { + return cacheManager; + } } diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java index 15ee31ca7663a..1aa39129bd207 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java @@ -17,6 +17,7 @@ import org.opensearch.action.search.SearchShardTask; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; +import java.util.List; import org.opensearch.datafusion.core.DefaultRecordBatchStream; import org.opensearch.datafusion.search.DatafusionContext; import org.opensearch.datafusion.search.DatafusionQuery; @@ -25,6 +26,8 @@ import org.opensearch.datafusion.search.DatafusionReaderManager; import org.opensearch.datafusion.search.DatafusionSearcher; import org.opensearch.datafusion.search.DatafusionSearcherSupplier; +import org.opensearch.datafusion.search.cache.CacheManager; +import org.opensearch.datafusion.search.cache.CacheType; import org.opensearch.index.engine.CatalogSnapshotAwareRefreshListener; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.EngineException; @@ -58,12 +61,17 @@ public class DatafusionEngine extends SearchExecEngine formatCatalogSnapshot, DataFusionService dataFusionService, ShardPath shardPath) throws IOException { this.dataFormat = dataFormat; - this.datafusionReaderManager = new DatafusionReaderManager(shardPath.getDataPath().toString(), formatCatalogSnapshot, dataFormat.getName()); this.datafusionService = dataFusionService; + this.cacheManager = datafusionService.getCacheManager(); + datafusionReaderManager.setOnFilesAdded(files -> { + // Handle new files added during refresh + cacheManager.addToCache(files); + }); } @Override @@ -99,12 +107,14 @@ public EngineSearcherSupplier acquireSearcherSupplier(Functi searcher = new DatafusionSearcherSupplier(null) { @Override protected DatafusionSearcher acquireSearcherInternal(String source) { - return new DatafusionSearcher(source, reader, () -> {}); + return new DatafusionSearcher(source, reader, datafusionService.getTokioRuntimePointer(), + datafusionService.getRuntimePointer(), () -> {}); } @Override protected void doClose() { try { + cacheManager.removeFilesByDirectory(reader.directoryPath); reader.decRef(); } catch (IOException e) { throw new UncheckedIOException(e); @@ -112,6 +122,7 @@ protected void doClose() { } }; } catch (Exception ex) { + logger.error("Failed to acquire searcher {}", ex.toString(), ex); // TODO } return searcher; @@ -137,6 +148,8 @@ public DatafusionSearcher acquireSearcher(String source, Engine.SearcherScope sc return new DatafusionSearcher( source, searcher.getReader(), + datafusionService.getTokioRuntimePointer(), + datafusionService.getRuntimePointer(), () -> Releasables.close(searcher, searcherSupplier) ); } finally { @@ -164,7 +177,7 @@ public Map execute(DatafusionContext context) { Map finalRes = new HashMap<>(); try { DatafusionSearcher datafusionSearcher = context.getEngineSearcher(); - long streamPointer = datafusionSearcher.search(context.getDatafusionQuery(), datafusionService.getTokioRuntimePointer()); + long streamPointer = datafusionSearcher.search(context.getDatafusionQuery()); RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); RecordBatchStream stream = new RecordBatchStream(streamPointer, datafusionService.getTokioRuntimePointer() , allocator); diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/GlobalRuntimeEnv.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/GlobalRuntimeEnv.java index 547539d5ff4d1..3428f200abfa2 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/GlobalRuntimeEnv.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/GlobalRuntimeEnv.java @@ -8,8 +8,12 @@ package org.opensearch.datafusion.core; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.datafusion.search.cache.CacheManager; + import static org.opensearch.datafusion.DataFusionQueryJNI.closeGlobalRuntime; import static org.opensearch.datafusion.DataFusionQueryJNI.createGlobalRuntime; +import static org.opensearch.datafusion.DataFusionQueryJNI.createGlobalRuntimev1; import static org.opensearch.datafusion.DataFusionQueryJNI.createTokioRuntime; /** @@ -20,6 +24,8 @@ public class GlobalRuntimeEnv implements AutoCloseable { // ptr to runtime environment in df private final long ptr; private final long tokio_runtime_ptr; + private CacheManager cacheManager; + /** * Creates a new global runtime environment. @@ -27,6 +33,13 @@ public class GlobalRuntimeEnv implements AutoCloseable { public GlobalRuntimeEnv() { this.ptr = createGlobalRuntime(); this.tokio_runtime_ptr = createTokioRuntime(); + this.cacheManager = null; + } + + public GlobalRuntimeEnv(ClusterSettings clusterSettings) { + this.cacheManager = CacheManager.fromConfig(clusterSettings); + this.ptr = createGlobalRuntimev1(cacheManager.getCacheManagerPtr()); + this.tokio_runtime_ptr = createTokioRuntime(); } /** @@ -45,4 +58,8 @@ public long getTokioRuntimePtr() { public void close() { closeGlobalRuntime(this.ptr); } + + public CacheManager getCacheManager() { + return cacheManager; + } } diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java index 192be34625bd2..3bcf6f871cbc0 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java @@ -8,11 +8,16 @@ package org.opensearch.datafusion.search; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.function.Consumer; import org.apache.lucene.search.ReferenceManager; import org.opensearch.index.engine.CatalogSnapshotAwareRefreshListener; import org.opensearch.index.engine.EngineReaderManager; import org.opensearch.index.engine.exec.DataFormat; import org.opensearch.index.engine.exec.FileMetadata; +import org.opensearch.index.engine.exec.WriterFileSet; import org.opensearch.index.engine.exec.coord.CatalogSnapshot; import java.io.IOException; @@ -27,6 +32,7 @@ public class DatafusionReaderManager implements EngineReaderManager> onFilesAdded; // private final Lock refreshLock = new ReentrantLock(); // private final List refreshListeners = new CopyOnWriteArrayList(); @@ -36,6 +42,13 @@ public DatafusionReaderManager(String path, Collection files, Stri this.dataFormat = dataFormat; } + /** + * Set callback for when files are added during refresh + */ + public void setOnFilesAdded(Consumer> onFilesAdded) { + this.onFilesAdded = onFilesAdded; + } + @Override public DatafusionReader acquire() throws IOException { if (current == null) { @@ -61,11 +74,40 @@ public void beforeRefresh() throws IOException { public void afterRefresh(boolean didRefresh, CatalogSnapshot catalogSnapshot) throws IOException { if (didRefresh && catalogSnapshot != null) { DatafusionReader old = this.current; + Collection newFiles = catalogSnapshot.getSearchableFiles(dataFormat); if(old !=null) { release(old); + processFileChanges(old.files, newFiles); + } else { + processFileChanges(List.of(), newFiles); } - this.current = new DatafusionReader(this.path, catalogSnapshot.getSearchableFiles(dataFormat)); + this.current = new DatafusionReader(this.path, newFiles); this.current.incRef(); } } + + private void processFileChanges(Collection oldFiles, Collection newFiles) { + Set oldFilePaths = extractFilePaths(oldFiles); + Set newFilePaths = extractFilePaths(newFiles); + + Set filesToAdd = new HashSet<>(newFilePaths); + filesToAdd.removeAll(oldFilePaths); + + // TODO: Either remove files periodically or let eviction handle stale files + Set filesToRemove = new HashSet<>(oldFilePaths); + filesToRemove.removeAll(newFilePaths); + + if (!filesToAdd.isEmpty() && onFilesAdded != null) { + onFilesAdded.accept(List.copyOf(filesToAdd)); + } + } + + private Set extractFilePaths(Collection files) { + String[] fileNames = files.stream() + .flatMap(writerFileSet -> writerFileSet.getFiles().stream()) + .toArray(String[]::new); + Set paths = new HashSet<>(); + paths.addAll(Arrays.asList(fileNames)); + return paths; + } } diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java index 53b1db13bdd12..d53cdcec4ae51 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java @@ -25,10 +25,15 @@ public class DatafusionSearcher implements EngineSearcher { private final String source; private DatafusionReader reader; + private Long tokioRuntimePtr; + private Long globalRuntimeEnvId; private Closeable closeable; - public DatafusionSearcher(String source, DatafusionReader reader, Closeable close) { + + public DatafusionSearcher(String source, DatafusionReader reader, Long tokioRuntimePtr, Long globalRuntimeEnvId, Closeable close) { this.source = source; this.reader = reader; + this.tokioRuntimePtr = tokioRuntimePtr; + this.globalRuntimeEnvId = globalRuntimeEnvId; } @Override @@ -40,7 +45,7 @@ public String source() { public void search(DatafusionQuery datafusionQuery, List> collectors) throws IOException { // TODO : call search here to native // TODO : change RunTimePtr - long nativeStreamPtr = DataFusionQueryJNI.executeSubstraitQuery(reader.getCachePtr(), datafusionQuery.getSubstraitBytes(), 0); + long nativeStreamPtr = DataFusionQueryJNI.executeSubstraitQuery(reader.getCachePtr(), datafusionQuery.getSubstraitBytes(), tokioRuntimePtr, globalRuntimeEnvId); RecordBatchStream stream = new DefaultRecordBatchStream(nativeStreamPtr); while(stream.hasNext()) { for(SearchResultsCollector collector : collectors) { @@ -50,8 +55,8 @@ public void search(DatafusionQuery datafusionQuery, List properties; + protected long pointer; + protected long sizeLimit; + + public void setSizeLimit(ByteSizeValue sizeLimit) { + this.sizeLimit = sizeLimit.getBytes(); + } + + public long getPointer() { + return pointer; + } + + public long getConfiguredSizeLimit() { + return this.sizeLimit; + } + + public String getName() { + return name.toString(); + } + + public CacheAccessor(long cacheManagerPointer, ClusterSettings cacheSettings, CacheType name) { + this.properties = extractSettings(cacheSettings); + this.pointer = createCache(cacheManagerPointer, properties); + this.name = name; + } + + // Abstract method - subclasses define what settings to extract + protected abstract Map extractSettings(ClusterSettings clusterSettings); + public abstract long createCache(long cacheManagerPointer, Map properties); + + // Instance methods matching native signatures + public abstract boolean put(String filePath); + public abstract Object get(String filePath); + public abstract boolean remove(String filePath); + public abstract void evict(); + public abstract void clear(); + public abstract long getMemoryConsumed(); + public abstract boolean containsFile(String filePath); + public abstract List getEntries(); +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CacheManager.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CacheManager.java new file mode 100644 index 0000000000000..78bca9d942efe --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CacheManager.java @@ -0,0 +1,117 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.search.cache; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.opensearch.common.settings.ClusterSettings; + +import static org.opensearch.datafusion.DataFusionQueryJNI.initCacheManagerConfig; + + +public class CacheManager { + + private Map caches; + private long cacheManagerPtr; + private long totalSizeLimit; + + public long getCacheManagerPtr() { + return this.cacheManagerPtr; + } + + public long getTotalSizeLimit() { + return totalSizeLimit; + } + + public CacheManager(long cacheManagerPtr, Map cacheMap) { + this.caches = new HashMap<>(cacheMap); + this.cacheManagerPtr = cacheManagerPtr; + this.totalSizeLimit = caches.values().stream() + .mapToLong(CacheAccessor::getConfiguredSizeLimit) + .sum(); + } + + // Factory method to create CacheManager from config + public static CacheManager fromConfig(ClusterSettings clusterSettings) { + long cacheManagerPtr = initCacheManagerConfig(); + Map cacheMap = new HashMap<>(); + for (CacheType type : CacheType.values()) { + if (type.isEnabled(clusterSettings)) { + cacheMap.put(type, type.createCache(cacheManagerPtr, clusterSettings)); + } + } + return new CacheManager(cacheManagerPtr, cacheMap); + } + + public CacheAccessor getCacheAccessor(CacheType cacheType) { + return caches.get(cacheType); + } + + public List getAllCaches() { + return new ArrayList<>(caches.values()); + } + + public List getCachesByType(CacheType... types) { + return caches.entrySet().stream() + .filter(entry -> List.of(types).contains(entry.getKey())) + .map(Map.Entry::getValue) + .collect(Collectors.toList()); + } + + public boolean removeFiles(List files) { + boolean allSuccessful = true; + for (CacheAccessor cache : getAllCaches()) { + for (String filename : files) { + allSuccessful &= cache.remove(filename); + } + } + + + return allSuccessful; + } + + public boolean removeFilesByDirectory(String path) { + boolean allSuccessful = true; + for (CacheAccessor cache : getAllCaches()) { + allSuccessful &= cache.remove(path); + } + return allSuccessful; + } + + public boolean addToCache(List files) { + boolean allSuccessful = true; + for (CacheAccessor cache : getAllCaches()) { + for (String filename : files) { + allSuccessful &= cache.put(filename); + } + } + return allSuccessful; + } + + public void resetCache() { + getAllCaches().forEach(CacheAccessor::clear); + } + + public long getTotalUsedBytes() { + return getAllCaches().stream().mapToLong(CacheAccessor::getMemoryConsumed).sum(); + } + + public boolean withinCacheLimit(CacheType cacheType) { + CacheAccessor cacheAccessor = getCacheAccessor(cacheType); + return cacheAccessor.getMemoryConsumed() < cacheAccessor.getConfiguredSizeLimit(); + } + + public boolean withinTotalLimit() { + return getTotalUsedBytes() < totalSizeLimit; + } + +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CachePolicy.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CachePolicy.java new file mode 100644 index 0000000000000..da83f83df241b --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CachePolicy.java @@ -0,0 +1,6 @@ +package org.opensearch.datafusion.search.cache; + +public enum CachePolicy { + LFU, + LRU +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CacheSettings.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CacheSettings.java new file mode 100644 index 0000000000000..c2988d3738e52 --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CacheSettings.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.search.cache; + +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import org.opensearch.common.settings.Setting; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; + +public class CacheSettings { + + public static final String METADATA_CACHE_SIZE_LIMIT_KEY = "datafusion.metadata.cache.size.limit"; + public static final Setting METADATA_CACHE_SIZE_LIMIT = + new Setting<>(METADATA_CACHE_SIZE_LIMIT_KEY, "50mb", + (s) -> ByteSizeValue.parseBytesSizeValue(s, new ByteSizeValue(1000, ByteSizeUnit.KB),METADATA_CACHE_SIZE_LIMIT_KEY), Setting.Property.NodeScope, Setting.Property.Dynamic); + + public static final Setting METADATA_CACHE_EVICTION_TYPE = new Setting( + "datafusion.metadata.cache.eviction.type", + "LFU", + Function.identity(), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + + public static final String METADATA_CACHE_ENABLED_KEY = "datafusion.metadata.cache.enabled"; + public static final Setting METADATA_CACHE_ENABLED = + Setting.boolSetting(METADATA_CACHE_ENABLED_KEY, true, Setting.Property.NodeScope, Setting.Property.Dynamic); + + + public static final List> CACHE_SETTINGS = Arrays.asList( + METADATA_CACHE_SIZE_LIMIT, + METADATA_CACHE_EVICTION_TYPE + ); + + public static final List> CACHE_ENABLED = Arrays.asList( + METADATA_CACHE_ENABLED + ); +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CacheType.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CacheType.java new file mode 100644 index 0000000000000..8d204f0177a84 --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/CacheType.java @@ -0,0 +1,32 @@ +package org.opensearch.datafusion.search.cache; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; + +import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_ENABLED; + +public enum CacheType { + METADATA(MetadataCacheAccessor::new, METADATA_CACHE_ENABLED); + // STATS(StatsCacheAccessor::new); + + private final CacheFactory factory; + private final Setting enabledSetting; + + CacheType(CacheFactory factory, Setting enabledSetting) { + this.factory = factory; + this.enabledSetting = enabledSetting; + } + + public CacheAccessor createCache(long cacheManagerPointer, ClusterSettings clusterSettings) { + return factory.create(cacheManagerPointer, clusterSettings, this); + } + + public boolean isEnabled(ClusterSettings clusterSettings) { + return clusterSettings.get(enabledSetting); + } + + @FunctionalInterface + private interface CacheFactory { + CacheAccessor create(long cacheManagerPointer, ClusterSettings clusterSettings, CacheType type); + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/MetadataCacheAccessor.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/MetadataCacheAccessor.java new file mode 100644 index 0000000000000..46c37cf8089b5 --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/cache/MetadataCacheAccessor.java @@ -0,0 +1,104 @@ +package org.opensearch.datafusion.search.cache; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.core.common.unit.ByteSizeValue; + +import static org.opensearch.datafusion.DataFusionQueryJNI.createMetadataCache; +import static org.opensearch.datafusion.DataFusionQueryJNI.metadataCacheClear; +import static org.opensearch.datafusion.DataFusionQueryJNI.metadataCacheContainsFile; +import static org.opensearch.datafusion.DataFusionQueryJNI.metadataCacheGet; +import static org.opensearch.datafusion.DataFusionQueryJNI.metadataCacheGetEntries; +import static org.opensearch.datafusion.DataFusionQueryJNI.metadataCacheGetSize; +import static org.opensearch.datafusion.DataFusionQueryJNI.metadataCachePut; +import static org.opensearch.datafusion.DataFusionQueryJNI.metadataCacheRemove; +import static org.opensearch.datafusion.DataFusionQueryJNI.metadataCacheUpdateSizeLimit; +import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_EVICTION_TYPE; +import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_SIZE_LIMIT; +import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_SIZE_LIMIT_KEY; + +public class MetadataCacheAccessor extends CacheAccessor { + private CachePolicy cachePolicy; + + public MetadataCacheAccessor(long cacheManagerPointer, ClusterSettings settings, CacheType type) { + super(cacheManagerPointer, settings,type); + } + + public void setCachePolicy(String cachePolicy) { + this.cachePolicy = CachePolicy.valueOf(cachePolicy); + } + + @Override + protected Map extractSettings(ClusterSettings clusterSettings) { + Map properties = new HashMap<>(); + + clusterSettings.addSettingsUpdateConsumer(METADATA_CACHE_SIZE_LIMIT, this::setSizeLimit); + setSizeLimit(clusterSettings.get(METADATA_CACHE_SIZE_LIMIT)); + properties.put(METADATA_CACHE_SIZE_LIMIT_KEY,this.sizeLimit); + + clusterSettings.addSettingsUpdateConsumer(METADATA_CACHE_EVICTION_TYPE, this::setCachePolicy); + setCachePolicy(clusterSettings.get(METADATA_CACHE_EVICTION_TYPE)); + properties.put(METADATA_CACHE_EVICTION_TYPE.getKey(),this.cachePolicy); + + return properties; + } + + @Override + public long createCache(long cacheManagerPointer, Map properties) { + return createMetadataCache(cacheManagerPointer, this.sizeLimit); + } + + @Override + public boolean put(String filePath) { + return metadataCachePut(this.getPointer(), filePath); + } + + @Override + public Object get(String filePath) { + return metadataCacheGet(this.pointer,filePath); + } + + @Override + public boolean remove(String filePath) { + return metadataCacheRemove(this.pointer, filePath); + } + + @Override + public void evict() { + throw new UnsupportedOperationException("Explicit Eviction Not Supported"); + } + + @Override + public void clear() { + metadataCacheClear(this.pointer); + } + + @Override + public long getMemoryConsumed() { + return metadataCacheGetSize(this.pointer); + } + + @Override + public boolean containsFile(String filePath) { + return metadataCacheContainsFile(this.pointer, filePath); + } + + // TODO: Replace the logic with optimized version to check if it is update or set limit call + @Override + public void setSizeLimit(ByteSizeValue limit) { + if(this.sizeLimit == 0){ + this.sizeLimit = limit.getBytes(); + } else{ + metadataCacheUpdateSizeLimit(this.pointer, limit.getBytes()); + this.sizeLimit = limit.getBytes(); + } + } + + @Override + public List getEntries() { + return List.of(metadataCacheGetEntries(this.pointer)); + } + +} diff --git a/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionCacheManagerTests.java b/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionCacheManagerTests.java new file mode 100644 index 0000000000000..0e9990f409017 --- /dev/null +++ b/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionCacheManagerTests.java @@ -0,0 +1,200 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion; + +import java.io.File; +import java.net.URL; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.junit.Before; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.datafusion.search.cache.CacheAccessor; +import org.opensearch.datafusion.search.cache.CacheManager; +import org.opensearch.datafusion.search.cache.CacheType; +import org.opensearch.env.Environment; +import org.opensearch.test.OpenSearchTestCase; + +import static org.mockito.Mockito.when; +import static org.opensearch.common.settings.ClusterSettings.BUILT_IN_CLUSTER_SETTINGS; +import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_ENABLED; +import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_EVICTION_TYPE; +import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_SIZE_LIMIT; + +public class DataFusionCacheManagerTests extends OpenSearchTestCase { + private DataFusionService service; + + @Mock + private Environment mockEnvironment; + + @Before + public void setup() { + MockitoAnnotations.openMocks(this); + Settings mockSettings = Settings.builder().put("path.data", "/tmp/test-data").build(); + + when(mockEnvironment.settings()).thenReturn(mockSettings); + Set> clusterSettingsToAdd = new HashSet<>(BUILT_IN_CLUSTER_SETTINGS); + clusterSettingsToAdd.add(METADATA_CACHE_ENABLED); + clusterSettingsToAdd.add(METADATA_CACHE_SIZE_LIMIT); + clusterSettingsToAdd.add(METADATA_CACHE_EVICTION_TYPE); + + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, clusterSettingsToAdd); + + service = new DataFusionService(Collections.emptyMap(), clusterSettings); + service.doStart(); + } + + public void testAddFileToCache() { + CacheManager cacheManager = service.getCacheManager(); + CacheAccessor metadataCache = cacheManager.getCacheAccessor(CacheType.METADATA); + String fileName = getResourceFile("hits1.parquet").getPath(); + + cacheManager.addToCache(List.of(fileName)); + + assertTrue((Boolean) metadataCache.get(fileName)); + assertTrue(metadataCache.containsFile(fileName)); + assertTrue(metadataCache.getMemoryConsumed() > 0); + } + + public void testRemoveFileFromCache() { + CacheManager cacheManager = service.getCacheManager(); + CacheAccessor metadataCache = cacheManager.getCacheAccessor(CacheType.METADATA); + String fileName = getResourceFile("hits1.parquet").getPath(); + + cacheManager.addToCache(List.of(fileName)); + assertTrue(metadataCache.containsFile(fileName)); + + boolean removed = cacheManager.removeFiles(List.of(fileName)); + + assertTrue(removed); + assertFalse(metadataCache.containsFile(fileName)); + } + + public void testCacheSizeLimitEviction() { + CacheAccessor metadataCache = service.getCacheManager().getCacheAccessor(CacheType.METADATA); + String fileName = getResourceFile("hits1.parquet").getPath(); + + metadataCache.put(fileName); + assertTrue(metadataCache.containsFile(fileName)); + + metadataCache.setSizeLimit(new ByteSizeValue(40)); + + assertFalse(metadataCache.containsFile(fileName)); + assertEquals(0, metadataCache.getEntries().size()); + } + + public void testCachePutWithIncreasedSizeLimit() { + CacheAccessor metadataCache = service.getCacheManager().getCacheAccessor(CacheType.METADATA); + String fileName = getResourceFile("hits1.parquet").getPath(); + + metadataCache.setSizeLimit(new ByteSizeValue(500000)); + metadataCache.put(fileName); + + assertTrue(metadataCache.containsFile(fileName)); + logger.info("Entries: {}", metadataCache.getEntries()); + //(we print 3 elements per entry : filePath, memorySize, HitCount) + assertEquals(1*3, metadataCache.getEntries().size()); + } + + public void testCacheClear() { + CacheAccessor metadataCache = service.getCacheManager().getCacheAccessor(CacheType.METADATA); + String fileName = getResourceFile("hits1.parquet").getPath(); + + metadataCache.put(fileName); + assertTrue(metadataCache.containsFile(fileName)); + + metadataCache.clear(); + + assertFalse(metadataCache.containsFile(fileName)); + assertEquals(0, metadataCache.getEntries().size()); + } + + public void testAddMultipleFilesToCache() { + CacheManager cacheManager = service.getCacheManager(); + CacheAccessor metadataCache = cacheManager.getCacheAccessor(CacheType.METADATA); + List fileNames = List.of( + getResourceFile("hits1.parquet").getPath(), + getResourceFile("hits2.parquet").getPath() + ); + + cacheManager.addToCache(fileNames); + // 3 elements per cache entry displayed + assertEquals(2*3, metadataCache.getEntries().size()); + fileNames.forEach(fileName -> assertTrue(metadataCache.containsFile(fileName))); + } + + public void testRemoveNonExistentFile() { + CacheManager cacheManager = service.getCacheManager(); + String nonExistentFile = "/path/nonexistent.parquet"; + + boolean removed = cacheManager.removeFiles(List.of(nonExistentFile)); + + assertFalse(removed); + } + + public void testGetNonExistentFile() { + CacheAccessor metadataCache = service.getCacheManager().getCacheAccessor(CacheType.METADATA); + String nonExistentFile = "/path/nonexistent.parquet"; + + Object result = metadataCache.get(nonExistentFile); + +// assertNull(result); + assertFalse(metadataCache.containsFile(nonExistentFile)); + } + + public void testAddEmptyFileList() { + CacheManager cacheManager = service.getCacheManager(); + CacheAccessor metadataCache = cacheManager.getCacheAccessor(CacheType.METADATA); + + cacheManager.addToCache(Collections.emptyList()); + + assertEquals(0, metadataCache.getEntries().size()); + } + + public void testCacheManagerTotalMemoryTracking() { + CacheManager cacheManager = service.getCacheManager(); + String fileName = getResourceFile("hits1.parquet").getPath(); + + long initialMemory = cacheManager.getTotalUsedBytes(); + cacheManager.addToCache(List.of(fileName)); + long afterAddMemory = cacheManager.getTotalUsedBytes(); + + assertTrue(afterAddMemory > initialMemory); + + cacheManager.removeFiles(List.of(fileName)); + long afterRemoveMemory = cacheManager.getTotalUsedBytes(); + + assertEquals(initialMemory, afterRemoveMemory); + } + + public void testCacheSizeLimits() { + CacheManager cacheManager = service.getCacheManager(); + CacheAccessor metadataCache = cacheManager.getCacheAccessor(CacheType.METADATA); + + long configuredLimit = metadataCache.getConfiguredSizeLimit(); + long totalLimit = cacheManager.getTotalSizeLimit(); + + assertTrue(configuredLimit > 0); + assertTrue(totalLimit > 0); + } + + private File getResourceFile(String fileName) { + URL resourceUrl = getClass().getClassLoader().getResource(fileName); + if (resourceUrl == null) { + throw new IllegalArgumentException("Resource not found: " + fileName); + } + return new File(resourceUrl.getPath()); + } +} diff --git a/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionServiceTests.java b/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionServiceTests.java index 47547007a8e73..ff4f4442e902b 100644 --- a/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionServiceTests.java +++ b/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionServiceTests.java @@ -8,16 +8,24 @@ package org.opensearch.datafusion; +import java.io.File; +import java.net.URL; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.lucene.search.Query; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.datafusion.core.SessionContext; import org.opensearch.datafusion.search.DatafusionQuery; import org.opensearch.datafusion.search.DatafusionSearcher; +import org.opensearch.datafusion.search.cache.CacheAccessor; +import org.opensearch.datafusion.search.cache.CacheManager; +import org.opensearch.datafusion.search.cache.CacheType; import org.opensearch.env.Environment; import org.opensearch.index.engine.exec.FileMetadata; import org.opensearch.index.engine.exec.text.TextDF; @@ -42,6 +50,11 @@ import java.util.*; import static org.mockito.Mockito.when; +import static org.opensearch.common.settings.ClusterSettings.BUILT_IN_CLUSTER_SETTINGS; +import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_ENABLED; +import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_EVICTION_TYPE; +import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_SIZE_LIMIT; + import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.Field; @@ -65,7 +78,15 @@ public void setup() { Settings mockSettings = Settings.builder().put("path.data", "/tmp/test-data").build(); when(mockEnvironment.settings()).thenReturn(mockSettings); - service = new DataFusionService(Map.of()); + Set> clusterSettingsToAdd = new HashSet<>(BUILT_IN_CLUSTER_SETTINGS); + clusterSettingsToAdd.add(METADATA_CACHE_ENABLED); + clusterSettingsToAdd.add(METADATA_CACHE_SIZE_LIMIT); + clusterSettingsToAdd.add(METADATA_CACHE_EVICTION_TYPE); + + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, clusterSettingsToAdd); + + service = new DataFusionService(Collections.emptyMap(), clusterSettings); + //service = new DataFusionService(Map.of()); service.doStart(); } @@ -97,61 +118,70 @@ public void testGetVersion() { // TO run update proper directory path for generation-1-optimized.parquet file in // this.datafusionReaderManager = new DatafusionReaderManager("TODO://FigureOutPath", formatCatalogSnapshot); - public void testQueryPhaseExecutor() throws IOException { - Map finalRes = new HashMap<>(); - DatafusionSearcher datafusionSearcher = null; - try { - DatafusionEngine engine = new DatafusionEngine(DataFormat.CSV, List.of(new FileMetadata(new TextDF(), "hits_data.parquet")), service); - datafusionSearcher = engine.acquireSearcher("Search"); - - - byte[] protoContent; - - try (InputStream is = getClass().getResourceAsStream("/substrait_plan.pb")) { - protoContent = is.readAllBytes(); - } catch (IOException e) { - throw new RuntimeException(e); - } - - long streamPointer = datafusionSearcher.search(new DatafusionQuery(protoContent, new ArrayList<>()), service.getTokioRuntimePointer()); - RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); - RecordBatchStream stream = new RecordBatchStream(streamPointer, service.getTokioRuntimePointer() , allocator); - - // We can have some collectors passed like this which can collect the results and convert to InternalAggregation - // Is the possible? need to check - - SearchResultsCollector collector = new SearchResultsCollector() { - @Override - public void collect(RecordBatchStream value) { - VectorSchemaRoot root = value.getVectorSchemaRoot(); - for (Field field : root.getSchema().getFields()) { - String filedName = field.getName(); - FieldVector fieldVector = root.getVector(filedName); - Object[] fieldValues = new Object[fieldVector.getValueCount()]; - for (int i = 0; i < fieldVector.getValueCount(); i++) { - fieldValues[i] = fieldVector.getObject(i); - } - finalRes.put(filedName, fieldValues); - } - } - }; - - while (stream.loadNextBatch().join()) { - collector.collect(stream); - } - - logger.info("Final Results:"); - for (Map.Entry entry : finalRes.entrySet()) { - logger.info("{}: {}", entry.getKey(), java.util.Arrays.toString(entry.getValue())); - } - - } catch (Exception exception) { - logger.error("Failed to execute Substrait query plan", exception); - } - finally { - if(datafusionSearcher != null) { - datafusionSearcher.close(); - } +// public void testQueryPhaseExecutor() throws IOException { +// Map finalRes = new HashMap<>(); +// DatafusionSearcher datafusionSearcher = null; +// try { +// DatafusionEngine engine = new DatafusionEngine(DataFormat.CSV, List.of(new TextDF(), "hits2.parquet")), service); +// datafusionSearcher = engine.acquireSearcher("Search"); +// +// byte[] protoContent; +// +// try (InputStream is = getClass().getResourceAsStream("/substrait_plan.pb")) { +// protoContent = is.readAllBytes(); +// } catch (IOException e) { +// throw new RuntimeException(e); +// } +// +// long streamPointer = datafusionSearcher.search(new DatafusionQuery(protoContent, new ArrayList<>())); +// RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); +// RecordBatchStream stream = new RecordBatchStream(streamPointer, service.getTokioRuntimePointer() , allocator); +// +// // We can have some collectors passed like this which can collect the results and convert to InternalAggregation +// // Is the possible? need to check +// +// SearchResultsCollector collector = new SearchResultsCollector() { +// @Override +// public void collect(RecordBatchStream value) { +// VectorSchemaRoot root = value.getVectorSchemaRoot(); +// for (Field field : root.getSchema().getFields()) { +// String filedName = field.getName(); +// FieldVector fieldVector = root.getVector(filedName); +// Object[] fieldValues = new Object[fieldVector.getValueCount()]; +// for (int i = 0; i < fieldVector.getValueCount(); i++) { +// fieldValues[i] = fieldVector.getObject(i); +// } +// finalRes.put(filedName, fieldValues); +// } +// } +// }; +// +// while (stream.loadNextBatch().join()) { +// collector.collect(stream); +// } +// +// logger.info("Final Results:"); +// for (Map.Entry entry : finalRes.entrySet()) { +// logger.info("{}: {}", entry.getKey(), java.util.Arrays.toString(entry.getValue())); +// } +// +// } catch (Exception exception) { +// logger.error("Failed to execute Substrait query plan", exception); +// } +// finally { +// if(datafusionSearcher != null) { +// datafusionSearcher.close(); +// } +// } +// } + + private File getResourceFile(String fileName) { + URL resourceUrl = getClass().getClassLoader().getResource(fileName); + if (resourceUrl == null) { + throw new IllegalArgumentException("Resource not found: " + fileName); } + return new File(resourceUrl.getPath()); } + + } diff --git a/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/TestDataFusionServiceTests.java b/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/TestDataFusionServiceTests.java index 395e2fae52e2f..49e35adc78721 100644 --- a/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/TestDataFusionServiceTests.java +++ b/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/TestDataFusionServiceTests.java @@ -8,11 +8,22 @@ package org.opensearch.datafusion; +import java.util.HashSet; +import java.util.Set; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; import org.opensearch.test.OpenSearchTestCase; import java.util.Collections; import java.util.List; +import static org.mockito.Mockito.when; +import static org.opensearch.common.settings.ClusterSettings.BUILT_IN_CLUSTER_SETTINGS; +import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_ENABLED; +import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_EVICTION_TYPE; +import static org.opensearch.datafusion.search.cache.CacheSettings.METADATA_CACHE_SIZE_LIMIT; + /** * Unit tests for DataFusionService * @@ -27,7 +38,14 @@ public class TestDataFusionServiceTests extends OpenSearchTestCase { @Override public void setUp() throws Exception { super.setUp(); - service = new DataFusionService(Collections.emptyMap()); + Set> clusterSettingsToAdd = new HashSet<>(BUILT_IN_CLUSTER_SETTINGS); + clusterSettingsToAdd.add(METADATA_CACHE_ENABLED); + clusterSettingsToAdd.add(METADATA_CACHE_SIZE_LIMIT); + clusterSettingsToAdd.add(METADATA_CACHE_EVICTION_TYPE); + + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, clusterSettingsToAdd); + + service = new DataFusionService(Collections.emptyMap(),clusterSettings); service.doStart(); } diff --git a/plugins/engine-datafusion/src/test/resources/hits1.parquet b/plugins/engine-datafusion/src/test/resources/hits1.parquet new file mode 100644 index 0000000000000..647d8fb5235c2 Binary files /dev/null and b/plugins/engine-datafusion/src/test/resources/hits1.parquet differ diff --git a/plugins/engine-datafusion/src/test/resources/hits2.parquet b/plugins/engine-datafusion/src/test/resources/hits2.parquet new file mode 100644 index 0000000000000..581c7e502f18b Binary files /dev/null and b/plugins/engine-datafusion/src/test/resources/hits2.parquet differ diff --git a/server/src/main/java/org/opensearch/index/engine/EngineSearcher.java b/server/src/main/java/org/opensearch/index/engine/EngineSearcher.java index 7471fd3fbeb5f..3db82b9c069a3 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineSearcher.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineSearcher.java @@ -32,7 +32,7 @@ default void search(Q query, List> collectors) throws throw new UnsupportedOperationException(); } - default long search(Q query, Long runtimePtr) throws IOException { + default long search(Q query) throws IOException { throw new UnsupportedOperationException(); } }