Skip to content

Commit c79e554

Browse files
Aniket Modakbharath-techie
authored andcommitted
added global row id optimizer and tests for query phase
1 parent 1276efd commit c79e554

File tree

10 files changed

+2061
-35
lines changed

10 files changed

+2061
-35
lines changed

plugins/engine-datafusion/build.gradle

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ dependencies {
4343
implementation "org.apache.arrow:arrow-c-data:17.0.0"
4444
implementation "org.apache.arrow:arrow-format:17.0.0"
4545
// SLF4J API for Arrow logging compatibility
46-
implementation "org.slf4j:slf4j-api:2.0.17"
46+
implementation "org.slf4j:slf4j-api:${versions.slf4j}"
4747
// CheckerFramework annotations required by Arrow 17.0.0
4848
implementation "org.checkerframework:checker-qual:3.42.0"
4949
// FlatBuffers dependency required by Arrow 17.0.0
@@ -172,7 +172,9 @@ clean {
172172

173173
test {
174174
// Set system property to help tests find the native library
175-
systemProperty 'java.library.path', file('src/main/resources/native').absolutePath
175+
jvmArgs += ["--add-opens", "java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"]
176+
177+
systemProperty 'java.library.path', file('src/main/resources/native').absolutePath
176178
}
177179

178180
yamlRestTest {

plugins/engine-datafusion/jni/Cargo.toml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@ crate-type = ["cdylib"]
1212
[dependencies]
1313
# DataFusion dependencies
1414
datafusion = "49.0.0"
15+
datafusion-expr = "49.0.0"
16+
datafusion-datasource = "49.0.0"
1517
arrow-json = "55.2"
16-
#arrow = { version = "55.2", features = ["ffi", "ipc_compression"] }
17-
arrow = "55.2.0"
18+
arrow = { version = "55.2", features = ["ffi", "ipc_compression"] }
19+
#arrow = "55.2.0"
1820
arrow-array = "55.2.0"
1921
arrow-schema = "55.2.0"
2022
arrow-buffer = "55.2.0"
@@ -56,6 +58,11 @@ substrait = "0.47"
5658
tempfile = "3.0"
5759
chrono = "0.4.41"
5860

61+
async-trait = "0.1.89"
62+
itertools = "0.14.0"
63+
rstest = "0.26.1"
64+
regex = "1.11.2"
65+
5966
[build-dependencies]
6067
cbindgen = "0.27"
6168

plugins/engine-datafusion/jni/src/lib.rs

Lines changed: 95 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,32 +5,42 @@
55
* this file be licensed under the Apache-2.0 license or a
66
* compatible open source license.
77
*/
8-
9-
use jni::objects::{JByteArray, JClass};
8+
use std::ptr::addr_of_mut;
9+
use jni::objects::{JByteArray, JClass, JObject};
1010
use jni::sys::{jbyteArray, jlong, jstring};
1111
use jni::JNIEnv;
1212
use std::sync::Arc;
13+
use arrow_array::{Array, StructArray};
14+
use arrow_array::ffi::FFI_ArrowArray;
15+
use arrow_schema::DataType;
16+
use arrow_schema::ffi::FFI_ArrowSchema;
1317

1418
mod util;
19+
mod row_id_optimizer;
20+
mod listing_table;
1521

1622
use datafusion::execution::context::SessionContext;
1723

18-
use crate::util::{create_object_meta_from_filenames, parse_string_arr};
24+
use crate::util::{create_object_meta_from_filenames, parse_string_arr, set_object_result_error, set_object_result_ok};
1925
use datafusion::datasource::file_format::csv::CsvFormat;
20-
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl};
26+
use datafusion::datasource::listing::{ListingTableUrl};
2127
use datafusion::execution::cache::cache_manager::CacheManagerConfig;
2228
use datafusion::execution::cache::cache_unit::DefaultListFilesCache;
2329
use datafusion::execution::cache::CacheAccessor;
2430
use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
2531
use datafusion::prelude::SessionConfig;
2632
use datafusion::DATAFUSION_VERSION;
2733
use datafusion::datasource::file_format::parquet::ParquetFormat;
34+
use datafusion::physical_plan::SendableRecordBatchStream;
2835
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
2936
use datafusion_substrait::substrait::proto::Plan;
37+
use futures::TryStreamExt;
3038
use jni::objects::{JObjectArray, JString};
3139
use object_store::ObjectMeta;
3240
use prost::Message;
3341
use tokio::runtime::Runtime;
42+
use crate::listing_table::{ListingOptions, ListingTable, ListingTableConfig};
43+
use crate::row_id_optimizer::FilterRowIdOptimizer;
3444

3545
/// Create a new DataFusion session context
3646
#[no_mangle]
@@ -135,7 +145,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_closeSe
135145

136146

137147
#[no_mangle]
138-
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createReader(
148+
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createDatafusionReader(
139149
mut env: JNIEnv,
140150
_class: JClass,
141151
table_path: JString,
@@ -185,7 +195,7 @@ impl ShardView {
185195

186196

187197
#[no_mangle]
188-
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_nativeExecuteSubstraitQuery(
198+
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_executeSubstraitQuery(
189199
mut env: JNIEnv,
190200
_class: JClass,
191201
shard_view_ptr: jlong,
@@ -205,15 +215,28 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_nativeE
205215

206216
let runtime_env = RuntimeEnvBuilder::new()
207217
.with_cache_manager(CacheManagerConfig::default()
208-
.with_list_files_cache(Some(list_file_cache))).build().unwrap();
209-
210-
let ctx = SessionContext::new_with_config_rt(SessionConfig::new(), Arc::new(runtime_env));
218+
//.with_list_files_cache(Some(list_file_cache)) TODO: //Fix this
219+
).build().unwrap();
220+
221+
// TODO: get config from CSV DataFormat
222+
let mut config = SessionConfig::new();
223+
// config.options_mut().execution.parquet.pushdown_filters = true;
224+
225+
let state = datafusion::execution::SessionStateBuilder::new()
226+
.with_config(config)
227+
.with_runtime_env(Arc::from(runtime_env))
228+
.with_default_features()
229+
// .with_optimizer_rule(Arc::new(OptimizeRowId))
230+
// .with_physical_optimizer_rule(Arc::new(FilterRowIdOptimizer)) // TODO: enable only for query phase
231+
.build();
211232

233+
let ctx = SessionContext::new_with_state(state);
212234

213235
// Create default parquet options
214236
let file_format = ParquetFormat::new();
215237
let listing_options = ListingOptions::new(Arc::new(file_format))
216-
.with_file_extension(".parquet");
238+
.with_file_extension(".parquet"); // TODO: take this as parameter
239+
// .with_table_partition_cols(vec![("row_base".to_string(), DataType::Int32)]); // TODO: enable only for query phase
217240

218241
// Ideally the executor will give this
219242
runtime_ptr.block_on(async {
@@ -229,7 +252,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_nativeE
229252
// Create a new TableProvider
230253
let provider = Arc::new(ListingTable::try_new(config).unwrap());
231254
let shard_id = table_path.prefix().filename().expect("error in fetching Path");
232-
ctx.register_table(shard_id, provider)
255+
ctx.register_table("logs", provider)
233256
.expect("Failed to attach the Table");
234257

235258
});
@@ -345,4 +368,65 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_nativeC
345368

346369

347370

371+
#[no_mangle]
372+
pub extern "system" fn Java_org_opensearch_datafusion_RecordBatchStream_next(
373+
mut env: JNIEnv,
374+
_class: JClass,
375+
runtime_ptr: jlong,
376+
stream: jlong,
377+
callback: JObject,
378+
) {
379+
let runtime = unsafe { &mut *(runtime_ptr as *mut Runtime) };
380+
381+
let stream = unsafe { &mut *(stream as *mut SendableRecordBatchStream) };
382+
runtime.block_on(async {
383+
//let fetch_start = std::time::Instant::now();
384+
let next = stream.try_next().await;
385+
//let fetch_time = fetch_start.elapsed();
386+
match next {
387+
Ok(Some(batch)) => {
388+
//let convert_start = std::time::Instant::now();
389+
// Convert to struct array for compatibility with FFI
390+
//println!("Num rows : {}", batch.num_rows());
391+
let struct_array: StructArray = batch.into();
392+
let array_data = struct_array.into_data();
393+
let mut ffi_array = FFI_ArrowArray::new(&array_data);
394+
//let convert_time = convert_start.elapsed();
395+
// ffi_array must remain alive until after the callback is called
396+
// let callback_start = std::time::Instant::now();
397+
set_object_result_ok(&mut env, callback, addr_of_mut!(ffi_array));
398+
// let callback_time = callback_start.elapsed();
399+
// println!("Fetch: {:?}, Convert: {:?}, Callback: {:?}",
400+
// fetch_time, convert_time, callback_time);
401+
}
402+
Ok(None) => {
403+
set_object_result_ok(&mut env, callback, 0 as *mut FFI_ArrowSchema);
404+
}
405+
Err(err) => {
406+
set_object_result_error(&mut env, callback, &err);
407+
}
408+
}
409+
//println!("Total time: {:?}", start.elapsed());
410+
});
411+
}
348412

413+
#[no_mangle]
414+
pub extern "system" fn Java_org_opensearch_datafusion_RecordBatchStream_getSchema(
415+
mut env: JNIEnv,
416+
_class: JClass,
417+
stream: jlong,
418+
callback: JObject,
419+
) {
420+
let stream = unsafe { &mut *(stream as *mut SendableRecordBatchStream) };
421+
let schema = stream.schema();
422+
let ffi_schema = FFI_ArrowSchema::try_from(&*schema);
423+
match ffi_schema {
424+
Ok(mut ffi_schema) => {
425+
// ffi_schema must remain alive until after the callback is called
426+
set_object_result_ok(&mut env, callback, addr_of_mut!(ffi_schema));
427+
}
428+
Err(err) => {
429+
set_object_result_error(&mut env, callback, &err);
430+
}
431+
}
432+
}

0 commit comments

Comments
 (0)