Skip to content

Commit 5f89131

Browse files
committed
Add support for remote Parquet writer with openDAL
1 parent e097924 commit 5f89131

File tree

2 files changed

+42
-55
lines changed

2 files changed

+42
-55
lines changed

native/core/src/execution/operators/parquet_writer.rs

Lines changed: 28 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ use std::{
2929

3030
use opendal::Operator;
3131

32+
use crate::execution::shuffle::CompressionCodec;
33+
use crate::parquet::parquet_support::{
34+
create_hdfs_operator, is_hdfs_scheme, prepare_object_store_with_configs,
35+
};
3236
use arrow::datatypes::{Schema, SchemaRef};
3337
use arrow::record_batch::RecordBatch;
3438
use async_trait::async_trait;
@@ -50,9 +54,7 @@ use parquet::{
5054
basic::{Compression, ZstdLevel},
5155
file::properties::WriterProperties,
5256
};
53-
54-
use crate::execution::shuffle::CompressionCodec;
55-
use crate::parquet::parquet_support::prepare_object_store_with_configs;
57+
use url::Url;
5658

5759
/// Enum representing different types of Arrow writers based on storage backend
5860
enum ParquetWriter {
@@ -276,17 +278,14 @@ impl ParquetWriterExec {
276278
runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
277279
object_store_options: &HashMap<String, String>,
278280
) -> Result<ParquetWriter> {
279-
// Determine storage scheme from output_file_path
280-
let storage_scheme = if output_file_path.starts_with("hdfs://") {
281-
"hdfs"
282-
} else if output_file_path.starts_with("s3://") || output_file_path.starts_with("s3a://") {
283-
"s3"
284-
} else {
285-
"local"
286-
};
281+
// Parse URL and match on storage scheme directly
282+
let url = Url::parse(output_file_path).map_err(|e| {
283+
DataFusionError::Execution(format!("Failed to parse URL '{}': {}", output_file_path, e))
284+
})?;
287285

288-
match storage_scheme {
289-
"hdfs" => {
286+
if is_hdfs_scheme(&url, object_store_options) {
287+
// HDFS storage
288+
{
290289
// Use prepare_object_store_with_configs to create and register the object store
291290
let (_object_store_url, object_store_path) = prepare_object_store_with_configs(
292291
runtime_env,
@@ -305,42 +304,17 @@ impl ParquetWriterExec {
305304
let cursor = Cursor::new(buffer);
306305
let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor, schema, Some(props))
307306
.map_err(|e| {
308-
DataFusionError::Execution(format!(
309-
"Failed to create {} writer: {}",
310-
storage_scheme, e
311-
))
307+
DataFusionError::Execution(format!("Failed to create HDFS writer: {}", e))
312308
})?;
313309

314-
// Get the registered object store URL to retrieve the operator
315-
// prepare_object_store_with_configs registers an object_store, but we need OpenDAL Operator
316-
// For now, we'll create the operator directly but using the path from prepare_object_store_with_configs
317-
let url = url::Url::parse(output_file_path).map_err(|e| {
310+
// Create HDFS operator with configuration options using the helper function
311+
let op = create_hdfs_operator(&url).map_err(|e| {
318312
DataFusionError::Execution(format!(
319-
"Failed to parse URL '{}': {}",
313+
"Failed to create HDFS operator for '{}': {}",
320314
output_file_path, e
321315
))
322316
})?;
323317

324-
// Extract namenode for OpenDAL
325-
let namenode = format!(
326-
"{}://{}{}",
327-
url.scheme(),
328-
url.host_str().unwrap_or("localhost"),
329-
url.port()
330-
.map(|p| format!(":{}", p))
331-
.unwrap_or_else(|| ":9000".to_string())
332-
);
333-
334-
let builder = opendal::services::Hdfs::default().name_node(&namenode);
335-
let op = Operator::new(builder)
336-
.map_err(|e| {
337-
DataFusionError::Execution(format!(
338-
"Failed to create HDFS operator for '{}' (namenode: {}): {}",
339-
output_file_path, namenode, e
340-
))
341-
})?
342-
.finish();
343-
344318
// HDFS writer will be created lazily on first write
345319
// Use the path from prepare_object_store_with_configs
346320
Ok(ParquetWriter::Remote(
@@ -350,7 +324,12 @@ impl ParquetWriterExec {
350324
object_store_path.to_string(),
351325
))
352326
}
353-
"local" => {
327+
} else if output_file_path.starts_with("file://")
328+
|| output_file_path.starts_with("file:")
329+
|| !output_file_path.contains("://")
330+
{
331+
// Local file system
332+
{
354333
// For a local file system, write directly to file
355334
// Strip file:// or file: prefix if present
356335
let local_path = output_file_path
@@ -387,10 +366,12 @@ impl ParquetWriterExec {
387366
})?;
388367
Ok(ParquetWriter::LocalFile(writer))
389368
}
390-
_ => Err(DataFusionError::Execution(format!(
391-
"Unsupported storage scheme: {}",
392-
storage_scheme
393-
))),
369+
} else {
370+
// Unsupported storage scheme
371+
Err(DataFusionError::Execution(format!(
372+
"Unsupported storage scheme in path: {}",
373+
output_file_path
374+
)))
394375
}
395376
}
396377
}

native/core/src/parquet/parquet_support.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ fn value_field(entries_field: &FieldRef) -> Option<FieldRef> {
358358
}
359359
}
360360

361-
fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap<String, String>) -> bool {
361+
pub fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap<String, String>) -> bool {
362362
const COMET_LIBHDFS_SCHEMES_KEY: &str = "fs.comet.libhdfs.schemes";
363363
let scheme = url.scheme();
364364
if let Some(libhdfs_schemes) = object_store_configs.get(COMET_LIBHDFS_SCHEMES_KEY) {
@@ -387,20 +387,26 @@ fn create_hdfs_object_store(
387387
}
388388
}
389389

390-
// Creates an HDFS object store from a URL using OpenDAL
390+
// Creates an OpenDAL HDFS Operator from a URL with optional configuration
391391
#[cfg(feature = "hdfs-opendal")]
392-
fn create_hdfs_object_store(
393-
url: &Url,
394-
) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
392+
pub(crate) fn create_hdfs_operator(url: &Url) -> Result<opendal::Operator, object_store::Error> {
395393
let name_node = get_name_node_uri(url)?;
396394
let builder = opendal::services::Hdfs::default().name_node(&name_node);
397395

398-
let op = opendal::Operator::new(builder)
396+
opendal::Operator::new(builder)
399397
.map_err(|error| object_store::Error::Generic {
400398
store: "hdfs-opendal",
401399
source: error.into(),
402-
})?
403-
.finish();
400+
})
401+
.map(|op| op.finish())
402+
}
403+
404+
// Creates an HDFS object store from a URL using OpenDAL
405+
#[cfg(feature = "hdfs-opendal")]
406+
pub(crate) fn create_hdfs_object_store(
407+
url: &Url,
408+
) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
409+
let op = create_hdfs_operator(url)?;
404410
let store = object_store_opendal::OpendalStore::new(op);
405411
let path = Path::parse(url.path())?;
406412
Ok((Box::new(store), path))

0 commit comments

Comments
 (0)