Skip to content

Commit 5359439

Browse files
author
Aniket Modak
committed
Fixed select * and added analyzer rules for projecting row_id in query phase
1 parent fd54b9a commit 5359439

File tree

15 files changed

+427
-138
lines changed

15 files changed

+427
-138
lines changed

plugins/engine-datafusion/jni/src/row_id_optimizer.rs renamed to plugins/engine-datafusion/jni/src/absolute_row_id_optimizer.rs

Lines changed: 34 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::sync::Arc;
1111

1212
use arrow::datatypes::{DataType, Field, Fields, Schema};
1313
use arrow_schema::SchemaRef;
14-
use datafusion::physical_plan::projection::new_projections_for_columns;
14+
use datafusion::physical_plan::projection::{new_projections_for_columns, ProjectionExpr};
1515
use datafusion::{
1616
common::tree_node::{Transformed, TreeNode, TreeNodeRecursion},
1717
config::ConfigOptions,
@@ -21,15 +21,21 @@ use datafusion::{
2121
},
2222
error::DataFusionError,
2323
logical_expr::Operator,
24-
physical_expr::{PhysicalExpr, expressions::{BinaryExpr, Column}},
24+
parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder,
25+
physical_expr::{
26+
expressions::{BinaryExpr, Column},
27+
PhysicalExpr,
28+
},
2529
physical_optimizer::PhysicalOptimizerRule,
26-
physical_plan::{ExecutionPlan, filter::FilterExec, projection::{ProjectionExec, ProjectionExpr}},
30+
physical_plan::{filter::FilterExec, projection::ProjectionExec, ExecutionPlan},
2731
};
2832

2933
#[derive(Debug)]
30-
pub struct ProjectRowIdOptimizer;
34+
pub struct AbsoluteRowIdOptimizer;
35+
pub const ROW_ID_FIELD_NAME: &'static str = "___row_id";
36+
pub const ROW_BASE_FIELD_NAME: &'static str = "row_base";
3137

32-
impl ProjectRowIdOptimizer {
38+
impl AbsoluteRowIdOptimizer {
3339
/// Helper to build new schema and projection info with added `row_base` column.
3440
fn build_updated_file_source_schema(
3541
&self,
@@ -53,27 +59,19 @@ impl ProjectRowIdOptimizer {
5359
// }
5460
// }
5561

56-
if !projections.contains(&file_source_schema.index_of("___row_id").unwrap()) {
57-
new_projections.push(file_source_schema.index_of("___row_id").unwrap());
62+
if !projections.contains(&file_source_schema.index_of(ROW_ID_FIELD_NAME).unwrap()) {
63+
new_projections.push(file_source_schema.index_of(ROW_ID_FIELD_NAME).unwrap());
5864

59-
// let field = file_source_schema.field_with_name(&*"___row_id").expect("Field ___row_id not found in file_source_schema");
60-
// fields.push(Arc::new(Field::new("___row_id", field.data_type().clone(), field.is_nullable())));
65+
// let field = file_source_schema.field_with_name(&*ROW_ID_FIELD_NAME).expect("Field ___row_id not found in file_source_schema");
66+
// fields.push(Arc::new(Field::new(ROW_ID_FIELD_NAME, field.data_type().clone(), field.is_nullable())));
6167
}
6268
new_projections.push(file_source_schema.fields.len());
63-
// fields.push(Arc::new(Field::new("row_base", file_source_schema.field_with_name("___row_id").unwrap().data_type().clone(), true)));
69+
// fields.push(Arc::new(Field::new("row_base", file_source_schema.field_with_name(ROW_ID_FIELD_NAME).unwrap().data_type().clone(), true)));
6470

6571
// Add row_base field to schema
6672

6773
let mut new_fields = file_source_schema.fields().clone().to_vec();
68-
new_fields.push(Arc::new(Field::new(
69-
"row_base",
70-
file_source_schema
71-
.field_with_name("___row_id")
72-
.unwrap()
73-
.data_type()
74-
.clone(),
75-
true,
76-
)));
74+
new_fields.push(Arc::new(Field::new(ROW_BASE_FIELD_NAME, file_source_schema.field_with_name(ROW_ID_FIELD_NAME).unwrap().data_type().clone(), true)));
7775

7876
let new_schema = Arc::new(Schema {
7977
metadata: file_source_schema.metadata().clone(),
@@ -84,31 +82,23 @@ impl ProjectRowIdOptimizer {
8482
}
8583

8684
/// Creates a projection expression that adds `row_base` to `___row_id`.
87-
fn build_projection_exprs(
88-
&self,
89-
new_schema: &SchemaRef,
90-
) -> Result<Vec<(Arc<dyn PhysicalExpr>, String)>, DataFusionError> {
91-
let row_id_idx = new_schema
92-
.index_of("___row_id")
93-
.expect("Field ___row_id missing");
94-
let row_base_idx = new_schema
95-
.index_of("row_base")
96-
.expect("Field row_base missing");
97-
85+
fn build_projection_exprs(&self, new_schema: &SchemaRef) -> Result<Vec<(Arc<dyn PhysicalExpr>, String)>, DataFusionError> {
86+
let row_id_idx = new_schema.index_of(ROW_ID_FIELD_NAME).expect("Field ___row_id missing");
87+
let row_base_idx = new_schema.index_of(ROW_BASE_FIELD_NAME).expect("Field row_base missing");
9888
let sum_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
99-
Arc::new(Column::new("___row_id", row_id_idx)),
89+
Arc::new(Column::new(ROW_ID_FIELD_NAME, row_id_idx)),
10090
Operator::Plus,
101-
Arc::new(Column::new("row_base", row_base_idx)),
91+
Arc::new(Column::new(ROW_BASE_FIELD_NAME, row_base_idx)),
10292
));
10393

10494
let mut projection_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = Vec::new();
10595

10696
let mut has_row_id = false;
10797
for field_name in new_schema.fields().to_vec() {
108-
if field_name.name() == "___row_id" {
98+
if field_name.name() == ROW_ID_FIELD_NAME {
10999
projection_exprs.push((sum_expr.clone(), field_name.name().clone()));
110100
has_row_id = true;
111-
} else if (field_name.name() != "row_base") {
101+
} else if(field_name.name() != ROW_BASE_FIELD_NAME) {
112102
// Match the column by name from new_schema
113103
let idx = new_schema
114104
.index_of(&*field_name.name().clone())
@@ -120,7 +110,7 @@ impl ProjectRowIdOptimizer {
120110
}
121111
}
122112
if !has_row_id {
123-
projection_exprs.push((sum_expr.clone(), "___row_id".parse().unwrap()));
113+
projection_exprs.push((sum_expr.clone(), ROW_ID_FIELD_NAME.parse().unwrap()));
124114
}
125115
Ok(projection_exprs)
126116
}
@@ -147,7 +137,7 @@ impl ProjectRowIdOptimizer {
147137
}
148138
}
149139

150-
impl PhysicalOptimizerRule for ProjectRowIdOptimizer {
140+
impl PhysicalOptimizerRule for AbsoluteRowIdOptimizer {
151141
fn optimize(
152142
&self,
153143
plan: Arc<dyn ExecutionPlan>,
@@ -162,34 +152,16 @@ impl PhysicalOptimizerRule for ProjectRowIdOptimizer {
162152
.downcast_ref::<FileScanConfig>()
163153
.expect("DataSource not found");
164154
let schema = datasource.file_schema.clone();
165-
schema
166-
.field_with_name("___row_id")
167-
.expect("Field ___row_id missing");
168-
let projection = self
169-
.create_datasource_projection(datasource, datasource_exec.schema())
170-
.expect("Failed to create ProjectionExec from datasource");
171-
return Ok(Transformed::new(
172-
Arc::new(projection),
173-
true,
174-
TreeNodeRecursion::Continue,
175-
));
155+
schema.field_with_name(ROW_ID_FIELD_NAME).expect("Field ___row_id missing");
156+
let projection = self.create_datasource_projection(datasource, datasource_exec.schema()).expect("Failed to create ProjectionExec from datasource");
157+
return Ok(Transformed::new(Arc::new(projection), true, TreeNodeRecursion::Continue));
158+
176159
} else if let Some(projection_exec) = node.as_any().downcast_ref::<ProjectionExec>() {
177-
if !projection_exec
178-
.schema()
179-
.field_with_name("___row_id")
180-
.is_ok()
181-
{
160+
if !projection_exec.schema().field_with_name(ROW_ID_FIELD_NAME).is_ok() {
161+
182162
let mut projection_exprs = projection_exec.expr().to_vec();
183-
if (projection_exec
184-
.input()
185-
.schema()
186-
.index_of("___row_id")
187-
.is_ok())
188-
{
189-
if projection_exec.input().schema().index_of("___row_id").is_ok() {
190-
let row_id_col: Arc<dyn PhysicalExpr> = Arc::new(Column::new("___row_id", projection_exec.input().schema().index_of("___row_id").unwrap()));
191-
projection_exprs.push(ProjectionExpr::new(row_id_col, "___row_id".to_string()));
192-
}
163+
if(projection_exec.input().schema().index_of(ROW_ID_FIELD_NAME).is_ok()) {
164+
projection_exprs.push(ProjectionExpr::new(Arc::new(Column::new(ROW_ID_FIELD_NAME, projection_exec.input().schema().index_of(ROW_ID_FIELD_NAME).unwrap())), ROW_ID_FIELD_NAME.to_string()));
193165
}
194166

195167
let projection =

plugins/engine-datafusion/jni/src/lib.rs

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::num::NonZeroUsize;
1010
use std::ptr::addr_of_mut;
1111
use jni::objects::{JByteArray, JClass, JObject};
1212
use jni::objects::JLongArray;
13-
use jni::sys::{jbyteArray, jint, jlong, jstring};
13+
use jni::sys::{jboolean, jbyteArray, jint, jlong, jstring};
1414
use jni::{JNIEnv, JavaVM};
1515
use std::sync::{Arc, OnceLock};
1616
use arrow_array::{Array, StructArray};
@@ -32,7 +32,7 @@ use std::default::Default;
3232
use std::time::{Duration, Instant};
3333

3434
mod util;
35-
mod row_id_optimizer;
35+
mod absolute_row_id_optimizer;
3636
mod listing_table;
3737
mod cache;
3838
mod custom_cache_manager;
@@ -44,6 +44,7 @@ mod runtime_manager;
4444
mod cache_jni;
4545
mod partial_agg_optimizer;
4646
mod query_executor;
47+
mod project_row_id_analyzer;
4748

4849
use crate::custom_cache_manager::CustomCacheManager;
4950
use crate::util::{create_file_meta_from_filenames, parse_string_arr, set_action_listener_error, set_action_listener_error_global, set_action_listener_ok, set_action_listener_ok_global};
@@ -330,7 +331,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_createDat
330331
}
331332
};
332333

333-
let files: Vec<String> = match parse_string_arr(&mut env, files) {
334+
let mut files: Vec<String> = match parse_string_arr(&mut env, files) {
334335
Ok(files) => files,
335336
Err(e) => {
336337
let _ = env.throw_new(
@@ -341,6 +342,8 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_createDat
341342
}
342343
};
343344

345+
// TODO: This works since files are named similarly ending with incremental generation count, preferably move this up to DatafusionReaderManager to keep file order
346+
files.sort();
344347
let files_metadata = match create_file_meta_from_filenames(&table_path, files.clone()) {
345348
Ok(metadata) => metadata,
346349
Err(err) => {
@@ -450,6 +453,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_executeQu
450453
shard_view_ptr: jlong,
451454
table_name: JString,
452455
substrait_bytes: jbyteArray,
456+
is_aggregation_query: jboolean,
453457
runtime_ptr: jlong,
454458
listener: JObject,
455459
) {
@@ -458,7 +462,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_executeQu
458462
None => {
459463
error!("Runtime manager not initialized");
460464
set_action_listener_error(&mut env, listener,
461-
&DataFusionError::Execution("Runtime manager not initialized".to_string()));
465+
&DataFusionError::Execution("Runtime manager not initialized".to_string()));
462466
return;
463467
}
464468
};
@@ -469,18 +473,20 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_executeQu
469473
Err(e) => {
470474
error!("Failed to get table name: {}", e);
471475
set_action_listener_error(&mut env, listener,
472-
&DataFusionError::Execution(format!("Failed to get table name: {}", e)));
476+
&DataFusionError::Execution(format!("Failed to get table name: {}", e)));
473477
return;
474478
}
475479
};
476480

481+
let is_aggregation_query: bool = is_aggregation_query !=0;
482+
477483
let plan_bytes_obj = unsafe { JByteArray::from_raw(substrait_bytes) };
478484
let plan_bytes_vec = match env.convert_byte_array(plan_bytes_obj) {
479485
Ok(bytes) => bytes,
480486
Err(e) => {
481487
error!("Failed to convert plan bytes: {}", e);
482488
set_action_listener_error(&mut env, listener,
483-
&DataFusionError::Execution(format!("Failed to convert plan bytes: {}", e)));
489+
&DataFusionError::Execution(format!("Failed to convert plan bytes: {}", e)));
484490
return;
485491
}
486492
};
@@ -491,7 +497,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_executeQu
491497
Err(e) => {
492498
error!("Failed to create global ref: {}", e);
493499
set_action_listener_error(&mut env, listener,
494-
&DataFusionError::Execution(format!("Failed to create global ref: {}", e)));
500+
&DataFusionError::Execution(format!("Failed to create global ref: {}", e)));
495501
return;
496502
}
497503
};
@@ -511,6 +517,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_executeQu
511517
files_meta,
512518
table_name,
513519
plan_bytes_vec,
520+
is_aggregation_query,
514521
runtime,
515522
cpu_executor,
516523
).await;
@@ -559,7 +566,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_streamNex
559566
Err(e) => {
560567
error!("Failed to create global ref: {}", e);
561568
set_action_listener_error(&mut env, listener,
562-
&DataFusionError::Execution(format!("Failed to create global ref: {}", e)));
569+
&DataFusionError::Execution(format!("Failed to create global ref: {}", e)));
563570
return;
564571
}
565572
};
@@ -644,7 +651,8 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_executeFe
644651
_class: JClass,
645652
shard_view_ptr: jlong,
646653
values: JLongArray,
647-
projections: JObjectArray,
654+
include_fields: JObjectArray,
655+
exclude_fields: JObjectArray,
648656
runtime_ptr: jlong,
649657
callback: JObject,
650658
) -> jlong {
@@ -654,8 +662,10 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_executeFe
654662
let table_path = shard_view.table_path();
655663
let files_metadata = shard_view.files_metadata();
656664

657-
let projections: Vec<String> =
658-
parse_string_arr(&mut env, projections).expect("Expected list of files");
665+
let include_fields: Vec<String> =
666+
parse_string_arr(&mut env, include_fields).expect("Expected list of files");
667+
let exclude_fields: Vec<String> =
668+
parse_string_arr(&mut env, exclude_fields).expect("Expected list of files");
659669

660670
// Safety checks first
661671
if values.is_null() {
@@ -697,7 +707,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_executeFe
697707
None => {
698708
error!("Runtime manager not initialized");
699709
set_action_listener_error(&mut env, callback,
700-
&DataFusionError::Execution("Runtime manager not initialized".to_string()));
710+
&DataFusionError::Execution("Runtime manager not initialized".to_string()));
701711
return 0;
702712
}
703713
};
@@ -710,7 +720,8 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_executeFe
710720
table_path,
711721
files_metadata,
712722
row_ids,
713-
projections,
723+
include_fields,
724+
exclude_fields,
714725
runtime,
715726
cpu_executor,
716727
).await {

plugins/engine-datafusion/jni/src/listing_table.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ use futures::{future, stream, Stream, StreamExt, TryStreamExt};
6464
use itertools::Itertools;
6565
use object_store::ObjectStore;
6666
use regex::Regex;
67+
use crate::absolute_row_id_optimizer::ROW_ID_FIELD_NAME;
6768
use std::fs::File;
6869
use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc};
6970

@@ -302,7 +303,7 @@ impl ListingTableConfig {
302303
/// # Errors
303304
/// * if `self.options` is not set. See [`Self::with_listing_options`]
304305
pub async fn infer_schema(self, state: &dyn Session) -> Result<Self> {
305-
match self.options {
306+
match self.options {
306307
Some(options) => {
307308
let ListingTableConfig {
308309
table_paths,
@@ -1144,7 +1145,7 @@ impl ListingTable {
11441145
}
11451146
let row_id_field_datatype = self
11461147
.file_schema
1147-
.field_with_name("___row_id")
1148+
.field_with_name(ROW_ID_FIELD_NAME)
11481149
.expect("Field ___row_id not found")
11491150
.data_type();
11501151
if !(row_id_field_datatype.equals_datatype(&DataType::Int32)

0 commit comments

Comments
 (0)