Skip to content

Commit 5beaa32

Browse files
authored
Merge branch 'apache:main' into non-utf8-csv2
2 parents f1f9c5f + 1f37a33 commit 5beaa32

File tree

12 files changed

+914
-247
lines changed

12 files changed

+914
-247
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ hex = { version = "0.4.3" }
162162
indexmap = "2.13.0"
163163
insta = { version = "1.46.3", features = ["glob", "filters"] }
164164
itertools = "0.14"
165+
itoa = "1.0"
165166
liblzma = { version = "0.4.6", features = ["static"] }
166167
log = "^0.4"
167168
memchr = "2.8.0"

datafusion/core/src/dataframe/parquet.rs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,19 @@ mod tests {
127127
use tempfile::TempDir;
128128
use url::Url;
129129

130+
/// Helper to extract a metric value by name from aggregated metrics.
131+
fn metric_usize(
132+
aggregated: &datafusion_physical_expr_common::metrics::MetricsSet,
133+
name: &str,
134+
) -> usize {
135+
aggregated
136+
.iter()
137+
.find(|m| m.value().name() == name)
138+
.unwrap_or_else(|| panic!("should have {name} metric"))
139+
.value()
140+
.as_usize()
141+
}
142+
130143
#[tokio::test]
131144
async fn filter_pushdown_dataframe() -> Result<()> {
132145
let ctx = SessionContext::new();
@@ -430,6 +443,126 @@ mod tests {
430443
Ok(())
431444
}
432445

446+
/// Test that ParquetSink exposes rows_written, bytes_written, and
447+
/// elapsed_compute metrics via DataSinkExec.
448+
#[tokio::test]
449+
async fn test_parquet_sink_metrics() -> Result<()> {
450+
use arrow::array::Int32Array;
451+
use arrow::datatypes::{DataType, Field, Schema};
452+
use arrow::record_batch::RecordBatch;
453+
use datafusion_execution::TaskContext;
454+
455+
use futures::TryStreamExt;
456+
457+
let ctx = SessionContext::new();
458+
let tmp_dir = TempDir::new()?;
459+
let output_path = tmp_dir.path().join("metrics_test.parquet");
460+
let output_path_str = output_path.to_str().unwrap();
461+
462+
// Register a table with 100 rows
463+
let schema = Arc::new(Schema::new(vec![
464+
Field::new("id", DataType::Int32, false),
465+
Field::new("val", DataType::Int32, false),
466+
]));
467+
let ids: Vec<i32> = (0..100).collect();
468+
let vals: Vec<i32> = (100..200).collect();
469+
let batch = RecordBatch::try_new(
470+
Arc::clone(&schema),
471+
vec![
472+
Arc::new(Int32Array::from(ids)),
473+
Arc::new(Int32Array::from(vals)),
474+
],
475+
)?;
476+
ctx.register_batch("source", batch)?;
477+
478+
// Create the physical plan for COPY TO
479+
let df = ctx
480+
.sql(&format!(
481+
"COPY source TO '{output_path_str}' STORED AS PARQUET"
482+
))
483+
.await?;
484+
let plan = df.create_physical_plan().await?;
485+
486+
// Execute the plan
487+
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
488+
let stream = plan.execute(0, task_ctx)?;
489+
let _batches: Vec<_> = stream.try_collect().await?;
490+
491+
// Check metrics on the DataSinkExec (top-level plan)
492+
let metrics = plan
493+
.metrics()
494+
.expect("DataSinkExec should return metrics from ParquetSink");
495+
let aggregated = metrics.aggregate_by_name();
496+
497+
// rows_written should be 100
498+
assert_eq!(
499+
metric_usize(&aggregated, "rows_written"),
500+
100,
501+
"expected 100 rows written"
502+
);
503+
504+
// bytes_written should be > 0
505+
let bytes_written = metric_usize(&aggregated, "bytes_written");
506+
assert!(
507+
bytes_written > 0,
508+
"expected bytes_written > 0, got {bytes_written}"
509+
);
510+
511+
// elapsed_compute should be > 0
512+
let elapsed = metric_usize(&aggregated, "elapsed_compute");
513+
assert!(elapsed > 0, "expected elapsed_compute > 0");
514+
515+
Ok(())
516+
}
517+
518+
/// Test that ParquetSink metrics work with single_file_parallelism enabled.
519+
#[tokio::test]
520+
async fn test_parquet_sink_metrics_parallel() -> Result<()> {
521+
use arrow::array::Int32Array;
522+
use arrow::datatypes::{DataType, Field, Schema};
523+
use arrow::record_batch::RecordBatch;
524+
use datafusion_execution::TaskContext;
525+
526+
use futures::TryStreamExt;
527+
528+
let ctx = SessionContext::new();
529+
ctx.sql("SET datafusion.execution.parquet.allow_single_file_parallelism = true")
530+
.await?
531+
.collect()
532+
.await?;
533+
534+
let tmp_dir = TempDir::new()?;
535+
let output_path = tmp_dir.path().join("metrics_parallel.parquet");
536+
let output_path_str = output_path.to_str().unwrap();
537+
538+
let schema =
539+
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
540+
let ids: Vec<i32> = (0..50).collect();
541+
let batch = RecordBatch::try_new(
542+
Arc::clone(&schema),
543+
vec![Arc::new(Int32Array::from(ids))],
544+
)?;
545+
ctx.register_batch("source2", batch)?;
546+
547+
let df = ctx
548+
.sql(&format!(
549+
"COPY source2 TO '{output_path_str}' STORED AS PARQUET"
550+
))
551+
.await?;
552+
let plan = df.create_physical_plan().await?;
553+
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
554+
let stream = plan.execute(0, task_ctx)?;
555+
let _batches: Vec<_> = stream.try_collect().await?;
556+
557+
let metrics = plan.metrics().expect("DataSinkExec should return metrics");
558+
let aggregated = metrics.aggregate_by_name();
559+
560+
assert_eq!(metric_usize(&aggregated, "rows_written"), 50);
561+
assert!(metric_usize(&aggregated, "bytes_written") > 0);
562+
563+
Ok(())
564+
}
565+
433566
/// Test FileOutputMode::Directory - explicitly request directory output
434567
/// even for paths WITH file extensions.
435568
#[tokio::test]

0 commit comments

Comments
 (0)