Skip to content

Commit 776c709

Browse files
author
Tanvir Alam
committed
Integrate Liquid Cache with DataFusion 52.1 for byte-level Parquet caching with comprehensive logging
1 parent e669187 commit 776c709

File tree

10 files changed

+240
-75
lines changed

10 files changed

+240
-75
lines changed

plugins/engine-datafusion/Cargo.toml

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,22 @@ members = [
66

77
[workspace.dependencies]
88
# DataFusion dependencies
9-
datafusion = "51.0.0"
10-
datafusion-expr = "51.0.0"
11-
datafusion-datasource = "51.0.0"
12-
arrow-json = "57.1.0"
13-
arrow = { version = "57.1.0", features = ["ffi", "ipc_compression"] }
14-
arrow-array = "57.1.0"
15-
arrow-schema = "57.1.0"
16-
arrow-buffer = "57.1.0"
9+
datafusion = "52.1.0"
10+
datafusion-expr = "52.1.0"
11+
datafusion-datasource = "52.1.0"
12+
arrow-json = "57.3.0"
13+
arrow = { version = "57.3.0", features = ["ffi", "ipc_compression"] }
14+
arrow-array = "57.3.0"
15+
arrow-schema = "57.3.0"
16+
arrow-buffer = "57.3.0"
1717
downcast-rs = "1.2"
1818

1919

2020
# JNI dependencies
2121
jni = "0.21"
2222

2323
# Substrait support
24-
datafusion-substrait = "51.0.0"
24+
datafusion-substrait = "52.1.0"
2525
prost = "0.14"
2626

2727

@@ -42,10 +42,10 @@ thiserror = "1.0"
4242
# Logging
4343
log = "0.4"
4444
# Parquet support
45-
parquet = "57.1.0"
45+
parquet = "57.3.0"
4646

4747
# Object store for file access
48-
object_store = "=0.12.4"
48+
object_store = "0.12.5"
4949
url = "2.0"
5050

5151
# Substrait support

plugins/engine-datafusion/jni/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ num_cpus = "1.16"
6161
object_store = { workspace = true }
6262
url = { workspace = true }
6363

64+
# Liquid Cache for byte-level caching
65+
liquid-cache-datafusion-local = "0.1.12"
66+
6467
# Substrait support
6568
substrait = { workspace = true }
6669

plugins/engine-datafusion/jni/src/absolute_row_id_optimizer.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use datafusion::{
1414
common::tree_node::{Transformed, TreeNode, TreeNodeRecursion},
1515
config::ConfigOptions,
1616
datasource::{
17-
physical_plan::{FileScanConfig, FileScanConfigBuilder},
17+
physical_plan::{FileScanConfig, FileScanConfigBuilder, ParquetSource},
1818
source::DataSourceExec,
1919
},
2020
error::DataFusionError,
@@ -41,7 +41,7 @@ impl AbsoluteRowIdOptimizer {
4141
datasource_exec_schema: SchemaRef,
4242
) -> (SchemaRef, Vec<usize>) {
4343
// Clone projection and add new field index
44-
let mut projected_schema = datasource.projected_schema().clone();
44+
let projected_schema = datasource.projected_schema().expect("Failed to get projected schema");
4545
let file_source_schema = datasource.file_schema().clone();
4646

4747
let mut new_projections = vec![];
@@ -109,9 +109,13 @@ impl AbsoluteRowIdOptimizer {
109109
) -> Result<ProjectionExec, DataFusionError> {
110110
let (new_schema, new_projections) =
111111
self.build_updated_file_source_schema(datasource, data_source_exec_schema.clone());
112+
113+
// Create a new file source with the updated schema
114+
let new_file_source = ParquetSource::new(new_schema.clone());
115+
112116
let file_scan_config = FileScanConfigBuilder::from(datasource.clone())
113-
.with_source(datasource.file_source.with_schema(TableSchema::from_file_schema(new_schema.clone())))
114-
.with_projection_indices(Some(new_projections))
117+
.with_source(Arc::new(new_file_source))
118+
.with_projection_indices(Some(new_projections))?
115119
.build();
116120

117121
let new_datasource = DataSourceExec::from_data_source(file_scan_config);

plugins/engine-datafusion/jni/src/cache.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ impl CacheAccessor<ObjectMeta, Arc<dyn datafusion::execution::cache::cache_manag
110110
}
111111
}
112112

113-
fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn datafusion::execution::cache::cache_manager::FileMetadata>> {
113+
fn remove(&self, k: &ObjectMeta) -> Option<Arc<dyn datafusion::execution::cache::cache_manager::FileMetadata>> {
114114
match self.inner.lock() {
115115
Ok(mut cache) => cache.remove(k),
116116
Err(e) => {

plugins/engine-datafusion/jni/src/custom_cache_manager.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,9 @@ impl CustomCacheManager {
6161
.with_metadata_cache_limit(cache.cache_limit());
6262
}
6363

64-
// Add statistics cache if available - use CustomStatisticsCache directly
65-
if let Some(stats_cache) = &self.statistics_cache {
66-
config = config.with_files_statistics_cache(Some(stats_cache.clone() as FileStatisticsCache));
67-
} else {
68-
// Default statistics cache if none set
69-
let default_stats = Arc::new(DefaultFileStatisticsCache::default());
70-
config = config.with_files_statistics_cache(Some(default_stats));
71-
}
64+
// Add statistics cache if available - use default since CustomStatisticsCache doesn't implement FileStatisticsCache trait
65+
let default_stats = Arc::new(DefaultFileStatisticsCache::default());
66+
config = config.with_files_statistics_cache(Some(default_stats));
7267

7368
config
7469
}

plugins/engine-datafusion/jni/src/lib.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,15 @@ use crate::runtime_manager::RuntimeManager;
8080

8181
mod statistics_cache;
8282
mod eviction_policy;
83+
mod liquid_cache_runtime;
84+
85+
use liquid_cache_runtime::LiquidOnlyRuntime;
8386

8487
struct DataFusionRuntime {
8588
runtime_env: RuntimeEnv,
8689
custom_cache_manager: Option<CustomCacheManager>,
8790
monitor: Arc<Monitor>,
91+
liquid_cache_runtime_env: Arc<RuntimeEnv>,
8892
}
8993

9094
// TASK monitorint metrics
@@ -291,10 +295,25 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_createGlo
291295
.with_disk_manager_builder(builder)
292296
.build().unwrap();
293297

298+
log_info!("[LiquidCache] Initializing global Liquid Cache (1GB max)");
299+
let liquid_runtime = match LiquidOnlyRuntime::init(1024 * 1024 * 1024) {
300+
Ok(runtime) => {
301+
log_info!("[LiquidCache] ✓ Liquid Cache initialized successfully");
302+
runtime
303+
},
304+
Err(e) => {
305+
log_error!("[LiquidCache] ERROR: Failed to initialize Liquid Cache: {}", e);
306+
panic!("Failed to initialize Liquid Cache: {}", e);
307+
}
308+
};
309+
310+
let liquid_cache_runtime_env = liquid_runtime.runtime_env();
311+
294312
let runtime = DataFusionRuntime {
295313
runtime_env,
296314
custom_cache_manager,
297315
monitor,
316+
liquid_cache_runtime_env,
298317
};
299318

300319
Box::into_raw(Box::new(runtime)) as jlong
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
use std::{fs, path::PathBuf, sync::{Arc, OnceLock}};
10+
11+
use datafusion::{
12+
common::DataFusionError,
13+
execution::runtime_env::RuntimeEnv,
14+
prelude::SessionConfig,
15+
};
16+
17+
use liquid_cache_datafusion_local::LiquidCacheLocalBuilder;
18+
19+
type LiquidCacheRef = std::sync::Arc<dyn std::any::Any + Send + Sync>;
20+
use vectorized_exec_spi::{log_info, log_debug};
21+
22+
pub const LIQUID_CACHE_DIR: &str = "/var/lib/opensearch/liquid_cache";
23+
24+
pub struct LiquidOnlyRuntime {
25+
runtime_env: Arc<RuntimeEnv>,
26+
liquid_cache: LiquidCacheRef,
27+
}
28+
29+
static LIQUID_ONLY: OnceLock<Result<LiquidOnlyRuntime, String>> = OnceLock::new();
30+
31+
impl LiquidOnlyRuntime {
32+
pub fn init(max_cache_bytes: u64) -> Result<&'static LiquidOnlyRuntime, DataFusionError> {
33+
log_info!("[LiquidCache] Initializing Liquid Cache with max_cache_bytes={}", max_cache_bytes);
34+
35+
let result = LIQUID_ONLY.get_or_init(|| {
36+
log_info!("[LiquidCache] Creating new Liquid Cache instance");
37+
38+
let cache_dir = PathBuf::from(LIQUID_CACHE_DIR);
39+
log_info!("[LiquidCache] Cache directory: {:?}", cache_dir);
40+
41+
match fs::create_dir_all(&cache_dir) {
42+
Ok(_) => log_info!("[LiquidCache] Cache directory created/verified successfully"),
43+
Err(e) => {
44+
log_info!("[LiquidCache] ERROR: Failed to create cache directory: {}", e);
45+
return Err(format!(
46+
"Failed to create liquid cache dir {:?}: {}",
47+
cache_dir, e
48+
));
49+
}
50+
}
51+
52+
let bootstrap_cfg = SessionConfig::new();
53+
log_info!("[LiquidCache] Building Liquid Cache with LiquidCacheLocalBuilder");
54+
55+
let (liquid_ctx, cache_ref) = match LiquidCacheLocalBuilder::new()
56+
.with_max_cache_bytes(max_cache_bytes as usize)
57+
.with_cache_dir(cache_dir.clone())
58+
.build(bootstrap_cfg) {
59+
Ok(result) => {
60+
log_info!("[LiquidCache] ✓ Liquid Cache built successfully");
61+
log_info!("[LiquidCache] Cache directory: {:?}", cache_dir);
62+
log_info!("[LiquidCache] Max cache size: {} bytes ({} MB)",
63+
max_cache_bytes, max_cache_bytes / 1024 / 1024);
64+
result
65+
},
66+
Err(e) => {
67+
log_info!("[LiquidCache] ERROR: Failed to build Liquid Cache: {}", e);
68+
return Err(format!("Liquid cache build failed: {e}"));
69+
}
70+
};
71+
72+
log_info!("[LiquidCache] ✓ Liquid Cache initialization complete");
73+
74+
Ok(LiquidOnlyRuntime {
75+
runtime_env: liquid_ctx.runtime_env(),
76+
liquid_cache: cache_ref,
77+
})
78+
});
79+
80+
match result.as_ref() {
81+
Ok(_) => {
82+
log_info!("[LiquidCache] ✓ Liquid Cache ready for use");
83+
Ok(result.as_ref().unwrap())
84+
},
85+
Err(e) => {
86+
log_info!("[LiquidCache] ERROR: Liquid Cache initialization failed: {}", e);
87+
Err(DataFusionError::Execution(e.clone()))
88+
}
89+
}
90+
}
91+
92+
pub fn runtime_env(&self) -> Arc<RuntimeEnv> {
93+
self.runtime_env.clone()
94+
}
95+
96+
pub fn cache(&self) -> &LiquidCacheRef {
97+
&self.liquid_cache
98+
}
99+
}

plugins/engine-datafusion/jni/src/listing_table.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -948,7 +948,7 @@ pub struct ListingTable {
948948
schema_source: SchemaSource,
949949
options: ListingOptions,
950950
definition: Option<String>,
951-
collected_statistics: FileStatisticsCache,
951+
collected_statistics: Option<Arc<dyn FileStatisticsCache>>,
952952
constraints: Constraints,
953953
column_defaults: HashMap<String, Expr>,
954954
/// Optional [`SchemaAdapterFactory`] for creating schema adapters
@@ -992,7 +992,7 @@ impl ListingTable {
992992
schema_source,
993993
options,
994994
definition: None,
995-
collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
995+
collected_statistics: Some(Arc::new(DefaultFileStatisticsCache::default())),
996996
constraints: Constraints::default(),
997997
column_defaults: HashMap::new(),
998998
schema_adapter_factory: config.schema_adapter_factory,
@@ -1020,9 +1020,9 @@ impl ListingTable {
10201020
/// multiple times in the same session.
10211021
///
10221022
/// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query.
1023-
pub fn with_cache(mut self, cache: Option<FileStatisticsCache>) -> Self {
1023+
pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
10241024
self.collected_statistics =
1025-
cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
1025+
Some(cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default())));
10261026
self
10271027
}
10281028

@@ -1098,7 +1098,7 @@ impl ListingTable {
10981098

10991099
/// Creates a file source and applies schema adapter factory if available
11001100
fn create_file_source_with_schema_adapter(&self) -> Result<Arc<dyn FileSource>> {
1101-
let mut source = self.options.format.file_source();
1101+
let mut source = self.options.format.file_source(self.file_schema.clone().into());
11021102
// Apply schema adapter to source if available
11031103
//
11041104
// The source will use this SchemaAdapter to adapt data batches as they flow up the plan.
@@ -1309,16 +1309,14 @@ impl TableProvider for ListingTable {
13091309
state,
13101310
FileScanConfigBuilder::new(
13111311
object_store_url,
1312-
Arc::clone(&self.file_schema),
13131312
file_source,
13141313
)
13151314
.with_file_groups(partitioned_file_lists)
13161315
.with_constraints(self.constraints.clone())
13171316
.with_statistics(statistics)
1318-
.with_projection_indices(projection.cloned())
1317+
.with_projection_indices(projection.cloned())?
13191318
.with_limit(limit)
13201319
.with_output_ordering(output_ordering)
1321-
.with_table_partition_cols(table_partition_cols)
13221320
.with_expr_adapter(self.expr_adapter_factory.clone())
13231321
.build(),
13241322
)
@@ -1496,7 +1494,8 @@ impl ListingTable {
14961494
) -> Result<Arc<Statistics>> {
14971495
match self
14981496
.collected_statistics
1499-
.get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
1497+
.as_ref()
1498+
.and_then(|cache| cache.get_with_extra(&part_file.object_meta.location, &part_file.object_meta))
15001499
{
15011500
Some(statistics) => Ok(statistics),
15021501
None => {
@@ -1511,11 +1510,13 @@ impl ListingTable {
15111510
)
15121511
.await?;
15131512
let statistics = Arc::new(statistics);
1514-
self.collected_statistics.put_with_extra(
1513+
if let Some(cache) = &self.collected_statistics {
1514+
cache.put_with_extra(
15151515
&part_file.object_meta.location,
15161516
Arc::clone(&statistics),
15171517
&part_file.object_meta,
15181518
);
1519+
}
15191520
Ok(statistics)
15201521
}
15211522
}

0 commit comments

Comments
 (0)