Skip to content
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
64c3dd7
save
andygrove Nov 21, 2025
c6fe639
save
andygrove Nov 21, 2025
2a8dc52
save
andygrove Nov 21, 2025
e4acf93
save
andygrove Nov 21, 2025
82a552d
save
andygrove Nov 21, 2025
ba93de7
save
andygrove Nov 21, 2025
96087ef
save [skip ci]
andygrove Nov 21, 2025
e6bb5cf
delete some tests
andygrove Nov 21, 2025
8c5a6be
test passes
andygrove Nov 21, 2025
42e0c79
prep for review
andygrove Nov 21, 2025
fac5c56
improve test
andygrove Nov 21, 2025
30a503f
prep for review
andygrove Nov 21, 2025
5174ba3
remove partition id from proto
andygrove Nov 21, 2025
8748d85
prep for review
andygrove Nov 21, 2025
f1b7ba8
move test
andygrove Nov 21, 2025
3ad4d6e
clippy
andygrove Nov 21, 2025
f35196b
code cleanup
andygrove Nov 21, 2025
1ea7f31
preserve column names
andygrove Nov 21, 2025
61b6690
fix assertion
andygrove Nov 21, 2025
1c00aae
test
andygrove Nov 21, 2025
e1e357e
improve test
andygrove Nov 21, 2025
9fb2b53
refactor to use operator serde framework
andygrove Nov 21, 2025
457a9b9
skip testing for native_datafusion for now
andygrove Nov 21, 2025
757ba81
remove hard-coded compression codec
andygrove Nov 21, 2025
7b9e925
lz4 level
andygrove Nov 21, 2025
913f2ba
remove partition id from proto
andygrove Nov 21, 2025
de07d45
implement getSupportLevel
andygrove Nov 22, 2025
eb4fee4
move config to testing category
andygrove Nov 22, 2025
9a6e48c
fuzz test
andygrove Nov 22, 2025
8b1cb0b
remove snappy from filename
andygrove Nov 22, 2025
58c056d
remove snappy from filename
andygrove Nov 22, 2025
bd04274
docs
andygrove Nov 22, 2025
55d5d48
fix ci
andygrove Nov 22, 2025
cda8322
Revert "fix ci"
andygrove Nov 22, 2025
6a4726b
Merge remote-tracking branch 'apache/main' into parquet-write-poc
andygrove Nov 24, 2025
1d1d430
partially address feedback
andygrove Nov 24, 2025
ece289a
Update spark/src/main/scala/org/apache/comet/serde/operator/CometData…
andygrove Nov 25, 2025
b7daa61
format
andygrove Nov 26, 2025
44991b2
address more feedback
andygrove Nov 26, 2025
8d9b41a
cargo fmt
andygrove Nov 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ jobs:
org.apache.comet.exec.DisableAQECometAsyncShuffleSuite
- name: "parquet"
value: |
org.apache.comet.parquet.CometParquetWriterSuite
org.apache.comet.parquet.ParquetReadV1Suite
org.apache.comet.parquet.ParquetReadV2Suite
org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ jobs:
org.apache.comet.exec.DisableAQECometAsyncShuffleSuite
- name: "parquet"
value: |
org.apache.comet.parquet.CometParquetWriterSuite
org.apache.comet.parquet.ParquetReadV1Suite
org.apache.comet.parquet.ParquetReadV2Suite
org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite
Expand Down
11 changes: 11 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(true)

val COMET_NATIVE_PARQUET_WRITE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.parquet.write.enabled")
.category(CATEGORY_TESTING)
.doc(
"Whether to enable native Parquet write through Comet. When enabled, " +
"Comet will intercept Parquet write operations and execute them natively. This " +
"feature is highly experimental and only partially implemented. It should not " +
"be used in production.")
.booleanConf
.createWithDefault(false)

val SCAN_NATIVE_COMET = "native_comet"
val SCAN_NATIVE_DATAFUSION = "native_datafusion"
val SCAN_NATIVE_ICEBERG_COMPAT = "native_iceberg_compat"
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ These settings can be used to determine which parts of the plan are accelerated
| `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. It can be overridden by the environment variable `ENABLE_COMET_ONHEAP`. | false |
| `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, `greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and `unbounded`. | greedy_task_shared |
| `spark.comet.memoryOverhead` | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. | 1024 MiB |
| `spark.comet.parquet.write.enabled` | Whether to enable native Parquet write through Comet. When enabled, Comet will intercept Parquet write operations and execute them natively. This feature is highly experimental and only partially implemented. It should not be used in production. | false |
| `spark.comet.sparkToColumnar.enabled` | Whether to enable Spark to Arrow columnar conversion. When this is turned on, Comet will convert operators in `spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before processing. This is an experimental feature and has known issues with non-UTC timezones. | false |
| `spark.comet.sparkToColumnar.supportedOperatorList` | A comma-separated list of operators that will be converted to Arrow columnar format when `spark.comet.sparkToColumnar.enabled` is true. | Range,InMemoryTableScan,RDDScan |
| `spark.comet.testing.strict` | Experimental option to enable strict testing, which will fail tests that could be more comprehensive, such as checking for a specific fallback reason. It can be overridden by the environment variable `ENABLE_COMET_STRICT_TESTING`. | false |
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ not supported by Comet will fall back to regular Spark execution.
| BatchScanExec | Yes | Supports Parquet files and Apache Iceberg Parquet scans. See the [Comet Compatibility Guide] for more information. |
| BroadcastExchangeExec | Yes | |
| BroadcastHashJoinExec | Yes | |
| DataWritingCommandExec | No | Experimental support for native Parquet writes. Disabled by default. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it mean also Iceberg writes?

Copy link
Member

@wForget wForget Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change to InsertIntoHadoopFsRelationCommand here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I updated this.

| ExpandExec | Yes | |
| FileSourceScanExec | Yes | Supports Parquet files. See the [Comet Compatibility Guide] for more information. |
| FilterExec | Yes | |
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ mod copy;
mod expand;
pub use expand::ExpandExec;
mod iceberg_scan;
mod parquet_writer;
pub use parquet_writer::ParquetWriterExec;
mod scan;

/// Error returned during executing operators.
Expand Down
282 changes: 282 additions & 0 deletions native/core/src/execution/operators/parquet_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Parquet writer operator for writing RecordBatches to Parquet files
use std::{
any::Any,
fmt,
fmt::{Debug, Formatter},
fs::File,
sync::Arc,
};

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion::{
error::{DataFusionError, Result},
execution::context::TaskContext,
physical_expr::EquivalenceProperties,
physical_plan::{
execution_plan::{Boundedness, EmissionType},
metrics::{ExecutionPlanMetricsSet, MetricsSet},
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
SendableRecordBatchStream, Statistics,
},
};
use futures::TryStreamExt;
use parquet::{
arrow::ArrowWriter,
basic::{Compression, ZstdLevel},
file::properties::WriterProperties,
};

use crate::execution::shuffle::CompressionCodec;

/// Parquet writer operator that writes input batches to a Parquet file
#[derive(Debug)]
pub struct ParquetWriterExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
/// Output file path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it file or folder?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is folder. Files named part-*-.parquet will be created within the folder

output_path: String,
/// Compression codec
compression: CompressionCodec,
/// Partition ID (from Spark TaskContext)
partition_id: i32,
/// Column names to use in the output Parquet file
column_names: Vec<String>,
/// Metrics
metrics: ExecutionPlanMetricsSet,
/// Cache for plan properties
cache: PlanProperties,
}

impl ParquetWriterExec {
/// Create a new ParquetWriterExec
pub fn try_new(
input: Arc<dyn ExecutionPlan>,
output_path: String,
compression: CompressionCodec,
partition_id: i32,
column_names: Vec<String>,
) -> Result<Self> {
// Preserve the input's partitioning so each partition writes its own file
let input_partitioning = input.output_partitioning().clone();

let cache = PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&input.schema())),
input_partitioning,
EmissionType::Final,
Boundedness::Bounded,
);

Ok(ParquetWriterExec {
input,
output_path,
compression,
partition_id,
column_names,
metrics: ExecutionPlanMetricsSet::new(),
cache,
})
}

fn compression_to_parquet(&self) -> Result<Compression> {
match self.compression {
CompressionCodec::None => Ok(Compression::UNCOMPRESSED),
CompressionCodec::Zstd(level) => Ok(Compression::ZSTD(ZstdLevel::try_new(level)?)),
CompressionCodec::Lz4Frame => Ok(Compression::LZ4),
CompressionCodec::Snappy => Ok(Compression::SNAPPY),
}
}
}

impl DisplayAs for ParquetWriterExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"ParquetWriterExec: path={}, compression={:?}",
self.output_path, self.compression
)
}
DisplayFormatType::TreeRender => unimplemented!(),
}
}
}

#[async_trait]
impl ExecutionPlan for ParquetWriterExec {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"ParquetWriterExec"
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Result<Statistics> {
self.input.partition_statistics(None)
}

fn properties(&self) -> &PlanProperties {
&self.cache
}

fn schema(&self) -> SchemaRef {
self.input.schema()
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
match children.len() {
1 => Ok(Arc::new(ParquetWriterExec::try_new(
Arc::clone(&children[0]),
self.output_path.clone(),
self.compression.clone(),
self.partition_id,
self.column_names.clone(),
)?)),
_ => Err(DataFusionError::Internal(
"ParquetWriterExec requires exactly one child".to_string(),
)),
}
}

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, context)?;
let input_schema = self.schema();
let output_path = self.output_path.clone();
let compression = self.compression_to_parquet()?;
let column_names = self.column_names.clone();

// Create output schema with correct column names
let output_schema = if !column_names.is_empty() {
// Replace the generic column names (col_0, col_1, etc.) with the actual names
let fields: Vec<_> = input_schema
.fields()
.iter()
.enumerate()
.map(|(i, field)| {
if i < column_names.len() {
Arc::new(field.as_ref().clone().with_name(&column_names[i]))
} else {
Arc::clone(field)
}
})
.collect();
Arc::new(arrow::datatypes::Schema::new(fields))
} else {
// No column names provided, use input schema as-is
Arc::clone(&input_schema)
};

// Strip file:// or file: prefix if present
let local_path = output_path
.strip_prefix("file://")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if hdfs:// ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a fallback for now so that it falls back to Spark if the path does not start with file:

.or_else(|| output_path.strip_prefix("file:"))
.unwrap_or(&output_path)
.to_string();

// Create output directory
std::fs::create_dir_all(&local_path).map_err(|e| {
DataFusionError::Execution(format!(
"Failed to create output directory '{}': {}",
local_path, e
))
})?;

// Generate part file name for this partition
let part_file = format!("{}/part-{:05}.parquet", local_path, self.partition_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seem right, the extension will be different depending on the codec

.snappy.parquet
.gz.parquet
etc

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file name is best generated by FileCommitProtocol later, so hardcoding it on the native side for now makes sense to me.
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L156-L163

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @wForget, are you proposing to keep hardcoded names for the PR and replicate Spark getFilename later?

Copy link
Member

@wForget wForget Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this pr seems to be missing some work related to file commit. My proposed write process might look like this: create a staging dir -> native write files to staging dir -> file commit (move and merge staging files) -> add or update partitions

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #2827 for implementing the file commit protocol.

This PR adds a starting point for development. Once it is merged then other contributors can help add the missing features.


// Create the Parquet file
let file = File::create(&part_file).map_err(|e| {
DataFusionError::Execution(format!(
"Failed to create output file '{}': {}",
part_file, e
))
})?;

// Configure writer properties
let props = WriterProperties::builder()
.set_compression(compression)
.build();

let mut writer = ArrowWriter::try_new(file, Arc::clone(&output_schema), Some(props))
.map_err(|e| DataFusionError::Execution(format!("Failed to create writer: {}", e)))?;

// Clone schema for use in async closure
let schema_for_write = Arc::clone(&output_schema);

// Write batches
let write_task = async move {
let mut stream = input;

while let Some(batch_result) = stream.try_next().await.transpose() {
let batch = batch_result?;

// Rename columns in the batch to match output schema
let renamed_batch = if !column_names.is_empty() {
use arrow::record_batch::RecordBatch;
RecordBatch::try_new(Arc::clone(&schema_for_write), batch.columns().to_vec())
.map_err(|e| {
DataFusionError::Execution(format!(
"Failed to rename batch columns: {}",
e
))
})?
} else {
batch
};

writer.write(&renamed_batch).map_err(|e| {
DataFusionError::Execution(format!("Failed to write batch: {}", e))
})?;
}

writer.close().map_err(|e| {
DataFusionError::Execution(format!("Failed to close writer: {}", e))
})?;

// Return empty stream to indicate completion
Ok::<_, DataFusionError>(futures::stream::empty())
};

// Execute the write task and convert to a stream
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
Ok(Box::pin(RecordBatchStreamAdapter::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the partition failed? what would happen with the folder?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷 this is all highly experimental so far

output_schema,
futures::stream::once(write_task).try_flatten(),
)))
}
}
34 changes: 33 additions & 1 deletion native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
errors::ExpressionError,
execution::{
expressions::subquery::Subquery,
operators::{ExecutionError, ExpandExec, ScanExec},
operators::{ExecutionError, ExpandExec, ParquetWriterExec, ScanExec},
serde::to_arrow_datatype,
shuffle::ShuffleWriterExec,
},
Expand Down Expand Up @@ -1448,6 +1448,38 @@ impl PhysicalPlanner {
)),
))
}
OpStruct::ParquetWriter(writer) => {
assert_eq!(children.len(), 1);
let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?;

let codec = match writer.compression.try_into() {
Ok(SparkCompressionCodec::None) => Ok(CompressionCodec::None),
Ok(SparkCompressionCodec::Snappy) => Ok(CompressionCodec::Snappy),
Ok(SparkCompressionCodec::Zstd) => Ok(CompressionCodec::Zstd(3)),
Ok(SparkCompressionCodec::Lz4) => Ok(CompressionCodec::Lz4Frame),
_ => Err(GeneralError(format!(
"Unsupported parquet compression codec: {:?}",
writer.compression
))),
}?;

let parquet_writer = Arc::new(ParquetWriterExec::try_new(
Arc::clone(&child.native_plan),
writer.output_path.clone(),
codec,
self.partition,
writer.column_names.clone(),
)?);

Ok((
scans,
Arc::new(SparkPlan::new(
spark_plan.plan_id,
parquet_writer,
vec![Arc::clone(&child)],
)),
))
}
OpStruct::Expand(expand) => {
assert_eq!(children.len(), 1);
let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?;
Expand Down
Loading
Loading