Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ jobs:
org.apache.spark.sql.comet.ParquetEncryptionITCase
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
- name: "csv"
value: |
org.apache.comet.csv.CometCsvNativeReadSuite
- name: "exec"
value: |
org.apache.comet.exec.CometAggregateSuite
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ jobs:
org.apache.spark.sql.comet.ParquetEncryptionITCase
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
- name: "csv"
value: |
org.apache.comet.csv.CometCsvNativeReadSuite
- name: "exec"
value: |
org.apache.comet.exec.CometAggregateSuite
Expand Down
10 changes: 10 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_CSV_V2_NATIVE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.scan.csv.v2.enabled")
.category(CATEGORY_TESTING)
.doc(
"Whether to use the native Comet V2 CSV reader for improved performance. " +
"Default: false (uses standard Spark CSV reader) " +
"Experimental: Performance benefits are workload-dependent.")
.booleanConf
.createWithDefault(false)

val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] =
conf("spark.comet.parquet.respectFilterPushdown")
.category(CATEGORY_PARQUET)
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ These settings can be used to determine which parts of the plan are accelerated
| `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, `greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and `unbounded`. | greedy_task_shared |
| `spark.comet.memoryOverhead` | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. | 1024 MiB |
| `spark.comet.parquet.write.enabled` | Whether to enable native Parquet write through Comet. When enabled, Comet will intercept Parquet write operations and execute them natively. This feature is highly experimental and only partially implemented. It should not be used in production. | false |
| `spark.comet.scan.csv.v2.enabled` | Whether to use the native Comet V2 CSV reader for improved performance. Default: false (uses standard Spark CSV reader) Experimental: Performance benefits are workload-dependent. | false |
| `spark.comet.sparkToColumnar.enabled` | Whether to enable Spark to Arrow columnar conversion. When this is turned on, Comet will convert operators in `spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before processing. This is an experimental feature and has known issues with non-UTC timezones. | false |
| `spark.comet.sparkToColumnar.supportedOperatorList` | A comma-separated list of operators that will be converted to Arrow columnar format when `spark.comet.sparkToColumnar.enabled` is true. | Range,InMemoryTableScan,RDDScan |
| `spark.comet.testing.strict` | Experimental option to enable strict testing, which will fail tests that could be more comprehensive, such as checking for a specific fallback reason. It can be overridden by the environment variable `ENABLE_COMET_STRICT_TESTING`. | false |
Expand Down
90 changes: 90 additions & 0 deletions native/core/src/execution/operators/csv_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::execution::operators::ExecutionError;
use arrow::datatypes::{Field, SchemaRef};
use datafusion::common::DataFusionError;
use datafusion::common::Result;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::CsvSource;
use datafusion_comet_proto::spark_operator::CsvOptions;
use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::source::DataSourceExec;
use datafusion_datasource::PartitionedFile;
use itertools::Itertools;
use std::sync::Arc;

pub fn init_csv_datasource_exec(
object_store_url: ObjectStoreUrl,
file_groups: Vec<Vec<PartitionedFile>>,
data_schema: SchemaRef,
partition_schema: Option<SchemaRef>,
csv_options: &CsvOptions,
) -> Result<Arc<DataSourceExec>, ExecutionError> {
let csv_source = build_csv_source(csv_options.clone());

let file_groups = file_groups
.iter()
.map(|files| FileGroup::new(files.clone()))
.collect();

let partition_fields = partition_schema
.map(|schema| {
schema
.fields()
.iter()
.map(|field| {
Field::new(field.name(), field.data_type().clone(), field.is_nullable())
})
.collect_vec()
})
.unwrap_or(vec![]);

let file_scan_config: FileScanConfig =
FileScanConfigBuilder::new(object_store_url, data_schema, csv_source)
.with_file_groups(file_groups)
.with_table_partition_cols(partition_fields)
.build();

Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))
}

fn build_csv_source(options: CsvOptions) -> Arc<CsvSource> {
let delimiter = string_to_u8(&options.delimiter, "delimiter").unwrap();
let quote = string_to_u8(&options.quote, "quote").unwrap();
let escape = string_to_u8(&options.escape, "escape").unwrap();
let terminator = string_to_u8(&options.terminator, "terminator").unwrap();
let comment = options
.comment
.map(|c| string_to_u8(&c, "comment").unwrap());
let csv_source = CsvSource::new(options.has_header, delimiter, quote)
.with_escape(Some(escape))
.with_comment(comment)
.with_terminator(Some(terminator))
.with_truncate_rows(options.truncated_rows);
Arc::new(csv_source)
}

fn string_to_u8(option: &str, option_name: &str) -> Result<u8> {
match option.as_bytes().first() {
Some(&ch) if ch.is_ascii() => Ok(ch),
_ => Err(DataFusionError::Configuration(format!(
"invalid {option_name} character '{option}': must be an ASCII character"
))),
}
}
2 changes: 2 additions & 0 deletions native/core/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ pub use expand::ExpandExec;
mod iceberg_scan;
mod parquet_writer;
pub use parquet_writer::ParquetWriterExec;
mod csv_scan;
pub mod projection;
mod scan;
pub use csv_scan::init_csv_datasource_exec;

/// Error returned during executing operators.
#[derive(thiserror::Error, Debug)]
Expand Down
38 changes: 38 additions & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod expression_registry;
pub mod macros;
pub mod operator_registry;

use crate::execution::operators::init_csv_datasource_exec;
use crate::execution::operators::IcebergScanExec;
use crate::{
errors::ExpressionError,
Expand Down Expand Up @@ -95,6 +96,7 @@ use datafusion::physical_expr::window::WindowExpr;
use datafusion::physical_expr::LexOrdering;

use crate::parquet::parquet_exec::init_datasource_exec;

use arrow::array::{
new_empty_array, Array, ArrayRef, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array,
Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray,
Expand Down Expand Up @@ -1120,6 +1122,42 @@ impl PhysicalPlanner {
Arc::new(SparkPlan::new(spark_plan.plan_id, scan, vec![])),
))
}
OpStruct::CsvScan(scan) => {
let data_schema =
convert_spark_types_to_arrow_schema(scan.required_schema.as_slice());
let partition_schema =
convert_spark_types_to_arrow_schema(scan.partition_schema.as_slice());
let object_store_options: HashMap<String, String> = scan
.object_store_options
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let one_file = scan
.file_partitions
.first()
.and_then(|f| f.partitioned_file.first())
.map(|f| f.file_path.clone())
.ok_or(GeneralError("Failed to locate file".to_string()))?;
let (object_store_url, _) = prepare_object_store_with_configs(
self.session_ctx.runtime_env(),
one_file,
&object_store_options,
)?;
let files =
self.get_partitioned_files(&scan.file_partitions[self.partition as usize])?;
let file_groups: Vec<Vec<PartitionedFile>> = vec![files];
let scan = init_csv_datasource_exec(
object_store_url,
file_groups,
data_schema,
Some(partition_schema),
&scan.csv_options.clone().unwrap(),
)?;
Ok((
vec![],
Arc::new(SparkPlan::new(spark_plan.plan_id, scan, vec![])),
))
}
OpStruct::Scan(scan) => {
let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec();

Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/planner/operator_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub enum OperatorType {
SortMergeJoin,
HashJoin,
Window,
CsvScan,
}

/// Global registry of operator builders
Expand Down Expand Up @@ -151,5 +152,6 @@ fn get_operator_type(spark_operator: &Operator) -> Option<OperatorType> {
OpStruct::HashJoin(_) => Some(OperatorType::HashJoin),
OpStruct::Window(_) => Some(OperatorType::Window),
OpStruct::Explode(_) => None, // Not yet in OperatorType enum
OpStruct::CsvScan(_) => Some(OperatorType::CsvScan),
}
}
19 changes: 19 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ message Operator {
IcebergScan iceberg_scan = 112;
ParquetWriter parquet_writer = 113;
Explode explode = 114;
CsvScan csv_scan = 115;
}
}

Expand Down Expand Up @@ -110,6 +111,24 @@ message NativeScan {
bool encryption_enabled = 14;
}

message CsvScan {
repeated SparkStructField required_schema = 1;
repeated SparkStructField partition_schema = 2;
repeated SparkFilePartition file_partitions = 3;
map<string, string> object_store_options = 4;
CsvOptions csv_options = 5;
}

message CsvOptions {
bool has_header = 1;
string delimiter = 2;
string quote = 3;
string escape = 4;
optional string comment = 5;
string terminator = 7;
bool truncated_rows = 8;
}

message IcebergScan {
// Schema to read
repeated SparkStructField required_schema = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ import org.apache.comet.{CometConf, CometExplainInfo, ExtendedExplainInfo}
import org.apache.comet.CometConf.{COMET_SPARK_TO_ARROW_ENABLED, COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST}
import org.apache.comet.CometSparkSessionExtensions._
import org.apache.comet.rules.CometExecRule.allExecs
import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible, OperatorOuterClass, Unsupported}
import org.apache.comet.serde._
import org.apache.comet.serde.operator._
import org.apache.comet.serde.operator.CometDataWritingCommand

object CometExecRule {

Expand Down Expand Up @@ -191,6 +190,9 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
case scan: CometBatchScanExec if scan.nativeIcebergScanMetadata.isDefined =>
convertToComet(scan, CometIcebergNativeScan).getOrElse(scan)

case scan: CometBatchScanExec if scan.wrapped.scan.isInstanceOf[CSVScan] =>
convertToComet(scan, CometCsvNativeScanExec).getOrElse(scan)

// Comet JVM + native scan for V1 and V2
case op if isCometScan(op) =>
convertToComet(op, CometScanWrapper).getOrElse(op)
Expand Down
39 changes: 39 additions & 0 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -226,6 +227,44 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
withInfos(scanExec, fallbackReasons.toSet)
}

case scan: CSVScan if COMET_CSV_V2_NATIVE_ENABLED.get() =>
val fallbackReasons = new ListBuffer[String]()
val schemaSupported =
CometBatchScanExec.isSchemaSupported(scan.readDataSchema, fallbackReasons)
if (!schemaSupported) {
fallbackReasons += s"Schema ${scan.readDataSchema} is not supported"
}
val partitionSchemaSupported =
CometBatchScanExec.isSchemaSupported(scan.readPartitionSchema, fallbackReasons)
if (!partitionSchemaSupported) {
fallbackReasons += s"Partition schema ${scan.readPartitionSchema} is not supported"
}
val corruptedRecordsColumnName =
SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
val containsCorruptedRecordsColumn =
!scan.readDataSchema.fieldNames.contains(corruptedRecordsColumnName)
if (!containsCorruptedRecordsColumn) {
fallbackReasons += "Comet doesn't support the processing of corrupted records"
}
val isInferSchemaEnabled = scan.options.getBoolean("inferSchema", false)
if (isInferSchemaEnabled) {
fallbackReasons += "Comet doesn't support inferSchema=true option"
}
val delimiter = scan.options.get("delimiter")
val isSingleCharacterDelimiter = delimiter.length == 1
if (!isSingleCharacterDelimiter) {
fallbackReasons +=
s"Comet supports only single-character delimiters, but got: '$delimiter'"
}
if (schemaSupported && partitionSchemaSupported && containsCorruptedRecordsColumn
&& !isInferSchemaEnabled && isSingleCharacterDelimiter) {
CometBatchScanExec(
scanExec.clone().asInstanceOf[BatchScanExec],
runtimeFilters = scanExec.runtimeFilters)
} else {
withInfos(scanExec, fallbackReasons.toSet)
}

// Iceberg scan - patched version implementing SupportsComet interface
case s: SupportsComet if !COMET_ICEBERG_NATIVE_ENABLED.get() =>
val fallbackReasons = new ListBuffer[String]()
Expand Down
Loading
Loading