Skip to content

Commit 3dcd9ad

Browse files
authored
chore: Remove low-level ffi/jvm timers from native ScanExec (apache#2939)
1 parent b9fd050 commit 3dcd9ad

File tree

1 file changed

+1
-33
lines changed
  • native/core/src/execution/operators

1 file changed

+1
-33
lines changed

native/core/src/execution/operators/scan.rs

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,6 @@ pub struct ScanExec {
7777
metrics: ExecutionPlanMetricsSet,
7878
/// Baseline metrics
7979
baseline_metrics: BaselineMetrics,
80-
/// Time waiting for JVM input plan to execute and return batches
81-
jvm_fetch_time: Time,
82-
/// Time spent in FFI
83-
arrow_ffi_time: Time,
8480
/// Whether native code can assume ownership of batches that it receives
8581
arrow_ffi_safe: bool,
8682
}
@@ -95,8 +91,6 @@ impl ScanExec {
9591
) -> Result<Self, CometError> {
9692
let metrics_set = ExecutionPlanMetricsSet::default();
9793
let baseline_metrics = BaselineMetrics::new(&metrics_set, 0);
98-
let arrow_ffi_time = MetricBuilder::new(&metrics_set).subset_time("arrow_ffi_time", 0);
99-
let jvm_fetch_time = MetricBuilder::new(&metrics_set).subset_time("jvm_fetch_time", 0);
10094

10195
// Build schema directly from data types since get_next now always unpacks dictionaries
10296
let schema = schema_from_data_types(&data_types);
@@ -119,8 +113,6 @@ impl ScanExec {
119113
cache,
120114
metrics: metrics_set,
121115
baseline_metrics,
122-
jvm_fetch_time,
123-
arrow_ffi_time,
124116
schema,
125117
arrow_ffi_safe,
126118
})
@@ -155,8 +147,6 @@ impl ScanExec {
155147
self.exec_context_id,
156148
self.input_source.as_ref().unwrap().as_obj(),
157149
self.data_types.len(),
158-
&self.jvm_fetch_time,
159-
&self.arrow_ffi_time,
160150
self.arrow_ffi_safe,
161151
)?;
162152
*current_batch = Some(next_batch);
@@ -172,8 +162,6 @@ impl ScanExec {
172162
exec_context_id: i64,
173163
iter: &JObject,
174164
num_cols: usize,
175-
jvm_fetch_time: &Time,
176-
arrow_ffi_time: &Time,
177165
arrow_ffi_safe: bool,
178166
) -> Result<InputBatch, CometError> {
179167
if exec_context_id == TEST_EXEC_CONTEXT_ID {
@@ -189,15 +177,11 @@ impl ScanExec {
189177

190178
let mut env = JVMClasses::get_env()?;
191179

192-
let mut timer = jvm_fetch_time.timer();
193-
194180
let num_rows: i32 = unsafe {
195181
jni_call!(&mut env,
196182
comet_batch_iterator(iter).has_next() -> i32)?
197183
};
198184

199-
timer.stop();
200-
201185
if num_rows == -1 {
202186
return Ok(InputBatch::EOF);
203187
}
@@ -206,11 +190,9 @@ impl ScanExec {
206190
// JVM via FFI
207191
// Selection vectors can be provided by, for instance, Iceberg to
208192
// remove rows that have been deleted.
209-
let selection_indices_arrays =
210-
Self::get_selection_indices(&mut env, iter, num_cols, jvm_fetch_time, arrow_ffi_time)?;
193+
let selection_indices_arrays = Self::get_selection_indices(&mut env, iter, num_cols)?;
211194

212195
// fetch batch data from JVM via FFI
213-
let mut timer = arrow_ffi_time.timer();
214196
let (num_rows, array_addrs, schema_addrs) =
215197
Self::allocate_and_fetch_batch(&mut env, iter, num_cols)?;
216198

@@ -262,8 +244,6 @@ impl ScanExec {
262244
}
263245
}
264246

265-
timer.stop();
266-
267247
// If selection was applied, determine the actual row count from the selected arrays
268248
let actual_num_rows = if let Some(ref selection_arrays) = selection_indices_arrays {
269249
if !selection_arrays.is_empty() {
@@ -332,21 +312,15 @@ impl ScanExec {
332312
env: &mut jni::JNIEnv,
333313
iter: &JObject,
334314
num_cols: usize,
335-
jvm_fetch_time: &Time,
336-
arrow_ffi_time: &Time,
337315
) -> Result<Option<Vec<ArrayRef>>, CometError> {
338316
// Check if all columns have selection vectors
339-
let mut timer = jvm_fetch_time.timer();
340317
let has_selection_vectors_result: jni::sys::jboolean = unsafe {
341318
jni_call!(env,
342319
comet_batch_iterator(iter).has_selection_vectors() -> jni::sys::jboolean)?
343320
};
344-
timer.stop();
345321
let has_selection_vectors = has_selection_vectors_result != 0;
346322

347323
let selection_indices_arrays = if has_selection_vectors {
348-
let mut timer = arrow_ffi_time.timer();
349-
350324
// Allocate arrays for selection indices export (one per column)
351325
let mut indices_array_addrs = Vec::with_capacity(num_cols);
352326
let mut indices_schema_addrs = Vec::with_capacity(num_cols);
@@ -364,21 +338,16 @@ impl ScanExec {
364338
env.set_long_array_region(&indices_array_obj, 0, &indices_array_addrs)?;
365339
env.set_long_array_region(&indices_schema_obj, 0, &indices_schema_addrs)?;
366340

367-
timer.stop();
368-
369341
// Export selection indices from JVM
370-
let mut timer = jvm_fetch_time.timer();
371342
let _exported_count: i32 = unsafe {
372343
jni_call!(env,
373344
comet_batch_iterator(iter).export_selection_indices(
374345
JValueGen::Object(JObject::from(indices_array_obj).as_ref()),
375346
JValueGen::Object(JObject::from(indices_schema_obj).as_ref())
376347
) -> i32)?
377348
};
378-
timer.stop();
379349

380350
// Convert to ArrayRef for easier handling
381-
let mut timer = arrow_ffi_time.timer();
382351
let mut selection_arrays = Vec::with_capacity(num_cols);
383352
for i in 0..num_cols {
384353
let array_data =
@@ -391,7 +360,6 @@ impl ScanExec {
391360
Rc::from_raw(indices_schema_addrs[i] as *const FFI_ArrowSchema);
392361
}
393362
}
394-
timer.stop();
395363

396364
Some(selection_arrays)
397365
} else {

0 commit comments

Comments
 (0)