Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions plugins/engine-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ members = [
datafusion = "52.1.0"
datafusion-expr = "52.1.0"
datafusion-datasource = "52.1.0"
datafusion-datasource-parquet = "52.1.0"
datafusion-common = "52.1.0"
datafusion-execution = "52.1.0"
datafusion-physical-expr = "52.1.0"
arrow-json = "57.3.0"
arrow = { version = "57.3.0", features = ["ffi", "ipc_compression"] }
arrow-array = "57.3.0"
Expand Down Expand Up @@ -86,3 +90,23 @@ codegen-units = 1 # More parallel compilation
incremental = true # Enable incremental compilation
debug = "full"
strip = false

# Patch arrow/parquet to local arrow-rs with sparse column chunk fix
# Required by indexed-table crate for correct page index handling
# https://github.com/apache/arrow-rs/pull/9301/changes
#[patch.crates-io]
#parquet = { path = "../../../arrow-rs/parquet" }
#arrow = { path = "../../../arrow-rs/arrow" }
#arrow-array = { path = "../../../arrow-rs/arrow-array" }
#arrow-buffer = { path = "../../../arrow-rs/arrow-buffer" }
#arrow-data = { path = "../../../arrow-rs/arrow-data" }
#arrow-schema = { path = "../../../arrow-rs/arrow-schema" }
#arrow-select = { path = "../../../arrow-rs/arrow-select" }
#arrow-ipc = { path = "../../../arrow-rs/arrow-ipc" }
#arrow-cast = { path = "../../../arrow-rs/arrow-cast" }
#arrow-ord = { path = "../../../arrow-rs/arrow-ord" }
#arrow-string = { path = "../../../arrow-rs/arrow-string" }
#arrow-row = { path = "../../../arrow-rs/arrow-row" }
#arrow-csv = { path = "../../../arrow-rs/arrow-csv" }
#arrow-json = { path = "../../../arrow-rs/arrow-json" }
#arrow-arith = { path = "../../../arrow-rs/arrow-arith" }
5 changes: 5 additions & 0 deletions plugins/engine-datafusion/jni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ crate-type = ["cdylib"]
datafusion = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-datasource = { workspace = true }
datafusion-datasource-parquet = { workspace = true }
datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
arrow-json = { workspace = true }
arrow = { workspace = true }
#arrow = "55.2.0"
Expand Down Expand Up @@ -50,6 +53,7 @@ log ={ workspace = true }

# Shared OpenSearch utilities
vectorized-exec-spi = { path = "../../../libs/vectorized-exec-spi/rust" }

# Parquet support
parquet = { workspace = true }

Expand All @@ -72,6 +76,7 @@ tempfile ={ workspace = true }
chrono = { workspace = true }

async-trait = { workspace = true }
bytes = "1.5"
itertools = { workspace = true }
rstest = { workspace = true }
regex = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,18 @@ impl AbsoluteRowIdOptimizer {
let (new_schema, new_projections) =
self.build_updated_file_source_schema(datasource, data_source_exec_schema.clone());

// Build a new TableSchema with partition columns from the original source
let table_partition_cols = datasource.table_partition_cols().clone();
let new_table_schema = TableSchema::new(new_schema.clone(), table_partition_cols);

// Create a new ParquetSource with the updated schema
use datafusion::datasource::physical_plan::ParquetSource;
let new_file_source = Arc::new(ParquetSource::new(new_table_schema));

let file_scan_config = FileScanConfigBuilder::from(datasource.clone())
.with_source(new_file_source)
.with_projection_indices(Some(new_projections))
.expect("Failed to set projection indices")
.with_statistics(datasource.statistics().add_column_statistics(ColumnStatistics::new_unknown()))
.build();

let new_datasource = DataSourceExec::from_data_source(file_scan_config);
Expand Down
126 changes: 126 additions & 0 deletions plugins/engine-datafusion/jni/src/indexed_query_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
Indexed query executor — bridges indexed-table into engine-datafusion.

Registers an IndexedTableProvider (Lucene index + parquet) as the table,
then executes the substrait plan against it — same as the normal DF path
but with index-accelerated reads instead of full parquet scans.
**/

use std::sync::Arc;

use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::execute_stream;
use datafusion::prelude::SessionConfig;
use jni::sys::jlong;
use prost::Message;

use crate::indexed_table::index::BitsetMode;
use crate::indexed_table::table_provider::{IndexedTableConfig, IndexedTableProvider};
use crate::indexed_table::jni_helpers::build_segments;

use crate::executor::DedicatedExecutor;
use crate::query_executor::get_cross_rt_stream;
use crate::DataFusionRuntime;

use datafusion::common::DataFusionError;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::substrait::proto::Plan;
use datafusion_substrait::substrait::proto::extensions::simple_extension_declaration::MappingType;
use vectorized_exec_spi::log_error;

/// Execute an indexed query with a substrait plan and return a CrossRtStream pointer.
pub async fn execute_indexed_query_stream(
weight_ptr: i64,
segment_max_docs: Vec<i64>,
parquet_paths: Vec<String>,
table_name: String,
plan_bytes: Vec<u8>,
num_partitions: usize,
bitset_mode: BitsetMode,
is_query_plan_explain_enabled: bool,
jvm: Arc<jni::JavaVM>,
searcher_class_ref: jni::objects::GlobalRef,
runtime: &DataFusionRuntime,
cpu_executor: DedicatedExecutor,
) -> Result<jlong, DataFusionError> {
let t0 = std::time::Instant::now();

let searcher = Arc::new(crate::indexed_table::JniShardSearcher::new(
Arc::clone(&jvm),
weight_ptr,
searcher_class_ref,
segment_max_docs.clone(),
)) as Arc<dyn crate::indexed_table::index::ShardSearcher>;

let (segments, schema) = build_segments(&parquet_paths, &segment_max_docs)
.map_err(|e| DataFusionError::Execution(format!("build_segments: {}", e)))?;

let provider = IndexedTableProvider::try_new(
IndexedTableConfig::new(searcher, segments, schema)
.with_bitset_mode(bitset_mode)
.with_partitions(num_partitions),
)?;

let config = SessionConfig::new()
.with_target_partitions(num_partitions);

let ctx = SessionContext::new_with_config(config);
ctx.register_table(&table_name, Arc::new(provider))
.map_err(|e| DataFusionError::Execution(format!("register_table: {}", e)))?;

let t_setup = t0.elapsed();

let substrait_plan = Plan::decode(plan_bytes.as_slice())
.map_err(|e| DataFusionError::Execution(format!("Failed to decode Substrait: {}", e)))?;

let mut modified_plan = substrait_plan.clone();
for ext in modified_plan.extensions.iter_mut() {
if let Some(mapping_type) = &mut ext.mapping_type {
if let MappingType::ExtensionFunction(func) = mapping_type {
if func.name == "approx_count_distinct:any" {
func.name = "approx_distinct:any".to_string();
}
}
}
}

let logical_plan = from_substrait_plan(&ctx.state(), &modified_plan).await
.map_err(|e| { eprintln!("Failed to convert Substrait plan: {}", e); e })?;

let mut dataframe = ctx.execute_logical_plan(logical_plan).await
.map_err(|e| { eprintln!("Failed to execute logical plan: {}", e); e })?;

let physical_plan = dataframe.clone().create_physical_plan().await?;

let t_plan = t0.elapsed();

// if is_query_plan_explain_enabled {
// log_error!("---- Indexed Query Explain Plan ----");
// let plan_str = format!("{}", datafusion::physical_plan::displayable(physical_plan.as_ref()).indent(true));
// for line in plan_str.lines() {
// log_error!(" {}", line);
// }
// }

if is_query_plan_explain_enabled {
println!("---- Explain plan ----");
let clone_df = dataframe.clone().explain(false, true).expect("Failed to explain plan");
clone_df.show().await?;
}

let df_stream = execute_stream(physical_plan, ctx.task_ctx())?;

let t_exec = t0.elapsed();
eprintln!("[INDEXED-TIMING] setup={}ms, plan={}ms, execute_stream={}ms, explain={}",
t_setup.as_millis(), (t_plan - t_setup).as_millis(), (t_exec - t_plan).as_millis(), is_query_plan_explain_enabled);

Ok(get_cross_rt_stream(cpu_executor, df_stream))
}
81 changes: 81 additions & 0 deletions plugins/engine-datafusion/jni/src/indexed_table/index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
Core index traits — the contract between the index and the query engine.

These two traits are all the core crate needs. How the searcher was created
(Lucene JNI, Tantivy, test stub, Java-side Weight pointer) is irrelevant here.

ShardSearcher (Equivalent of Lucene's Weight — once per shard)
└── SegmentCollector (Equivalent of Lucene's Scorer — once per segment)
└── collect(range) → Vec<u64> (Collects matching doc IDs as bitsets within the given doc range)
**/

use std::fmt::Debug;
use std::sync::Arc;

/// A collector that retrieves matching doc IDs as a bitset for a row group docs
/// range within a segment.
///
/// The `collect` method returns a bitset (Vec<u64>) where each set bit represents
/// a matching doc ID relative to `min_doc`.
pub trait RowGroupDocsCollector: Send + Sync + Debug {
/// Collect matching doc IDs in `[min_doc, max_doc)` as a bitset.
///
/// TODO : change this to direct native memory bitset
///
/// Each u64 word represents 64 consecutive doc IDs starting from `min_doc`.
/// Bit `i` in word `j` represents doc ID `min_doc + j*64 + i`.
///
/// The collector may be called multiple times with increasing ranges
/// (forward-only iteration).
fn collect(&self, min_doc: i32, max_doc: i32) -> Result<Vec<u64>, String>;
}

/// A searcher scoped to a single shard (index), created once per query.
///
/// Analogous to Lucene's `Weight` — expensive to create (parses query, builds
/// automaton for wildcards, etc.), but creating segment collectors from it is cheap.
///
/// Right now : Java walks the query's boolean tree, builds a single Lucene BooleanQuery
/// that covers all indexed columns, and produces one `ShardSearcher`. The bitset
/// it returns is a superset of all rows matching the indexed portion of the query.
///
/// TODO : Handle complex nested boolean queries which will need multiple scorers.
pub trait ShardSearcher: Send + Sync + Debug {
/// Number of segments in this shard.
fn segment_count(&self) -> usize;

/// Max doc ID for a specific segment.
fn segment_max_doc(&self, segment_ord: usize) -> Result<i64, String>;

/// Create a collector for a specific segment and doc ID range.
///
/// The collector will only return docs in `[doc_min, doc_max)` as we stream through the row groups
/// This is cheap — analogous to `Weight.scorer(leafCtx, doc_min, doc_max)` in Lucene.
fn collector(
&self,
segment_ord: usize,
doc_min: i32,
doc_max: i32,
) -> Result<Arc<dyn RowGroupDocsCollector>, String>;
}


/// How the index bitset relates to the parquet (non-indexed) filters.
///
/// Java decides this based on the top-level query structure:
///
/// In both cases, the result is a superset of the true answer. DataFusion's
/// residual filter cleans up false positives after parquet reads.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BitsetMode {
/// Intersect bitset with page pruner ranges (default).
And,
/// Union bitset with page pruner candidate rows.
Or,
}

impl Default for BitsetMode {
fn default() -> Self {
BitsetMode::And
}
}
68 changes: 68 additions & 0 deletions plugins/engine-datafusion/jni/src/indexed_table/jni_helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
Helpers for building indexed table segments from parquet file paths.
**/

use super::parquet_bridge;
use super::stream::RowGroupInfo;
use super::table_provider::{coerce_binary_to_string, SegmentFileInfo};

/// Build `SegmentFileInfo` from parquet paths + segment max docs.
///
/// Each parquet file is one segment. Reads parquet metadata to discover
/// row groups (count + row counts).
pub fn build_segments(
parquet_paths: &[String],
segment_max_docs: &[i64],
) -> Result<(Vec<SegmentFileInfo>, arrow::datatypes::SchemaRef), String> {
use std::path::PathBuf;

if parquet_paths.len() != segment_max_docs.len() {
return Err(format!(
"parquet_paths.len()={} != segment_max_docs.len()={}",
parquet_paths.len(),
segment_max_docs.len()
));
}

let mut segments = Vec::with_capacity(parquet_paths.len());
let mut schema: Option<arrow::datatypes::SchemaRef> = None;

for (seg_ord, (path, &max_doc)) in parquet_paths.iter().zip(segment_max_docs).enumerate() {
let file = std::fs::File::open(path).map_err(|e| format!("open {}: {}", path, e))?;
let file_size = file
.metadata()
.map_err(|e| format!("stat {}: {}", path, e))?
.len();

let (file_schema, pq_meta) = parquet_bridge::load_parquet_metadata(&file)
.map_err(|e| format!("parquet metadata {}: {}", path, e))?;

if schema.is_none() {
schema = Some(coerce_binary_to_string(file_schema));
}

let mut row_groups = Vec::new();
let mut offset: i64 = 0;
for rg_idx in 0..pq_meta.num_row_groups() {
let num_rows = pq_meta.row_group(rg_idx).num_rows();
row_groups.push(RowGroupInfo {
index: rg_idx,
first_row: offset,
num_rows,
});
offset += num_rows;
}

segments.push(SegmentFileInfo {
segment_ord: seg_ord as i32,
max_doc,
parquet_path: PathBuf::from(path),
parquet_size: file_size,
row_groups,
metadata: pq_meta,
});
}

let schema = schema.ok_or_else(|| "No parquet files provided".to_string())?;
Ok((segments, schema))
}
Loading
Loading