diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index aaed1fc74f..e1efcf5836 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -503,60 +503,69 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( pull_input_batches(exec_context)?; } - loop { - // Polling the stream. - let next_item = exec_context.stream.as_mut().unwrap().next(); - let poll_output = get_runtime().block_on(async { poll!(next_item) }); - - // update metrics at interval - if let Some(interval) = exec_context.metrics_update_interval { - let now = Instant::now(); - if now - exec_context.metrics_last_update_time >= interval { - update_metrics(&mut env, exec_context)?; - exec_context.metrics_last_update_time = now; + // Enter the runtime once for the entire polling loop to avoid repeated + // Runtime::enter() overhead + get_runtime().block_on(async { + loop { + // Polling the stream. + let next_item = exec_context.stream.as_mut().unwrap().next(); + let poll_output = poll!(next_item); + + // update metrics at interval + if let Some(interval) = exec_context.metrics_update_interval { + let now = Instant::now(); + if now - exec_context.metrics_last_update_time >= interval { + update_metrics(&mut env, exec_context)?; + exec_context.metrics_last_update_time = now; + } } - } - match poll_output { - Poll::Ready(Some(output)) => { - // prepare output for FFI transfer - return prepare_output( - &mut env, - array_addrs, - schema_addrs, - output?, - exec_context.debug_native, - ); - } - Poll::Ready(None) => { - // Reaches EOF of output. - if exec_context.explain_native { - if let Some(plan) = &exec_context.root_op { - let formatted_plan_str = DisplayableExecutionPlan::with_metrics( - plan.native_plan.as_ref(), - ) - .indent(true); - info!( - "Comet native query plan with metrics (Plan #{} Stage {} Partition {}):\ - \n plan creation took {:?}:\ - \n{formatted_plan_str:}", - plan.plan_id, stage_id, partition, exec_context.plan_creation_time - ); + match poll_output { + Poll::Ready(Some(output)) => { + // prepare output for FFI transfer + return prepare_output( + &mut env, + array_addrs, + schema_addrs, + output?, + exec_context.debug_native, + ); + } + Poll::Ready(None) => { + // Reaches EOF of output. + if exec_context.explain_native { + if let Some(plan) = &exec_context.root_op { + let formatted_plan_str = DisplayableExecutionPlan::with_metrics( + plan.native_plan.as_ref(), + ) + .indent(true); + info!( + "Comet native query plan with metrics (Plan #{} Stage {} Partition {}):\ + \n plan creation took {:?}:\ + \n{formatted_plan_str:}", + plan.plan_id, stage_id, partition, exec_context.plan_creation_time + ); + } } + return Ok(-1); + } + // A poll pending means there are more than one blocking operators, + // we don't need go back-forth between JVM/Native. Just keeping polling. + Poll::Pending => { + // TODO: Investigate if JNI calls are safe without block_in_place. + // block_in_place prevents Tokio from migrating this task to another thread, + // which is necessary because JNI env is thread-local. If we can guarantee + // thread safety another way, we could remove this wrapper for better perf. + tokio::task::block_in_place(|| { + pull_input_batches(exec_context) + })?; + + // Output not ready yet + continue; } - return Ok(-1); - } - // A poll pending means there are more than one blocking operators, - // we don't need go back-forth between JVM/Native. Just keeping polling. - Poll::Pending => { - // Pull input batches - pull_input_batches(exec_context)?; - - // Output not ready yet - continue; } } - } + }) }) }) }