Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 1 addition & 33 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,6 @@ pub struct ScanExec {
metrics: ExecutionPlanMetricsSet,
/// Baseline metrics
baseline_metrics: BaselineMetrics,
/// Time waiting for JVM input plan to execute and return batches
jvm_fetch_time: Time,
/// Time spent in FFI
arrow_ffi_time: Time,
/// Whether native code can assume ownership of batches that it receives
arrow_ffi_safe: bool,
}
Expand All @@ -95,8 +91,6 @@ impl ScanExec {
) -> Result<Self, CometError> {
let metrics_set = ExecutionPlanMetricsSet::default();
let baseline_metrics = BaselineMetrics::new(&metrics_set, 0);
let arrow_ffi_time = MetricBuilder::new(&metrics_set).subset_time("arrow_ffi_time", 0);
let jvm_fetch_time = MetricBuilder::new(&metrics_set).subset_time("jvm_fetch_time", 0);

// Build schema directly from data types since get_next now always unpacks dictionaries
let schema = schema_from_data_types(&data_types);
Expand All @@ -119,8 +113,6 @@ impl ScanExec {
cache,
metrics: metrics_set,
baseline_metrics,
jvm_fetch_time,
arrow_ffi_time,
schema,
arrow_ffi_safe,
})
Expand Down Expand Up @@ -155,8 +147,6 @@ impl ScanExec {
self.exec_context_id,
self.input_source.as_ref().unwrap().as_obj(),
self.data_types.len(),
&self.jvm_fetch_time,
&self.arrow_ffi_time,
self.arrow_ffi_safe,
)?;
*current_batch = Some(next_batch);
Expand All @@ -172,8 +162,6 @@ impl ScanExec {
exec_context_id: i64,
iter: &JObject,
num_cols: usize,
jvm_fetch_time: &Time,
arrow_ffi_time: &Time,
arrow_ffi_safe: bool,
) -> Result<InputBatch, CometError> {
if exec_context_id == TEST_EXEC_CONTEXT_ID {
Expand All @@ -189,15 +177,11 @@ impl ScanExec {

let mut env = JVMClasses::get_env()?;

let mut timer = jvm_fetch_time.timer();

let num_rows: i32 = unsafe {
jni_call!(&mut env,
comet_batch_iterator(iter).has_next() -> i32)?
};

timer.stop();

if num_rows == -1 {
return Ok(InputBatch::EOF);
}
Expand All @@ -206,11 +190,9 @@ impl ScanExec {
// JVM via FFI
// Selection vectors can be provided by, for instance, Iceberg to
// remove rows that have been deleted.
let selection_indices_arrays =
Self::get_selection_indices(&mut env, iter, num_cols, jvm_fetch_time, arrow_ffi_time)?;
let selection_indices_arrays = Self::get_selection_indices(&mut env, iter, num_cols)?;

// fetch batch data from JVM via FFI
let mut timer = arrow_ffi_time.timer();
let (num_rows, array_addrs, schema_addrs) =
Self::allocate_and_fetch_batch(&mut env, iter, num_cols)?;

Expand Down Expand Up @@ -262,8 +244,6 @@ impl ScanExec {
}
}

timer.stop();

// If selection was applied, determine the actual row count from the selected arrays
let actual_num_rows = if let Some(ref selection_arrays) = selection_indices_arrays {
if !selection_arrays.is_empty() {
Expand Down Expand Up @@ -332,21 +312,15 @@ impl ScanExec {
env: &mut jni::JNIEnv,
iter: &JObject,
num_cols: usize,
jvm_fetch_time: &Time,
arrow_ffi_time: &Time,
) -> Result<Option<Vec<ArrayRef>>, CometError> {
// Check if all columns have selection vectors
let mut timer = jvm_fetch_time.timer();
let has_selection_vectors_result: jni::sys::jboolean = unsafe {
jni_call!(env,
comet_batch_iterator(iter).has_selection_vectors() -> jni::sys::jboolean)?
};
timer.stop();
let has_selection_vectors = has_selection_vectors_result != 0;

let selection_indices_arrays = if has_selection_vectors {
let mut timer = arrow_ffi_time.timer();

// Allocate arrays for selection indices export (one per column)
let mut indices_array_addrs = Vec::with_capacity(num_cols);
let mut indices_schema_addrs = Vec::with_capacity(num_cols);
Expand All @@ -364,21 +338,16 @@ impl ScanExec {
env.set_long_array_region(&indices_array_obj, 0, &indices_array_addrs)?;
env.set_long_array_region(&indices_schema_obj, 0, &indices_schema_addrs)?;

timer.stop();

// Export selection indices from JVM
let mut timer = jvm_fetch_time.timer();
let _exported_count: i32 = unsafe {
jni_call!(env,
comet_batch_iterator(iter).export_selection_indices(
JValueGen::Object(JObject::from(indices_array_obj).as_ref()),
JValueGen::Object(JObject::from(indices_schema_obj).as_ref())
) -> i32)?
};
timer.stop();

// Convert to ArrayRef for easier handling
let mut timer = arrow_ffi_time.timer();
let mut selection_arrays = Vec::with_capacity(num_cols);
for i in 0..num_cols {
let array_data =
Expand All @@ -391,7 +360,6 @@ impl ScanExec {
Rc::from_raw(indices_schema_addrs[i] as *const FFI_ArrowSchema);
}
}
timer.stop();

Some(selection_arrays)
} else {
Expand Down
Loading