Skip to content

Commit 7b415cd

Browse files
committed
WIP: Add support for remote Parquet writer with openDAL
1 parent 50e652e commit 7b415cd

File tree

3 files changed

+170
-41
lines changed

3 files changed

+170
-41
lines changed

native/core/src/execution/operators/parquet_writer.rs

Lines changed: 93 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ use parquet::{
5454
use crate::execution::shuffle::CompressionCodec;
5555
use crate::parquet::parquet_support::write_to_hdfs_with_opendal_async;
5656

57+
#[cfg(all(test, feature = "hdfs-opendal"))]
58+
use crate::parquet::parquet_support::write_record_batch_to_hdfs;
59+
5760
/// Enum representing different types of Arrow writers based on storage backend
5861
enum ParquetWriter {
5962
/// Writer for local file system
@@ -65,7 +68,10 @@ enum ParquetWriter {
6568

6669
impl ParquetWriter {
6770
/// Write a RecordBatch to the underlying writer
68-
async fn write(&mut self, batch: &RecordBatch) -> std::result::Result<(), parquet::errors::ParquetError> {
71+
async fn write(
72+
&mut self,
73+
batch: &RecordBatch,
74+
) -> std::result::Result<(), parquet::errors::ParquetError> {
6975
match self {
7076
ParquetWriter::LocalFile(writer) => writer.write(batch),
7177
ParquetWriter::Remote(writer, output_path) => {
@@ -84,7 +90,7 @@ impl ParquetWriter {
8490
output_path, e
8591
))
8692
})?;
87-
93+
8894
write_to_hdfs_with_opendal_async(&url, Bytes::from(buffer))
8995
.await
9096
.map_err(|e| {
@@ -99,12 +105,14 @@ impl ParquetWriter {
99105
cursor.set_position(0);
100106

101107
Ok(())
102-
},
108+
}
103109
}
104110
}
105111

106112
/// Close the writer and return the buffer for remote writers
107-
fn close(self) -> std::result::Result<Option<(Vec<u8>, String)>, parquet::errors::ParquetError> {
113+
fn close(
114+
self,
115+
) -> std::result::Result<Option<(Vec<u8>, String)>, parquet::errors::ParquetError> {
108116
match self {
109117
ParquetWriter::LocalFile(writer) => {
110118
writer.close()?;
@@ -211,10 +219,12 @@ impl ParquetWriterExec {
211219
// For remote storage (HDFS, S3), write to an in-memory buffer
212220
let buffer = Vec::new();
213221
let cursor = Cursor::new(buffer);
214-
let writer = ArrowWriter::try_new(cursor, schema, Some(props))
215-
.map_err(|e| DataFusionError::Execution(format!(
216-
"Failed to create {} writer: {}", storage_scheme, e
217-
)))?;
222+
let writer = ArrowWriter::try_new(cursor, schema, Some(props)).map_err(|e| {
223+
DataFusionError::Execution(format!(
224+
"Failed to create {} writer: {}",
225+
storage_scheme, e
226+
))
227+
})?;
218228
Ok(ParquetWriter::Remote(writer, output_file_path.to_string()))
219229
}
220230
"local" => {
@@ -232,12 +242,14 @@ impl ParquetWriterExec {
232242
))
233243
})?;
234244

235-
let writer = ArrowWriter::try_new(file, schema, Some(props))
236-
.map_err(|e| DataFusionError::Execution(format!("Failed to create local file writer: {}", e)))?;
245+
let writer = ArrowWriter::try_new(file, schema, Some(props)).map_err(|e| {
246+
DataFusionError::Execution(format!("Failed to create local file writer: {}", e))
247+
})?;
237248
Ok(ParquetWriter::LocalFile(writer))
238249
}
239250
_ => Err(DataFusionError::Execution(format!(
240-
"Unsupported storage scheme: {}", storage_scheme
251+
"Unsupported storage scheme: {}",
252+
storage_scheme
241253
))),
242254
}
243255
}
@@ -379,7 +391,12 @@ impl ExecutionPlan for ParquetWriterExec {
379391
.set_compression(compression)
380392
.build();
381393

382-
let mut writer = Self::create_arrow_writer(storage_scheme, &part_file, Arc::clone(&output_schema), props)?;
394+
let mut writer = Self::create_arrow_writer(
395+
storage_scheme,
396+
&part_file,
397+
Arc::clone(&output_schema),
398+
props,
399+
)?;
383400

384401
// Clone schema for use in async closure
385402
let schema_for_write = Arc::clone(&output_schema);
@@ -444,3 +461,67 @@ impl ExecutionPlan for ParquetWriterExec {
444461
)))
445462
}
446463
}
464+
465+
#[cfg(test)]
466+
mod tests {
467+
use super::*;
468+
use arrow::array::{Int32Array, StringArray};
469+
use arrow::datatypes::{DataType, Field, Schema};
470+
use std::sync::Arc;
471+
472+
/// Helper function to create a test RecordBatch with 1000 rows of (int, string) data
473+
fn create_test_record_batch() -> Result<RecordBatch> {
474+
let num_rows = 1000;
475+
476+
// Create int column with values 0..1000
477+
let int_array = Int32Array::from_iter_values(0..num_rows);
478+
479+
// Create string column with values "value_0", "value_1", ..., "value_999"
480+
let string_values: Vec<String> = (0..num_rows)
481+
.map(|i| format!("value_{}", i))
482+
.collect();
483+
let string_array = StringArray::from(string_values);
484+
485+
// Define schema
486+
let schema = Arc::new(Schema::new(vec![
487+
Field::new("id", DataType::Int32, false),
488+
Field::new("name", DataType::Utf8, false),
489+
]));
490+
491+
// Create RecordBatch
492+
RecordBatch::try_new(
493+
schema,
494+
vec![Arc::new(int_array), Arc::new(string_array)],
495+
)
496+
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
497+
}
498+
499+
#[tokio::test]
500+
//#[cfg(feature = "hdfs-opendal")]
501+
async fn test_write_to_hdfs() -> Result<()> {
502+
use opendal::services::Hdfs;
503+
use opendal::Operator;
504+
505+
// Create test data
506+
let batch = create_test_record_batch()?;
507+
508+
// Configure HDFS connection
509+
let namenode = "hdfs://namenode:9000";
510+
let output_path = "/user/test_write/data.parquet";
511+
512+
// Create OpenDAL HDFS operator
513+
let builder = Hdfs::default().name_node(namenode);
514+
let op = Operator::new(builder)
515+
.map_err(|e| DataFusionError::Execution(format!("Failed to create HDFS operator: {}", e)))?
516+
.finish();
517+
518+
// Write the batch using write_record_batch_to_hdfs
519+
write_record_batch_to_hdfs(&op, output_path, batch)
520+
.await
521+
.map_err(|e| DataFusionError::Execution(format!("Failed to write to HDFS: {}", e)))?;
522+
523+
println!("Successfully wrote 1000 rows to HDFS at {}{}", namenode, output_path);
524+
525+
Ok(())
526+
}
527+
}

native/core/src/parquet/parquet_support.rs

Lines changed: 74 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -406,35 +406,82 @@ fn create_hdfs_object_store(
406406
Ok((Box::new(store), path))
407407
}
408408

409-
/// Writes data to HDFS using OpenDAL via ObjectStore trait (asynchronous version)
410-
///
411-
/// # Arguments
412-
/// * `url` - The HDFS URL (e.g., hdfs://namenode:port/path/to/file)
413-
/// * `data` - The bytes to write to the file
414-
///
415-
/// # Returns
416-
/// * `Ok(())` on success
417-
/// * `Err(object_store::Error)` on failure
418-
///
419-
/// # Example
420-
/// ```ignore
421-
/// use url::Url;
422-
/// use bytes::Bytes;
423-
///
424-
/// let url = Url::parse("hdfs://namenode:9000/path/to/file.parquet")?;
425-
/// let data = Bytes::from("file contents");
426-
/// write_to_hdfs_with_opendal_async(&url, data).await?;
427-
/// ```
409+
use tokio::sync::mpsc;
410+
411+
struct ChannelWriter {
412+
sender: mpsc::Sender<Vec<u8>>,
413+
}
414+
415+
impl std::io::Write for ChannelWriter {
416+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
417+
self.sender
418+
.blocking_send(buf.to_vec())
419+
.map_err(|_| {
420+
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "channel closed")
421+
})?;
422+
Ok(buf.len())
423+
}
424+
425+
fn flush(&mut self) -> std::io::Result<()> {
426+
Ok(())
427+
}
428+
}
429+
428430
#[cfg(feature = "hdfs-opendal")]
429-
pub async fn write_to_hdfs_with_opendal_async(
430-
url: &Url,
431-
data: bytes::Bytes,
432-
) -> Result<(), object_store::Error> {
433-
// Create the HDFS object store using OpenDAL
434-
let (object_store, path) = create_hdfs_object_store(url)?;
431+
use opendal::Operator;
432+
use parquet::arrow::ArrowWriter;
433+
use arrow::record_batch::RecordBatch;
434+
use parquet::file::properties::WriterProperties;
435+
436+
#[cfg(feature = "hdfs-opendal")]
437+
pub async fn write_record_batch_to_hdfs(
438+
op: &Operator,
439+
path: &str,
440+
batch: RecordBatch,
441+
) -> Result<(), opendal::Error> {
442+
// ------------------------------------------------------------
443+
// 1. Open async HDFS writer
444+
// ------------------------------------------------------------
445+
let mut hdfs_writer = op.writer(path).await?;
446+
447+
// ------------------------------------------------------------
448+
// 2. Channel between blocking and async worlds
449+
// ------------------------------------------------------------
450+
let (tx, mut rx) = mpsc::channel::<Vec<u8>>(8);
451+
452+
let schema = batch.schema();
453+
454+
// ------------------------------------------------------------
455+
// 3. Blocking Parquet writer
456+
// ------------------------------------------------------------
457+
let parquet_task = tokio::task::spawn_blocking(move || -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
458+
let props = WriterProperties::builder().build();
459+
let channel_writer = ChannelWriter { sender: tx };
460+
461+
let mut writer = ArrowWriter::try_new(
462+
channel_writer,
463+
schema,
464+
Some(props),
465+
)?;
466+
467+
writer.write(&batch)?;
468+
writer.close()?; // important to flush remaining data
469+
470+
Ok(())
471+
});
472+
473+
// ------------------------------------------------------------
474+
// 4. Async HDFS consumer
475+
// ------------------------------------------------------------
476+
while let Some(chunk) = rx.recv().await {
477+
hdfs_writer.write(chunk).await?;
478+
}
435479

436-
// Use the ObjectStore trait's put method to write the data
437-
object_store.put(&path, data.into()).await?;
480+
hdfs_writer.close().await?;
481+
parquet_task
482+
.await
483+
.map_err(|e| opendal::Error::new(opendal::ErrorKind::Unexpected, &format!("task join failed: {}", e)))?
484+
.map_err(|e| opendal::Error::new(opendal::ErrorKind::Unexpected, &format!("parquet write failed: {}", e)))?;
438485

439486
Ok(())
440487
}

spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
5252
case cmd: InsertIntoHadoopFsRelationCommand =>
5353
cmd.fileFormat match {
5454
case _: ParquetFileFormat =>
55-
if (!cmd.outputPath.toString.startsWith("file:")) {
56-
return Unsupported(Some("Only local filesystem output paths are supported"))
55+
if (!cmd.outputPath.toString.startsWith("file:") && !cmd.outputPath.toString
56+
.startsWith("hdfs:")) {
57+
return Unsupported(Some("Only HDFS/local filesystems output paths are supported"))
5758
}
5859

5960
if (cmd.bucketSpec.isDefined) {

0 commit comments

Comments
 (0)