Skip to content

Commit 50e652e

Browse files
committed
WIP: Add support for remote Parquet writer with openDAL
1 parent 619f368 commit 50e652e

File tree

2 files changed

+191
-16
lines changed

2 files changed

+191
-16
lines changed

native/core/src/execution/operators/parquet_writer.rs

Lines changed: 133 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,13 @@ use std::{
2222
fmt,
2323
fmt::{Debug, Formatter},
2424
fs::File,
25+
io::Cursor,
2526
sync::Arc,
2627
};
2728

29+
use bytes::Bytes;
30+
use url::Url;
31+
2832
use arrow::datatypes::{Schema, SchemaRef};
2933
use arrow::record_batch::RecordBatch;
3034
use async_trait::async_trait;
@@ -48,6 +52,71 @@ use parquet::{
4852
};
4953

5054
use crate::execution::shuffle::CompressionCodec;
55+
use crate::parquet::parquet_support::write_to_hdfs_with_opendal_async;
56+
57+
/// Enum representing different types of Arrow writers based on storage backend
58+
enum ParquetWriter {
59+
/// Writer for local file system
60+
LocalFile(ArrowWriter<File>),
61+
/// Writer for HDFS or other remote storage (writes to in-memory buffer)
62+
/// Contains the writer and the destination HDFS path
63+
Remote(ArrowWriter<Cursor<Vec<u8>>>, String),
64+
}
65+
66+
impl ParquetWriter {
67+
/// Write a RecordBatch to the underlying writer
68+
async fn write(&mut self, batch: &RecordBatch) -> std::result::Result<(), parquet::errors::ParquetError> {
69+
match self {
70+
ParquetWriter::LocalFile(writer) => writer.write(batch),
71+
ParquetWriter::Remote(writer, output_path) => {
72+
// Write batch to in-memory buffer
73+
writer.write(batch)?;
74+
75+
// Flush and get the current buffer content
76+
writer.flush()?;
77+
let cursor = writer.inner_mut();
78+
let buffer = cursor.get_ref().clone();
79+
80+
// Upload
81+
let url = Url::parse(output_path).map_err(|e| {
82+
parquet::errors::ParquetError::General(format!(
83+
"Failed to parse URL '{}': {}",
84+
output_path, e
85+
))
86+
})?;
87+
88+
write_to_hdfs_with_opendal_async(&url, Bytes::from(buffer))
89+
.await
90+
.map_err(|e| {
91+
parquet::errors::ParquetError::General(format!(
92+
"Failed to upload to '{}': {}",
93+
output_path, e
94+
))
95+
})?;
96+
97+
// Clear the buffer after upload
98+
cursor.get_mut().clear();
99+
cursor.set_position(0);
100+
101+
Ok(())
102+
},
103+
}
104+
}
105+
106+
/// Close the writer and return the buffer for remote writers
107+
fn close(self) -> std::result::Result<Option<(Vec<u8>, String)>, parquet::errors::ParquetError> {
108+
match self {
109+
ParquetWriter::LocalFile(writer) => {
110+
writer.close()?;
111+
Ok(None)
112+
}
113+
ParquetWriter::Remote(writer, path) => {
114+
let cursor = writer.into_inner()?;
115+
Ok(Some((cursor.into_inner(), path)))
116+
}
117+
}
118+
}
119+
}
51120

52121
/// Parquet writer operator that writes input batches to a Parquet file
53122
#[derive(Debug)]
@@ -119,6 +188,59 @@ impl ParquetWriterExec {
119188
CompressionCodec::Snappy => Ok(Compression::SNAPPY),
120189
}
121190
}
191+
192+
/// Create an Arrow writer based on the storage scheme
193+
///
194+
/// # Arguments
195+
/// * `storage_scheme` - The storage backend ("hdfs", "s3", or "local")
196+
/// * `output_file_path` - The full path to the output file
197+
/// * `schema` - The Arrow schema for the Parquet file
198+
/// * `props` - Writer properties including compression
199+
///
200+
/// # Returns
201+
/// * `Ok(ParquetWriter)` - A writer appropriate for the storage scheme
202+
/// * `Err(DataFusionError)` - If writer creation fails
203+
fn create_arrow_writer(
204+
storage_scheme: &str,
205+
output_file_path: &str,
206+
schema: SchemaRef,
207+
props: WriterProperties,
208+
) -> Result<ParquetWriter> {
209+
match storage_scheme {
210+
"hdfs" | "s3" => {
211+
// For remote storage (HDFS, S3), write to an in-memory buffer
212+
let buffer = Vec::new();
213+
let cursor = Cursor::new(buffer);
214+
let writer = ArrowWriter::try_new(cursor, schema, Some(props))
215+
.map_err(|e| DataFusionError::Execution(format!(
216+
"Failed to create {} writer: {}", storage_scheme, e
217+
)))?;
218+
Ok(ParquetWriter::Remote(writer, output_file_path.to_string()))
219+
}
220+
"local" => {
221+
// For local file system, write directly to file
222+
// Strip file:// or file: prefix if present
223+
let local_path = output_file_path
224+
.strip_prefix("file://")
225+
.or_else(|| output_file_path.strip_prefix("file:"))
226+
.unwrap_or(output_file_path);
227+
228+
let file = File::create(local_path).map_err(|e| {
229+
DataFusionError::Execution(format!(
230+
"Failed to create output file '{}': {}",
231+
local_path, e
232+
))
233+
})?;
234+
235+
let writer = ArrowWriter::try_new(file, schema, Some(props))
236+
.map_err(|e| DataFusionError::Execution(format!("Failed to create local file writer: {}", e)))?;
237+
Ok(ParquetWriter::LocalFile(writer))
238+
}
239+
_ => Err(DataFusionError::Execution(format!(
240+
"Unsupported storage scheme: {}", storage_scheme
241+
))),
242+
}
243+
}
122244
}
123245

124246
impl DisplayAs for ParquetWriterExec {
@@ -217,6 +339,15 @@ impl ExecutionPlan for ParquetWriterExec {
217339
.collect();
218340
let output_schema = Arc::new(arrow::datatypes::Schema::new(fields));
219341

342+
// Determine storage scheme from work_dir
343+
let storage_scheme = if work_dir.starts_with("hdfs://") {
344+
"hdfs"
345+
} else if work_dir.starts_with("s3://") || work_dir.starts_with("s3a://") {
346+
"s3"
347+
} else {
348+
"local"
349+
};
350+
220351
// Strip file:// or file: prefix if present
221352
let local_path = work_dir
222353
.strip_prefix("file://")
@@ -243,21 +374,12 @@ impl ExecutionPlan for ParquetWriterExec {
243374
format!("{}/part-{:05}.parquet", local_path, self.partition_id)
244375
};
245376

246-
// Create the Parquet file
247-
let file = File::create(&part_file).map_err(|e| {
248-
DataFusionError::Execution(format!(
249-
"Failed to create output file '{}': {}",
250-
part_file, e
251-
))
252-
})?;
253-
254377
// Configure writer properties
255378
let props = WriterProperties::builder()
256379
.set_compression(compression)
257380
.build();
258381

259-
let mut writer = ArrowWriter::try_new(file, Arc::clone(&output_schema), Some(props))
260-
.map_err(|e| DataFusionError::Execution(format!("Failed to create writer: {}", e)))?;
382+
let mut writer = Self::create_arrow_writer(storage_scheme, &part_file, Arc::clone(&output_schema), props)?;
261383

262384
// Clone schema for use in async closure
263385
let schema_for_write = Arc::clone(&output_schema);
@@ -286,7 +408,7 @@ impl ExecutionPlan for ParquetWriterExec {
286408
batch
287409
};
288410

289-
writer.write(&renamed_batch).map_err(|e| {
411+
writer.write(&renamed_batch).await.map_err(|e| {
290412
DataFusionError::Execution(format!("Failed to write batch: {}", e))
291413
})?;
292414
}

native/core/src/parquet/parquet_support.rs

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -369,9 +369,11 @@ fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap<String, String>) ->
369369
}
370370
}
371371

372-
// Mirrors object_store::parse::parse_url for the hdfs object store
372+
// Creates an HDFS object store from a URL using the native HDFS implementation
373373
#[cfg(feature = "hdfs")]
374-
fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
374+
fn create_hdfs_object_store(
375+
url: &Url,
376+
) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
375377
match datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref())
376378
{
377379
Some(object_store) => {
@@ -385,8 +387,11 @@ fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn ObjectStore>, Path), object_stor
385387
}
386388
}
387389

390+
// Creates an HDFS object store from a URL using OpenDAL
388391
#[cfg(feature = "hdfs-opendal")]
389-
fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
392+
fn create_hdfs_object_store(
393+
url: &Url,
394+
) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
390395
let name_node = get_name_node_uri(url)?;
391396
let builder = opendal::services::Hdfs::default().name_node(&name_node);
392397

@@ -401,6 +406,51 @@ fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn ObjectStore>, Path), object_stor
401406
Ok((Box::new(store), path))
402407
}
403408

409+
/// Writes data to HDFS using OpenDAL via ObjectStore trait (asynchronous version)
410+
///
411+
/// # Arguments
412+
/// * `url` - The HDFS URL (e.g., hdfs://namenode:port/path/to/file)
413+
/// * `data` - The bytes to write to the file
414+
///
415+
/// # Returns
416+
/// * `Ok(())` on success
417+
/// * `Err(object_store::Error)` on failure
418+
///
419+
/// # Example
420+
/// ```ignore
421+
/// use url::Url;
422+
/// use bytes::Bytes;
423+
///
424+
/// let url = Url::parse("hdfs://namenode:9000/path/to/file.parquet")?;
425+
/// let data = Bytes::from("file contents");
426+
/// write_to_hdfs_with_opendal_async(&url, data).await?;
427+
/// ```
428+
#[cfg(feature = "hdfs-opendal")]
429+
pub async fn write_to_hdfs_with_opendal_async(
430+
url: &Url,
431+
data: bytes::Bytes,
432+
) -> Result<(), object_store::Error> {
433+
// Create the HDFS object store using OpenDAL
434+
let (object_store, path) = create_hdfs_object_store(url)?;
435+
436+
// Use the ObjectStore trait's put method to write the data
437+
object_store.put(&path, data.into()).await?;
438+
439+
Ok(())
440+
}
441+
442+
/// Stub implementation when hdfs-opendal feature is not enabled
443+
#[cfg(not(feature = "hdfs-opendal"))]
444+
pub async fn write_to_hdfs_with_opendal_async(
445+
_url: &Url,
446+
_data: bytes::Bytes,
447+
) -> Result<(), object_store::Error> {
448+
Err(object_store::Error::Generic {
449+
store: "hdfs-opendal",
450+
source: "HDFS OpenDAL support is not enabled in this build".into(),
451+
})
452+
}
453+
404454
#[cfg(feature = "hdfs-opendal")]
405455
fn get_name_node_uri(url: &Url) -> Result<String, object_store::Error> {
406456
use std::fmt::Write;
@@ -422,8 +472,11 @@ fn get_name_node_uri(url: &Url) -> Result<String, object_store::Error> {
422472
}
423473
}
424474

475+
// Stub implementation when HDFS support is not enabled
425476
#[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))]
426-
fn parse_hdfs_url(_url: &Url) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
477+
fn create_hdfs_object_store(
478+
_url: &Url,
479+
) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
427480
Err(object_store::Error::Generic {
428481
store: "HadoopFileSystem",
429482
source: "Hdfs support is not enabled in this build".into(),
@@ -454,7 +507,7 @@ pub(crate) fn prepare_object_store_with_configs(
454507
);
455508

456509
let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if is_hdfs_scheme {
457-
parse_hdfs_url(&url)
510+
create_hdfs_object_store(&url)
458511
} else if scheme == "s3" {
459512
objectstore::s3::create_store(&url, object_store_configs, Duration::from_secs(300))
460513
} else {

0 commit comments

Comments
 (0)