diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index d06ccd4410..c4a8e2dee5 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -118,6 +118,7 @@ jobs: org.apache.comet.exec.DisableAQECometAsyncShuffleSuite - name: "parquet" value: | + org.apache.comet.parquet.CometParquetWriterSuite org.apache.comet.parquet.ParquetReadV1Suite org.apache.comet.parquet.ParquetReadV2Suite org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index d57800f4f0..4cb98eb3bc 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -83,6 +83,7 @@ jobs: org.apache.comet.exec.DisableAQECometAsyncShuffleSuite - name: "parquet" value: | + org.apache.comet.parquet.CometParquetWriterSuite org.apache.comet.parquet.ParquetReadV1Suite org.apache.comet.parquet.ParquetReadV2Suite org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 0484b64f16..1e5d19ee23 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -100,6 +100,17 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(true) + val COMET_NATIVE_PARQUET_WRITE_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.parquet.write.enabled") + .category(CATEGORY_TESTING) + .doc( + "Whether to enable native Parquet write through Comet. When enabled, " + + "Comet will intercept Parquet write operations and execute them natively. This " + + "feature is highly experimental and only partially implemented. It should not " + + "be used in production.") + .booleanConf + .createWithDefault(false) + val SCAN_NATIVE_COMET = "native_comet" val SCAN_NATIVE_DATAFUSION = "native_datafusion" val SCAN_NATIVE_ICEBERG_COMPAT = "native_iceberg_compat" diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 1e77032f7d..a1c3212c20 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -142,6 +142,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. It can be overridden by the environment variable `ENABLE_COMET_ONHEAP`. | false | | `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, `greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and `unbounded`. | greedy_task_shared | | `spark.comet.memoryOverhead` | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. | 1024 MiB | +| `spark.comet.parquet.write.enabled` | Whether to enable native Parquet write through Comet. When enabled, Comet will intercept Parquet write operations and execute them natively. This feature is highly experimental and only partially implemented. It should not be used in production. | false | | `spark.comet.sparkToColumnar.enabled` | Whether to enable Spark to Arrow columnar conversion. When this is turned on, Comet will convert operators in `spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before processing. This is an experimental feature and has known issues with non-UTC timezones. | false | | `spark.comet.sparkToColumnar.supportedOperatorList` | A comma-separated list of operators that will be converted to Arrow columnar format when `spark.comet.sparkToColumnar.enabled` is true. | Range,InMemoryTableScan,RDDScan | | `spark.comet.testing.strict` | Experimental option to enable strict testing, which will fail tests that could be more comprehensive, such as checking for a specific fallback reason. It can be overridden by the environment variable `ENABLE_COMET_STRICT_TESTING`. | false | diff --git a/docs/source/user-guide/latest/operators.md b/docs/source/user-guide/latest/operators.md index fdfbcef687..f5f2d9724d 100644 --- a/docs/source/user-guide/latest/operators.md +++ b/docs/source/user-guide/latest/operators.md @@ -22,25 +22,26 @@ The following Spark operators are currently replaced with native versions. Query stages that contain any operators not supported by Comet will fall back to regular Spark execution. -| Operator | Spark-Compatible? | Compatibility Notes | -| ----------------------- | ----------------- | ------------------------------------------------------------------------------------------------------------------ | -| BatchScanExec | Yes | Supports Parquet files and Apache Iceberg Parquet scans. See the [Comet Compatibility Guide] for more information. | -| BroadcastExchangeExec | Yes | | -| BroadcastHashJoinExec | Yes | | -| ExpandExec | Yes | | -| FileSourceScanExec | Yes | Supports Parquet files. See the [Comet Compatibility Guide] for more information. | -| FilterExec | Yes | | -| GlobalLimitExec | Yes | | -| HashAggregateExec | Yes | | -| LocalLimitExec | Yes | | -| LocalTableScanExec | No | Experimental and disabled by default. | -| ObjectHashAggregateExec | Yes | Supports a limited number of aggregates, such as `bloom_filter_agg`. | -| ProjectExec | Yes | | -| ShuffleExchangeExec | Yes | | -| ShuffledHashJoinExec | Yes | | -| SortExec | Yes | | -| SortMergeJoinExec | Yes | | -| UnionExec | Yes | | -| WindowExec | No | Disabled by default due to known correctness issues. | +| Operator | Spark-Compatible? | Compatibility Notes | +| --------------------------------- | ----------------- | ------------------------------------------------------------------------------------------------------------------ | +| BatchScanExec | Yes | Supports Parquet files and Apache Iceberg Parquet scans. See the [Comet Compatibility Guide] for more information. | +| BroadcastExchangeExec | Yes | | +| BroadcastHashJoinExec | Yes | | +| ExpandExec | Yes | | +| FileSourceScanExec | Yes | Supports Parquet files. See the [Comet Compatibility Guide] for more information. | +| FilterExec | Yes | | +| GlobalLimitExec | Yes | | +| HashAggregateExec | Yes | | +| InsertIntoHadoopFsRelationCommand | No | Experimental support for native Parquet writes. Disabled by default. | +| LocalLimitExec | Yes | | +| LocalTableScanExec | No | Experimental and disabled by default. | +| ObjectHashAggregateExec | Yes | Supports a limited number of aggregates, such as `bloom_filter_agg`. | +| ProjectExec | Yes | | +| ShuffleExchangeExec | Yes | | +| ShuffledHashJoinExec | Yes | | +| SortExec | Yes | | +| SortMergeJoinExec | Yes | | +| UnionExec | Yes | | +| WindowExec | No | Disabled by default due to known correctness issues. | [Comet Compatibility Guide]: compatibility.md diff --git a/native/core/src/execution/operators/mod.rs b/native/core/src/execution/operators/mod.rs index b3998e2f60..b01f7857be 100644 --- a/native/core/src/execution/operators/mod.rs +++ b/native/core/src/execution/operators/mod.rs @@ -29,6 +29,8 @@ mod copy; mod expand; pub use expand::ExpandExec; mod iceberg_scan; +mod parquet_writer; +pub use parquet_writer::ParquetWriterExec; mod scan; /// Error returned during executing operators. diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs new file mode 100644 index 0000000000..5536e30dc1 --- /dev/null +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -0,0 +1,278 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Parquet writer operator for writing RecordBatches to Parquet files + +use std::{ + any::Any, + fmt, + fmt::{Debug, Formatter}, + fs::File, + sync::Arc, +}; + +use arrow::datatypes::SchemaRef; +use async_trait::async_trait; +use datafusion::{ + error::{DataFusionError, Result}, + execution::context::TaskContext, + physical_expr::EquivalenceProperties, + physical_plan::{ + execution_plan::{Boundedness, EmissionType}, + metrics::{ExecutionPlanMetricsSet, MetricsSet}, + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + SendableRecordBatchStream, Statistics, + }, +}; +use futures::TryStreamExt; +use parquet::{ + arrow::ArrowWriter, + basic::{Compression, ZstdLevel}, + file::properties::WriterProperties, +}; + +use crate::execution::shuffle::CompressionCodec; + +/// Parquet writer operator that writes input batches to a Parquet file +#[derive(Debug)] +pub struct ParquetWriterExec { + /// Input execution plan + input: Arc, + /// Output file path + output_path: String, + /// Compression codec + compression: CompressionCodec, + /// Partition ID (from Spark TaskContext) + partition_id: i32, + /// Column names to use in the output Parquet file + column_names: Vec, + /// Metrics + metrics: ExecutionPlanMetricsSet, + /// Cache for plan properties + cache: PlanProperties, +} + +impl ParquetWriterExec { + /// Create a new ParquetWriterExec + pub fn try_new( + input: Arc, + output_path: String, + compression: CompressionCodec, + partition_id: i32, + column_names: Vec, + ) -> Result { + // Preserve the input's partitioning so each partition writes its own file + let input_partitioning = input.output_partitioning().clone(); + + let cache = PlanProperties::new( + EquivalenceProperties::new(Arc::clone(&input.schema())), + input_partitioning, + EmissionType::Final, + Boundedness::Bounded, + ); + + Ok(ParquetWriterExec { + input, + output_path, + compression, + partition_id, + column_names, + metrics: ExecutionPlanMetricsSet::new(), + cache, + }) + } + + fn compression_to_parquet(&self) -> Result { + match self.compression { + CompressionCodec::None => Ok(Compression::UNCOMPRESSED), + CompressionCodec::Zstd(level) => Ok(Compression::ZSTD(ZstdLevel::try_new(level)?)), + CompressionCodec::Lz4Frame => Ok(Compression::LZ4), + CompressionCodec::Snappy => Ok(Compression::SNAPPY), + } + } +} + +impl DisplayAs for ParquetWriterExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "ParquetWriterExec: path={}, compression={:?}", + self.output_path, self.compression + ) + } + DisplayFormatType::TreeRender => unimplemented!(), + } + } +} + +#[async_trait] +impl ExecutionPlan for ParquetWriterExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "ParquetWriterExec" + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn statistics(&self) -> Result { + self.input.partition_statistics(None) + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn schema(&self) -> SchemaRef { + self.input.schema() + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + match children.len() { + 1 => Ok(Arc::new(ParquetWriterExec::try_new( + Arc::clone(&children[0]), + self.output_path.clone(), + self.compression.clone(), + self.partition_id, + self.column_names.clone(), + )?)), + _ => Err(DataFusionError::Internal( + "ParquetWriterExec requires exactly one child".to_string(), + )), + } + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + let input = self.input.execute(partition, context)?; + let input_schema = self.schema(); + let output_path = self.output_path.clone(); + let compression = self.compression_to_parquet()?; + let column_names = self.column_names.clone(); + + assert_eq!(input_schema.fields().len(), column_names.len()); + + // Create output schema with correct column names + let output_schema = if !column_names.is_empty() { + // Replace the generic column names (col_0, col_1, etc.) with the actual names + let fields: Vec<_> = input_schema + .fields() + .iter() + .enumerate() + .map(|(i, field)| Arc::new(field.as_ref().clone().with_name(&column_names[i]))) + .collect(); + Arc::new(arrow::datatypes::Schema::new(fields)) + } else { + // No column names provided, use input schema as-is + Arc::clone(&input_schema) + }; + + // Strip file:// or file: prefix if present + let local_path = output_path + .strip_prefix("file://") + .or_else(|| output_path.strip_prefix("file:")) + .unwrap_or(&output_path) + .to_string(); + + // Create output directory + std::fs::create_dir_all(&local_path).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create output directory '{}': {}", + local_path, e + )) + })?; + + // Generate part file name for this partition + let part_file = format!("{}/part-{:05}.parquet", local_path, self.partition_id); + + // Create the Parquet file + let file = File::create(&part_file).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create output file '{}': {}", + part_file, e + )) + })?; + + // Configure writer properties + let props = WriterProperties::builder() + .set_compression(compression) + .build(); + + let mut writer = ArrowWriter::try_new(file, Arc::clone(&output_schema), Some(props)) + .map_err(|e| DataFusionError::Execution(format!("Failed to create writer: {}", e)))?; + + // Clone schema for use in async closure + let schema_for_write = Arc::clone(&output_schema); + + // Write batches + let write_task = async move { + let mut stream = input; + + while let Some(batch_result) = stream.try_next().await.transpose() { + let batch = batch_result?; + + // Rename columns in the batch to match output schema + let renamed_batch = if !column_names.is_empty() { + use arrow::record_batch::RecordBatch; + RecordBatch::try_new(Arc::clone(&schema_for_write), batch.columns().to_vec()) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to rename batch columns: {}", + e + )) + })? + } else { + batch + }; + + writer.write(&renamed_batch).map_err(|e| { + DataFusionError::Execution(format!("Failed to write batch: {}", e)) + })?; + } + + writer.close().map_err(|e| { + DataFusionError::Execution(format!("Failed to close writer: {}", e)) + })?; + + // Return empty stream to indicate completion + Ok::<_, DataFusionError>(futures::stream::empty()) + }; + + // Execute the write task and convert to a stream + use datafusion::physical_plan::stream::RecordBatchStreamAdapter; + Ok(Box::pin(RecordBatchStreamAdapter::new( + output_schema, + futures::stream::once(write_task).try_flatten(), + ))) + } +} diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 0fe04a5a41..b0746a6f84 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -22,7 +22,7 @@ use crate::{ errors::ExpressionError, execution::{ expressions::subquery::Subquery, - operators::{ExecutionError, ExpandExec, ScanExec}, + operators::{ExecutionError, ExpandExec, ParquetWriterExec, ScanExec}, serde::to_arrow_datatype, shuffle::ShuffleWriterExec, }, @@ -1448,6 +1448,38 @@ impl PhysicalPlanner { )), )) } + OpStruct::ParquetWriter(writer) => { + assert_eq!(children.len(), 1); + let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?; + + let codec = match writer.compression.try_into() { + Ok(SparkCompressionCodec::None) => Ok(CompressionCodec::None), + Ok(SparkCompressionCodec::Snappy) => Ok(CompressionCodec::Snappy), + Ok(SparkCompressionCodec::Zstd) => Ok(CompressionCodec::Zstd(3)), + Ok(SparkCompressionCodec::Lz4) => Ok(CompressionCodec::Lz4Frame), + _ => Err(GeneralError(format!( + "Unsupported parquet compression codec: {:?}", + writer.compression + ))), + }?; + + let parquet_writer = Arc::new(ParquetWriterExec::try_new( + Arc::clone(&child.native_plan), + writer.output_path.clone(), + codec, + self.partition, + writer.column_names.clone(), + )?); + + Ok(( + scans, + Arc::new(SparkPlan::new( + spark_plan.plan_id, + parquet_writer, + vec![Arc::clone(&child)], + )), + )) + } OpStruct::Expand(expand) => { assert_eq!(children.len(), 1); let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?; diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 94661a20e6..a958327099 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -49,6 +49,7 @@ message Operator { Window window = 110; NativeScan native_scan = 111; IcebergScan iceberg_scan = 112; + ParquetWriter parquet_writer = 113; } } @@ -236,6 +237,12 @@ message ShuffleWriter { bool tracing_enabled = 7; } +message ParquetWriter { + string output_path = 1; + CompressionCodec compression = 2; + repeated string column_names = 4; +} + enum AggregateMode { Partial = 0; Final = 1; diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 4baedc9196..124188b64d 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, Comet import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, BroadcastQueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec} -import org.apache.spark.sql.execution.command.ExecutedCommandExec +import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec} import org.apache.spark.sql.execution.datasources.v2.V2CommandExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} @@ -48,6 +48,7 @@ import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible, Ope import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde.{serializeDataType, supportedDataType} import org.apache.comet.serde.operator._ +import org.apache.comet.serde.operator.CometDataWritingCommand object CometExecRule { @@ -70,6 +71,13 @@ object CometExecRule { classOf[LocalTableScanExec] -> CometLocalTableScanExec, classOf[WindowExec] -> CometWindowExec) + /** + * DataWritingCommandExec is handled separately in convertNode since it doesn't follow the + * standard pattern of having CometNativeExec children. + */ + val writeExecs: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] = + Map(classOf[DataWritingCommandExec] -> CometDataWritingCommand) + /** * Sinks that have a native plan of ScanExec. */ @@ -218,6 +226,23 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { val nativeOp = operator2Proto(cometOp) CometScanWrapper(nativeOp.get, cometOp) + // Handle DataWritingCommandExec specially since it doesn't follow the standard pattern + case exec: DataWritingCommandExec => + CometExecRule.writeExecs.get(classOf[DataWritingCommandExec]) match { + case Some(handler) if isOperatorEnabled(handler, exec) => + val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(exec.id) + handler + .asInstanceOf[CometOperatorSerde[DataWritingCommandExec]] + .convert(exec, builder) + .map(nativeOp => + handler + .asInstanceOf[CometOperatorSerde[DataWritingCommandExec]] + .createExec(nativeOp, exec)) + .getOrElse(exec) + case _ => + exec + } + // For AQE broadcast stage on a Comet broadcast exchange case s @ BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) => newPlanWithProto(s, CometSinkPlaceHolder(_, s, s)) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala new file mode 100644 index 0000000000..46d01c8879 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde.operator + +import java.util.Locale + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.sql.comet.{CometNativeExec, CometNativeWriteExec} +import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, WriteFilesExec} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport} +import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.{CometOperatorSerde, Incompatible, OperatorOuterClass, SupportLevel, Unsupported} +import org.apache.comet.serde.OperatorOuterClass.Operator +import org.apache.comet.serde.QueryPlanSerde.serializeDataType + +/** + * CometOperatorSerde implementation for DataWritingCommandExec that converts Parquet write + * operations to use Comet's native Parquet writer. + */ +object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec] { + + private val supportedCompressionCodes = Set("none", "snappy", "lz4", "zstd") + + override def enabledConfig: Option[ConfigEntry[Boolean]] = + Some(CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED) + + override def getSupportLevel(op: DataWritingCommandExec): SupportLevel = { + op.cmd match { + case cmd: InsertIntoHadoopFsRelationCommand => + cmd.fileFormat match { + case _: ParquetFileFormat => + if (!cmd.outputPath.toString.startsWith("file:")) { + return Unsupported(Some("Only local filesystem output paths are supported")) + } + + if (cmd.bucketSpec.isDefined) { + return Unsupported(Some("Bucketed writes are not supported")) + } + + if (cmd.partitionColumns.nonEmpty || cmd.staticPartitions.nonEmpty) { + return Unsupported(Some("Partitioned writes are not supported")) + } + + if (cmd.query.output.exists(attr => DataTypeSupport.isComplexType(attr.dataType))) { + return Unsupported(Some("Complex types are not supported")) + } + + val codec = parseCompressionCodec(cmd) + if (!supportedCompressionCodes.contains(codec)) { + return Unsupported(Some(s"Unsupported compression codec: $codec")) + } + + Incompatible(Some("Parquet write support is highly experimental")) + case _ => + Unsupported(Some("Only Parquet writes are supported")) + } + case other => + Unsupported(Some(s"Unsupported write command: ${other.getClass}")) + } + } + + override def convert( + op: DataWritingCommandExec, + builder: Operator.Builder, + childOp: Operator*): Option[OperatorOuterClass.Operator] = { + + try { + val cmd = op.cmd.asInstanceOf[InsertIntoHadoopFsRelationCommand] + + val scanOp = OperatorOuterClass.Scan + .newBuilder() + .setSource(cmd.query.nodeName) + .setArrowFfiSafe(false) + + // Add fields from the query output schema + val scanTypes = cmd.query.output.flatMap { attr => + serializeDataType(attr.dataType) + } + + if (scanTypes.length != cmd.query.output.length) { + withInfo(op, "Cannot serialize data types for native write") + return None + } + + scanTypes.foreach(scanOp.addFields) + + val scanOperator = Operator + .newBuilder() + .setPlanId(op.id) + .setScan(scanOp.build()) + .build() + + val outputPath = cmd.outputPath.toString + + val codec = parseCompressionCodec(cmd) match { + case "snappy" => OperatorOuterClass.CompressionCodec.Snappy + case "lz4" => OperatorOuterClass.CompressionCodec.Lz4 + case "zstd" => OperatorOuterClass.CompressionCodec.Zstd + case "none" => OperatorOuterClass.CompressionCodec.None + case other => + withInfo(op, s"Unsupported compression codec: $other") + return None + } + + val writerOp = OperatorOuterClass.ParquetWriter + .newBuilder() + .setOutputPath(outputPath) + .setCompression(codec) + .addAllColumnNames(cmd.query.output.map(_.name).asJava) + .build() + + val writerOperator = Operator + .newBuilder() + .setPlanId(op.id) + .addChildren(scanOperator) + .setParquetWriter(writerOp) + .build() + + Some(writerOperator) + } catch { + case e: Exception => + withInfo( + op, + "Failed to convert DataWritingCommandExec to native execution: " + + s"${e.getMessage}") + None + } + } + + override def createExec(nativeOp: Operator, op: DataWritingCommandExec): CometNativeExec = { + val cmd = op.cmd.asInstanceOf[InsertIntoHadoopFsRelationCommand] + val outputPath = cmd.outputPath.toString + + // Get the child plan from the WriteFilesExec or use the child directly + val childPlan = op.child match { + case writeFiles: WriteFilesExec => + // The WriteFilesExec child should already be a Comet operator + writeFiles.child + case other => + // Fallback: use the child directly + other + } + + CometNativeWriteExec(nativeOp, childPlan, outputPath) + } + + private def parseCompressionCodec(cmd: InsertIntoHadoopFsRelationCommand) = { + cmd.options + .getOrElse( + "compression", + SQLConf.get.getConfString( + SQLConf.PARQUET_COMPRESSION.key, + SQLConf.PARQUET_COMPRESSION.defaultValueString)) + .toLowerCase(Locale.ROOT) + } + +} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala new file mode 100644 index 0000000000..2617e8c60a --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.vectorized.ColumnarBatch + +import org.apache.comet.CometExecIterator +import org.apache.comet.serde.OperatorOuterClass.Operator + +/** + * Comet physical operator for native Parquet write operations. + * + * This operator writes data to Parquet files using the native Comet engine. It wraps the child + * operator and adds a ParquetWriter operator on top. + * + * @param nativeOp + * The native operator representing the write operation + * @param child + * The child operator providing the data to write + * @param outputPath + * The path where the Parquet file will be written + */ +case class CometNativeWriteExec(nativeOp: Operator, child: SparkPlan, outputPath: String) + extends CometNativeExec + with UnaryExecNode { + + override def originalPlan: SparkPlan = child + + override def serializedPlanOpt: SerializedPlan = { + val outputStream = new java.io.ByteArrayOutputStream() + nativeOp.writeTo(outputStream) + outputStream.close() + SerializedPlan(Some(outputStream.toByteArray)) + } + + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = + copy(child = newChild) + + override def nodeName: String = "CometNativeWrite" + + override def doExecute(): RDD[InternalRow] = { + // Execute the native write + val resultRDD = doExecuteColumnar() + // Convert to empty InternalRow RDD (write operations typically return empty results) + resultRDD.mapPartitions { iter => + // Consume all batches (they should be empty) + iter.foreach(_.close()) + Iterator.empty + } + } + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + // Get the input data from the child operator + val childRDD = if (child.supportsColumnar) { + child.executeColumnar() + } else { + // If child doesn't support columnar, convert to columnar + child.execute().mapPartitionsInternal { _ => + // TODO this could delegate to CometRowToColumnar, but maybe Comet + // does not need to support this case? + throw new UnsupportedOperationException( + "Row-based child operators not yet supported for native write") + } + } + + // Capture metadata before the transformation + val numPartitions = childRDD.getNumPartitions + val numOutputCols = child.output.length + + // Execute native write operation + childRDD.mapPartitionsInternal { iter => + val nativeMetrics = CometMetricNode.fromCometPlan(this) + + val outputStream = new java.io.ByteArrayOutputStream() + nativeOp.writeTo(outputStream) + outputStream.close() + val planBytes = outputStream.toByteArray + + new CometExecIterator( + CometExec.newIterId, + Seq(iter), + numOutputCols, + planBytes, + nativeMetrics, + numPartitions, + org.apache.spark.TaskContext.getPartitionId(), + None, + Seq.empty) + + } + } +} diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala new file mode 100644 index 0000000000..e4b8b53856 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet + +import java.io.File + +import scala.util.Random + +import org.apache.spark.sql.{CometTestBase, DataFrame} +import org.apache.spark.sql.comet.CometNativeWriteExec +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.CometConf +import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, SchemaGenOptions} + +class CometParquetWriterSuite extends CometTestBase { + + test("basic parquet write") { + // no support for fully native scan as input yet + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + + withTempPath { dir => + val outputPath = new File(dir, "output.parquet").getAbsolutePath + + // Create test data and write it to a temp parquet file first + withTempPath { inputDir => + val inputPath = new File(inputDir, "input.parquet").getAbsolutePath + val schema = FuzzDataGenerator.generateSchema( + SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false)) + val df = FuzzDataGenerator.generateDataFrame( + new Random(42), + spark, + schema, + 1000, + DataGenOptions(generateNegativeZero = false)) + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "false", + SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Denver") { + df.write.parquet(inputPath) + } + + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true") { + val df = spark.read.parquet(inputPath) + + // Use a listener to capture the execution plan during write + var capturedPlan: Option[QueryExecution] = None + + val listener = new org.apache.spark.sql.util.QueryExecutionListener { + override def onSuccess( + funcName: String, + qe: QueryExecution, + durationNs: Long): Unit = { + // Capture plans from write operations + if (funcName == "save" || funcName.contains("command")) { + capturedPlan = Some(qe) + } + } + + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + + spark.listenerManager.register(listener) + + try { + // Perform native write + df.write.parquet(outputPath) + + // Wait for listener to be called with timeout + val maxWaitTimeMs = 15000 + val checkIntervalMs = 100 + val maxIterations = maxWaitTimeMs / checkIntervalMs + var iterations = 0 + + while (capturedPlan.isEmpty && iterations < maxIterations) { + Thread.sleep(checkIntervalMs) + iterations += 1 + } + + // Verify that CometNativeWriteExec was used + assert( + capturedPlan.isDefined, + s"Listener was not called within ${maxWaitTimeMs}ms - no execution plan captured") + + capturedPlan.foreach { qe => + val executedPlan = qe.executedPlan + val hasNativeWrite = executedPlan.exists { + case _: CometNativeWriteExec => true + case d: DataWritingCommandExec => + d.child.exists { + case _: CometNativeWriteExec => true + case _ => false + } + case _ => false + } + + assert( + hasNativeWrite, + s"Expected CometNativeWriteExec in the plan, but got:\n${executedPlan.treeString}") + } + } finally { + spark.listenerManager.unregister(listener) + } + + // Verify the data was written correctly + val resultDf = spark.read.parquet(outputPath) + assert(resultDf.count() == 1000, "Expected 1000 rows to be written") + + // Verify multiple part files were created + val outputDir = new File(outputPath) + val partFiles = outputDir.listFiles().filter(_.getName.startsWith("part-")) + // With 1000 rows and default parallelism, we should get multiple partitions + assert(partFiles.length > 1, "Expected multiple part files to be created") + + // read with and without Comet and compare + var sparkDf: DataFrame = null + var cometDf: DataFrame = null + withSQLConf(CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false") { + sparkDf = spark.read.parquet(outputPath) + } + withSQLConf(CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true") { + cometDf = spark.read.parquet(outputPath) + } + checkAnswer(sparkDf, cometDf) + } + } + } + } + +}