Skip to content

Commit 6d8b8c5

Browse files
authored
chore: ScanExec::new no longer fetches data (#2881)
1 parent 24e4c12 commit 6d8b8c5

File tree

2 files changed

+15
-54
lines changed

2 files changed

+15
-54
lines changed

native/core/src/execution/jni_api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
534534
.indent(true);
535535
info!(
536536
"Comet native query plan with metrics (Plan #{} Stage {} Partition {}):\
537-
\n plan creation (including CometScans fetching first batches) took {:?}:\
537+
\n plan creation took {:?}:\
538538
\n{formatted_plan_str:}",
539539
plan.plan_id, stage_id, partition, exec_context.plan_creation_time
540540
);

native/core/src/execution/operators/scan.rs

Lines changed: 14 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -98,29 +98,8 @@ impl ScanExec {
9898
let arrow_ffi_time = MetricBuilder::new(&metrics_set).subset_time("arrow_ffi_time", 0);
9999
let jvm_fetch_time = MetricBuilder::new(&metrics_set).subset_time("jvm_fetch_time", 0);
100100

101-
// Scan's schema is determined by the input batch, so we need to set it before execution.
102-
// Note that we determine if arrays are dictionary-encoded based on the
103-
// first batch. The array may be dictionary-encoded in some batches and not others, and
104-
// ScanExec will cast arrays from all future batches to the type determined here, so we
105-
// may end up either unpacking dictionary arrays or dictionary-encoding arrays.
106-
// Dictionary-encoded primitive arrays are always unpacked.
107-
let first_batch = if let Some(input_source) = input_source.as_ref() {
108-
let mut timer = baseline_metrics.elapsed_compute().timer();
109-
let batch = ScanExec::get_next(
110-
exec_context_id,
111-
input_source.as_obj(),
112-
data_types.len(),
113-
&jvm_fetch_time,
114-
&arrow_ffi_time,
115-
arrow_ffi_safe,
116-
)?;
117-
timer.stop();
118-
batch
119-
} else {
120-
InputBatch::EOF
121-
};
122-
123-
let schema = scan_schema(&first_batch, &data_types);
101+
// Build schema directly from data types since get_next now always unpacks dictionaries
102+
let schema = schema_from_data_types(&data_types);
124103

125104
let cache = PlanProperties::new(
126105
EquivalenceProperties::new(Arc::clone(&schema)),
@@ -136,7 +115,7 @@ impl ScanExec {
136115
input_source,
137116
input_source_description: input_source_description.to_string(),
138117
data_types,
139-
batch: Arc::new(Mutex::new(Some(first_batch))),
118+
batch: Arc::new(Mutex::new(None)),
140119
cache,
141120
metrics: metrics_set,
142121
baseline_metrics,
@@ -423,26 +402,16 @@ impl ScanExec {
423402
}
424403
}
425404

426-
fn scan_schema(input_batch: &InputBatch, data_types: &[DataType]) -> SchemaRef {
427-
let fields = match input_batch {
428-
// Note that if `columns` is empty, we'll get an empty schema
429-
InputBatch::Batch(columns, _) => {
430-
columns
431-
.iter()
432-
.enumerate()
433-
.map(|(idx, c)| {
434-
let datatype = ScanExec::unpack_dictionary_type(c.data_type());
435-
// We don't use the field name. Put a placeholder.
436-
Field::new(format!("col_{idx}"), datatype, true)
437-
})
438-
.collect::<Vec<Field>>()
439-
}
440-
_ => data_types
441-
.iter()
442-
.enumerate()
443-
.map(|(idx, dt)| Field::new(format!("col_{idx}"), dt.clone(), true))
444-
.collect(),
445-
};
405+
fn schema_from_data_types(data_types: &[DataType]) -> SchemaRef {
406+
let fields = data_types
407+
.iter()
408+
.enumerate()
409+
.map(|(idx, dt)| {
410+
let datatype = ScanExec::unpack_dictionary_type(dt);
411+
// We don't use the field name. Put a placeholder.
412+
Field::new(format!("col_{idx}"), datatype, true)
413+
})
414+
.collect::<Vec<Field>>();
446415

447416
Arc::new(Schema::new(fields))
448417
}
@@ -453,15 +422,7 @@ impl ExecutionPlan for ScanExec {
453422
}
454423

455424
fn schema(&self) -> SchemaRef {
456-
if self.exec_context_id == TEST_EXEC_CONTEXT_ID {
457-
// `unwrap` is safe because `schema` is only called during converting
458-
// Spark plan to DataFusion plan. At the moment, `batch` is not EOF.
459-
let binding = self.batch.try_lock().unwrap();
460-
let input_batch = binding.as_ref().unwrap();
461-
scan_schema(input_batch, &self.data_types)
462-
} else {
463-
Arc::clone(&self.schema)
464-
}
425+
Arc::clone(&self.schema)
465426
}
466427

467428
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {

0 commit comments

Comments
 (0)