diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 1fedafbe84..2543705fb0 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -77,10 +77,6 @@ pub struct ScanExec { metrics: ExecutionPlanMetricsSet, /// Baseline metrics baseline_metrics: BaselineMetrics, - /// Time waiting for JVM input plan to execute and return batches - jvm_fetch_time: Time, - /// Time spent in FFI - arrow_ffi_time: Time, /// Whether native code can assume ownership of batches that it receives arrow_ffi_safe: bool, } @@ -95,8 +91,6 @@ impl ScanExec { ) -> Result { let metrics_set = ExecutionPlanMetricsSet::default(); let baseline_metrics = BaselineMetrics::new(&metrics_set, 0); - let arrow_ffi_time = MetricBuilder::new(&metrics_set).subset_time("arrow_ffi_time", 0); - let jvm_fetch_time = MetricBuilder::new(&metrics_set).subset_time("jvm_fetch_time", 0); // Build schema directly from data types since get_next now always unpacks dictionaries let schema = schema_from_data_types(&data_types); @@ -119,8 +113,6 @@ impl ScanExec { cache, metrics: metrics_set, baseline_metrics, - jvm_fetch_time, - arrow_ffi_time, schema, arrow_ffi_safe, }) @@ -155,8 +147,6 @@ impl ScanExec { self.exec_context_id, self.input_source.as_ref().unwrap().as_obj(), self.data_types.len(), - &self.jvm_fetch_time, - &self.arrow_ffi_time, self.arrow_ffi_safe, )?; *current_batch = Some(next_batch); @@ -172,8 +162,6 @@ impl ScanExec { exec_context_id: i64, iter: &JObject, num_cols: usize, - jvm_fetch_time: &Time, - arrow_ffi_time: &Time, arrow_ffi_safe: bool, ) -> Result { if exec_context_id == TEST_EXEC_CONTEXT_ID { @@ -189,15 +177,11 @@ impl ScanExec { let mut env = JVMClasses::get_env()?; - let mut timer = jvm_fetch_time.timer(); - let num_rows: i32 = unsafe { jni_call!(&mut env, comet_batch_iterator(iter).has_next() -> i32)? }; - timer.stop(); - if num_rows == -1 { return Ok(InputBatch::EOF); } @@ -206,11 +190,9 @@ impl ScanExec { // JVM via FFI // Selection vectors can be provided by, for instance, Iceberg to // remove rows that have been deleted. - let selection_indices_arrays = - Self::get_selection_indices(&mut env, iter, num_cols, jvm_fetch_time, arrow_ffi_time)?; + let selection_indices_arrays = Self::get_selection_indices(&mut env, iter, num_cols)?; // fetch batch data from JVM via FFI - let mut timer = arrow_ffi_time.timer(); let (num_rows, array_addrs, schema_addrs) = Self::allocate_and_fetch_batch(&mut env, iter, num_cols)?; @@ -262,8 +244,6 @@ impl ScanExec { } } - timer.stop(); - // If selection was applied, determine the actual row count from the selected arrays let actual_num_rows = if let Some(ref selection_arrays) = selection_indices_arrays { if !selection_arrays.is_empty() { @@ -332,21 +312,15 @@ impl ScanExec { env: &mut jni::JNIEnv, iter: &JObject, num_cols: usize, - jvm_fetch_time: &Time, - arrow_ffi_time: &Time, ) -> Result>, CometError> { // Check if all columns have selection vectors - let mut timer = jvm_fetch_time.timer(); let has_selection_vectors_result: jni::sys::jboolean = unsafe { jni_call!(env, comet_batch_iterator(iter).has_selection_vectors() -> jni::sys::jboolean)? }; - timer.stop(); let has_selection_vectors = has_selection_vectors_result != 0; let selection_indices_arrays = if has_selection_vectors { - let mut timer = arrow_ffi_time.timer(); - // Allocate arrays for selection indices export (one per column) let mut indices_array_addrs = Vec::with_capacity(num_cols); let mut indices_schema_addrs = Vec::with_capacity(num_cols); @@ -364,10 +338,7 @@ impl ScanExec { env.set_long_array_region(&indices_array_obj, 0, &indices_array_addrs)?; env.set_long_array_region(&indices_schema_obj, 0, &indices_schema_addrs)?; - timer.stop(); - // Export selection indices from JVM - let mut timer = jvm_fetch_time.timer(); let _exported_count: i32 = unsafe { jni_call!(env, comet_batch_iterator(iter).export_selection_indices( @@ -375,10 +346,8 @@ impl ScanExec { JValueGen::Object(JObject::from(indices_schema_obj).as_ref()) ) -> i32)? }; - timer.stop(); // Convert to ArrayRef for easier handling - let mut timer = arrow_ffi_time.timer(); let mut selection_arrays = Vec::with_capacity(num_cols); for i in 0..num_cols { let array_data = @@ -391,7 +360,6 @@ impl ScanExec { Rc::from_raw(indices_schema_addrs[i] as *const FFI_ArrowSchema); } } - timer.stop(); Some(selection_arrays) } else {