diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index beb5f9dcf7..aecc819354 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -133,6 +133,9 @@ jobs: org.apache.spark.sql.comet.ParquetEncryptionITCase org.apache.comet.exec.CometNativeReaderSuite org.apache.comet.CometIcebergNativeSuite + - name: "csv" + value: | + org.apache.comet.csv.CometCsvNativeReadSuite - name: "exec" value: | org.apache.comet.exec.CometAggregateSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 9a45fe022d..d9dd7ea668 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -96,6 +96,9 @@ jobs: org.apache.spark.sql.comet.ParquetEncryptionITCase org.apache.comet.exec.CometNativeReaderSuite org.apache.comet.CometIcebergNativeSuite + - name: "csv" + value: | + org.apache.comet.csv.CometCsvNativeReadSuite - name: "exec" value: | org.apache.comet.exec.CometAggregateSuite diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index cccad53c53..ac599ae0d5 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -143,6 +143,16 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_CSV_V2_NATIVE_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.scan.csv.v2.enabled") + .category(CATEGORY_TESTING) + .doc( + "Whether to use the native Comet V2 CSV reader for improved performance. " + + "Default: false (uses standard Spark CSV reader) " + + "Experimental: Performance benefits are workload-dependent.") + .booleanConf + .createWithDefault(false) + val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] = conf("spark.comet.parquet.respectFilterPushdown") .category(CATEGORY_PARQUET) diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 1a273ad033..38030d9677 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -144,6 +144,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, `greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and `unbounded`. | greedy_task_shared | | `spark.comet.memoryOverhead` | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. | 1024 MiB | | `spark.comet.parquet.write.enabled` | Whether to enable native Parquet write through Comet. When enabled, Comet will intercept Parquet write operations and execute them natively. This feature is highly experimental and only partially implemented. It should not be used in production. | false | +| `spark.comet.scan.csv.v2.enabled` | Whether to use the native Comet V2 CSV reader for improved performance. Default: false (uses standard Spark CSV reader) Experimental: Performance benefits are workload-dependent. | false | | `spark.comet.sparkToColumnar.enabled` | Whether to enable Spark to Arrow columnar conversion. When this is turned on, Comet will convert operators in `spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before processing. This is an experimental feature and has known issues with non-UTC timezones. | false | | `spark.comet.sparkToColumnar.supportedOperatorList` | A comma-separated list of operators that will be converted to Arrow columnar format when `spark.comet.sparkToColumnar.enabled` is true. | Range,InMemoryTableScan,RDDScan | | `spark.comet.testing.strict` | Experimental option to enable strict testing, which will fail tests that could be more comprehensive, such as checking for a specific fallback reason. It can be overridden by the environment variable `ENABLE_COMET_STRICT_TESTING`. | false | diff --git a/native/core/src/execution/operators/csv_scan.rs b/native/core/src/execution/operators/csv_scan.rs new file mode 100644 index 0000000000..7cc6d41bc2 --- /dev/null +++ b/native/core/src/execution/operators/csv_scan.rs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::execution::operators::ExecutionError; +use arrow::datatypes::{Field, SchemaRef}; +use datafusion::common::DataFusionError; +use datafusion::common::Result; +use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::datasource::physical_plan::CsvSource; +use datafusion_comet_proto::spark_operator::CsvOptions; +use datafusion_datasource::file_groups::FileGroup; +use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::PartitionedFile; +use itertools::Itertools; +use std::sync::Arc; + +pub fn init_csv_datasource_exec( + object_store_url: ObjectStoreUrl, + file_groups: Vec>, + data_schema: SchemaRef, + partition_schema: Option, + csv_options: &CsvOptions, +) -> Result, ExecutionError> { + let csv_source = build_csv_source(csv_options.clone()); + + let file_groups = file_groups + .iter() + .map(|files| FileGroup::new(files.clone())) + .collect(); + + let partition_fields = partition_schema + .map(|schema| { + schema + .fields() + .iter() + .map(|field| { + Field::new(field.name(), field.data_type().clone(), field.is_nullable()) + }) + .collect_vec() + }) + .unwrap_or(vec![]); + + let file_scan_config: FileScanConfig = + FileScanConfigBuilder::new(object_store_url, data_schema, csv_source) + .with_file_groups(file_groups) + .with_table_partition_cols(partition_fields) + .build(); + + Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config)))) +} + +fn build_csv_source(options: CsvOptions) -> Arc { + let delimiter = string_to_u8(&options.delimiter, "delimiter").unwrap(); + let quote = string_to_u8(&options.quote, "quote").unwrap(); + let escape = string_to_u8(&options.escape, "escape").unwrap(); + let terminator = string_to_u8(&options.terminator, "terminator").unwrap(); + let comment = options + .comment + .map(|c| string_to_u8(&c, "comment").unwrap()); + let csv_source = CsvSource::new(options.has_header, delimiter, quote) + .with_escape(Some(escape)) + .with_comment(comment) + .with_terminator(Some(terminator)) + .with_truncate_rows(options.truncated_rows); + Arc::new(csv_source) +} + +fn string_to_u8(option: &str, option_name: &str) -> Result { + match option.as_bytes().first() { + Some(&ch) if ch.is_ascii() => Ok(ch), + _ => Err(DataFusionError::Configuration(format!( + "invalid {option_name} character '{option}': must be an ASCII character" + ))), + } +} diff --git a/native/core/src/execution/operators/mod.rs b/native/core/src/execution/operators/mod.rs index 33b9be9434..07ee995367 100644 --- a/native/core/src/execution/operators/mod.rs +++ b/native/core/src/execution/operators/mod.rs @@ -31,8 +31,10 @@ pub use expand::ExpandExec; mod iceberg_scan; mod parquet_writer; pub use parquet_writer::ParquetWriterExec; +mod csv_scan; pub mod projection; mod scan; +pub use csv_scan::init_csv_datasource_exec; /// Error returned during executing operators. #[derive(thiserror::Error, Debug)] diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 93fbb59c11..b0f0c8dfcf 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -21,6 +21,7 @@ pub mod expression_registry; pub mod macros; pub mod operator_registry; +use crate::execution::operators::init_csv_datasource_exec; use crate::execution::operators::IcebergScanExec; use crate::{ errors::ExpressionError, @@ -95,6 +96,7 @@ use datafusion::physical_expr::window::WindowExpr; use datafusion::physical_expr::LexOrdering; use crate::parquet::parquet_exec::init_datasource_exec; + use arrow::array::{ new_empty_array, Array, ArrayRef, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, @@ -1120,6 +1122,42 @@ impl PhysicalPlanner { Arc::new(SparkPlan::new(spark_plan.plan_id, scan, vec![])), )) } + OpStruct::CsvScan(scan) => { + let data_schema = + convert_spark_types_to_arrow_schema(scan.required_schema.as_slice()); + let partition_schema = + convert_spark_types_to_arrow_schema(scan.partition_schema.as_slice()); + let object_store_options: HashMap = scan + .object_store_options + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let one_file = scan + .file_partitions + .first() + .and_then(|f| f.partitioned_file.first()) + .map(|f| f.file_path.clone()) + .ok_or(GeneralError("Failed to locate file".to_string()))?; + let (object_store_url, _) = prepare_object_store_with_configs( + self.session_ctx.runtime_env(), + one_file, + &object_store_options, + )?; + let files = + self.get_partitioned_files(&scan.file_partitions[self.partition as usize])?; + let file_groups: Vec> = vec![files]; + let scan = init_csv_datasource_exec( + object_store_url, + file_groups, + data_schema, + Some(partition_schema), + &scan.csv_options.clone().unwrap(), + )?; + Ok(( + vec![], + Arc::new(SparkPlan::new(spark_plan.plan_id, scan, vec![])), + )) + } OpStruct::Scan(scan) => { let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec(); diff --git a/native/core/src/execution/planner/operator_registry.rs b/native/core/src/execution/planner/operator_registry.rs index e4899280b7..b34a80df95 100644 --- a/native/core/src/execution/planner/operator_registry.rs +++ b/native/core/src/execution/planner/operator_registry.rs @@ -60,6 +60,7 @@ pub enum OperatorType { SortMergeJoin, HashJoin, Window, + CsvScan, } /// Global registry of operator builders @@ -151,5 +152,6 @@ fn get_operator_type(spark_operator: &Operator) -> Option { OpStruct::HashJoin(_) => Some(OperatorType::HashJoin), OpStruct::Window(_) => Some(OperatorType::Window), OpStruct::Explode(_) => None, // Not yet in OperatorType enum + OpStruct::CsvScan(_) => Some(OperatorType::CsvScan), } } diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 015b5d96b6..3372f7a4d3 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -51,6 +51,7 @@ message Operator { IcebergScan iceberg_scan = 112; ParquetWriter parquet_writer = 113; Explode explode = 114; + CsvScan csv_scan = 115; } } @@ -110,6 +111,24 @@ message NativeScan { bool encryption_enabled = 14; } +message CsvScan { + repeated SparkStructField required_schema = 1; + repeated SparkStructField partition_schema = 2; + repeated SparkFilePartition file_partitions = 3; + map object_store_options = 4; + CsvOptions csv_options = 5; +} + +message CsvOptions { + bool has_header = 1; + string delimiter = 2; + string quote = 3; + string escape = 4; + optional string comment = 5; + string terminator = 7; + bool truncated_rows = 8; +} + message IcebergScan { // Schema to read repeated SparkStructField required_schema = 1; diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index bb4ce879d7..9c23b1be68 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -51,9 +51,8 @@ import org.apache.comet.{CometConf, CometExplainInfo, ExtendedExplainInfo} import org.apache.comet.CometConf.{COMET_SPARK_TO_ARROW_ENABLED, COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST} import org.apache.comet.CometSparkSessionExtensions._ import org.apache.comet.rules.CometExecRule.allExecs -import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible, OperatorOuterClass, Unsupported} +import org.apache.comet.serde._ import org.apache.comet.serde.operator._ -import org.apache.comet.serde.operator.CometDataWritingCommand object CometExecRule { @@ -191,6 +190,9 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { case scan: CometBatchScanExec if scan.nativeIcebergScanMetadata.isDefined => convertToComet(scan, CometIcebergNativeScan).getOrElse(scan) + case scan: CometBatchScanExec if scan.wrapped.scan.isInstanceOf[CSVScan] => + convertToComet(scan, CometCsvNativeScanExec).getOrElse(scan) + // Comet JVM + native scan for V1 and V2 case op if isCometScan(op) => convertToComet(op, CometScanWrapper).getOrElse(op) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 8ba2c5845e..53ef20c4ae 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec} import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -255,6 +256,44 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com withInfos(scanExec, fallbackReasons.toSet) } + case scan: CSVScan if COMET_CSV_V2_NATIVE_ENABLED.get() => + val fallbackReasons = new ListBuffer[String]() + val schemaSupported = + CometBatchScanExec.isSchemaSupported(scan.readDataSchema, fallbackReasons) + if (!schemaSupported) { + fallbackReasons += s"Schema ${scan.readDataSchema} is not supported" + } + val partitionSchemaSupported = + CometBatchScanExec.isSchemaSupported(scan.readPartitionSchema, fallbackReasons) + if (!partitionSchemaSupported) { + fallbackReasons += s"Partition schema ${scan.readPartitionSchema} is not supported" + } + val corruptedRecordsColumnName = + SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD) + val containsCorruptedRecordsColumn = + !scan.readDataSchema.fieldNames.contains(corruptedRecordsColumnName) + if (!containsCorruptedRecordsColumn) { + fallbackReasons += "Comet doesn't support the processing of corrupted records" + } + val isInferSchemaEnabled = scan.options.getBoolean("inferSchema", false) + if (isInferSchemaEnabled) { + fallbackReasons += "Comet doesn't support inferSchema=true option" + } + val delimiter = scan.options.get("delimiter") + val isSingleCharacterDelimiter = delimiter.length == 1 + if (!isSingleCharacterDelimiter) { + fallbackReasons += + s"Comet supports only single-character delimiters, but got: '$delimiter'" + } + if (schemaSupported && partitionSchemaSupported && containsCorruptedRecordsColumn + && !isInferSchemaEnabled && isSingleCharacterDelimiter) { + CometBatchScanExec( + scanExec.clone().asInstanceOf[BatchScanExec], + runtimeFilters = scanExec.runtimeFilters) + } else { + withInfos(scanExec, fallbackReasons.toSet) + } + // Iceberg scan - patched version implementing SupportsComet interface case s: SupportsComet if !COMET_ICEBERG_NATIVE_ENABLED.get() => val fallbackReasons = new ListBuffer[String]() diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index 12be14450b..b7909b67cb 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -27,9 +27,8 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, PlanExpre import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues import org.apache.spark.sql.comet.{CometNativeExec, CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.FileSourceScanExec -import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{StructField, StructType} import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometConf.COMET_EXEC_ENABLED @@ -144,12 +143,13 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { var firstPartition: Option[PartitionedFile] = None val filePartitions = scan.getFilePartitions() - filePartitions.foreach { partition => + val filePartitionsProto = filePartitions.map { partition => if (firstPartition.isEmpty) { firstPartition = partition.files.headOption } - partition2Proto(partition, nativeScanBuilder, scan.relation.partitionSchema) + partition2Proto(partition, scan.relation.partitionSchema) } + nativeScanBuilder.addAllFilePartitions(filePartitionsProto.asJava) val partitionSchema = schema2Proto(scan.relation.partitionSchema.fields) val requiredSchema = schema2Proto(scan.requiredSchema.fields) @@ -203,50 +203,6 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { } - private def schema2Proto( - fields: Array[StructField]): Array[OperatorOuterClass.SparkStructField] = { - val fieldBuilder = OperatorOuterClass.SparkStructField.newBuilder() - fields.map(field => { - fieldBuilder.setName(field.name) - fieldBuilder.setDataType(serializeDataType(field.dataType).get) - fieldBuilder.setNullable(field.nullable) - fieldBuilder.build() - }) - } - - private def partition2Proto( - partition: FilePartition, - nativeScanBuilder: OperatorOuterClass.NativeScan.Builder, - partitionSchema: StructType): Unit = { - val partitionBuilder = OperatorOuterClass.SparkFilePartition.newBuilder() - partition.files.foreach(file => { - // Process the partition values - val partitionValues = file.partitionValues - assert(partitionValues.numFields == partitionSchema.length) - val partitionVals = - partitionValues.toSeq(partitionSchema).zipWithIndex.map { case (value, i) => - val attr = partitionSchema(i) - val valueProto = exprToProto(Literal(value, attr.dataType), Seq.empty) - // In `CometScanRule`, we have already checked that all partition values are - // supported. So, we can safely use `get` here. - assert( - valueProto.isDefined, - s"Unsupported partition value: $value, type: ${attr.dataType}") - valueProto.get - } - - val fileBuilder = OperatorOuterClass.SparkPartitionedFile.newBuilder() - partitionVals.foreach(fileBuilder.addPartitionValues) - fileBuilder - .setFilePath(file.filePath.toString) - .setStart(file.start) - .setLength(file.length) - .setFileSize(file.fileSize) - partitionBuilder.addPartitionedFile(fileBuilder.build()) - }) - nativeScanBuilder.addFilePartitions(partitionBuilder.build()) - } - override def createExec(nativeOp: Operator, op: CometScanExec): CometNativeExec = { CometNativeScanExec(nativeOp, op.wrapped, op.session) } diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/package.scala b/spark/src/main/scala/org/apache/comet/serde/operator/package.scala new file mode 100644 index 0000000000..7b811d09e7 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/serde/operator/package.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde + +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.execution.datasources.FilePartition +import org.apache.spark.sql.types.{StructField, StructType} + +import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType} + +package object operator { + + def schema2Proto(fields: Array[StructField]): Array[OperatorOuterClass.SparkStructField] = { + val fieldBuilder = OperatorOuterClass.SparkStructField.newBuilder() + fields.map { field => + fieldBuilder.setName(field.name) + fieldBuilder.setDataType(serializeDataType(field.dataType).get) + fieldBuilder.setNullable(field.nullable) + fieldBuilder.build() + } + } + + def partition2Proto( + partition: FilePartition, + partitionSchema: StructType): OperatorOuterClass.SparkFilePartition = { + val partitionBuilder = OperatorOuterClass.SparkFilePartition.newBuilder() + partition.files.foreach(file => { + // Process the partition values + val partitionValues = file.partitionValues + assert(partitionValues.numFields == partitionSchema.length) + val partitionVals = + partitionValues.toSeq(partitionSchema).zipWithIndex.map { case (value, i) => + val attr = partitionSchema(i) + val valueProto = exprToProto(Literal(value, attr.dataType), Seq.empty) + // In `CometScanRule`, we have already checked that all partition values are + // supported. So, we can safely use `get` here. + assert( + valueProto.isDefined, + s"Unsupported partition value: $value, type: ${attr.dataType}") + valueProto.get + } + val fileBuilder = OperatorOuterClass.SparkPartitionedFile.newBuilder() + partitionVals.foreach(fileBuilder.addPartitionValues) + fileBuilder + .setFilePath(file.filePath.toString) + .setStart(file.start) + .setLength(file.length) + .setFileSize(file.fileSize) + partitionBuilder.addPartitionedFile(fileBuilder.build()) + }) + partitionBuilder.build() + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometCsvNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometCsvNativeScanExec.scala new file mode 100644 index 0000000000..1477401306 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometCsvNativeScanExec.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.sql.catalyst.csv.CSVOptions +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.FilePartition +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan + +import com.google.common.base.Objects + +import org.apache.comet.{CometConf, ConfigEntry} +import org.apache.comet.objectstore.NativeConfig +import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} +import org.apache.comet.serde.OperatorOuterClass.Operator +import org.apache.comet.serde.operator.{partition2Proto, schema2Proto} + +/* + * Native CSV scan operator that delegates file reading to datafusion. + */ +case class CometCsvNativeScanExec( + override val nativeOp: Operator, + override val output: Seq[Attribute], + @transient override val originalPlan: BatchScanExec, + override val serializedPlanOpt: SerializedPlan) + extends CometLeafExec { + override val supportsColumnar: Boolean = true + + override val nodeName: String = "CometCsvNativeScan" + + override def outputPartitioning: Partitioning = UnknownPartitioning( + originalPlan.inputPartitions.length) + + override def outputOrdering: Seq[SortOrder] = Nil + + override protected def doCanonicalize(): SparkPlan = { + CometCsvNativeScanExec(nativeOp, output, originalPlan, serializedPlanOpt) + } + + override def equals(obj: Any): Boolean = { + obj match { + case other: CometCsvNativeScanExec => + this.output == other.output && + this.serializedPlanOpt == other.serializedPlanOpt && + this.originalPlan == other.originalPlan + case _ => + false + } + } + + override def hashCode(): Int = { + Objects.hashCode(output, serializedPlanOpt, originalPlan) + } +} + +object CometCsvNativeScanExec extends CometOperatorSerde[CometBatchScanExec] { + + override def enabledConfig: Option[ConfigEntry[Boolean]] = Some( + CometConf.COMET_CSV_V2_NATIVE_ENABLED) + + override def convert( + op: CometBatchScanExec, + builder: Operator.Builder, + childOp: Operator*): Option[Operator] = { + val csvScanBuilder = OperatorOuterClass.CsvScan.newBuilder() + val csvScan = op.wrapped.scan.asInstanceOf[CSVScan] + val sessionState = op.session.sessionState + val options = { + val columnPruning = sessionState.conf.csvColumnPruning + val timeZone = sessionState.conf.sessionLocalTimeZone + new CSVOptions(csvScan.options.asScala.toMap, columnPruning, timeZone) + } + val filePartitions = op.inputPartitions.map(_.asInstanceOf[FilePartition]) + val csvOptionsProto = csvOptions2Proto(options) + val schemaProto = schema2Proto(csvScan.readDataSchema.fields) + val partitionSchemaProto = schema2Proto(csvScan.readPartitionSchema.fields) + val partitionsProto = filePartitions.map(partition2Proto(_, csvScan.readPartitionSchema)) + + val objectStoreOptions = filePartitions.headOption + .flatMap { partitionFile => + val hadoopConf = sessionState + .newHadoopConfWithOptions(op.session.sparkContext.conf.getAll.toMap) + partitionFile.files.headOption + .map(file => NativeConfig.extractObjectStoreOptions(hadoopConf, file.pathUri)) + } + .getOrElse(Map.empty) + + csvScanBuilder.putAllObjectStoreOptions(objectStoreOptions.asJava) + csvScanBuilder.setCsvOptions(csvOptionsProto) + csvScanBuilder.addAllFilePartitions(partitionsProto.asJava) + csvScanBuilder.addAllRequiredSchema(schemaProto.toIterable.asJava) + csvScanBuilder.addAllPartitionSchema(partitionSchemaProto.toIterable.asJava) + Some(builder.setCsvScan(csvScanBuilder).build()) + } + + override def createExec(nativeOp: Operator, op: CometBatchScanExec): CometNativeExec = { + CometCsvNativeScanExec(nativeOp, op.output, op.wrapped, SerializedPlan(None)) + } + + private def csvOptions2Proto(options: CSVOptions): OperatorOuterClass.CsvOptions = { + val csvOptionsBuilder = OperatorOuterClass.CsvOptions.newBuilder() + csvOptionsBuilder.setDelimiter(options.delimiter) + csvOptionsBuilder.setHasHeader(options.headerFlag) + csvOptionsBuilder.setQuote(options.quote.toString) + csvOptionsBuilder.setEscape(options.escape.toString) + csvOptionsBuilder.setTerminator(options.lineSeparator.getOrElse("\n")) + csvOptionsBuilder.setTruncatedRows(options.multiLine) + if (options.isCommentSet) { + csvOptionsBuilder.setComment(options.comment.toString) + } + csvOptionsBuilder.build() + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 0a435e5b7a..f34d5a9448 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -394,10 +394,11 @@ abstract class CometNativeExec extends CometExec { def foreachUntilCometInput(plan: SparkPlan)(func: SparkPlan => Unit): Unit = { plan match { case _: CometNativeScanExec | _: CometScanExec | _: CometBatchScanExec | - _: CometIcebergNativeScanExec | _: ShuffleQueryStageExec | _: AQEShuffleReadExec | - _: CometShuffleExchangeExec | _: CometUnionExec | _: CometTakeOrderedAndProjectExec | - _: CometCoalesceExec | _: ReusedExchangeExec | _: CometBroadcastExchangeExec | - _: BroadcastQueryStageExec | _: CometSparkToColumnarExec | _: CometLocalTableScanExec => + _: CometIcebergNativeScanExec | _: CometCsvNativeScanExec | _: ShuffleQueryStageExec | + _: AQEShuffleReadExec | _: CometShuffleExchangeExec | _: CometUnionExec | + _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec | _: ReusedExchangeExec | + _: CometBroadcastExchangeExec | _: BroadcastQueryStageExec | + _: CometSparkToColumnarExec | _: CometLocalTableScanExec => func(plan) case _: CometPlan => // Other Comet operators, continue to traverse the tree. diff --git a/spark/src/test/resources/test-data/csv-test-2.csv b/spark/src/test/resources/test-data/csv-test-2.csv new file mode 100644 index 0000000000..1c7c834f14 --- /dev/null +++ b/spark/src/test/resources/test-data/csv-test-2.csv @@ -0,0 +1,4 @@ +a,b,c +1,2,3 +4,5,6 +7,0,8 \ No newline at end of file diff --git a/spark/src/test/scala/org/apache/comet/csv/CometCsvNativeReadSuite.scala b/spark/src/test/scala/org/apache/comet/csv/CometCsvNativeReadSuite.scala new file mode 100644 index 0000000000..e9a18a18dc --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/csv/CometCsvNativeReadSuite.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.csv + +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} + +import org.apache.comet.CometConf + +class CometCsvNativeReadSuite extends CometTestBase { + private val TEST_CSV_PATH_NO_HEADER = "src/test/resources/test-data/csv-test-1.csv" + private val TEST_CSV_PATH_HAS_HEADER = "src/test/resources/test-data/csv-test-2.csv" + + test("Native csv read - with schema") { + withSQLConf( + CometConf.COMET_CSV_V2_NATIVE_ENABLED.key -> "true", + SQLConf.USE_V1_SOURCE_LIST.key -> "") { + val schema = new StructType() + .add("a", IntegerType) + .add("b", IntegerType) + .add("c", IntegerType) + val df = spark.read + .options(Map("header" -> "false", "delimiter" -> ",")) + .schema(schema) + .csv(TEST_CSV_PATH_NO_HEADER) + checkSparkAnswerAndOperator(df) + } + } + + test("Native csv read - without schema") { + withSQLConf( + CometConf.COMET_CSV_V2_NATIVE_ENABLED.key -> "true", + SQLConf.USE_V1_SOURCE_LIST.key -> "") { + val df = spark.read + .options(Map("header" -> "true", "delimiter" -> ",")) + .csv(TEST_CSV_PATH_HAS_HEADER) + checkSparkAnswerAndOperator(df) + } + } + + test("Native csv read - test fallback reasons") { + withSQLConf( + CometConf.COMET_CSV_V2_NATIVE_ENABLED.key -> "true", + SQLConf.USE_V1_SOURCE_LIST.key -> "") { + val columnNameOfCorruptedRecords = + SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD) + val schema = new StructType() + .add("a", IntegerType) + .add("b", IntegerType) + .add("c", IntegerType) + .add(columnNameOfCorruptedRecords, StringType) + var df = spark.read + .options(Map("header" -> "false", "delimiter" -> ",")) + .schema(schema) + .csv(TEST_CSV_PATH_NO_HEADER) + checkSparkAnswerAndFallbackReason( + df, + "Comet doesn't support the processing of corrupted records") + df = spark.read + .options(Map("header" -> "false", "delimiter" -> ",", "inferSchema" -> "true")) + .csv(TEST_CSV_PATH_NO_HEADER) + checkSparkAnswerAndFallbackReason(df, "Comet doesn't support inferSchema=true option") + df = spark.read + .options(Map("header" -> "false", "delimiter" -> ",,")) + .csv(TEST_CSV_PATH_NO_HEADER) + checkSparkAnswerAndFallbackReason( + df, + "Comet supports only single-character delimiters, but got: ',,'") + } + } +}