Skip to content

Commit 4b61351

Browse files
cargo changes and logging
Signed-off-by: bharath-techie <[email protected]>
1 parent e2cf8b7 commit 4b61351

File tree

3 files changed

+115
-31
lines changed

3 files changed

+115
-31
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
[workspace]
2+
resolver = "2"
3+
members = [
4+
"jni"
5+
]
6+
7+
[workspace.dependencies]
8+
# DataFusion dependencies
9+
datafusion = "49.0.0"
10+
datafusion-expr = "49.0.0"
11+
datafusion-datasource = "49.0.0"
12+
arrow-json = "55.2"
13+
arrow = { version = "55.2", features = ["ffi", "ipc_compression"] }
14+
#arrow = "55.2.0"
15+
arrow-array = "55.2.0"
16+
arrow-schema = "55.2.0"
17+
arrow-buffer = "55.2.0"
18+
19+
# JNI dependencies
20+
jni = "0.21"
21+
22+
# Substrait support
23+
datafusion-substrait = "49.0.0"
24+
prost = "0.13"
25+
26+
27+
# Async runtime
28+
tokio = { version = "1.0", features = ["full"] }
29+
futures = "0.3"
30+
#tokio = { version = "1.0", features = ["rt", "rt-multi-thread", "macros"] }
31+
32+
# Serialization
33+
serde = { version = "1.0", features = ["derive"] }
34+
serde_json = "1.0"
35+
36+
# Error handling
37+
anyhow = "1.0"
38+
thiserror = "1.0"
39+
40+
# Logging
41+
log = "0.4"
42+
# Parquet support
43+
parquet = "53.0.0"
44+
45+
# Object store for file access
46+
object_store = "=0.12.3"
47+
url = "2.0"
48+
49+
# Substrait support
50+
substrait = "0.47"
51+
52+
# Temporary directory support
53+
tempfile = "3.0"
54+
chrono = "0.4.41"
55+
56+
async-trait = "0.1.89"
57+
itertools = "0.14.0"
58+
rstest = "0.26.1"
59+
regex = "1.11.2"
60+
#
61+
#[build-dependencies]
62+
#cbindgen = "0.27"
63+
64+
65+
[profile.release]
66+
lto = true
67+
codegen-units = 1
68+
panic = "abort"
69+
70+
[profile.dev]
71+
opt-level = 1 # Some optimization for reasonable performance
72+
lto = false # Disable LTO for faster builds
73+
codegen-units = 16 # More parallel compilation
74+
incremental = true # Enable incremental compilation

plugins/engine-datafusion/jni/Cargo.toml

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,66 +11,68 @@ crate-type = ["cdylib"]
1111

1212
[dependencies]
1313
# DataFusion dependencies
14-
datafusion = "49.0.0"
15-
datafusion-expr = "49.0.0"
16-
datafusion-datasource = "49.0.0"
17-
arrow-json = "55.2"
18-
arrow = { version = "55.2", features = ["ffi", "ipc_compression"] }
14+
datafusion = { workspace = true }
15+
datafusion-expr = { workspace = true }
16+
datafusion-datasource = { workspace = true }
17+
arrow-json = { workspace = true }
18+
arrow = { workspace = true }
1919
#arrow = "55.2.0"
20-
arrow-array = "55.2.0"
21-
arrow-schema = "55.2.0"
22-
arrow-buffer = "55.2.0"
20+
arrow-array = { workspace = true }
21+
arrow-schema = { workspace = true }
22+
arrow-buffer = { workspace = true }
2323

2424
# JNI dependencies
25-
jni = "0.21"
25+
jni = { workspace = true }
2626

2727
# Substrait support
28-
datafusion-substrait = "49.0.0"
29-
prost = "0.13"
28+
datafusion-substrait = { workspace = true }
29+
prost = { workspace = true }
3030

3131

3232
# Async runtime
33-
tokio = { version = "1.0", features = ["full"] }
34-
futures = "0.3"
33+
tokio = { workspace = true }
34+
futures = { workspace = true }
3535
#tokio = { version = "1.0", features = ["rt", "rt-multi-thread", "macros"] }
3636

3737
# Serialization
38-
serde = { version = "1.0", features = ["derive"] }
39-
serde_json = "1.0"
38+
serde = { workspace = true }
39+
serde_json = { workspace = true }
4040

4141
# Error handling
42-
anyhow = "1.0"
43-
thiserror = "1.0"
42+
anyhow = { workspace = true }
43+
thiserror = { workspace = true }
4444

4545
# Logging
46-
log = "0.4"
46+
log ={ workspace = true }
4747
# Parquet support
48-
parquet = "53.0.0"
48+
parquet = { workspace = true }
4949

5050
# Object store for file access
51-
object_store = "=0.12.3"
52-
url = "2.0"
51+
object_store = { workspace = true }
52+
url = { workspace = true }
5353

5454
# Substrait support
55-
substrait = "0.47"
55+
substrait = { workspace = true }
5656

5757
# Temporary directory support
58-
tempfile = "3.0"
59-
chrono = "0.4.41"
58+
tempfile ={ workspace = true }
59+
chrono = { workspace = true }
6060

61-
async-trait = "0.1.89"
62-
itertools = "0.14.0"
63-
rstest = "0.26.1"
64-
regex = "1.11.2"
61+
async-trait = { workspace = true }
62+
itertools = { workspace = true }
63+
rstest = { workspace = true }
64+
regex = { workspace = true }
6565

6666
[build-dependencies]
6767
cbindgen = "0.27"
68-
68+
[build]
69+
rustflags = ["-C", "force-frame-pointers=yes","symbol-mangling-version=v0"]
6970

7071
[profile.release]
7172
lto = true
7273
codegen-units = 1
7374
panic = "abort"
75+
incremental = true
7476

7577
[profile.dev]
7678
opt-level = 1 # Some optimization for reasonable performance

plugins/engine-datafusion/jni/src/lib.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use jni::sys::{jbyteArray, jlong, jstring};
1313
use jni::JNIEnv;
1414
use std::ptr::addr_of_mut;
1515
use std::sync::Arc;
16+
use std::time::Instant;
1617

1718
mod util;
1819
mod row_id_optimizer;
@@ -208,6 +209,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute
208209
tokio_runtime_env_ptr: jlong,
209210
// callback: JObject,
210211
) -> jlong {
212+
let overall = Instant::now();
211213
let shard_view = unsafe { &*(shard_view_ptr as *const ShardView) };
212214
let runtime_ptr = unsafe { &*(tokio_runtime_env_ptr as *const Runtime)};
213215
let table_name: String = env.get_string(&table_name).expect("Couldn't get java string!").into();
@@ -228,7 +230,8 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute
228230

229231
// TODO: get config from CSV DataFormat
230232
let mut config = SessionConfig::new();
231-
// config.options_mut().execution.parquet.pushdown_filters = true;
233+
config.options_mut().execution.parquet.pushdown_filters = false;
234+
config.options_mut().execution.target_partitions = 1;
232235

233236
let state = datafusion::execution::SessionStateBuilder::new()
234237
.with_config(config)
@@ -265,6 +268,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute
265268

266269
});
267270

271+
let start = Instant::now();
268272
// TODO : how to close ctx ?
269273
// Convert Java byte array to Rust Vec<u8>
270274
let plan_bytes_obj = unsafe { JByteArray::from_raw(substrait_bytes) };
@@ -293,18 +297,22 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute
293297
let logical_plan = match from_substrait_plan(&ctx.state(), &substrait_plan).await {
294298
Ok(plan) => {
295299
// println!("SUBSTRAIT Rust: LogicalPlan: {:?}", plan);
300+
let duration = start.elapsed();
301+
println!("Rust: Substrait decoding time in milliseconds: {}", duration.as_millis());
296302
plan
297303
},
298304
Err(e) => {
299305
println!("SUBSTRAIT Rust: Failed to convert Substrait plan: {}", e);
300306
return 0;
301307
}
302308
};
303-
304309
let dataframe = ctx.execute_logical_plan(logical_plan).await.unwrap();
305310
let stream = dataframe.execute_stream().await.unwrap();
306311
let stream_ptr = Box::into_raw(Box::new(stream)) as jlong;
307312
// println!("The memory used currently right now: {:?}", jemalloc_stats::refresh_allocated());
313+
let duration1 = overall.elapsed();
314+
println!("Rust: Overall query setup time in milliseconds: {}", duration1.as_millis());
315+
308316
stream_ptr
309317

310318
})

0 commit comments

Comments
 (0)