Skip to content

Commit b9a3b9f

Browse files
adriangbclaudeCopilot
authored
Record sort order when writing Parquet with WITH ORDER (apache#19595)
## Which issue does this PR close? Part of apache#19433 ## Rationale for this change When writing data to a table created with `CREATE EXTERNAL TABLE ... WITH ORDER`, the sorting columns should be recorded in the Parquet file's row group metadata. This allows downstream readers to know the data is sorted and potentially skip sorting operations. ## What changes are included in this PR? - Add `sort_expr_to_sorting_column()` and `lex_ordering_to_sorting_columns()` functions in `metadata.rs` to convert DataFusion ordering to Parquet `SortingColumn` - Add `sorting_columns` field to `ParquetSink` with `with_sorting_columns()` builder method - Update `create_writer_physical_plan()` to pass order requirements to `ParquetSink` - Update `create_writer_props()` to set sorting columns on `WriterProperties` - Add test verifying `sorting_columns` metadata is written correctly ## Are these changes tested? Yes, added `test_create_table_with_order_writes_sorting_columns` that: 1. Creates an external table with `WITH ORDER (a ASC NULLS FIRST, b DESC NULLS LAST)` 2. Inserts data 3. Reads the Parquet file and verifies the `sorting_columns` metadata matches the expected order ## Are there any user-facing changes? No user-facing API changes. Parquet files written via `INSERT INTO` or `COPY` for tables with `WITH ORDER` will now contain `sorting_columns` metadata in the row group. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 0cf45ca commit b9a3b9f

File tree

4 files changed

+185
-4
lines changed

4 files changed

+185
-4
lines changed

datafusion/core/tests/parquet/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ mod expr_adapter;
5050
mod external_access_plan;
5151
mod file_statistics;
5252
mod filter_pushdown;
53+
mod ordering;
5354
mod page_pruning;
5455
mod row_group_pruning;
5556
mod schema;
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Tests for ordering in Parquet sorting_columns metadata
19+
20+
use datafusion::prelude::SessionContext;
21+
use datafusion_common::Result;
22+
use tempfile::tempdir;
23+
24+
/// Test that CREATE TABLE ... WITH ORDER writes sorting_columns to Parquet metadata
25+
#[tokio::test]
26+
async fn test_create_table_with_order_writes_sorting_columns() -> Result<()> {
27+
use parquet::file::reader::FileReader;
28+
use parquet::file::serialized_reader::SerializedFileReader;
29+
use std::fs::File;
30+
31+
let ctx = SessionContext::new();
32+
let tmp_dir = tempdir()?;
33+
let table_path = tmp_dir.path().join("sorted_table");
34+
std::fs::create_dir_all(&table_path)?;
35+
36+
// Create external table with ordering
37+
let create_table_sql = format!(
38+
"CREATE EXTERNAL TABLE sorted_data (a INT, b VARCHAR) \
39+
STORED AS PARQUET \
40+
LOCATION '{}' \
41+
WITH ORDER (a ASC NULLS FIRST, b DESC NULLS LAST)",
42+
table_path.display()
43+
);
44+
ctx.sql(&create_table_sql).await?;
45+
46+
// Insert sorted data
47+
ctx.sql("INSERT INTO sorted_data VALUES (1, 'x'), (2, 'y'), (3, 'z')")
48+
.await?
49+
.collect()
50+
.await?;
51+
52+
// Find the parquet file that was written
53+
let parquet_files: Vec<_> = std::fs::read_dir(&table_path)?
54+
.filter_map(|e| e.ok())
55+
.filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
56+
.collect();
57+
58+
assert!(
59+
!parquet_files.is_empty(),
60+
"Expected at least one parquet file in {}",
61+
table_path.display()
62+
);
63+
64+
// Read the parquet file and verify sorting_columns metadata
65+
let file = File::open(parquet_files[0].path())?;
66+
let reader = SerializedFileReader::new(file)?;
67+
let metadata = reader.metadata();
68+
69+
// Check that row group has sorting_columns
70+
let row_group = metadata.row_group(0);
71+
let sorting_columns = row_group.sorting_columns();
72+
73+
assert!(
74+
sorting_columns.is_some(),
75+
"Expected sorting_columns in row group metadata"
76+
);
77+
let sorting = sorting_columns.unwrap();
78+
assert_eq!(sorting.len(), 2, "Expected 2 sorting columns");
79+
80+
// First column: a ASC NULLS FIRST (column_idx = 0)
81+
assert_eq!(sorting[0].column_idx, 0, "First sort column should be 'a'");
82+
assert!(
83+
!sorting[0].descending,
84+
"First column should be ASC (descending=false)"
85+
);
86+
assert!(
87+
sorting[0].nulls_first,
88+
"First column should have NULLS FIRST"
89+
);
90+
91+
// Second column: b DESC NULLS LAST (column_idx = 1)
92+
assert_eq!(sorting[1].column_idx, 1, "Second sort column should be 'b'");
93+
assert!(
94+
sorting[1].descending,
95+
"Second column should be DESC (descending=true)"
96+
);
97+
assert!(
98+
!sorting[1].nulls_first,
99+
"Second column should have NULLS LAST"
100+
);
101+
102+
Ok(())
103+
}

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,11 @@ use datafusion_datasource::sink::{DataSink, DataSinkExec};
5454
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
5555
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
5656
use datafusion_expr::dml::InsertOp;
57-
use datafusion_physical_expr_common::sort_expr::LexRequirement;
57+
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
5858
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
5959
use datafusion_session::Session;
6060

61-
use crate::metadata::DFParquetMetadata;
61+
use crate::metadata::{DFParquetMetadata, lex_ordering_to_sorting_columns};
6262
use crate::reader::CachedParquetFileReaderFactory;
6363
use crate::source::{ParquetSource, parse_coerce_int96_string};
6464
use async_trait::async_trait;
@@ -81,7 +81,7 @@ use parquet::basic::Type;
8181
#[cfg(feature = "parquet_encryption")]
8282
use parquet::encryption::encrypt::FileEncryptionProperties;
8383
use parquet::errors::ParquetError;
84-
use parquet::file::metadata::ParquetMetaData;
84+
use parquet::file::metadata::{ParquetMetaData, SortingColumn};
8585
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
8686
use parquet::file::writer::SerializedFileWriter;
8787
use parquet::schema::types::SchemaDescriptor;
@@ -500,7 +500,22 @@ impl FileFormat for ParquetFormat {
500500
return not_impl_err!("Overwrites are not implemented yet for Parquet");
501501
}
502502

503-
let sink = Arc::new(ParquetSink::new(conf, self.options.clone()));
503+
// Convert ordering requirements to Parquet SortingColumns for file metadata
504+
let sorting_columns = if let Some(ref requirements) = order_requirements {
505+
let ordering: LexOrdering = requirements.clone().into();
506+
// In cases like `COPY (... ORDER BY ...) TO ...` the ORDER BY clause
507+
// may not be compatible with Parquet sorting columns (e.g. ordering on `random()`).
508+
// So if we cannot create a Parquet sorting column from the ordering requirement,
509+
// we skip setting sorting columns on the Parquet sink.
510+
lex_ordering_to_sorting_columns(&ordering).ok()
511+
} else {
512+
None
513+
};
514+
515+
let sink = Arc::new(
516+
ParquetSink::new(conf, self.options.clone())
517+
.with_sorting_columns(sorting_columns),
518+
);
504519

505520
Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
506521
}
@@ -1088,6 +1103,8 @@ pub struct ParquetSink {
10881103
/// File metadata from successfully produced parquet files. The Mutex is only used
10891104
/// to allow inserting to HashMap from behind borrowed reference in DataSink::write_all.
10901105
written: Arc<parking_lot::Mutex<HashMap<Path, ParquetMetaData>>>,
1106+
/// Optional sorting columns to write to Parquet metadata
1107+
sorting_columns: Option<Vec<SortingColumn>>,
10911108
}
10921109

10931110
impl Debug for ParquetSink {
@@ -1119,9 +1136,19 @@ impl ParquetSink {
11191136
config,
11201137
parquet_options,
11211138
written: Default::default(),
1139+
sorting_columns: None,
11221140
}
11231141
}
11241142

1143+
/// Set sorting columns for the Parquet file metadata.
1144+
pub fn with_sorting_columns(
1145+
mut self,
1146+
sorting_columns: Option<Vec<SortingColumn>>,
1147+
) -> Self {
1148+
self.sorting_columns = sorting_columns;
1149+
self
1150+
}
1151+
11251152
/// Retrieve the file metadata for the written files, keyed to the path
11261153
/// which may be partitioned (in the case of hive style partitioning).
11271154
pub fn written(&self) -> HashMap<Path, ParquetMetaData> {
@@ -1145,6 +1172,12 @@ impl ParquetSink {
11451172
}
11461173

11471174
let mut builder = WriterPropertiesBuilder::try_from(&parquet_opts)?;
1175+
1176+
// Set sorting columns if configured
1177+
if let Some(ref sorting_columns) = self.sorting_columns {
1178+
builder = builder.set_sorting_columns(Some(sorting_columns.clone()));
1179+
}
1180+
11481181
builder = set_writer_encryption_properties(
11491182
builder,
11501183
runtime,

datafusion/datasource-parquet/src/metadata.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ use datafusion_execution::cache::cache_manager::{
3535
CachedFileMetadataEntry, FileMetadata, FileMetadataCache,
3636
};
3737
use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
38+
use datafusion_physical_expr::expressions::Column;
39+
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
3840
use datafusion_physical_plan::Accumulator;
3941
use log::debug;
4042
use object_store::path::Path;
@@ -43,6 +45,7 @@ use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
4345
use parquet::arrow::{parquet_column, parquet_to_arrow_schema};
4446
use parquet::file::metadata::{
4547
PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
48+
SortingColumn,
4649
};
4750
use parquet::schema::types::SchemaDescriptor;
4851
use std::any::Any;
@@ -614,6 +617,47 @@ impl FileMetadata for CachedParquetMetaData {
614617
}
615618
}
616619

620+
/// Convert a [`PhysicalSortExpr`] to a Parquet [`SortingColumn`].
621+
///
622+
/// Returns `Err` if the expression is not a simple column reference.
623+
pub(crate) fn sort_expr_to_sorting_column(
624+
sort_expr: &PhysicalSortExpr,
625+
) -> Result<SortingColumn> {
626+
let column = sort_expr
627+
.expr
628+
.as_any()
629+
.downcast_ref::<Column>()
630+
.ok_or_else(|| {
631+
DataFusionError::Plan(format!(
632+
"Parquet sorting_columns only supports simple column references, \
633+
but got expression: {}",
634+
sort_expr.expr
635+
))
636+
})?;
637+
638+
let column_idx: i32 = column.index().try_into().map_err(|_| {
639+
DataFusionError::Plan(format!(
640+
"Column index {} is too large to be represented as i32",
641+
column.index()
642+
))
643+
})?;
644+
645+
Ok(SortingColumn {
646+
column_idx,
647+
descending: sort_expr.options.descending,
648+
nulls_first: sort_expr.options.nulls_first,
649+
})
650+
}
651+
652+
/// Convert a [`LexOrdering`] to `Vec<SortingColumn>` for Parquet.
653+
///
654+
/// Returns `Err` if any expression is not a simple column reference.
655+
pub(crate) fn lex_ordering_to_sorting_columns(
656+
ordering: &LexOrdering,
657+
) -> Result<Vec<SortingColumn>> {
658+
ordering.iter().map(sort_expr_to_sorting_column).collect()
659+
}
660+
617661
#[cfg(test)]
618662
mod tests {
619663
use super::*;

0 commit comments

Comments
 (0)