Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions plugins/engine-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ members = [

[workspace.dependencies]
# DataFusion dependencies
datafusion = "51.0.0"
datafusion-expr = "51.0.0"
datafusion-datasource = "51.0.0"
arrow-json = "57.1.0"
arrow = { version = "57.1.0", features = ["ffi", "ipc_compression"] }
arrow-array = "57.1.0"
arrow-schema = "57.1.0"
arrow-buffer = "57.1.0"
datafusion = "52.1.0"
datafusion-expr = "52.1.0"
datafusion-datasource = "52.1.0"
arrow-json = "57.3.0"
arrow = { version = "57.3.0", features = ["ffi", "ipc_compression"] }
arrow-array = "57.3.0"
arrow-schema = "57.3.0"
arrow-buffer = "57.3.0"
downcast-rs = "1.2"


# JNI dependencies
jni = "0.21"

# Substrait support
datafusion-substrait = "51.0.0"
datafusion-substrait = "52.1.0"
prost = "0.14"


Expand All @@ -42,12 +42,15 @@ thiserror = "1.0"
# Logging
log = "0.4"
# Parquet support
parquet = "57.1.0"
parquet = "57.3.0"

# Object store for file access
object_store = "=0.12.4"
object_store = "0.12.5"
url = "2.0"

# Liquid Cache for byte-level caching
liquid-cache-datafusion-local = "0.1.12"

# Substrait support
substrait = "0.47"

Expand Down
3 changes: 3 additions & 0 deletions plugins/engine-datafusion/jni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ num_cpus = "1.16"
object_store = { workspace = true }
url = { workspace = true }

# Liquid Cache for byte-level caching
liquid-cache-datafusion-local = { workspace = true }

# Substrait support
substrait = { workspace = true }

Expand Down
13 changes: 11 additions & 2 deletions plugins/engine-datafusion/jni/src/absolute_row_id_optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl AbsoluteRowIdOptimizer {
datasource_exec_schema: SchemaRef,
) -> (SchemaRef, Vec<usize>) {
// Clone projection and add new field index
let mut projected_schema = datasource.projected_schema().clone();
let projected_schema = datasource.projected_schema().expect("projected_schema failed");
let file_source_schema = datasource.file_schema().clone();

let mut new_projections = vec![];
Expand Down Expand Up @@ -109,12 +109,21 @@ impl AbsoluteRowIdOptimizer {
) -> Result<ProjectionExec, DataFusionError> {
let (new_schema, new_projections) =
self.build_updated_file_source_schema(datasource, data_source_exec_schema.clone());

let table_partition_cols = datasource.table_partition_cols().clone();
let new_table_schema = TableSchema::new(new_schema.clone(), table_partition_cols);

use datafusion::datasource::physical_plan::ParquetSource;
let new_file_source = Arc::new(ParquetSource::new(new_table_schema));

let file_scan_config = FileScanConfigBuilder::from(datasource.clone())
.with_source(datasource.file_source.with_schema(TableSchema::from_file_schema(new_schema.clone())))
.with_source(new_file_source)
.with_projection_indices(Some(new_projections))
.expect("Failed to set projection indices")
.build();

let new_datasource = DataSourceExec::from_data_source(file_scan_config);

let projection_exprs = self
.build_projection_exprs(&new_datasource.schema())
.expect("Failed to build projection expressions");
Expand Down
2 changes: 1 addition & 1 deletion plugins/engine-datafusion/jni/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl CacheAccessor<ObjectMeta, Arc<dyn datafusion::execution::cache::cache_manag
}
}

fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn datafusion::execution::cache::cache_manager::FileMetadata>> {
fn remove(&self, k: &ObjectMeta) -> Option<Arc<dyn datafusion::execution::cache::cache_manager::FileMetadata>> {
match self.inner.lock() {
Ok(mut cache) => cache.remove(k),
Err(e) => {
Expand Down
12 changes: 1 addition & 11 deletions plugins/engine-datafusion/jni/src/cache_jni.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,17 +434,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_cacheMana

match &runtime_env.custom_cache_manager {
Some(manager) => {
match cache_type.as_str() {
cache::CACHE_TYPE_METADATA => {
manager.contains_file(&file_path)
}
_ => {
let msg = format!("Unknown cache type: {}", cache_type);
log_debug!("{}", msg);
let _ = env.throw_new("org/opensearch/datafusion/DataFusionException", &msg);
false
}
}
manager.contains_file_by_type(&file_path, &cache_type)
}
None => {
let msg = "No custom cache manager available";
Expand Down
31 changes: 23 additions & 8 deletions plugins/engine-datafusion/jni/src/custom_cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl CustomCacheManager {

// Add statistics cache if available - use CustomStatisticsCache directly
if let Some(stats_cache) = &self.statistics_cache {
config = config.with_files_statistics_cache(Some(stats_cache.clone() as FileStatisticsCache));
config = config.with_files_statistics_cache(Some(stats_cache.clone() as Arc<dyn FileStatisticsCache>));
} else {
// Default statistics cache if none set
let default_stats = Arc::new(DefaultFileStatisticsCache::default());
Expand Down Expand Up @@ -161,13 +161,9 @@ impl CustomCacheManager {
// Remove from statistics cache
if let Some(cache) = &self.statistics_cache {
let path = Path::from(file_path.clone());
// Use contains_key to check if the entry exists before attempting removal
if cache.contains_key(&path) {
// Since we can't call remove directly on Arc<CustomStatisticsCache>,
// we need to use the thread-safe DashMap operations
if cache.inner().remove(&path).is_some() {
any_removed = true;
}
// Use the CacheAccessor remove method to properly update memory tracking
if cache.remove(&path).is_some() {
any_removed = true;
}
}

Expand Down Expand Up @@ -219,6 +215,25 @@ impl CustomCacheManager {
found
}

/// Check if a file exists in a specific cache type
pub fn contains_file_by_type(&self, file_path: &str, cache_type: &str) -> bool {
match cache_type {
crate::cache::CACHE_TYPE_METADATA => {
create_object_meta_from_file(file_path)
.ok()
.and_then(|metas| metas.first().cloned())
.and_then(|meta| self.file_metadata_cache.as_ref()?.get(&meta))
.is_some()
}
crate::cache::CACHE_TYPE_STATS => {
self.statistics_cache
.as_ref()
.map_or(false, |cache| cache.contains_key(&Path::from(file_path)))
}
_ => false
}
}

/// Update the file metadata cache size limit
pub fn update_metadata_cache_limit(&self, new_limit: usize) {
if let Some(cache) = &self.file_metadata_cache {
Expand Down
58 changes: 29 additions & 29 deletions plugins/engine-datafusion/jni/src/listing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use datafusion_datasource::{
file::FileSource,
file_groups::FileGroup,
file_scan_config::{FileScanConfig, FileScanConfigBuilder},
schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory},
schema_adapter::SchemaAdapterFactory,
};
use datafusion_expr::{dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType};
use futures::future::err;
Expand Down Expand Up @@ -948,7 +948,7 @@ pub struct ListingTable {
schema_source: SchemaSource,
options: ListingOptions,
definition: Option<String>,
collected_statistics: FileStatisticsCache,
collected_statistics: Arc<dyn FileStatisticsCache>,
constraints: Constraints,
column_defaults: HashMap<String, Expr>,
/// Optional [`SchemaAdapterFactory`] for creating schema adapters
Expand Down Expand Up @@ -1020,7 +1020,7 @@ impl ListingTable {
/// multiple times in the same session.
///
/// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query.
pub fn with_cache(mut self, cache: Option<FileStatisticsCache>) -> Self {
pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
self.collected_statistics =
cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
self
Expand Down Expand Up @@ -1085,20 +1085,17 @@ impl ListingTable {
}

/// Creates a schema adapter for mapping between file and table schemas
///
/// Uses the configured schema adapter factory if available, otherwise falls back
/// to the default implementation.
fn create_schema_adapter(&self) -> Box<dyn SchemaAdapter> {
let table_schema = self.schema();
match &self.schema_adapter_factory {
Some(factory) => factory.create_with_projected_schema(Arc::clone(&table_schema)),
None => DefaultSchemaAdapterFactory::from_schema(Arc::clone(&table_schema)),
}
}

/// Creates a file source and applies schema adapter factory if available
fn create_file_source_with_schema_adapter(&self) -> Result<Arc<dyn FileSource>> {
let mut source = self.options.format.file_source();
let table_schema = datafusion_datasource::table_schema::TableSchema::new(
self.file_schema.clone(),
self.options
.table_partition_cols
.iter()
.map(|(name, dt)| Arc::new(Field::new(name, dt.clone(), false)) as _)
.collect(),
);
let mut source = self.options.format.file_source(table_schema);
// Apply schema adapter to source if available
//
// The source will use this SchemaAdapter to adapt data batches as they flow up the plan.
Expand Down Expand Up @@ -1309,16 +1306,14 @@ impl TableProvider for ListingTable {
state,
FileScanConfigBuilder::new(
object_store_url,
Arc::clone(&self.file_schema),
file_source,
)
.with_file_groups(partitioned_file_lists)
.with_constraints(self.constraints.clone())
.with_statistics(statistics)
.with_projection_indices(projection.cloned())
.with_projection_indices(projection.cloned())?
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols)
.with_expr_adapter(self.expr_adapter_factory.clone())
.build(),
)
Expand Down Expand Up @@ -1469,17 +1464,22 @@ impl ListingTable {
inexact_stats,
)?;

let schema_adapter = self.create_schema_adapter();
let (schema_mapper, _) = schema_adapter.map_schema(self.file_schema.as_ref())?;

stats.column_statistics = schema_mapper.map_column_statistics(&stats.column_statistics)?;
file_groups.iter_mut().try_for_each(|file_group| {
if let Some(stat) = file_group.statistics_mut() {
stat.column_statistics =
schema_mapper.map_column_statistics(&stat.column_statistics)?;
}
Ok::<_, DataFusionError>(())
})?;
// Only map statistics if schema_adapter_factory is explicitly set
// In DataFusion 52.1, SchemaAdapter has been removed and replaced with PhysicalExprAdapterFactory
// Statistics mapping is now optional and only done when explicitly configured
if let Some(factory) = &self.schema_adapter_factory {
let schema_adapter = factory.create_with_projected_schema(self.schema());
let (schema_mapper, _) = schema_adapter.map_schema(self.file_schema.as_ref())?;

stats.column_statistics = schema_mapper.map_column_statistics(&stats.column_statistics)?;
file_groups.iter_mut().try_for_each(|file_group| {
if let Some(stat) = file_group.statistics_mut() {
stat.column_statistics = schema_mapper.map_column_statistics(&stat.column_statistics)?;
}
Ok::<_, DataFusionError>(())
})?;
}

Ok((file_groups, stats))
}

Expand Down
31 changes: 21 additions & 10 deletions plugins/engine-datafusion/jni/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use datafusion::{
datasource::physical_plan::parquet::{ParquetAccessPlan, RowGroupAccess},
datasource::physical_plan::ParquetSource,
execution::cache::cache_manager::CacheManagerConfig,
execution::cache::cache_unit::DefaultListFilesCache,
execution::cache::DefaultListFilesCache,
execution::cache::CacheAccessor,
execution::context::SessionContext,
execution::runtime_env::RuntimeEnvBuilder,
Expand All @@ -31,6 +31,7 @@ use datafusion_datasource::PartitionedFile;
use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::source::DataSourceExec;
use datafusion_datasource::TableSchema;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::substrait::proto::{Plan, extensions::simple_extension_declaration::MappingType};
use object_store::ObjectMeta;
Expand Down Expand Up @@ -121,7 +122,11 @@ pub async fn execute_query_with_cross_rt_stream(
);

let list_file_cache = Arc::new(DefaultListFilesCache::default());
list_file_cache.put(table_path.prefix(), object_meta);
let table_scoped_path = datafusion::execution::cache::TableScopedPath {
table: None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to define the table also here!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableScopedPath is required by DataFusion's cache API signature - the put() method expects this struct type, not a raw path.

path: table_path.prefix().clone(),
};
list_file_cache.put(&table_scoped_path, object_meta);

let runtimeEnv = &runtime.runtime_env;

Expand Down Expand Up @@ -361,15 +366,18 @@ pub async fn execute_fetch_phase(
);

let list_file_cache = Arc::new(DefaultListFilesCache::default());
list_file_cache.put(table_path.prefix(), object_meta);
let table_scoped_path = datafusion::execution::cache::TableScopedPath {
table: None,
path: table_path.prefix().clone(),
};
list_file_cache.put(&table_scoped_path, object_meta);

let runtime_env = RuntimeEnvBuilder::new()
.with_cache_manager(
CacheManagerConfig::default().with_list_files_cache(Some(list_file_cache))
.with_metadata_cache_limit(runtime.runtime_env.cache_manager.get_file_metadata_cache().cache_limit())
.with_metadata_cache_limit(runtime.runtime_env.cache_manager.get_file_metadata_cache().cache_limit())
.with_file_metadata_cache(Some(runtime.runtime_env.cache_manager.get_file_metadata_cache().clone()))
.with_files_statistics_cache(runtime.runtime_env.cache_manager.get_file_statistic_cache()),

)
.build()?;

Expand Down Expand Up @@ -414,7 +422,12 @@ pub async fn execute_fetch_phase(
.collect();

let file_group = FileGroup::new(partitioned_files);
let file_source = Arc::new(ParquetSource::default());

let table_schema = datafusion_datasource::table_schema::TableSchema::new(
parquet_schema.clone(),
vec![Arc::new(Field::new(ROW_BASE_FIELD_NAME, DataType::Int64, false))],
);
let file_source = Arc::new(ParquetSource::new(table_schema));

let mut projection_index = vec![];
for field_name in projections.iter() {
Expand All @@ -435,17 +448,15 @@ pub async fn execute_fetch_phase(

let file_scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
parquet_schema.clone(),
file_source,
)
.with_table_partition_cols(vec![Field::new(ROW_BASE_FIELD_NAME, DataType::Int64, false)])
.with_projection_indices(Some(projection_index.clone()))
.with_projection_indices(Some(projection_index.clone()))?
.with_file_group(file_group)
.build();

let parquet_exec = DataSourceExec::from_data_source(file_scan_config.clone());

let projection_exprs = build_projection_exprs(file_scan_config.projected_schema())
let projection_exprs = build_projection_exprs(file_scan_config.projected_schema()?)
.expect("Failed to build projection expressions");

let projection_exec = Arc::new(ProjectionExec::try_new(projection_exprs, parquet_exec)
Expand Down
9 changes: 8 additions & 1 deletion plugins/engine-datafusion/jni/src/statistics_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ impl CacheAccessor<Path, Arc<Statistics>> for CustomStatisticsCache {
result
}

fn remove(&mut self, k: &Path) -> Option<Arc<Statistics>> {
fn remove(&self, k: &Path) -> Option<Arc<Statistics>> {
let key = k.to_string();

// Actually remove from the underlying cache
Expand Down Expand Up @@ -497,6 +497,13 @@ impl CacheAccessor<Path, Arc<Statistics>> for CustomStatisticsCache {
}
}

impl datafusion::execution::cache::cache_manager::FileStatisticsCache for CustomStatisticsCache {
fn list_entries(&self) -> std::collections::HashMap<object_store::path::Path, datafusion::execution::cache::cache_manager::FileStatisticsCacheEntry> {
// Return empty map — this is used for introspection only
std::collections::HashMap::new()
}
}

impl Default for CustomStatisticsCache {
fn default() -> Self {
Self::with_default_config()
Expand Down
Loading
Loading