Skip to content

Commit 13e10d5

Browse files
author
Tanvir Alam
committed
Upgrade DataFusion to 52.1.0 and add liquid-cache-datafusion-local dependency
1 parent e669187 commit 13e10d5

File tree

8 files changed

+69
-33
lines changed

8 files changed

+69
-33
lines changed

plugins/engine-datafusion/Cargo.toml

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,22 @@ members = [
66

77
[workspace.dependencies]
88
# DataFusion dependencies
9-
datafusion = "51.0.0"
10-
datafusion-expr = "51.0.0"
11-
datafusion-datasource = "51.0.0"
12-
arrow-json = "57.1.0"
13-
arrow = { version = "57.1.0", features = ["ffi", "ipc_compression"] }
14-
arrow-array = "57.1.0"
15-
arrow-schema = "57.1.0"
16-
arrow-buffer = "57.1.0"
9+
datafusion = "52.1.0"
10+
datafusion-expr = "52.1.0"
11+
datafusion-datasource = "52.1.0"
12+
arrow-json = "57.3.0"
13+
arrow = { version = "57.3.0", features = ["ffi", "ipc_compression"] }
14+
arrow-array = "57.3.0"
15+
arrow-schema = "57.3.0"
16+
arrow-buffer = "57.3.0"
1717
downcast-rs = "1.2"
1818

1919

2020
# JNI dependencies
2121
jni = "0.21"
2222

2323
# Substrait support
24-
datafusion-substrait = "51.0.0"
24+
datafusion-substrait = "52.1.0"
2525
prost = "0.14"
2626

2727

@@ -42,10 +42,10 @@ thiserror = "1.0"
4242
# Logging
4343
log = "0.4"
4444
# Parquet support
45-
parquet = "57.1.0"
45+
parquet = "57.3.0"
4646

4747
# Object store for file access
48-
object_store = "=0.12.4"
48+
object_store = "0.12.5"
4949
url = "2.0"
5050

5151
# Substrait support

plugins/engine-datafusion/jni/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ num_cpus = "1.16"
6161
object_store = { workspace = true }
6262
url = { workspace = true }
6363

64+
# Liquid Cache for byte-level caching
65+
liquid-cache-datafusion-local = "0.1.12"
66+
6467
# Substrait support
6568
substrait = { workspace = true }
6669

plugins/engine-datafusion/jni/src/absolute_row_id_optimizer.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ impl AbsoluteRowIdOptimizer {
4141
datasource_exec_schema: SchemaRef,
4242
) -> (SchemaRef, Vec<usize>) {
4343
// Clone projection and add new field index
44-
let mut projected_schema = datasource.projected_schema().clone();
44+
let projected_schema = datasource.projected_schema().expect("projected_schema failed");
4545
let file_source_schema = datasource.file_schema().clone();
4646

4747
let mut new_projections = vec![];
@@ -109,12 +109,21 @@ impl AbsoluteRowIdOptimizer {
109109
) -> Result<ProjectionExec, DataFusionError> {
110110
let (new_schema, new_projections) =
111111
self.build_updated_file_source_schema(datasource, data_source_exec_schema.clone());
112+
113+
let table_partition_cols = datasource.table_partition_cols().clone();
114+
let new_table_schema = TableSchema::new(new_schema.clone(), table_partition_cols);
115+
116+
use datafusion::datasource::physical_plan::ParquetSource;
117+
let new_file_source = Arc::new(ParquetSource::new(new_table_schema));
118+
112119
let file_scan_config = FileScanConfigBuilder::from(datasource.clone())
113-
.with_source(datasource.file_source.with_schema(TableSchema::from_file_schema(new_schema.clone())))
120+
.with_source(new_file_source)
114121
.with_projection_indices(Some(new_projections))
122+
.expect("Failed to set projection indices")
115123
.build();
116124

117125
let new_datasource = DataSourceExec::from_data_source(file_scan_config);
126+
118127
let projection_exprs = self
119128
.build_projection_exprs(&new_datasource.schema())
120129
.expect("Failed to build projection expressions");

plugins/engine-datafusion/jni/src/cache.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ impl CacheAccessor<ObjectMeta, Arc<dyn datafusion::execution::cache::cache_manag
110110
}
111111
}
112112

113-
fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn datafusion::execution::cache::cache_manager::FileMetadata>> {
113+
fn remove(&self, k: &ObjectMeta) -> Option<Arc<dyn datafusion::execution::cache::cache_manager::FileMetadata>> {
114114
match self.inner.lock() {
115115
Ok(mut cache) => cache.remove(k),
116116
Err(e) => {

plugins/engine-datafusion/jni/src/custom_cache_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ impl CustomCacheManager {
6363

6464
// Add statistics cache if available - use CustomStatisticsCache directly
6565
if let Some(stats_cache) = &self.statistics_cache {
66-
config = config.with_files_statistics_cache(Some(stats_cache.clone() as FileStatisticsCache));
66+
config = config.with_files_statistics_cache(Some(stats_cache.clone() as Arc<dyn FileStatisticsCache>));
6767
} else {
6868
// Default statistics cache if none set
6969
let default_stats = Arc::new(DefaultFileStatisticsCache::default());

plugins/engine-datafusion/jni/src/listing_table.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -948,7 +948,7 @@ pub struct ListingTable {
948948
schema_source: SchemaSource,
949949
options: ListingOptions,
950950
definition: Option<String>,
951-
collected_statistics: FileStatisticsCache,
951+
collected_statistics: Arc<dyn FileStatisticsCache>,
952952
constraints: Constraints,
953953
column_defaults: HashMap<String, Expr>,
954954
/// Optional [`SchemaAdapterFactory`] for creating schema adapters
@@ -1020,7 +1020,7 @@ impl ListingTable {
10201020
/// multiple times in the same session.
10211021
///
10221022
/// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query.
1023-
pub fn with_cache(mut self, cache: Option<FileStatisticsCache>) -> Self {
1023+
pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
10241024
self.collected_statistics =
10251025
cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
10261026
self
@@ -1098,7 +1098,15 @@ impl ListingTable {
10981098

10991099
/// Creates a file source and applies schema adapter factory if available
11001100
fn create_file_source_with_schema_adapter(&self) -> Result<Arc<dyn FileSource>> {
1101-
let mut source = self.options.format.file_source();
1101+
let table_schema = datafusion_datasource::table_schema::TableSchema::new(
1102+
self.file_schema.clone(),
1103+
self.options
1104+
.table_partition_cols
1105+
.iter()
1106+
.map(|(name, dt)| Arc::new(Field::new(name, dt.clone(), false)) as _)
1107+
.collect(),
1108+
);
1109+
let mut source = self.options.format.file_source(table_schema);
11021110
// Apply schema adapter to source if available
11031111
//
11041112
// The source will use this SchemaAdapter to adapt data batches as they flow up the plan.
@@ -1309,16 +1317,14 @@ impl TableProvider for ListingTable {
13091317
state,
13101318
FileScanConfigBuilder::new(
13111319
object_store_url,
1312-
Arc::clone(&self.file_schema),
13131320
file_source,
13141321
)
13151322
.with_file_groups(partitioned_file_lists)
13161323
.with_constraints(self.constraints.clone())
13171324
.with_statistics(statistics)
1318-
.with_projection_indices(projection.cloned())
1325+
.with_projection_indices(projection.cloned())?
13191326
.with_limit(limit)
13201327
.with_output_ordering(output_ordering)
1321-
.with_table_partition_cols(table_partition_cols)
13221328
.with_expr_adapter(self.expr_adapter_factory.clone())
13231329
.build(),
13241330
)

plugins/engine-datafusion/jni/src/query_executor.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use datafusion::{
1818
datasource::physical_plan::parquet::{ParquetAccessPlan, RowGroupAccess},
1919
datasource::physical_plan::ParquetSource,
2020
execution::cache::cache_manager::CacheManagerConfig,
21-
execution::cache::cache_unit::DefaultListFilesCache,
21+
execution::cache::DefaultListFilesCache,
2222
execution::cache::CacheAccessor,
2323
execution::context::SessionContext,
2424
execution::runtime_env::RuntimeEnvBuilder,
@@ -31,6 +31,7 @@ use datafusion_datasource::PartitionedFile;
3131
use datafusion_datasource::file_groups::FileGroup;
3232
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
3333
use datafusion_datasource::source::DataSourceExec;
34+
use datafusion_datasource::TableSchema;
3435
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
3536
use datafusion_substrait::substrait::proto::{Plan, extensions::simple_extension_declaration::MappingType};
3637
use object_store::ObjectMeta;
@@ -120,7 +121,11 @@ pub async fn execute_query_with_cross_rt_stream(
120121
);
121122

122123
let list_file_cache = Arc::new(DefaultListFilesCache::default());
123-
list_file_cache.put(table_path.prefix(), object_meta);
124+
let table_scoped_path = datafusion::execution::cache::TableScopedPath {
125+
table: None,
126+
path: table_path.prefix().clone(),
127+
};
128+
list_file_cache.put(&table_scoped_path, object_meta);
124129

125130
let runtimeEnv = &runtime.runtime_env;
126131

@@ -142,7 +147,7 @@ pub async fn execute_query_with_cross_rt_stream(
142147

143148
let mut config = SessionConfig::new();
144149
config.options_mut().execution.parquet.pushdown_filters = false;
145-
config.options_mut().execution.target_partitions = 1;
150+
config.options_mut().execution.target_partitions = 4;
146151
config.options_mut().execution.batch_size = 8192;
147152

148153
let state = datafusion::execution::SessionStateBuilder::new()
@@ -360,15 +365,18 @@ pub async fn execute_fetch_phase(
360365
);
361366

362367
let list_file_cache = Arc::new(DefaultListFilesCache::default());
363-
list_file_cache.put(table_path.prefix(), object_meta);
368+
let table_scoped_path = datafusion::execution::cache::TableScopedPath {
369+
table: None,
370+
path: table_path.prefix().clone(),
371+
};
372+
list_file_cache.put(&table_scoped_path, object_meta);
364373

365374
let runtime_env = RuntimeEnvBuilder::new()
366375
.with_cache_manager(
367376
CacheManagerConfig::default().with_list_files_cache(Some(list_file_cache))
368-
.with_metadata_cache_limit(runtime.runtime_env.cache_manager.get_file_metadata_cache().cache_limit())
377+
.with_metadata_cache_limit(runtime.runtime_env.cache_manager.get_file_metadata_cache().cache_limit())
369378
.with_file_metadata_cache(Some(runtime.runtime_env.cache_manager.get_file_metadata_cache().clone()))
370379
.with_files_statistics_cache(runtime.runtime_env.cache_manager.get_file_statistic_cache()),
371-
372380
)
373381
.build()?;
374382

@@ -413,7 +421,12 @@ pub async fn execute_fetch_phase(
413421
.collect();
414422

415423
let file_group = FileGroup::new(partitioned_files);
416-
let file_source = Arc::new(ParquetSource::default());
424+
425+
let table_schema = datafusion_datasource::table_schema::TableSchema::new(
426+
parquet_schema.clone(),
427+
vec![Arc::new(Field::new(ROW_BASE_FIELD_NAME, DataType::Int64, false))],
428+
);
429+
let file_source = Arc::new(ParquetSource::new(table_schema));
417430

418431
let mut projection_index = vec![];
419432
for field_name in projections.iter() {
@@ -434,17 +447,15 @@ pub async fn execute_fetch_phase(
434447

435448
let file_scan_config = FileScanConfigBuilder::new(
436449
ObjectStoreUrl::local_filesystem(),
437-
parquet_schema.clone(),
438450
file_source,
439451
)
440-
.with_table_partition_cols(vec![Field::new(ROW_BASE_FIELD_NAME, DataType::Int64, false)])
441-
.with_projection_indices(Some(projection_index.clone()))
452+
.with_projection_indices(Some(projection_index.clone()))?
442453
.with_file_group(file_group)
443454
.build();
444455

445456
let parquet_exec = DataSourceExec::from_data_source(file_scan_config.clone());
446457

447-
let projection_exprs = build_projection_exprs(file_scan_config.projected_schema())
458+
let projection_exprs = build_projection_exprs(file_scan_config.projected_schema()?)
448459
.expect("Failed to build projection expressions");
449460

450461
let projection_exec = Arc::new(ProjectionExec::try_new(projection_exprs, parquet_exec)

plugins/engine-datafusion/jni/src/statistics_cache.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ impl CacheAccessor<Path, Arc<Statistics>> for CustomStatisticsCache {
433433
result
434434
}
435435

436-
fn remove(&mut self, k: &Path) -> Option<Arc<Statistics>> {
436+
fn remove(&self, k: &Path) -> Option<Arc<Statistics>> {
437437
let key = k.to_string();
438438

439439
// Actually remove from the underlying cache
@@ -497,6 +497,13 @@ impl CacheAccessor<Path, Arc<Statistics>> for CustomStatisticsCache {
497497
}
498498
}
499499

500+
impl datafusion::execution::cache::cache_manager::FileStatisticsCache for CustomStatisticsCache {
501+
fn list_entries(&self) -> std::collections::HashMap<object_store::path::Path, datafusion::execution::cache::cache_manager::FileStatisticsCacheEntry> {
502+
// Return empty map — this is used for introspection only
503+
std::collections::HashMap::new()
504+
}
505+
}
506+
500507
impl Default for CustomStatisticsCache {
501508
fn default() -> Self {
502509
Self::with_default_config()

0 commit comments

Comments
 (0)