Skip to content

Commit 6e5699d

Browse files
cocoszTanvir Alam
andauthored
Upgrade DataFusion to 52.1.0 and Add Liquid Cache Support (#20740)
* Upgrade DataFusion to 52.1.0 and add liquid-cache-datafusion-local dependency Signed-off-by: Tanvir Alam <tanvralm@amazon.com> * Fix cache manager tests and DataFusionServiceTests Signed-off-by: Tanvir Alam <tanvralm@amazon.com> * Refactor DataFusionServiceTests to use format-specific subdirectory structure Signed-off-by: Tanvir Alam <tanvralm@amazon.com> * Fix index out-of-bounds panic in AbsoluteRowIdOptimizer statistics projection Signed-off-by: Tanvir Alam <tanvralm@amazon.com> --------- Signed-off-by: Tanvir Alam <tanvralm@amazon.com> Co-authored-by: Tanvir Alam <tanvralm@amazon.com>
1 parent 3e6aef2 commit 6e5699d

File tree

12 files changed

+213
-115
lines changed

12 files changed

+213
-115
lines changed

plugins/engine-datafusion/Cargo.toml

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,22 @@ members = [
66

77
[workspace.dependencies]
88
# DataFusion dependencies
9-
datafusion = "51.0.0"
10-
datafusion-expr = "51.0.0"
11-
datafusion-datasource = "51.0.0"
12-
arrow-json = "57.1.0"
13-
arrow = { version = "57.1.0", features = ["ffi", "ipc_compression"] }
14-
arrow-array = "57.1.0"
15-
arrow-schema = "57.1.0"
16-
arrow-buffer = "57.1.0"
9+
datafusion = "52.1.0"
10+
datafusion-expr = "52.1.0"
11+
datafusion-datasource = "52.1.0"
12+
arrow-json = "57.3.0"
13+
arrow = { version = "57.3.0", features = ["ffi", "ipc_compression"] }
14+
arrow-array = "57.3.0"
15+
arrow-schema = "57.3.0"
16+
arrow-buffer = "57.3.0"
1717
downcast-rs = "1.2"
1818

1919

2020
# JNI dependencies
2121
jni = "0.21"
2222

2323
# Substrait support
24-
datafusion-substrait = "51.0.0"
24+
datafusion-substrait = "52.1.0"
2525
prost = "0.14"
2626

2727

@@ -42,12 +42,15 @@ thiserror = "1.0"
4242
# Logging
4343
log = "0.4"
4444
# Parquet support
45-
parquet = "57.1.0"
45+
parquet = "57.3.0"
4646

4747
# Object store for file access
48-
object_store = "=0.12.4"
48+
object_store = "0.12.5"
4949
url = "2.0"
5050

51+
# Liquid Cache for byte-level caching
52+
liquid-cache-datafusion-local = "0.1.12"
53+
5154
# Substrait support
5255
substrait = "=0.62.0"
5356

plugins/engine-datafusion/jni/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ num_cpus = "1.16"
6161
object_store = { workspace = true }
6262
url = { workspace = true }
6363

64+
# Liquid Cache for byte-level caching
65+
liquid-cache-datafusion-local = { workspace = true }
66+
6467
# Substrait support
6568
substrait = { workspace = true }
6669

plugins/engine-datafusion/jni/src/absolute_row_id_optimizer.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use datafusion::{
2626
physical_optimizer::PhysicalOptimizerRule,
2727
physical_plan::{projection::ProjectionExec, ExecutionPlan},
2828
};
29+
use datafusion::physical_plan::ColumnStatistics;
2930
use datafusion_datasource::TableSchema;
3031

3132
#[derive(Debug)]
@@ -41,7 +42,7 @@ impl AbsoluteRowIdOptimizer {
4142
datasource_exec_schema: SchemaRef,
4243
) -> (SchemaRef, Vec<usize>) {
4344
// Clone projection and add new field index
44-
let mut projected_schema = datasource.projected_schema().clone();
45+
let projected_schema = datasource.projected_schema().expect("projected_schema failed");
4546
let file_source_schema = datasource.file_schema().clone();
4647

4748
let mut new_projections = vec![];
@@ -109,12 +110,22 @@ impl AbsoluteRowIdOptimizer {
109110
) -> Result<ProjectionExec, DataFusionError> {
110111
let (new_schema, new_projections) =
111112
self.build_updated_file_source_schema(datasource, data_source_exec_schema.clone());
113+
114+
let table_partition_cols = datasource.table_partition_cols().clone();
115+
let new_table_schema = TableSchema::new(new_schema.clone(), table_partition_cols);
116+
117+
use datafusion::datasource::physical_plan::ParquetSource;
118+
let new_file_source = Arc::new(ParquetSource::new(new_table_schema));
119+
112120
let file_scan_config = FileScanConfigBuilder::from(datasource.clone())
113-
.with_source(datasource.file_source.with_schema(TableSchema::from_file_schema(new_schema.clone())))
121+
.with_source(new_file_source)
114122
.with_projection_indices(Some(new_projections))
123+
.expect("Failed to set projection indices")
124+
.with_statistics(datasource.statistics().add_column_statistics(ColumnStatistics::new_unknown()))
115125
.build();
116126

117127
let new_datasource = DataSourceExec::from_data_source(file_scan_config);
128+
118129
let projection_exprs = self
119130
.build_projection_exprs(&new_datasource.schema())
120131
.expect("Failed to build projection expressions");
@@ -144,7 +155,7 @@ impl PhysicalOptimizerRule for AbsoluteRowIdOptimizer {
144155
return Ok(Transformed::new(Arc::new(projection), true, TreeNodeRecursion::Continue));
145156

146157
}
147-
158+
148159
Ok(Transformed::no(node))
149160
})?;
150161

plugins/engine-datafusion/jni/src/cache.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ impl CacheAccessor<ObjectMeta, Arc<dyn datafusion::execution::cache::cache_manag
110110
}
111111
}
112112

113-
fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn datafusion::execution::cache::cache_manager::FileMetadata>> {
113+
fn remove(&self, k: &ObjectMeta) -> Option<Arc<dyn datafusion::execution::cache::cache_manager::FileMetadata>> {
114114
match self.inner.lock() {
115115
Ok(mut cache) => cache.remove(k),
116116
Err(e) => {

plugins/engine-datafusion/jni/src/cache_jni.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -434,17 +434,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_cacheMana
434434

435435
match &runtime_env.custom_cache_manager {
436436
Some(manager) => {
437-
match cache_type.as_str() {
438-
cache::CACHE_TYPE_METADATA => {
439-
manager.contains_file(&file_path)
440-
}
441-
_ => {
442-
let msg = format!("Unknown cache type: {}", cache_type);
443-
log_debug!("{}", msg);
444-
let _ = env.throw_new("org/opensearch/datafusion/DataFusionException", &msg);
445-
false
446-
}
447-
}
437+
manager.contains_file_by_type(&file_path, &cache_type)
448438
}
449439
None => {
450440
let msg = "No custom cache manager available";

plugins/engine-datafusion/jni/src/custom_cache_manager.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ impl CustomCacheManager {
6363

6464
// Add statistics cache if available - use CustomStatisticsCache directly
6565
if let Some(stats_cache) = &self.statistics_cache {
66-
config = config.with_files_statistics_cache(Some(stats_cache.clone() as FileStatisticsCache));
66+
config = config.with_files_statistics_cache(Some(stats_cache.clone() as Arc<dyn FileStatisticsCache>));
6767
} else {
6868
// Default statistics cache if none set
6969
let default_stats = Arc::new(DefaultFileStatisticsCache::default());
@@ -161,13 +161,9 @@ impl CustomCacheManager {
161161
// Remove from statistics cache
162162
if let Some(cache) = &self.statistics_cache {
163163
let path = Path::from(file_path.clone());
164-
// Use contains_key to check if the entry exists before attempting removal
165-
if cache.contains_key(&path) {
166-
// Since we can't call remove directly on Arc<CustomStatisticsCache>,
167-
// we need to use the thread-safe DashMap operations
168-
if cache.inner().remove(&path).is_some() {
169-
any_removed = true;
170-
}
164+
// Use the CacheAccessor remove method to properly update memory tracking
165+
if cache.remove(&path).is_some() {
166+
any_removed = true;
171167
}
172168
}
173169

@@ -219,6 +215,25 @@ impl CustomCacheManager {
219215
found
220216
}
221217

218+
/// Check if a file exists in a specific cache type
219+
pub fn contains_file_by_type(&self, file_path: &str, cache_type: &str) -> bool {
220+
match cache_type {
221+
crate::cache::CACHE_TYPE_METADATA => {
222+
create_object_meta_from_file(file_path)
223+
.ok()
224+
.and_then(|metas| metas.first().cloned())
225+
.and_then(|meta| self.file_metadata_cache.as_ref()?.get(&meta))
226+
.is_some()
227+
}
228+
crate::cache::CACHE_TYPE_STATS => {
229+
self.statistics_cache
230+
.as_ref()
231+
.map_or(false, |cache| cache.contains_key(&Path::from(file_path)))
232+
}
233+
_ => false
234+
}
235+
}
236+
222237
/// Update the file metadata cache size limit
223238
pub fn update_metadata_cache_limit(&self, new_limit: usize) {
224239
if let Some(cache) = &self.file_metadata_cache {

plugins/engine-datafusion/jni/src/listing_table.rs

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ use datafusion_datasource::{
5656
file::FileSource,
5757
file_groups::FileGroup,
5858
file_scan_config::{FileScanConfig, FileScanConfigBuilder},
59-
schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory},
59+
schema_adapter::SchemaAdapterFactory,
6060
};
6161
use datafusion_expr::{dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType};
6262
use futures::future::err;
@@ -948,7 +948,7 @@ pub struct ListingTable {
948948
schema_source: SchemaSource,
949949
options: ListingOptions,
950950
definition: Option<String>,
951-
collected_statistics: FileStatisticsCache,
951+
collected_statistics: Arc<dyn FileStatisticsCache>,
952952
constraints: Constraints,
953953
column_defaults: HashMap<String, Expr>,
954954
/// Optional [`SchemaAdapterFactory`] for creating schema adapters
@@ -1020,7 +1020,7 @@ impl ListingTable {
10201020
/// multiple times in the same session.
10211021
///
10221022
/// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query.
1023-
pub fn with_cache(mut self, cache: Option<FileStatisticsCache>) -> Self {
1023+
pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
10241024
self.collected_statistics =
10251025
cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
10261026
self
@@ -1085,20 +1085,17 @@ impl ListingTable {
10851085
}
10861086

10871087
/// Creates a schema adapter for mapping between file and table schemas
1088-
///
1089-
/// Uses the configured schema adapter factory if available, otherwise falls back
1090-
/// to the default implementation.
1091-
fn create_schema_adapter(&self) -> Box<dyn SchemaAdapter> {
1092-
let table_schema = self.schema();
1093-
match &self.schema_adapter_factory {
1094-
Some(factory) => factory.create_with_projected_schema(Arc::clone(&table_schema)),
1095-
None => DefaultSchemaAdapterFactory::from_schema(Arc::clone(&table_schema)),
1096-
}
1097-
}
1098-
10991088
/// Creates a file source and applies schema adapter factory if available
11001089
fn create_file_source_with_schema_adapter(&self) -> Result<Arc<dyn FileSource>> {
1101-
let mut source = self.options.format.file_source();
1090+
let table_schema = datafusion_datasource::table_schema::TableSchema::new(
1091+
self.file_schema.clone(),
1092+
self.options
1093+
.table_partition_cols
1094+
.iter()
1095+
.map(|(name, dt)| Arc::new(Field::new(name, dt.clone(), false)) as _)
1096+
.collect(),
1097+
);
1098+
let mut source = self.options.format.file_source(table_schema);
11021099
// Apply schema adapter to source if available
11031100
//
11041101
// The source will use this SchemaAdapter to adapt data batches as they flow up the plan.
@@ -1309,16 +1306,14 @@ impl TableProvider for ListingTable {
13091306
state,
13101307
FileScanConfigBuilder::new(
13111308
object_store_url,
1312-
Arc::clone(&self.file_schema),
13131309
file_source,
13141310
)
13151311
.with_file_groups(partitioned_file_lists)
13161312
.with_constraints(self.constraints.clone())
13171313
.with_statistics(statistics)
1318-
.with_projection_indices(projection.cloned())
1314+
.with_projection_indices(projection.cloned())?
13191315
.with_limit(limit)
13201316
.with_output_ordering(output_ordering)
1321-
.with_table_partition_cols(table_partition_cols)
13221317
.with_expr_adapter(self.expr_adapter_factory.clone())
13231318
.build(),
13241319
)
@@ -1469,17 +1464,22 @@ impl ListingTable {
14691464
inexact_stats,
14701465
)?;
14711466

1472-
let schema_adapter = self.create_schema_adapter();
1473-
let (schema_mapper, _) = schema_adapter.map_schema(self.file_schema.as_ref())?;
1474-
1475-
stats.column_statistics = schema_mapper.map_column_statistics(&stats.column_statistics)?;
1476-
file_groups.iter_mut().try_for_each(|file_group| {
1477-
if let Some(stat) = file_group.statistics_mut() {
1478-
stat.column_statistics =
1479-
schema_mapper.map_column_statistics(&stat.column_statistics)?;
1480-
}
1481-
Ok::<_, DataFusionError>(())
1482-
})?;
1467+
// Only map statistics if schema_adapter_factory is explicitly set
1468+
// In DataFusion 52.1, SchemaAdapter has been removed and replaced with PhysicalExprAdapterFactory
1469+
// Statistics mapping is now optional and only done when explicitly configured
1470+
if let Some(factory) = &self.schema_adapter_factory {
1471+
let schema_adapter = factory.create_with_projected_schema(self.schema());
1472+
let (schema_mapper, _) = schema_adapter.map_schema(self.file_schema.as_ref())?;
1473+
1474+
stats.column_statistics = schema_mapper.map_column_statistics(&stats.column_statistics)?;
1475+
file_groups.iter_mut().try_for_each(|file_group| {
1476+
if let Some(stat) = file_group.statistics_mut() {
1477+
stat.column_statistics = schema_mapper.map_column_statistics(&stat.column_statistics)?;
1478+
}
1479+
Ok::<_, DataFusionError>(())
1480+
})?;
1481+
}
1482+
14831483
Ok((file_groups, stats))
14841484
}
14851485

plugins/engine-datafusion/jni/src/query_executor.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use datafusion::{
1818
datasource::physical_plan::parquet::{ParquetAccessPlan, RowGroupAccess},
1919
datasource::physical_plan::ParquetSource,
2020
execution::cache::cache_manager::CacheManagerConfig,
21-
execution::cache::cache_unit::DefaultListFilesCache,
21+
execution::cache::DefaultListFilesCache,
2222
execution::cache::CacheAccessor,
2323
execution::context::SessionContext,
2424
execution::runtime_env::RuntimeEnvBuilder,
@@ -31,6 +31,7 @@ use datafusion_datasource::PartitionedFile;
3131
use datafusion_datasource::file_groups::FileGroup;
3232
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
3333
use datafusion_datasource::source::DataSourceExec;
34+
use datafusion_datasource::TableSchema;
3435
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
3536
use datafusion_substrait::substrait::proto::{Plan, extensions::simple_extension_declaration::MappingType};
3637
use object_store::ObjectMeta;
@@ -121,7 +122,11 @@ pub async fn execute_query_with_cross_rt_stream(
121122
);
122123

123124
let list_file_cache = Arc::new(DefaultListFilesCache::default());
124-
list_file_cache.put(table_path.prefix(), object_meta);
125+
let table_scoped_path = datafusion::execution::cache::TableScopedPath {
126+
table: None,
127+
path: table_path.prefix().clone(),
128+
};
129+
list_file_cache.put(&table_scoped_path, object_meta);
125130

126131
let runtimeEnv = &runtime.runtime_env;
127132

@@ -361,15 +366,18 @@ pub async fn execute_fetch_phase(
361366
);
362367

363368
let list_file_cache = Arc::new(DefaultListFilesCache::default());
364-
list_file_cache.put(table_path.prefix(), object_meta);
369+
let table_scoped_path = datafusion::execution::cache::TableScopedPath {
370+
table: None,
371+
path: table_path.prefix().clone(),
372+
};
373+
list_file_cache.put(&table_scoped_path, object_meta);
365374

366375
let runtime_env = RuntimeEnvBuilder::new()
367376
.with_cache_manager(
368377
CacheManagerConfig::default().with_list_files_cache(Some(list_file_cache))
369-
.with_metadata_cache_limit(runtime.runtime_env.cache_manager.get_file_metadata_cache().cache_limit())
378+
.with_metadata_cache_limit(runtime.runtime_env.cache_manager.get_file_metadata_cache().cache_limit())
370379
.with_file_metadata_cache(Some(runtime.runtime_env.cache_manager.get_file_metadata_cache().clone()))
371380
.with_files_statistics_cache(runtime.runtime_env.cache_manager.get_file_statistic_cache()),
372-
373381
)
374382
.build()?;
375383

@@ -414,7 +422,12 @@ pub async fn execute_fetch_phase(
414422
.collect();
415423

416424
let file_group = FileGroup::new(partitioned_files);
417-
let file_source = Arc::new(ParquetSource::default());
425+
426+
let table_schema = datafusion_datasource::table_schema::TableSchema::new(
427+
parquet_schema.clone(),
428+
vec![Arc::new(Field::new(ROW_BASE_FIELD_NAME, DataType::Int64, false))],
429+
);
430+
let file_source = Arc::new(ParquetSource::new(table_schema));
418431

419432
let mut projection_index = vec![];
420433
for field_name in projections.iter() {
@@ -435,17 +448,15 @@ pub async fn execute_fetch_phase(
435448

436449
let file_scan_config = FileScanConfigBuilder::new(
437450
ObjectStoreUrl::local_filesystem(),
438-
parquet_schema.clone(),
439451
file_source,
440452
)
441-
.with_table_partition_cols(vec![Field::new(ROW_BASE_FIELD_NAME, DataType::Int64, false)])
442-
.with_projection_indices(Some(projection_index.clone()))
453+
.with_projection_indices(Some(projection_index.clone()))?
443454
.with_file_group(file_group)
444455
.build();
445456

446457
let parquet_exec = DataSourceExec::from_data_source(file_scan_config.clone());
447458

448-
let projection_exprs = build_projection_exprs(file_scan_config.projected_schema())
459+
let projection_exprs = build_projection_exprs(file_scan_config.projected_schema()?)
449460
.expect("Failed to build projection expressions");
450461

451462
let projection_exec = Arc::new(ProjectionExec::try_new(projection_exprs, parquet_exec)

0 commit comments

Comments
 (0)