Skip to content

Commit 769f367

Browse files
authored
fix: Track elapsed_compute metric for CSV scans (#18901)
## Which issue does this PR close? - Closes #18795 ## Rationale for this change Currently, scanning CSV files does not correctly report the `elapsed_compute` metric in `BaselineMetrics`. When running `EXPLAIN ANALYZE` on a CSV table, the `elapsed_compute` time is reported as negligible (e.g., `1ns` or `42ns`), even for large files, because the time spent parsing the CSV data is not being measured. This PR ensures that the time spent reading and parsing CSV batches is correctly accounted for in the execution metrics. ## What changes are included in this PR? - Modified `datafusion/datasource-csv/src/source.rs`. - Updated `CsvOpener` to store the `partition_index`. - Initialized `BaselineMetrics` within C`svOpener::open`. - Wrapped the underlying CSV reader iterator using `std::iter::from_fn`. This allows us to start a timer before calling `reader.next()` and stop it immediately after, ensuring the CPU time spent parsing the CSV is captured in `elapsed_compute`. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes. Existing unit tests passed (`cargo test -p datafusion-datasource-csv`). Additionally, I verified the fix manually using `datafusion-cli` and `EXPLAIN ANALYZE`. **Before the fix:** elapsed_compute was reported as 85ns (effectively zero). ``` DataSourceExec: ... metrics=[output_rows=3, elapsed_compute=85ns, ... time_elapsed_processing=1.54ms ...] ``` **After the fix**: elapsed_compute is reported as 1.11ms, accurately reflecting the parsing time. ``` DataSourceExec: ... metrics=[output_rows=3, elapsed_compute=1.11ms, ... time_elapsed_processing=1.26ms ...] ``` ## Are there any user-facing changes? Users viewing `EXPLAIN ANALYZE` output for queries involving CSV files will now see accurate `elapsed_compute` values instead of near-zero values.
1 parent 4eb2933 commit 769f367

File tree

1 file changed

+19
-3
lines changed

1 file changed

+19
-3
lines changed

datafusion/datasource-csv/src/source.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use datafusion_common_runtime::JoinSet;
3939
use datafusion_datasource::file::FileSource;
4040
use datafusion_datasource::file_scan_config::FileScanConfig;
4141
use datafusion_execution::TaskContext;
42-
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
42+
use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
4343
use datafusion_physical_plan::{
4444
DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
4545
};
@@ -213,6 +213,7 @@ pub struct CsvOpener {
213213
config: Arc<CsvSource>,
214214
file_compression_type: FileCompressionType,
215215
object_store: Arc<dyn ObjectStore>,
216+
partition_index: usize,
216217
}
217218

218219
impl CsvOpener {
@@ -226,6 +227,7 @@ impl CsvOpener {
226227
config,
227228
file_compression_type,
228229
object_store,
230+
partition_index: 0,
229231
}
230232
}
231233
}
@@ -241,12 +243,13 @@ impl FileSource for CsvSource {
241243
&self,
242244
object_store: Arc<dyn ObjectStore>,
243245
base_config: &FileScanConfig,
244-
_partition: usize,
246+
partition: usize,
245247
) -> Arc<dyn FileOpener> {
246248
Arc::new(CsvOpener {
247249
config: Arc::new(self.clone()),
248250
file_compression_type: base_config.file_compression_type,
249251
object_store,
252+
partition_index: partition,
250253
})
251254
}
252255

@@ -352,6 +355,9 @@ impl FileOpener for CsvOpener {
352355
let store = Arc::clone(&self.object_store);
353356
let terminator = self.config.terminator();
354357

358+
let baseline_metrics =
359+
BaselineMetrics::new(&self.config.metrics, self.partition_index);
360+
355361
Ok(Box::pin(async move {
356362
// Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries)
357363

@@ -391,7 +397,17 @@ impl FileOpener for CsvOpener {
391397
)?
392398
};
393399

394-
Ok(futures::stream::iter(config.open(decoder)?)
400+
let mut reader = config.open(decoder)?;
401+
402+
// Use std::iter::from_fn to wrap execution of iterator's next() method.
403+
let iterator = std::iter::from_fn(move || {
404+
let mut timer = baseline_metrics.elapsed_compute().timer();
405+
let result = reader.next();
406+
timer.stop();
407+
result
408+
});
409+
410+
Ok(futures::stream::iter(iterator)
395411
.map(|r| r.map_err(Into::into))
396412
.boxed())
397413
}

0 commit comments

Comments
 (0)