Skip to content

Commit aff07d0

Browse files
authored
feat: Comet Writer should respect object store settings (#3042)
1 parent 79b83d8 commit aff07d0

File tree

5 files changed

+111
-65
lines changed

5 files changed

+111
-65
lines changed

native/core/src/execution/operators/parquet_writer.rs

Lines changed: 67 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,20 @@
1919
2020
use std::{
2121
any::Any,
22+
collections::HashMap,
2223
fmt,
2324
fmt::{Debug, Formatter},
2425
fs::File,
2526
io::Cursor,
2627
sync::Arc,
2728
};
2829

29-
use opendal::{services::Hdfs, Operator};
30-
use url::Url;
30+
use opendal::Operator;
3131

32+
use crate::execution::shuffle::CompressionCodec;
33+
use crate::parquet::parquet_support::{
34+
create_hdfs_operator, is_hdfs_scheme, prepare_object_store_with_configs,
35+
};
3236
use arrow::datatypes::{Schema, SchemaRef};
3337
use arrow::record_batch::RecordBatch;
3438
use async_trait::async_trait;
@@ -50,8 +54,7 @@ use parquet::{
5054
basic::{Compression, ZstdLevel},
5155
file::properties::WriterProperties,
5256
};
53-
54-
use crate::execution::shuffle::CompressionCodec;
57+
use url::Url;
5558

5659
/// Enum representing different types of Arrow writers based on storage backend
5760
enum ParquetWriter {
@@ -200,6 +203,8 @@ pub struct ParquetWriterExec {
200203
partition_id: i32,
201204
/// Column names to use in the output Parquet file
202205
column_names: Vec<String>,
206+
/// Object store configuration options
207+
object_store_options: HashMap<String, String>,
203208
/// Metrics
204209
metrics: ExecutionPlanMetricsSet,
205210
/// Cache for plan properties
@@ -218,6 +223,7 @@ impl ParquetWriterExec {
218223
compression: CompressionCodec,
219224
partition_id: i32,
220225
column_names: Vec<String>,
226+
object_store_options: HashMap<String, String>,
221227
) -> Result<Self> {
222228
// Preserve the input's partitioning so each partition writes its own file
223229
let input_partitioning = input.output_partitioning().clone();
@@ -238,6 +244,7 @@ impl ParquetWriterExec {
238244
compression,
239245
partition_id,
240246
column_names,
247+
object_store_options,
241248
metrics: ExecutionPlanMetricsSet::new(),
242249
cache,
243250
})
@@ -255,10 +262,11 @@ impl ParquetWriterExec {
255262
/// Create an Arrow writer based on the storage scheme
256263
///
257264
/// # Arguments
258-
/// * `storage_scheme` - The storage backend ("hdfs", "s3", or "local")
259265
/// * `output_file_path` - The full path to the output file
260266
/// * `schema` - The Arrow schema for the Parquet file
261267
/// * `props` - Writer properties including compression
268+
/// * `runtime_env` - Runtime environment for object store registration
269+
/// * `object_store_options` - Configuration options for object store
262270
///
263271
/// # Returns
264272
/// * `Ok(ParquetWriter)` - A writer appropriate for the storage scheme
@@ -267,71 +275,61 @@ impl ParquetWriterExec {
267275
output_file_path: &str,
268276
schema: SchemaRef,
269277
props: WriterProperties,
278+
runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
279+
object_store_options: &HashMap<String, String>,
270280
) -> Result<ParquetWriter> {
271-
// Determine storage scheme from output_file_path
272-
let storage_scheme = if output_file_path.starts_with("hdfs://") {
273-
"hdfs"
274-
} else if output_file_path.starts_with("s3://") || output_file_path.starts_with("s3a://") {
275-
"s3"
276-
} else {
277-
"local"
278-
};
281+
// Parse URL and match on storage scheme directly
282+
let url = Url::parse(output_file_path).map_err(|e| {
283+
DataFusionError::Execution(format!("Failed to parse URL '{}': {}", output_file_path, e))
284+
})?;
279285

280-
match storage_scheme {
281-
"hdfs" => {
282-
// Parse the output_file_path to extract namenode and path
283-
// Expected format: hdfs://namenode:port/path/to/file
284-
let url = Url::parse(output_file_path).map_err(|e| {
286+
if is_hdfs_scheme(&url, object_store_options) {
287+
// HDFS storage
288+
{
289+
// Use prepare_object_store_with_configs to create and register the object store
290+
let (_object_store_url, object_store_path) = prepare_object_store_with_configs(
291+
runtime_env,
292+
output_file_path.to_string(),
293+
object_store_options,
294+
)
295+
.map_err(|e| {
285296
DataFusionError::Execution(format!(
286-
"Failed to parse HDFS URL '{}': {}",
297+
"Failed to prepare object store for '{}': {}",
287298
output_file_path, e
288299
))
289300
})?;
290301

291-
// Extract namenode (scheme + host + port)
292-
let namenode = format!(
293-
"{}://{}{}",
294-
url.scheme(),
295-
url.host_str().unwrap_or("localhost"),
296-
url.port()
297-
.map(|p| format!(":{}", p))
298-
.unwrap_or_else(|| ":9000".to_string())
299-
);
300-
301-
// Extract the path (without the scheme and host)
302-
let hdfs_path = url.path().to_string();
303-
304302
// For remote storage (HDFS, S3), write to an in-memory buffer
305303
let buffer = Vec::new();
306304
let cursor = Cursor::new(buffer);
307305
let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor, schema, Some(props))
308306
.map_err(|e| {
309-
DataFusionError::Execution(format!(
310-
"Failed to create {} writer: {}",
311-
storage_scheme, e
312-
))
307+
DataFusionError::Execution(format!("Failed to create HDFS writer: {}", e))
313308
})?;
314309

315-
let builder = Hdfs::default().name_node(&namenode);
316-
let op = Operator::new(builder)
317-
.map_err(|e| {
318-
DataFusionError::Execution(format!(
319-
"Failed to create HDFS operator for '{}' (namenode: {}): {}",
320-
output_file_path, namenode, e
321-
))
322-
})?
323-
.finish();
310+
// Create HDFS operator with configuration options using the helper function
311+
let op = create_hdfs_operator(&url).map_err(|e| {
312+
DataFusionError::Execution(format!(
313+
"Failed to create HDFS operator for '{}': {}",
314+
output_file_path, e
315+
))
316+
})?;
324317

325318
// HDFS writer will be created lazily on first write
326-
// Use only the path part for the HDFS writer
319+
// Use the path from prepare_object_store_with_configs
327320
Ok(ParquetWriter::Remote(
328321
arrow_parquet_buffer_writer,
329322
None,
330323
op,
331-
hdfs_path,
324+
object_store_path.to_string(),
332325
))
333326
}
334-
"local" => {
327+
} else if output_file_path.starts_with("file://")
328+
|| output_file_path.starts_with("file:")
329+
|| !output_file_path.contains("://")
330+
{
331+
// Local file system
332+
{
335333
// For a local file system, write directly to file
336334
// Strip file:// or file: prefix if present
337335
let local_path = output_file_path
@@ -368,10 +366,12 @@ impl ParquetWriterExec {
368366
})?;
369367
Ok(ParquetWriter::LocalFile(writer))
370368
}
371-
_ => Err(DataFusionError::Execution(format!(
372-
"Unsupported storage scheme: {}",
373-
storage_scheme
374-
))),
369+
} else {
370+
// Unsupported storage scheme
371+
Err(DataFusionError::Execution(format!(
372+
"Unsupported storage scheme in path: {}",
373+
output_file_path
374+
)))
375375
}
376376
}
377377
}
@@ -435,6 +435,7 @@ impl ExecutionPlan for ParquetWriterExec {
435435
self.compression.clone(),
436436
self.partition_id,
437437
self.column_names.clone(),
438+
self.object_store_options.clone(),
438439
)?)),
439440
_ => Err(DataFusionError::Internal(
440441
"ParquetWriterExec requires exactly one child".to_string(),
@@ -454,6 +455,7 @@ impl ExecutionPlan for ParquetWriterExec {
454455
let bytes_written = MetricBuilder::new(&self.metrics).counter("bytes_written", partition);
455456
let rows_written = MetricBuilder::new(&self.metrics).counter("rows_written", partition);
456457

458+
let runtime_env = context.runtime_env();
457459
let input = self.input.execute(partition, context)?;
458460
let input_schema = self.input.schema();
459461
let work_dir = self.work_dir.clone();
@@ -488,7 +490,14 @@ impl ExecutionPlan for ParquetWriterExec {
488490
.set_compression(compression)
489491
.build();
490492

491-
let mut writer = Self::create_arrow_writer(&part_file, Arc::clone(&output_schema), props)?;
493+
let object_store_options = self.object_store_options.clone();
494+
let mut writer = Self::create_arrow_writer(
495+
&part_file,
496+
Arc::clone(&output_schema),
497+
props,
498+
runtime_env,
499+
&object_store_options,
500+
)?;
492501

493502
// Clone schema for use in async closure
494503
let schema_for_write = Arc::clone(&output_schema);
@@ -732,10 +741,14 @@ mod tests {
732741
// Create ParquetWriter using the create_arrow_writer method
733742
// Use full HDFS URL format
734743
let full_output_path = format!("hdfs://namenode:9000{}", output_path);
744+
let session_ctx = datafusion::prelude::SessionContext::new();
745+
let runtime_env = session_ctx.runtime_env();
735746
let mut writer = ParquetWriterExec::create_arrow_writer(
736747
&full_output_path,
737748
create_test_record_batch(1)?.schema(),
738749
props,
750+
runtime_env,
751+
&HashMap::new(),
739752
)?;
740753

741754
// Write 5 batches in a loop
@@ -802,6 +815,7 @@ mod tests {
802815
CompressionCodec::None,
803816
0, // partition_id
804817
column_names,
818+
HashMap::new(), // object_store_options
805819
)?;
806820

807821
// Create a session context and execute the plan

native/core/src/execution/planner.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1248,6 +1248,12 @@ impl PhysicalPlanner {
12481248
))),
12491249
}?;
12501250

1251+
let object_store_options: HashMap<String, String> = writer
1252+
.object_store_options
1253+
.iter()
1254+
.map(|(k, v)| (k.clone(), v.clone()))
1255+
.collect();
1256+
12511257
let parquet_writer = Arc::new(ParquetWriterExec::try_new(
12521258
Arc::clone(&child.native_plan),
12531259
writer.output_path.clone(),
@@ -1261,6 +1267,7 @@ impl PhysicalPlanner {
12611267
codec,
12621268
self.partition,
12631269
writer.column_names.clone(),
1270+
object_store_options,
12641271
)?);
12651272

12661273
Ok((

native/core/src/parquet/parquet_support.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ fn value_field(entries_field: &FieldRef) -> Option<FieldRef> {
358358
}
359359
}
360360

361-
fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap<String, String>) -> bool {
361+
pub fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap<String, String>) -> bool {
362362
const COMET_LIBHDFS_SCHEMES_KEY: &str = "fs.comet.libhdfs.schemes";
363363
let scheme = url.scheme();
364364
if let Some(libhdfs_schemes) = object_store_configs.get(COMET_LIBHDFS_SCHEMES_KEY) {
@@ -387,20 +387,26 @@ fn create_hdfs_object_store(
387387
}
388388
}
389389

390-
// Creates an HDFS object store from a URL using OpenDAL
390+
// Creates an OpenDAL HDFS Operator from a URL with optional configuration
391391
#[cfg(feature = "hdfs-opendal")]
392-
fn create_hdfs_object_store(
393-
url: &Url,
394-
) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
392+
pub(crate) fn create_hdfs_operator(url: &Url) -> Result<opendal::Operator, object_store::Error> {
395393
let name_node = get_name_node_uri(url)?;
396394
let builder = opendal::services::Hdfs::default().name_node(&name_node);
397395

398-
let op = opendal::Operator::new(builder)
396+
opendal::Operator::new(builder)
399397
.map_err(|error| object_store::Error::Generic {
400398
store: "hdfs-opendal",
401399
source: error.into(),
402-
})?
403-
.finish();
400+
})
401+
.map(|op| op.finish())
402+
}
403+
404+
// Creates an HDFS object store from a URL using OpenDAL
405+
#[cfg(feature = "hdfs-opendal")]
406+
pub(crate) fn create_hdfs_object_store(
407+
url: &Url,
408+
) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
409+
let op = create_hdfs_operator(url)?;
404410
let store = object_store_opendal::OpendalStore::new(op);
405411
let path = Path::parse(url.path())?;
406412
Ok((Box::new(store), path))

native/proto/src/proto/operator.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,13 @@ message ParquetWriter {
245245
optional string job_id = 6;
246246
// Task attempt ID for this specific task
247247
optional int32 task_attempt_id = 7;
248+
// Options for configuring object stores such as AWS S3, GCS, etc. The key-value pairs are taken
249+
// from Hadoop configuration for compatibility with Hadoop FileSystem implementations of object
250+
// stores.
251+
// The configuration values have hadoop. or spark.hadoop. prefix trimmed. For instance, the
252+
// configuration value "spark.hadoop.fs.s3a.access.key" will be stored as "fs.s3a.access.key" in
253+
// the map.
254+
map<string, string> object_store_options = 8;
248255
}
249256

250257
enum AggregateMode {

spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.comet.serde.operator
2121

22+
import java.net.URI
2223
import java.util.Locale
2324

2425
import scala.jdk.CollectionConverters._
@@ -32,6 +33,7 @@ import org.apache.spark.sql.internal.SQLConf
3233

3334
import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport}
3435
import org.apache.comet.CometSparkSessionExtensions.withInfo
36+
import org.apache.comet.objectstore.NativeConfig
3537
import org.apache.comet.serde.{CometOperatorSerde, Incompatible, OperatorOuterClass, SupportLevel, Unsupported}
3638
import org.apache.comet.serde.OperatorOuterClass.Operator
3739
import org.apache.comet.serde.QueryPlanSerde.serializeDataType
@@ -126,14 +128,24 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
126128
return None
127129
}
128130

129-
val writerOp = OperatorOuterClass.ParquetWriter
131+
val writerOpBuilder = OperatorOuterClass.ParquetWriter
130132
.newBuilder()
131133
.setOutputPath(outputPath)
132134
.setCompression(codec)
133135
.addAllColumnNames(cmd.query.output.map(_.name).asJava)
134-
// Note: work_dir, job_id, and task_attempt_id will be set at execution time
135-
// in CometNativeWriteExec, as they depend on the Spark task context
136-
.build()
136+
// Note: work_dir, job_id, and task_attempt_id will be set at execution time
137+
// in CometNativeWriteExec, as they depend on the Spark task context
138+
139+
// Collect S3/cloud storage configurations
140+
val session = op.session
141+
val hadoopConf = session.sessionState.newHadoopConfWithOptions(cmd.options)
142+
val objectStoreOptions =
143+
NativeConfig.extractObjectStoreOptions(hadoopConf, URI.create(outputPath))
144+
objectStoreOptions.foreach { case (key, value) =>
145+
writerOpBuilder.putObjectStoreOptions(key, value)
146+
}
147+
148+
val writerOp = writerOpBuilder.build()
137149

138150
val writerOperator = Operator
139151
.newBuilder()

0 commit comments

Comments
 (0)