Skip to content

Commit 84b491c

Browse files
xudong963claude
andcommitted
Cherry pick add metrics for parquet sink from upstream (#32)
Cherry-pick of apache#20307 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 9522508 commit 84b491c

File tree

3 files changed

+183
-3
lines changed

3 files changed

+183
-3
lines changed

datafusion/core/src/dataframe/parquet.rs

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,4 +324,147 @@ mod tests {
324324

325325
Ok(())
326326
}
327+
328+
/// Test that ParquetSink exposes rows_written, bytes_written, and
329+
/// elapsed_compute metrics via DataSinkExec.
330+
#[tokio::test]
331+
async fn test_parquet_sink_metrics() -> Result<()> {
332+
use arrow::array::Int32Array;
333+
use arrow::datatypes::{DataType, Field, Schema};
334+
use arrow::record_batch::RecordBatch;
335+
use datafusion_execution::TaskContext;
336+
337+
use futures::TryStreamExt;
338+
339+
let ctx = SessionContext::new();
340+
let tmp_dir = TempDir::new()?;
341+
let output_path = tmp_dir.path().join("metrics_test.parquet");
342+
let output_path_str = output_path.to_str().unwrap();
343+
344+
// Register a table with 100 rows
345+
let schema = Arc::new(Schema::new(vec![
346+
Field::new("id", DataType::Int32, false),
347+
Field::new("val", DataType::Int32, false),
348+
]));
349+
let ids: Vec<i32> = (0..100).collect();
350+
let vals: Vec<i32> = (100..200).collect();
351+
let batch = RecordBatch::try_new(
352+
Arc::clone(&schema),
353+
vec![
354+
Arc::new(Int32Array::from(ids)),
355+
Arc::new(Int32Array::from(vals)),
356+
],
357+
)?;
358+
ctx.register_batch("source", batch)?;
359+
360+
// Create the physical plan for COPY TO
361+
let df = ctx
362+
.sql(&format!(
363+
"COPY source TO '{output_path_str}' STORED AS PARQUET"
364+
))
365+
.await?;
366+
let plan = df.create_physical_plan().await?;
367+
368+
// Execute the plan
369+
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
370+
let stream = plan.execute(0, task_ctx)?;
371+
let _batches: Vec<_> = stream.try_collect().await?;
372+
373+
// Check metrics on the DataSinkExec (top-level plan)
374+
let metrics = plan
375+
.metrics()
376+
.expect("DataSinkExec should return metrics from ParquetSink");
377+
let aggregated = metrics.aggregate_by_name();
378+
379+
// rows_written should be 100
380+
let rows_written = aggregated
381+
.iter()
382+
.find(|m| m.value().name() == "rows_written")
383+
.expect("should have rows_written metric");
384+
assert_eq!(
385+
rows_written.value().as_usize(),
386+
100,
387+
"expected 100 rows written"
388+
);
389+
390+
// bytes_written should be > 0
391+
let bytes_written = aggregated
392+
.iter()
393+
.find(|m| m.value().name() == "bytes_written")
394+
.expect("should have bytes_written metric");
395+
assert!(
396+
bytes_written.value().as_usize() > 0,
397+
"expected bytes_written > 0, got {}",
398+
bytes_written.value().as_usize()
399+
);
400+
401+
// elapsed_compute should be > 0
402+
let elapsed = aggregated
403+
.iter()
404+
.find(|m| m.value().name() == "elapsed_compute")
405+
.expect("should have elapsed_compute metric");
406+
assert!(
407+
elapsed.value().as_usize() > 0,
408+
"expected elapsed_compute > 0"
409+
);
410+
411+
Ok(())
412+
}
413+
414+
/// Test that ParquetSink metrics work with single_file_parallelism enabled.
415+
#[tokio::test]
416+
async fn test_parquet_sink_metrics_parallel() -> Result<()> {
417+
use arrow::array::Int32Array;
418+
use arrow::datatypes::{DataType, Field, Schema};
419+
use arrow::record_batch::RecordBatch;
420+
use datafusion_execution::TaskContext;
421+
422+
use futures::TryStreamExt;
423+
424+
let ctx = SessionContext::new();
425+
ctx.sql("SET datafusion.execution.parquet.allow_single_file_parallelism = true")
426+
.await?
427+
.collect()
428+
.await?;
429+
430+
let tmp_dir = TempDir::new()?;
431+
let output_path = tmp_dir.path().join("metrics_parallel.parquet");
432+
let output_path_str = output_path.to_str().unwrap();
433+
434+
let schema =
435+
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
436+
let ids: Vec<i32> = (0..50).collect();
437+
let batch = RecordBatch::try_new(
438+
Arc::clone(&schema),
439+
vec![Arc::new(Int32Array::from(ids))],
440+
)?;
441+
ctx.register_batch("source2", batch)?;
442+
443+
let df = ctx
444+
.sql(&format!(
445+
"COPY source2 TO '{output_path_str}' STORED AS PARQUET"
446+
))
447+
.await?;
448+
let plan = df.create_physical_plan().await?;
449+
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
450+
let stream = plan.execute(0, task_ctx)?;
451+
let _batches: Vec<_> = stream.try_collect().await?;
452+
453+
let metrics = plan.metrics().expect("DataSinkExec should return metrics");
454+
let aggregated = metrics.aggregate_by_name();
455+
456+
let rows_written = aggregated
457+
.iter()
458+
.find(|m| m.value().name() == "rows_written")
459+
.expect("should have rows_written metric");
460+
assert_eq!(rows_written.value().as_usize(), 50);
461+
462+
let bytes_written = aggregated
463+
.iter()
464+
.find(|m| m.value().name() == "bytes_written")
465+
.expect("should have bytes_written metric");
466+
assert!(bytes_written.value().as_usize() > 0);
467+
468+
Ok(())
469+
}
327470
}

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReserv
5454
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
5555
use datafusion_expr::dml::InsertOp;
5656
use datafusion_physical_expr_common::sort_expr::LexRequirement;
57+
use datafusion_physical_plan::metrics::{
58+
ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
59+
};
5760
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
5861
use datafusion_session::Session;
5962

@@ -1079,6 +1082,8 @@ pub struct ParquetSink {
10791082
/// File metadata from successfully produced parquet files. The Mutex is only used
10801083
/// to allow inserting to HashMap from behind borrowed reference in DataSink::write_all.
10811084
written: Arc<parking_lot::Mutex<HashMap<Path, ParquetMetaData>>>,
1085+
/// Metrics for tracking write operations
1086+
metrics: ExecutionPlanMetricsSet,
10821087
}
10831088

10841089
impl Debug for ParquetSink {
@@ -1110,6 +1115,7 @@ impl ParquetSink {
11101115
config,
11111116
parquet_options,
11121117
written: Default::default(),
1118+
metrics: ExecutionPlanMetricsSet::new(),
11131119
}
11141120
}
11151121

@@ -1240,6 +1246,14 @@ impl FileSink for ParquetSink {
12401246
mut file_stream_rx: DemuxedStreamReceiver,
12411247
object_store: Arc<dyn ObjectStore>,
12421248
) -> Result<u64> {
1249+
let rows_written_counter =
1250+
MetricBuilder::new(&self.metrics).global_counter("rows_written");
1251+
let bytes_written_counter =
1252+
MetricBuilder::new(&self.metrics).global_counter("bytes_written");
1253+
let elapsed_compute = MetricBuilder::new(&self.metrics).elapsed_compute(0);
1254+
1255+
let write_start = datafusion_common::instant::Instant::now();
1256+
12431257
let parquet_opts = &self.parquet_options;
12441258

12451259
let mut file_write_tasks: JoinSet<
@@ -1317,12 +1331,18 @@ impl FileSink for ParquetSink {
13171331
}
13181332
}
13191333

1320-
let mut row_count = 0;
13211334
while let Some(result) = file_write_tasks.join_next().await {
13221335
match result {
13231336
Ok(r) => {
13241337
let (path, parquet_meta_data) = r?;
1325-
row_count += parquet_meta_data.file_metadata().num_rows();
1338+
let file_rows = parquet_meta_data.file_metadata().num_rows() as usize;
1339+
let file_bytes: usize = parquet_meta_data
1340+
.row_groups()
1341+
.iter()
1342+
.map(|rg| rg.compressed_size() as usize)
1343+
.sum();
1344+
rows_written_counter.add(file_rows);
1345+
bytes_written_counter.add(file_bytes);
13261346
let mut written_files = self.written.lock();
13271347
written_files
13281348
.try_insert(path.clone(), parquet_meta_data)
@@ -1344,7 +1364,9 @@ impl FileSink for ParquetSink {
13441364
.await
13451365
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
13461366

1347-
Ok(row_count as u64)
1367+
elapsed_compute.add_elapsed(write_start);
1368+
1369+
Ok(rows_written_counter.value() as u64)
13481370
}
13491371
}
13501372

@@ -1354,6 +1376,10 @@ impl DataSink for ParquetSink {
13541376
self
13551377
}
13561378

1379+
fn metrics(&self) -> Option<MetricsSet> {
1380+
Some(self.metrics.clone_inner())
1381+
}
1382+
13571383
fn schema(&self) -> &SchemaRef {
13581384
self.config.output_schema()
13591385
}

datafusion/sqllogictest/test_files/copy.slt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,17 @@ physical_plan
200200
01)DataSinkExec: sink=ParquetSink(file_groups=[])
201201
02)--DataSourceExec: partitions=1, partition_sizes=[1]
202202

203+
# Verify ParquetSink exposes rows_written, bytes_written, and elapsed_compute metrics
204+
# Use a query with Sort and Projection to verify metrics across all operators
205+
query TT
206+
EXPLAIN ANALYZE COPY (SELECT col1, upper(col2) AS col2_upper FROM source_table ORDER BY col1) TO 'test_files/scratch/copy/table_metrics/' STORED AS PARQUET;
207+
----
208+
Plan with Metrics
209+
01)DataSinkExec: sink=ParquetSink(file_groups=[]), metrics=[elapsed_compute=<slt:ignore>, bytes_written=<slt:ignore>, rows_written=2]
210+
02)--SortExec: expr=[col1@0 ASC NULLS LAST], preserve_partitioning=[false], metrics=[output_rows=2, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, output_batches=<slt:ignore>, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0]
211+
03)----ProjectionExec: expr=[col1@0 as col1, upper(col2@1) as col2_upper], metrics=[output_rows=2, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, output_batches=1, expr_0_eval_time=<slt:ignore>, expr_1_eval_time=<slt:ignore>]
212+
04)------DataSourceExec: partitions=1, partition_sizes=[1], metrics=[]
213+
203214
# Copy to directory as partitioned files with keep_partition_by_columns enabled
204215
query I
205216
COPY (values ('1', 'a'), ('2', 'b'), ('3', 'c')) TO 'test_files/scratch/copy/partitioned_table4/' STORED AS parquet PARTITIONED BY (column1)

0 commit comments

Comments
 (0)