Skip to content

Commit 361b2b3

Browse files
Indexed table integration for Lucene + parquet (#20954)
* adding ability to switch to lucene engine Signed-off-by: bharath-techie <bharath78910@gmail.com> * fixing query Signed-off-by: bharath-techie <bharath78910@gmail.com> * indexed lucene changes Signed-off-by: bharath-techie <bharath78910@gmail.com> * adding rest test for Lucene + parquet with a hardcoded plan Signed-off-by: bharath-techie <bharath78910@gmail.com> --------- Signed-off-by: bharath-techie <bharath78910@gmail.com>
1 parent 60bf40d commit 361b2b3

File tree

40 files changed

+4192
-99
lines changed

40 files changed

+4192
-99
lines changed

plugins/engine-datafusion/Cargo.toml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ members = [
99
datafusion = "52.1.0"
1010
datafusion-expr = "52.1.0"
1111
datafusion-datasource = "52.1.0"
12+
datafusion-datasource-parquet = "52.1.0"
13+
datafusion-common = "52.1.0"
14+
datafusion-execution = "52.1.0"
15+
datafusion-physical-expr = "52.1.0"
1216
arrow-json = "57.3.0"
1317
arrow = { version = "57.3.0", features = ["ffi", "ipc_compression"] }
1418
arrow-array = "57.3.0"
@@ -86,3 +90,23 @@ codegen-units = 1 # More parallel compilation
8690
incremental = true # Enable incremental compilation
8791
debug = "full"
8892
strip = false
93+
94+
# Patch arrow/parquet to local arrow-rs with sparse column chunk fix
95+
# Required by indexed-table crate for correct page index handling
96+
# https://github.com/apache/arrow-rs/pull/9301/changes
97+
#[patch.crates-io]
98+
#parquet = { path = "../../../arrow-rs/parquet" }
99+
#arrow = { path = "../../../arrow-rs/arrow" }
100+
#arrow-array = { path = "../../../arrow-rs/arrow-array" }
101+
#arrow-buffer = { path = "../../../arrow-rs/arrow-buffer" }
102+
#arrow-data = { path = "../../../arrow-rs/arrow-data" }
103+
#arrow-schema = { path = "../../../arrow-rs/arrow-schema" }
104+
#arrow-select = { path = "../../../arrow-rs/arrow-select" }
105+
#arrow-ipc = { path = "../../../arrow-rs/arrow-ipc" }
106+
#arrow-cast = { path = "../../../arrow-rs/arrow-cast" }
107+
#arrow-ord = { path = "../../../arrow-rs/arrow-ord" }
108+
#arrow-string = { path = "../../../arrow-rs/arrow-string" }
109+
#arrow-row = { path = "../../../arrow-rs/arrow-row" }
110+
#arrow-csv = { path = "../../../arrow-rs/arrow-csv" }
111+
#arrow-json = { path = "../../../arrow-rs/arrow-json" }
112+
#arrow-arith = { path = "../../../arrow-rs/arrow-arith" }

plugins/engine-datafusion/jni/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ crate-type = ["cdylib"]
1414
datafusion = { workspace = true }
1515
datafusion-expr = { workspace = true }
1616
datafusion-datasource = { workspace = true }
17+
datafusion-datasource-parquet = { workspace = true }
18+
datafusion-common = { workspace = true }
19+
datafusion-execution = { workspace = true }
1720
arrow-json = { workspace = true }
1821
arrow = { workspace = true }
1922
#arrow = "55.2.0"
@@ -50,6 +53,7 @@ log ={ workspace = true }
5053

5154
# Shared OpenSearch utilities
5255
vectorized-exec-spi = { path = "../../../libs/vectorized-exec-spi/rust" }
56+
5357
# Parquet support
5458
parquet = { workspace = true }
5559

@@ -72,6 +76,7 @@ tempfile ={ workspace = true }
7276
chrono = { workspace = true }
7377

7478
async-trait = { workspace = true }
79+
bytes = "1.5"
7580
itertools = { workspace = true }
7681
rstest = { workspace = true }
7782
regex = { workspace = true }

plugins/engine-datafusion/jni/src/absolute_row_id_optimizer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,17 +111,18 @@ impl AbsoluteRowIdOptimizer {
111111
let (new_schema, new_projections) =
112112
self.build_updated_file_source_schema(datasource, data_source_exec_schema.clone());
113113

114+
// Build a new TableSchema with partition columns from the original source
114115
let table_partition_cols = datasource.table_partition_cols().clone();
115116
let new_table_schema = TableSchema::new(new_schema.clone(), table_partition_cols);
116117

118+
// Create a new ParquetSource with the updated schema
117119
use datafusion::datasource::physical_plan::ParquetSource;
118120
let new_file_source = Arc::new(ParquetSource::new(new_table_schema));
119121

120122
let file_scan_config = FileScanConfigBuilder::from(datasource.clone())
121123
.with_source(new_file_source)
122124
.with_projection_indices(Some(new_projections))
123125
.expect("Failed to set projection indices")
124-
.with_statistics(datasource.statistics().add_column_statistics(ColumnStatistics::new_unknown()))
125126
.build();
126127

127128
let new_datasource = DataSourceExec::from_data_source(file_scan_config);
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
Indexed query executor — bridges indexed-table into engine-datafusion.
11+
12+
Registers an IndexedTableProvider (Lucene index + parquet) as the table,
13+
then executes the substrait plan against it — same as the normal DF path
14+
but with index-accelerated reads instead of full parquet scans.
15+
**/
16+
17+
use std::sync::Arc;
18+
19+
use datafusion::execution::context::SessionContext;
20+
use datafusion::physical_plan::execute_stream;
21+
use datafusion::prelude::SessionConfig;
22+
use jni::sys::jlong;
23+
use prost::Message;
24+
25+
use crate::indexed_table::index::BitsetMode;
26+
use crate::indexed_table::table_provider::{IndexedTableConfig, IndexedTableProvider};
27+
use crate::indexed_table::jni_helpers::build_segments;
28+
29+
use crate::executor::DedicatedExecutor;
30+
use crate::query_executor::get_cross_rt_stream;
31+
use crate::DataFusionRuntime;
32+
33+
use datafusion::common::DataFusionError;
34+
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
35+
use datafusion_substrait::substrait::proto::Plan;
36+
use datafusion_substrait::substrait::proto::extensions::simple_extension_declaration::MappingType;
37+
use vectorized_exec_spi::log_error;
38+
39+
/// Execute an indexed query with a substrait plan and return a CrossRtStream pointer.
40+
pub async fn execute_indexed_query_stream(
41+
weight_ptr: i64,
42+
segment_max_docs: Vec<i64>,
43+
parquet_paths: Vec<String>,
44+
table_name: String,
45+
plan_bytes: Vec<u8>,
46+
num_partitions: usize,
47+
bitset_mode: BitsetMode,
48+
is_query_plan_explain_enabled: bool,
49+
jvm: Arc<jni::JavaVM>,
50+
searcher_class_ref: jni::objects::GlobalRef,
51+
runtime: &DataFusionRuntime,
52+
cpu_executor: DedicatedExecutor,
53+
) -> Result<jlong, DataFusionError> {
54+
let t0 = std::time::Instant::now();
55+
56+
let searcher = Arc::new(crate::indexed_table::JniShardSearcher::new(
57+
Arc::clone(&jvm),
58+
weight_ptr,
59+
searcher_class_ref,
60+
segment_max_docs.clone(),
61+
)) as Arc<dyn crate::indexed_table::index::ShardSearcher>;
62+
63+
let (segments, schema) = build_segments(&parquet_paths, &segment_max_docs)
64+
.map_err(|e| DataFusionError::Execution(format!("build_segments: {}", e)))?;
65+
66+
let provider = IndexedTableProvider::try_new(
67+
IndexedTableConfig::new(searcher, segments, schema)
68+
.with_bitset_mode(bitset_mode)
69+
.with_partitions(num_partitions),
70+
)?;
71+
72+
let config = SessionConfig::new()
73+
.with_target_partitions(num_partitions);
74+
75+
let ctx = SessionContext::new_with_config(config);
76+
ctx.register_table(&table_name, Arc::new(provider))
77+
.map_err(|e| DataFusionError::Execution(format!("register_table: {}", e)))?;
78+
79+
let t_setup = t0.elapsed();
80+
81+
let substrait_plan = Plan::decode(plan_bytes.as_slice())
82+
.map_err(|e| DataFusionError::Execution(format!("Failed to decode Substrait: {}", e)))?;
83+
84+
let mut modified_plan = substrait_plan.clone();
85+
for ext in modified_plan.extensions.iter_mut() {
86+
if let Some(mapping_type) = &mut ext.mapping_type {
87+
if let MappingType::ExtensionFunction(func) = mapping_type {
88+
if func.name == "approx_count_distinct:any" {
89+
func.name = "approx_distinct:any".to_string();
90+
}
91+
}
92+
}
93+
}
94+
95+
let logical_plan = from_substrait_plan(&ctx.state(), &modified_plan).await
96+
.map_err(|e| { eprintln!("Failed to convert Substrait plan: {}", e); e })?;
97+
98+
let mut dataframe = ctx.execute_logical_plan(logical_plan).await
99+
.map_err(|e| { eprintln!("Failed to execute logical plan: {}", e); e })?;
100+
101+
let physical_plan = dataframe.clone().create_physical_plan().await?;
102+
103+
let t_plan = t0.elapsed();
104+
105+
// if is_query_plan_explain_enabled {
106+
// log_error!("---- Indexed Query Explain Plan ----");
107+
// let plan_str = format!("{}", datafusion::physical_plan::displayable(physical_plan.as_ref()).indent(true));
108+
// for line in plan_str.lines() {
109+
// log_error!(" {}", line);
110+
// }
111+
// }
112+
113+
if is_query_plan_explain_enabled {
114+
println!("---- Explain plan ----");
115+
let clone_df = dataframe.clone().explain(false, true).expect("Failed to explain plan");
116+
clone_df.show().await?;
117+
}
118+
119+
let df_stream = execute_stream(physical_plan, ctx.task_ctx())?;
120+
121+
let t_exec = t0.elapsed();
122+
eprintln!("[INDEXED-TIMING] setup={}ms, plan={}ms, execute_stream={}ms, explain={}",
123+
t_setup.as_millis(), (t_plan - t_setup).as_millis(), (t_exec - t_plan).as_millis(), is_query_plan_explain_enabled);
124+
125+
Ok(get_cross_rt_stream(cpu_executor, df_stream))
126+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/**
2+
Core index traits — the contract between the index and the query engine.
3+
4+
These two traits are all the core crate needs. How the searcher was created
5+
(Lucene JNI, Tantivy, test stub, Java-side Weight pointer) is irrelevant here.
6+
7+
ShardSearcher (Equivalent of Lucene's Weight — once per shard)
8+
└── SegmentCollector (Equivalent of Lucene's Scorer — once per segment)
9+
└── collect(range) → Vec<u64> (Collects matching doc IDs as bitsets within the given doc range)
10+
**/
11+
12+
use std::fmt::Debug;
13+
use std::sync::Arc;
14+
15+
/// A collector that retrieves matching doc IDs as a bitset for a row group docs
16+
/// range within a segment.
17+
///
18+
/// The `collect` method returns a bitset (Vec<u64>) where each set bit represents
19+
/// a matching doc ID relative to `min_doc`.
20+
pub trait RowGroupDocsCollector: Send + Sync + Debug {
21+
/// Collect matching doc IDs in `[min_doc, max_doc)` as a bitset.
22+
///
23+
/// TODO : change this to direct native memory bitset
24+
///
25+
/// Each u64 word represents 64 consecutive doc IDs starting from `min_doc`.
26+
/// Bit `i` in word `j` represents doc ID `min_doc + j*64 + i`.
27+
///
28+
/// The collector may be called multiple times with increasing ranges
29+
/// (forward-only iteration).
30+
fn collect(&self, min_doc: i32, max_doc: i32) -> Result<Vec<u64>, String>;
31+
}
32+
33+
/// A searcher scoped to a single shard (index), created once per query.
34+
///
35+
/// Analogous to Lucene's `Weight` — expensive to create (parses query, builds
36+
/// automaton for wildcards, etc.), but creating segment collectors from it is cheap.
37+
///
38+
/// Right now : Java walks the query's boolean tree, builds a single Lucene BooleanQuery
39+
/// that covers all indexed columns, and produces one `ShardSearcher`. The bitset
40+
/// it returns is a superset of all rows matching the indexed portion of the query.
41+
///
42+
/// TODO : Handle complex nested boolean queries which will need multiple scorers.
43+
pub trait ShardSearcher: Send + Sync + Debug {
44+
/// Number of segments in this shard.
45+
fn segment_count(&self) -> usize;
46+
47+
/// Max doc ID for a specific segment.
48+
fn segment_max_doc(&self, segment_ord: usize) -> Result<i64, String>;
49+
50+
/// Create a collector for a specific segment and doc ID range.
51+
///
52+
/// The collector will only return docs in `[doc_min, doc_max)` as we stream through the row groups
53+
/// This is cheap — analogous to `Weight.scorer(leafCtx, doc_min, doc_max)` in Lucene.
54+
fn collector(
55+
&self,
56+
segment_ord: usize,
57+
doc_min: i32,
58+
doc_max: i32,
59+
) -> Result<Arc<dyn RowGroupDocsCollector>, String>;
60+
}
61+
62+
63+
/// How the index bitset relates to the parquet (non-indexed) filters.
64+
///
65+
/// Java decides this based on the top-level query structure:
66+
///
67+
/// In both cases, the result is a superset of the true answer. DataFusion's
68+
/// residual filter cleans up false positives after parquet reads.
69+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70+
pub enum BitsetMode {
71+
/// Intersect bitset with page pruner ranges (default).
72+
And,
73+
/// Union bitset with page pruner candidate rows.
74+
Or,
75+
}
76+
77+
impl Default for BitsetMode {
78+
fn default() -> Self {
79+
BitsetMode::And
80+
}
81+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/**
2+
Helpers for building indexed table segments from parquet file paths.
3+
**/
4+
5+
use super::parquet_bridge;
6+
use super::stream::RowGroupInfo;
7+
use super::table_provider::{coerce_binary_to_string, SegmentFileInfo};
8+
9+
/// Build `SegmentFileInfo` from parquet paths + segment max docs.
10+
///
11+
/// Each parquet file is one segment. Reads parquet metadata to discover
12+
/// row groups (count + row counts).
13+
pub fn build_segments(
14+
parquet_paths: &[String],
15+
segment_max_docs: &[i64],
16+
) -> Result<(Vec<SegmentFileInfo>, arrow::datatypes::SchemaRef), String> {
17+
use std::path::PathBuf;
18+
19+
if parquet_paths.len() != segment_max_docs.len() {
20+
return Err(format!(
21+
"parquet_paths.len()={} != segment_max_docs.len()={}",
22+
parquet_paths.len(),
23+
segment_max_docs.len()
24+
));
25+
}
26+
27+
let mut segments = Vec::with_capacity(parquet_paths.len());
28+
let mut schema: Option<arrow::datatypes::SchemaRef> = None;
29+
30+
for (seg_ord, (path, &max_doc)) in parquet_paths.iter().zip(segment_max_docs).enumerate() {
31+
let file = std::fs::File::open(path).map_err(|e| format!("open {}: {}", path, e))?;
32+
let file_size = file
33+
.metadata()
34+
.map_err(|e| format!("stat {}: {}", path, e))?
35+
.len();
36+
37+
let (file_schema, pq_meta) = parquet_bridge::load_parquet_metadata(&file)
38+
.map_err(|e| format!("parquet metadata {}: {}", path, e))?;
39+
40+
if schema.is_none() {
41+
schema = Some(coerce_binary_to_string(file_schema));
42+
}
43+
44+
let mut row_groups = Vec::new();
45+
let mut offset: i64 = 0;
46+
for rg_idx in 0..pq_meta.num_row_groups() {
47+
let num_rows = pq_meta.row_group(rg_idx).num_rows();
48+
row_groups.push(RowGroupInfo {
49+
index: rg_idx,
50+
first_row: offset,
51+
num_rows,
52+
});
53+
offset += num_rows;
54+
}
55+
56+
segments.push(SegmentFileInfo {
57+
segment_ord: seg_ord as i32,
58+
max_doc,
59+
parquet_path: PathBuf::from(path),
60+
parquet_size: file_size,
61+
row_groups,
62+
metadata: pq_meta,
63+
});
64+
}
65+
66+
let schema = schema.ok_or_else(|| "No parquet files provided".to_string())?;
67+
Ok((segments, schema))
68+
}

0 commit comments

Comments
 (0)