diff --git a/plugins/engine-datafusion/Cargo.toml b/plugins/engine-datafusion/Cargo.toml index cead877ac0d2e..eeda4ae4cbefa 100644 --- a/plugins/engine-datafusion/Cargo.toml +++ b/plugins/engine-datafusion/Cargo.toml @@ -9,6 +9,10 @@ members = [ datafusion = "52.1.0" datafusion-expr = "52.1.0" datafusion-datasource = "52.1.0" +datafusion-datasource-parquet = "52.1.0" +datafusion-common = "52.1.0" +datafusion-execution = "52.1.0" +datafusion-physical-expr = "52.1.0" arrow-json = "57.3.0" arrow = { version = "57.3.0", features = ["ffi", "ipc_compression"] } arrow-array = "57.3.0" @@ -86,3 +90,23 @@ codegen-units = 1 # More parallel compilation incremental = true # Enable incremental compilation debug = "full" strip = false + +# Patch arrow/parquet to local arrow-rs with sparse column chunk fix +# Required by indexed-table crate for correct page index handling +# https://github.com/apache/arrow-rs/pull/9301/changes +#[patch.crates-io] +#parquet = { path = "../../../arrow-rs/parquet" } +#arrow = { path = "../../../arrow-rs/arrow" } +#arrow-array = { path = "../../../arrow-rs/arrow-array" } +#arrow-buffer = { path = "../../../arrow-rs/arrow-buffer" } +#arrow-data = { path = "../../../arrow-rs/arrow-data" } +#arrow-schema = { path = "../../../arrow-rs/arrow-schema" } +#arrow-select = { path = "../../../arrow-rs/arrow-select" } +#arrow-ipc = { path = "../../../arrow-rs/arrow-ipc" } +#arrow-cast = { path = "../../../arrow-rs/arrow-cast" } +#arrow-ord = { path = "../../../arrow-rs/arrow-ord" } +#arrow-string = { path = "../../../arrow-rs/arrow-string" } +#arrow-row = { path = "../../../arrow-rs/arrow-row" } +#arrow-csv = { path = "../../../arrow-rs/arrow-csv" } +#arrow-json = { path = "../../../arrow-rs/arrow-json" } +#arrow-arith = { path = "../../../arrow-rs/arrow-arith" } diff --git a/plugins/engine-datafusion/jni/Cargo.toml b/plugins/engine-datafusion/jni/Cargo.toml index 011498ed186a4..d7998381c5ae1 100644 --- a/plugins/engine-datafusion/jni/Cargo.toml +++ b/plugins/engine-datafusion/jni/Cargo.toml @@ -14,6 +14,9 @@ crate-type = ["cdylib"] datafusion = { workspace = true } datafusion-expr = { workspace = true } datafusion-datasource = { workspace = true } +datafusion-datasource-parquet = { workspace = true } +datafusion-common = { workspace = true } +datafusion-execution = { workspace = true } arrow-json = { workspace = true } arrow = { workspace = true } #arrow = "55.2.0" @@ -50,6 +53,7 @@ log ={ workspace = true } # Shared OpenSearch utilities vectorized-exec-spi = { path = "../../../libs/vectorized-exec-spi/rust" } + # Parquet support parquet = { workspace = true } @@ -72,6 +76,7 @@ tempfile ={ workspace = true } chrono = { workspace = true } async-trait = { workspace = true } +bytes = "1.5" itertools = { workspace = true } rstest = { workspace = true } regex = { workspace = true } diff --git a/plugins/engine-datafusion/jni/src/absolute_row_id_optimizer.rs b/plugins/engine-datafusion/jni/src/absolute_row_id_optimizer.rs index a29981bb8d3cf..6db31bffcfb59 100644 --- a/plugins/engine-datafusion/jni/src/absolute_row_id_optimizer.rs +++ b/plugins/engine-datafusion/jni/src/absolute_row_id_optimizer.rs @@ -111,9 +111,11 @@ impl AbsoluteRowIdOptimizer { let (new_schema, new_projections) = self.build_updated_file_source_schema(datasource, data_source_exec_schema.clone()); + // Build a new TableSchema with partition columns from the original source let table_partition_cols = datasource.table_partition_cols().clone(); let new_table_schema = TableSchema::new(new_schema.clone(), table_partition_cols); + // Create a new ParquetSource with the updated schema use datafusion::datasource::physical_plan::ParquetSource; let new_file_source = Arc::new(ParquetSource::new(new_table_schema)); @@ -121,7 +123,6 @@ impl AbsoluteRowIdOptimizer { .with_source(new_file_source) .with_projection_indices(Some(new_projections)) .expect("Failed to set projection indices") - .with_statistics(datasource.statistics().add_column_statistics(ColumnStatistics::new_unknown())) .build(); let new_datasource = DataSourceExec::from_data_source(file_scan_config); diff --git a/plugins/engine-datafusion/jni/src/indexed_query_executor.rs b/plugins/engine-datafusion/jni/src/indexed_query_executor.rs new file mode 100644 index 0000000000000..ceabbb34ec5b2 --- /dev/null +++ b/plugins/engine-datafusion/jni/src/indexed_query_executor.rs @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** +Indexed query executor — bridges indexed-table into engine-datafusion. + +Registers an IndexedTableProvider (Lucene index + parquet) as the table, +then executes the substrait plan against it — same as the normal DF path +but with index-accelerated reads instead of full parquet scans. +**/ + +use std::sync::Arc; + +use datafusion::execution::context::SessionContext; +use datafusion::physical_plan::execute_stream; +use datafusion::prelude::SessionConfig; +use jni::sys::jlong; +use prost::Message; + +use crate::indexed_table::index::BitsetMode; +use crate::indexed_table::table_provider::{IndexedTableConfig, IndexedTableProvider}; +use crate::indexed_table::jni_helpers::build_segments; + +use crate::executor::DedicatedExecutor; +use crate::query_executor::get_cross_rt_stream; +use crate::DataFusionRuntime; + +use datafusion::common::DataFusionError; +use datafusion_substrait::logical_plan::consumer::from_substrait_plan; +use datafusion_substrait::substrait::proto::Plan; +use datafusion_substrait::substrait::proto::extensions::simple_extension_declaration::MappingType; +use vectorized_exec_spi::log_error; + +/// Execute an indexed query with a substrait plan and return a CrossRtStream pointer. +pub async fn execute_indexed_query_stream( + weight_ptr: i64, + segment_max_docs: Vec, + parquet_paths: Vec, + table_name: String, + plan_bytes: Vec, + num_partitions: usize, + bitset_mode: BitsetMode, + is_query_plan_explain_enabled: bool, + jvm: Arc, + searcher_class_ref: jni::objects::GlobalRef, + runtime: &DataFusionRuntime, + cpu_executor: DedicatedExecutor, +) -> Result { + let t0 = std::time::Instant::now(); + + let searcher = Arc::new(crate::indexed_table::JniShardSearcher::new( + Arc::clone(&jvm), + weight_ptr, + searcher_class_ref, + segment_max_docs.clone(), + )) as Arc; + + let (segments, schema) = build_segments(&parquet_paths, &segment_max_docs) + .map_err(|e| DataFusionError::Execution(format!("build_segments: {}", e)))?; + + let provider = IndexedTableProvider::try_new( + IndexedTableConfig::new(searcher, segments, schema) + .with_bitset_mode(bitset_mode) + .with_partitions(num_partitions), + )?; + + let config = SessionConfig::new() + .with_target_partitions(num_partitions); + + let ctx = SessionContext::new_with_config(config); + ctx.register_table(&table_name, Arc::new(provider)) + .map_err(|e| DataFusionError::Execution(format!("register_table: {}", e)))?; + + let t_setup = t0.elapsed(); + + let substrait_plan = Plan::decode(plan_bytes.as_slice()) + .map_err(|e| DataFusionError::Execution(format!("Failed to decode Substrait: {}", e)))?; + + let mut modified_plan = substrait_plan.clone(); + for ext in modified_plan.extensions.iter_mut() { + if let Some(mapping_type) = &mut ext.mapping_type { + if let MappingType::ExtensionFunction(func) = mapping_type { + if func.name == "approx_count_distinct:any" { + func.name = "approx_distinct:any".to_string(); + } + } + } + } + + let logical_plan = from_substrait_plan(&ctx.state(), &modified_plan).await + .map_err(|e| { eprintln!("Failed to convert Substrait plan: {}", e); e })?; + + let mut dataframe = ctx.execute_logical_plan(logical_plan).await + .map_err(|e| { eprintln!("Failed to execute logical plan: {}", e); e })?; + + let physical_plan = dataframe.clone().create_physical_plan().await?; + + let t_plan = t0.elapsed(); + + // if is_query_plan_explain_enabled { + // log_error!("---- Indexed Query Explain Plan ----"); + // let plan_str = format!("{}", datafusion::physical_plan::displayable(physical_plan.as_ref()).indent(true)); + // for line in plan_str.lines() { + // log_error!(" {}", line); + // } + // } + + if is_query_plan_explain_enabled { + println!("---- Explain plan ----"); + let clone_df = dataframe.clone().explain(false, true).expect("Failed to explain plan"); + clone_df.show().await?; + } + + let df_stream = execute_stream(physical_plan, ctx.task_ctx())?; + + let t_exec = t0.elapsed(); + eprintln!("[INDEXED-TIMING] setup={}ms, plan={}ms, execute_stream={}ms, explain={}", + t_setup.as_millis(), (t_plan - t_setup).as_millis(), (t_exec - t_plan).as_millis(), is_query_plan_explain_enabled); + + Ok(get_cross_rt_stream(cpu_executor, df_stream)) +} diff --git a/plugins/engine-datafusion/jni/src/indexed_table/index.rs b/plugins/engine-datafusion/jni/src/indexed_table/index.rs new file mode 100644 index 0000000000000..c96ee7b138044 --- /dev/null +++ b/plugins/engine-datafusion/jni/src/indexed_table/index.rs @@ -0,0 +1,81 @@ +/** +Core index traits — the contract between the index and the query engine. + +These two traits are all the core crate needs. How the searcher was created +(Lucene JNI, Tantivy, test stub, Java-side Weight pointer) is irrelevant here. + +ShardSearcher (Equivalent of Lucene's Weight — once per shard) + └── SegmentCollector (Equivalent of Lucene's Scorer — once per segment) + └── collect(range) → Vec (Collects matching doc IDs as bitsets within the given doc range) +**/ + +use std::fmt::Debug; +use std::sync::Arc; + +/// A collector that retrieves matching doc IDs as a bitset for a row group docs +/// range within a segment. +/// +/// The `collect` method returns a bitset (Vec) where each set bit represents +/// a matching doc ID relative to `min_doc`. +pub trait RowGroupDocsCollector: Send + Sync + Debug { + /// Collect matching doc IDs in `[min_doc, max_doc)` as a bitset. + /// + /// TODO : change this to direct native memory bitset + /// + /// Each u64 word represents 64 consecutive doc IDs starting from `min_doc`. + /// Bit `i` in word `j` represents doc ID `min_doc + j*64 + i`. + /// + /// The collector may be called multiple times with increasing ranges + /// (forward-only iteration). + fn collect(&self, min_doc: i32, max_doc: i32) -> Result, String>; +} + +/// A searcher scoped to a single shard (index), created once per query. +/// +/// Analogous to Lucene's `Weight` — expensive to create (parses query, builds +/// automaton for wildcards, etc.), but creating segment collectors from it is cheap. +/// +/// Right now : Java walks the query's boolean tree, builds a single Lucene BooleanQuery +/// that covers all indexed columns, and produces one `ShardSearcher`. The bitset +/// it returns is a superset of all rows matching the indexed portion of the query. +/// +/// TODO : Handle complex nested boolean queries which will need multiple scorers. +pub trait ShardSearcher: Send + Sync + Debug { + /// Number of segments in this shard. + fn segment_count(&self) -> usize; + + /// Max doc ID for a specific segment. + fn segment_max_doc(&self, segment_ord: usize) -> Result; + + /// Create a collector for a specific segment and doc ID range. + /// + /// The collector will only return docs in `[doc_min, doc_max)` as we stream through the row groups + /// This is cheap — analogous to `Weight.scorer(leafCtx, doc_min, doc_max)` in Lucene. + fn collector( + &self, + segment_ord: usize, + doc_min: i32, + doc_max: i32, + ) -> Result, String>; +} + + +/// How the index bitset relates to the parquet (non-indexed) filters. +/// +/// Java decides this based on the top-level query structure: +/// +/// In both cases, the result is a superset of the true answer. DataFusion's +/// residual filter cleans up false positives after parquet reads. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BitsetMode { + /// Intersect bitset with page pruner ranges (default). + And, + /// Union bitset with page pruner candidate rows. + Or, +} + +impl Default for BitsetMode { + fn default() -> Self { + BitsetMode::And + } +} diff --git a/plugins/engine-datafusion/jni/src/indexed_table/jni_helpers.rs b/plugins/engine-datafusion/jni/src/indexed_table/jni_helpers.rs new file mode 100644 index 0000000000000..23ee0c899d645 --- /dev/null +++ b/plugins/engine-datafusion/jni/src/indexed_table/jni_helpers.rs @@ -0,0 +1,68 @@ +/** +Helpers for building indexed table segments from parquet file paths. +**/ + +use super::parquet_bridge; +use super::stream::RowGroupInfo; +use super::table_provider::{coerce_binary_to_string, SegmentFileInfo}; + +/// Build `SegmentFileInfo` from parquet paths + segment max docs. +/// +/// Each parquet file is one segment. Reads parquet metadata to discover +/// row groups (count + row counts). +pub fn build_segments( + parquet_paths: &[String], + segment_max_docs: &[i64], +) -> Result<(Vec, arrow::datatypes::SchemaRef), String> { + use std::path::PathBuf; + + if parquet_paths.len() != segment_max_docs.len() { + return Err(format!( + "parquet_paths.len()={} != segment_max_docs.len()={}", + parquet_paths.len(), + segment_max_docs.len() + )); + } + + let mut segments = Vec::with_capacity(parquet_paths.len()); + let mut schema: Option = None; + + for (seg_ord, (path, &max_doc)) in parquet_paths.iter().zip(segment_max_docs).enumerate() { + let file = std::fs::File::open(path).map_err(|e| format!("open {}: {}", path, e))?; + let file_size = file + .metadata() + .map_err(|e| format!("stat {}: {}", path, e))? + .len(); + + let (file_schema, pq_meta) = parquet_bridge::load_parquet_metadata(&file) + .map_err(|e| format!("parquet metadata {}: {}", path, e))?; + + if schema.is_none() { + schema = Some(coerce_binary_to_string(file_schema)); + } + + let mut row_groups = Vec::new(); + let mut offset: i64 = 0; + for rg_idx in 0..pq_meta.num_row_groups() { + let num_rows = pq_meta.row_group(rg_idx).num_rows(); + row_groups.push(RowGroupInfo { + index: rg_idx, + first_row: offset, + num_rows, + }); + offset += num_rows; + } + + segments.push(SegmentFileInfo { + segment_ord: seg_ord as i32, + max_doc, + parquet_path: PathBuf::from(path), + parquet_size: file_size, + row_groups, + metadata: pq_meta, + }); + } + + let schema = schema.ok_or_else(|| "No parquet files provided".to_string())?; + Ok((segments, schema)) +} diff --git a/plugins/engine-datafusion/jni/src/indexed_table/jni_searcher.rs b/plugins/engine-datafusion/jni/src/indexed_table/jni_searcher.rs new file mode 100644 index 0000000000000..1cb954ea7552e --- /dev/null +++ b/plugins/engine-datafusion/jni/src/indexed_table/jni_searcher.rs @@ -0,0 +1,208 @@ +/** +JNI-backed `ShardSearcher` — wraps a Java-side Weight pointer. + +This is the "flipped" version of `LuceneIndexProvider`: +- **Before**: Rust creates the Weight by calling into Java (`create_shard_weight`) +- **Now**: Java creates the Weight and passes the raw pointer to Rust + +The `JniShardSearcher` implements `ShardSearcher` by calling back into Java +for scorer creation and doc collection — same JNI methods, just driven from +the Java side. + +```text +Java: Weight weight = searcher.createWeight(query); + long weightPtr = storeWeight(weight); + nativeExecuteIndexedQuery(weightPtr, segmentInfos, ...); + +Rust: JniShardSearcher { weight_ptr, java_class } + .collector(seg, min, max) → calls Java createPartitionScorerFromShard + .collect(min, max) → calls Java getNextRowGroupDocs +``` +**/ + +use std::sync::Arc; + +use jni::objects::{GlobalRef, JClass, JValue}; +use jni::JavaVM; + +use super::index::{RowGroupDocsCollector, ShardSearcher}; + + +/// A `ShardSearcher` backed by a Java-side Weight pointer. +/// +/// Created by the JNI entry point when Java passes a pre-built Weight. +/// The Weight pointer is NOT owned — Java manages its lifecycle. +pub struct JniShardSearcher { + jvm: Arc, + weight_ptr: i64, + class_ref: GlobalRef, + segment_count: usize, + segment_max_docs: Vec, +} + +impl JniShardSearcher { + /// Create from a Java-side Weight pointer. + /// + /// `segment_max_docs` is passed from Java (one entry per segment). + /// The Weight pointer is borrowed — caller (Java) owns it. + pub fn new( + jvm: Arc, + weight_ptr: i64, + class_ref: GlobalRef, + segment_max_docs: Vec, + ) -> Self { + let segment_count = segment_max_docs.len(); + Self { + jvm, + weight_ptr, + class_ref, + segment_count, + segment_max_docs, + } + } +} + +impl std::fmt::Debug for JniShardSearcher { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("JniShardSearcher") + .field("weight_ptr", &self.weight_ptr) + .field("segment_count", &self.segment_count) + .finish() + } +} + +impl ShardSearcher for JniShardSearcher { + fn segment_count(&self) -> usize { + self.segment_count + } + + fn segment_max_doc(&self, segment_ord: usize) -> Result { + self.segment_max_docs + .get(segment_ord) + .copied() + .ok_or_else(|| { + format!( + "segment_ord {} out of range ({})", + segment_ord, self.segment_count + ) + }) + } + + fn collector( + &self, + segment_ord: usize, + doc_min: i32, + doc_max: i32, + ) -> Result, String> { + let mut env = self + .jvm + .attach_current_thread() + .map_err(|e| format!("Failed to attach thread: {}", e))?; + + let class: &JClass = self.class_ref.as_obj().into(); + + let scorer_ptr = env + .call_static_method( + class, + "createPartitionScorerFromShard", + "(JIII)J", + &[ + JValue::Long(self.weight_ptr), + JValue::Int(segment_ord as i32), + JValue::Int(doc_min), + JValue::Int(doc_max), + ], + ) + .map_err(|e| format!("createPartitionScorerFromShard failed: {}", e))? + .j() + .map_err(|e| format!("Failed to get long: {}", e))?; + + if scorer_ptr < 0 { + return Err("No matches in segment (scorer_ptr < 0)".to_string()); + } + + let scorer_class_ref = env + .new_global_ref(class) + .map_err(|e| format!("Failed to create global ref: {}", e))?; + + Ok(Arc::new(JniSegmentCollector { + jvm: Arc::clone(&self.jvm), + scorer_ptr, + class_ref: scorer_class_ref, + })) + } +} + +/// A `SegmentCollector` backed by a Java-side PartitionScorer pointer. +/// +/// The scorer pointer IS owned — dropped via `releasePartitionScorer`. +struct JniSegmentCollector { + jvm: Arc, + scorer_ptr: i64, + class_ref: GlobalRef, +} + +impl std::fmt::Debug for JniSegmentCollector { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("JniSegmentCollector") + .field("scorer_ptr", &self.scorer_ptr) + .finish() + } +} + +impl Drop for JniSegmentCollector { + fn drop(&mut self) { + if let Ok(mut env) = self.jvm.attach_current_thread() { + let class: &JClass = self.class_ref.as_obj().into(); + let _ = env.call_static_method( + class, + "releasePartitionScorer", + "(J)V", + &[JValue::Long(self.scorer_ptr)], + ); + } + } +} + +impl RowGroupDocsCollector for JniSegmentCollector { + fn collect(&self, min_doc: i32, max_doc: i32) -> Result, String> { + let mut env = self + .jvm + .attach_current_thread() + .map_err(|e| format!("Failed to attach thread: {}", e))?; + + let class: &JClass = self.class_ref.as_obj().into(); + + let result = env + .call_static_method( + class, + "getNextRowGroupDocs", + "(JII)[J", + &[ + JValue::Long(self.scorer_ptr), + JValue::Int(min_doc), + JValue::Int(max_doc), + ], + ) + .map_err(|e| format!("getNextRowGroupDocs failed: {}", e))?; + + let array_obj = result + .l() + .map_err(|e| format!("Failed to get array: {}", e))?; + + let long_array = unsafe { jni::objects::JLongArray::from_raw(array_obj.as_raw()) }; + let len = env + .get_array_length(&long_array) + .map_err(|e| format!("Failed to get array length: {}", e))? as usize; + + if len == 0 { + return Ok(Vec::new()); + } + + let mut buf = vec![0i64; len]; + env.get_long_array_region(&long_array, 0, &mut buf) + .map_err(|e| format!("Failed to get array region: {}", e))?; + + Ok(buf.iter().map(|&v| v as u64).collect()) + } +} diff --git a/plugins/engine-datafusion/jni/src/indexed_table/metrics.rs b/plugins/engine-datafusion/jni/src/indexed_table/metrics.rs new file mode 100644 index 0000000000000..40e434689d713 --- /dev/null +++ b/plugins/engine-datafusion/jni/src/indexed_table/metrics.rs @@ -0,0 +1,106 @@ +/** +Metrics for indexed search execution plans. + +- `PartitionMetrics` — registered against the parent ExecutionPlan, visible in EXPLAIN ANALYZE +- `StreamMetrics` — lightweight handles passed to the stream for recording +**/ + +use std::sync::Arc; + +use datafusion::physical_plan::metrics::{ + Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time, +}; + +/// Lightweight metric handles passed from the execution plan to the stream. +/// +/// All fields are `Option` because standalone `IndexedExec` (not under a +/// multi-segment parent) has no shared parent metrics to update. +#[derive(Clone)] +pub struct StreamMetrics { + pub output_rows: Option, + pub elapsed_compute: Option