Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ struct ExecutionContext {
pub memory_pool_config: MemoryPoolConfig,
/// Whether to log memory usage on each call to execute_plan
pub tracing_enabled: bool,
/// Spark session timeZoneId
pub session_time_zone_id: String,
}

/// Accept serialized query plan and return the address of the native query plan.
Expand All @@ -171,12 +173,14 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
memory_limit_per_task: jlong,
task_attempt_id: jlong,
key_unwrapper_obj: JObject,
session_timezone: JString,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
// Deserialize Spark configs
let bytes = env.convert_byte_array(serialized_spark_configs)?;
let spark_configs = serde::deserialize_config(bytes.as_slice())?;
let spark_config: HashMap<String, String> = spark_configs.entries.into_iter().collect();
let session_time_zone_id: String = env.get_string(&session_timezone).unwrap().into();

// Access Comet configs
let debug_native = spark_config.get_bool(COMET_DEBUG_ENABLED);
Expand Down Expand Up @@ -276,6 +280,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
explain_native,
memory_pool_config,
tracing_enabled,
session_time_zone_id,
});

Ok(Box::into_raw(exec_context) as i64)
Expand Down Expand Up @@ -464,9 +469,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
// query plan, we need to defer stream initialization to first time execution.
if exec_context.root_op.is_none() {
let start = Instant::now();
let planner =
PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx), partition)
.with_exec_id(exec_context_id);
let planner = PhysicalPlanner::new(
Arc::clone(&exec_context.session_ctx),
&exec_context.session_time_zone_id,
partition,
)
.with_exec_id(exec_context_id);
let (scans, root_op) = planner.create_plan(
&exec_context.spark_plan,
&mut exec_context.input_sources.clone(),
Expand Down Expand Up @@ -594,6 +602,7 @@ fn update_metrics(env: &mut JNIEnv, exec_context: &mut ExecutionContext) -> Come
fn convert_datatype_arrays(
env: &'_ mut JNIEnv<'_>,
serialized_datatypes: JObjectArray,
session_time_zone_id: &str,
) -> JNIResult<Vec<ArrowDataType>> {
let array_len = env.get_array_length(&serialized_datatypes)?;
let mut res: Vec<ArrowDataType> = Vec::new();
Expand All @@ -603,7 +612,7 @@ fn convert_datatype_arrays(
let inner_array: JByteArray = inner_array.into();
let bytes = env.convert_byte_array(inner_array)?;
let data_type = serde::deserialize_data_type(bytes.as_slice()).unwrap();
let arrow_dt = to_arrow_datatype(&data_type);
let arrow_dt = to_arrow_datatype(&data_type, session_time_zone_id);
res.push(arrow_dt);
}

Expand Down Expand Up @@ -637,13 +646,17 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
compression_codec: JString,
compression_level: jint,
tracing_enabled: jboolean,
session_timezone: JString,
) -> jlongArray {
try_unwrap_or_throw(&e, |mut env| unsafe {
with_trace(
"writeSortedFileNative",
tracing_enabled != JNI_FALSE,
|| {
let data_types = convert_datatype_arrays(&mut env, serialized_datatypes)?;
let session_time_zone_id: String =
env.get_string(&session_timezone).unwrap().into();
let data_types =
convert_datatype_arrays(&mut env, serialized_datatypes, &session_time_zone_id)?;

let row_num = env.get_array_length(&row_addresses)? as usize;
let row_addresses =
Expand Down
Loading
Loading