diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 2ca1e9cfd5..6de1da5b4c 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -19,6 +19,7 @@ use std::{ any::Any, + collections::HashMap, fmt, fmt::{Debug, Formatter}, fs::File, @@ -26,9 +27,12 @@ use std::{ sync::Arc, }; -use opendal::{services::Hdfs, Operator}; -use url::Url; +use opendal::Operator; +use crate::execution::shuffle::CompressionCodec; +use crate::parquet::parquet_support::{ + create_hdfs_operator, is_hdfs_scheme, prepare_object_store_with_configs, +}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; @@ -50,8 +54,7 @@ use parquet::{ basic::{Compression, ZstdLevel}, file::properties::WriterProperties, }; - -use crate::execution::shuffle::CompressionCodec; +use url::Url; /// Enum representing different types of Arrow writers based on storage backend enum ParquetWriter { @@ -200,6 +203,8 @@ pub struct ParquetWriterExec { partition_id: i32, /// Column names to use in the output Parquet file column_names: Vec, + /// Object store configuration options + object_store_options: HashMap, /// Metrics metrics: ExecutionPlanMetricsSet, /// Cache for plan properties @@ -218,6 +223,7 @@ impl ParquetWriterExec { compression: CompressionCodec, partition_id: i32, column_names: Vec, + object_store_options: HashMap, ) -> Result { // Preserve the input's partitioning so each partition writes its own file let input_partitioning = input.output_partitioning().clone(); @@ -238,6 +244,7 @@ impl ParquetWriterExec { compression, partition_id, column_names, + object_store_options, metrics: ExecutionPlanMetricsSet::new(), cache, }) @@ -255,10 +262,11 @@ impl ParquetWriterExec { /// Create an Arrow writer based on the storage scheme /// /// # Arguments - /// * `storage_scheme` - The storage backend ("hdfs", "s3", or "local") /// * `output_file_path` - The full path to the output file /// * `schema` - The Arrow schema for the Parquet file /// * `props` - Writer properties including compression + /// * `runtime_env` - Runtime environment for object store registration + /// * `object_store_options` - Configuration options for object store /// /// # Returns /// * `Ok(ParquetWriter)` - A writer appropriate for the storage scheme @@ -267,71 +275,61 @@ impl ParquetWriterExec { output_file_path: &str, schema: SchemaRef, props: WriterProperties, + runtime_env: Arc, + object_store_options: &HashMap, ) -> Result { - // Determine storage scheme from output_file_path - let storage_scheme = if output_file_path.starts_with("hdfs://") { - "hdfs" - } else if output_file_path.starts_with("s3://") || output_file_path.starts_with("s3a://") { - "s3" - } else { - "local" - }; + // Parse URL and match on storage scheme directly + let url = Url::parse(output_file_path).map_err(|e| { + DataFusionError::Execution(format!("Failed to parse URL '{}': {}", output_file_path, e)) + })?; - match storage_scheme { - "hdfs" => { - // Parse the output_file_path to extract namenode and path - // Expected format: hdfs://namenode:port/path/to/file - let url = Url::parse(output_file_path).map_err(|e| { + if is_hdfs_scheme(&url, object_store_options) { + // HDFS storage + { + // Use prepare_object_store_with_configs to create and register the object store + let (_object_store_url, object_store_path) = prepare_object_store_with_configs( + runtime_env, + output_file_path.to_string(), + object_store_options, + ) + .map_err(|e| { DataFusionError::Execution(format!( - "Failed to parse HDFS URL '{}': {}", + "Failed to prepare object store for '{}': {}", output_file_path, e )) })?; - // Extract namenode (scheme + host + port) - let namenode = format!( - "{}://{}{}", - url.scheme(), - url.host_str().unwrap_or("localhost"), - url.port() - .map(|p| format!(":{}", p)) - .unwrap_or_else(|| ":9000".to_string()) - ); - - // Extract the path (without the scheme and host) - let hdfs_path = url.path().to_string(); - // For remote storage (HDFS, S3), write to an in-memory buffer let buffer = Vec::new(); let cursor = Cursor::new(buffer); let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor, schema, Some(props)) .map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create {} writer: {}", - storage_scheme, e - )) + DataFusionError::Execution(format!("Failed to create HDFS writer: {}", e)) })?; - let builder = Hdfs::default().name_node(&namenode); - let op = Operator::new(builder) - .map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create HDFS operator for '{}' (namenode: {}): {}", - output_file_path, namenode, e - )) - })? - .finish(); + // Create HDFS operator with configuration options using the helper function + let op = create_hdfs_operator(&url).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create HDFS operator for '{}': {}", + output_file_path, e + )) + })?; // HDFS writer will be created lazily on first write - // Use only the path part for the HDFS writer + // Use the path from prepare_object_store_with_configs Ok(ParquetWriter::Remote( arrow_parquet_buffer_writer, None, op, - hdfs_path, + object_store_path.to_string(), )) } - "local" => { + } else if output_file_path.starts_with("file://") + || output_file_path.starts_with("file:") + || !output_file_path.contains("://") + { + // Local file system + { // For a local file system, write directly to file // Strip file:// or file: prefix if present let local_path = output_file_path @@ -368,10 +366,12 @@ impl ParquetWriterExec { })?; Ok(ParquetWriter::LocalFile(writer)) } - _ => Err(DataFusionError::Execution(format!( - "Unsupported storage scheme: {}", - storage_scheme - ))), + } else { + // Unsupported storage scheme + Err(DataFusionError::Execution(format!( + "Unsupported storage scheme in path: {}", + output_file_path + ))) } } } @@ -435,6 +435,7 @@ impl ExecutionPlan for ParquetWriterExec { self.compression.clone(), self.partition_id, self.column_names.clone(), + self.object_store_options.clone(), )?)), _ => Err(DataFusionError::Internal( "ParquetWriterExec requires exactly one child".to_string(), @@ -454,6 +455,7 @@ impl ExecutionPlan for ParquetWriterExec { let bytes_written = MetricBuilder::new(&self.metrics).counter("bytes_written", partition); let rows_written = MetricBuilder::new(&self.metrics).counter("rows_written", partition); + let runtime_env = context.runtime_env(); let input = self.input.execute(partition, context)?; let input_schema = self.input.schema(); let work_dir = self.work_dir.clone(); @@ -488,7 +490,14 @@ impl ExecutionPlan for ParquetWriterExec { .set_compression(compression) .build(); - let mut writer = Self::create_arrow_writer(&part_file, Arc::clone(&output_schema), props)?; + let object_store_options = self.object_store_options.clone(); + let mut writer = Self::create_arrow_writer( + &part_file, + Arc::clone(&output_schema), + props, + runtime_env, + &object_store_options, + )?; // Clone schema for use in async closure let schema_for_write = Arc::clone(&output_schema); @@ -732,10 +741,14 @@ mod tests { // Create ParquetWriter using the create_arrow_writer method // Use full HDFS URL format let full_output_path = format!("hdfs://namenode:9000{}", output_path); + let session_ctx = datafusion::prelude::SessionContext::new(); + let runtime_env = session_ctx.runtime_env(); let mut writer = ParquetWriterExec::create_arrow_writer( &full_output_path, create_test_record_batch(1)?.schema(), props, + runtime_env, + &HashMap::new(), )?; // Write 5 batches in a loop @@ -802,6 +815,7 @@ mod tests { CompressionCodec::None, 0, // partition_id column_names, + HashMap::new(), // object_store_options )?; // Create a session context and execute the plan diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 93fbb59c11..7d806213d8 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1248,6 +1248,12 @@ impl PhysicalPlanner { ))), }?; + let object_store_options: HashMap = writer + .object_store_options + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let parquet_writer = Arc::new(ParquetWriterExec::try_new( Arc::clone(&child.native_plan), writer.output_path.clone(), @@ -1261,6 +1267,7 @@ impl PhysicalPlanner { codec, self.partition, writer.column_names.clone(), + object_store_options, )?); Ok(( diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index c9a27d7dcb..e7ff5630f1 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -358,7 +358,7 @@ fn value_field(entries_field: &FieldRef) -> Option { } } -fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap) -> bool { +pub fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap) -> bool { const COMET_LIBHDFS_SCHEMES_KEY: &str = "fs.comet.libhdfs.schemes"; let scheme = url.scheme(); if let Some(libhdfs_schemes) = object_store_configs.get(COMET_LIBHDFS_SCHEMES_KEY) { @@ -387,20 +387,26 @@ fn create_hdfs_object_store( } } -// Creates an HDFS object store from a URL using OpenDAL +// Creates an OpenDAL HDFS Operator from a URL with optional configuration #[cfg(feature = "hdfs-opendal")] -fn create_hdfs_object_store( - url: &Url, -) -> Result<(Box, Path), object_store::Error> { +pub(crate) fn create_hdfs_operator(url: &Url) -> Result { let name_node = get_name_node_uri(url)?; let builder = opendal::services::Hdfs::default().name_node(&name_node); - let op = opendal::Operator::new(builder) + opendal::Operator::new(builder) .map_err(|error| object_store::Error::Generic { store: "hdfs-opendal", source: error.into(), - })? - .finish(); + }) + .map(|op| op.finish()) +} + +// Creates an HDFS object store from a URL using OpenDAL +#[cfg(feature = "hdfs-opendal")] +pub(crate) fn create_hdfs_object_store( + url: &Url, +) -> Result<(Box, Path), object_store::Error> { + let op = create_hdfs_operator(url)?; let store = object_store_opendal::OpendalStore::new(op); let path = Path::parse(url.path())?; Ok((Box::new(store), path)) diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 015b5d96b6..a1a3c4bed9 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -245,6 +245,13 @@ message ParquetWriter { optional string job_id = 6; // Task attempt ID for this specific task optional int32 task_attempt_id = 7; + // Options for configuring object stores such as AWS S3, GCS, etc. The key-value pairs are taken + // from Hadoop configuration for compatibility with Hadoop FileSystem implementations of object + // stores. + // The configuration values have hadoop. or spark.hadoop. prefix trimmed. For instance, the + // configuration value "spark.hadoop.fs.s3a.access.key" will be stored as "fs.s3a.access.key" in + // the map. + map object_store_options = 8; } enum AggregateMode { diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala index 8349329841..1f3c3f40c0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala @@ -19,6 +19,7 @@ package org.apache.comet.serde.operator +import java.net.URI import java.util.Locale import scala.jdk.CollectionConverters._ @@ -32,6 +33,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport} import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.objectstore.NativeConfig import org.apache.comet.serde.{CometOperatorSerde, Incompatible, OperatorOuterClass, SupportLevel, Unsupported} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde.serializeDataType @@ -126,14 +128,24 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec return None } - val writerOp = OperatorOuterClass.ParquetWriter + val writerOpBuilder = OperatorOuterClass.ParquetWriter .newBuilder() .setOutputPath(outputPath) .setCompression(codec) .addAllColumnNames(cmd.query.output.map(_.name).asJava) - // Note: work_dir, job_id, and task_attempt_id will be set at execution time - // in CometNativeWriteExec, as they depend on the Spark task context - .build() + // Note: work_dir, job_id, and task_attempt_id will be set at execution time + // in CometNativeWriteExec, as they depend on the Spark task context + + // Collect S3/cloud storage configurations + val session = op.session + val hadoopConf = session.sessionState.newHadoopConfWithOptions(cmd.options) + val objectStoreOptions = + NativeConfig.extractObjectStoreOptions(hadoopConf, URI.create(outputPath)) + objectStoreOptions.foreach { case (key, value) => + writerOpBuilder.putObjectStoreOptions(key, value) + } + + val writerOp = writerOpBuilder.build() val writerOperator = Operator .newBuilder()