Skip to content

Commit 72e9bfe

Browse files
committed
[WIP] opendal writes
1 parent 4605089 commit 72e9bfe

File tree

2 files changed

+151
-144
lines changed

2 files changed

+151
-144
lines changed

native/core/src/execution/operators/parquet_writer.rs

Lines changed: 151 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use std::{
2626
sync::Arc,
2727
};
2828

29-
use bytes::Bytes;
3029
use opendal::{services::Hdfs, Operator};
3130
use url::Url;
3231

@@ -54,17 +53,20 @@ use parquet::{
5453

5554
use crate::execution::shuffle::CompressionCodec;
5655

57-
#[cfg(all(test, feature = "hdfs-opendal"))]
58-
use crate::parquet::parquet_support::write_record_batch_to_hdfs;
59-
6056
/// Enum representing different types of Arrow writers based on storage backend
6157
enum ParquetWriter {
6258
/// Writer for local file system
6359
LocalFile(ArrowWriter<File>),
6460
/// Writer for HDFS or other remote storage (writes to in-memory buffer)
6561
/// Contains the arrow writer, HDFS operator, and destination path
62+
/// an Arrow writer writes to in-memory buffer the data converted to Parquet format
6663
/// The opendal::Writer is created lazily on first write
67-
Remote(ArrowWriter<Cursor<Vec<u8>>>, Option<opendal::Writer>, Operator, String),
64+
Remote(
65+
ArrowWriter<Cursor<Vec<u8>>>,
66+
Option<opendal::Writer>,
67+
Operator,
68+
String,
69+
),
6870
}
6971

7072
impl ParquetWriter {
@@ -75,7 +77,12 @@ impl ParquetWriter {
7577
) -> std::result::Result<(), parquet::errors::ParquetError> {
7678
match self {
7779
ParquetWriter::LocalFile(writer) => writer.write(batch),
78-
ParquetWriter::Remote(arrow_parquet_buffer_writer, hdfs_writer_opt, op, output_path) => {
80+
ParquetWriter::Remote(
81+
arrow_parquet_buffer_writer,
82+
hdfs_writer_opt,
83+
op,
84+
output_path,
85+
) => {
7986
// Write batch to in-memory buffer
8087
arrow_parquet_buffer_writer.write(batch)?;
8188

@@ -88,7 +95,8 @@ impl ParquetWriter {
8895
if hdfs_writer_opt.is_none() {
8996
let writer = op.writer(output_path.as_str()).await.map_err(|e| {
9097
parquet::errors::ParquetError::External(
91-
format!("Failed to create HDFS writer: {}", e).into()
98+
format!("Failed to create HDFS writer for '{}': {}", output_path, e)
99+
.into(),
92100
)
93101
})?;
94102
*hdfs_writer_opt = Some(writer);
@@ -98,7 +106,11 @@ impl ParquetWriter {
98106
if let Some(hdfs_writer) = hdfs_writer_opt {
99107
hdfs_writer.write(current_data).await.map_err(|e| {
100108
parquet::errors::ParquetError::External(
101-
format!("Failed to write batch to HDFS: {}", e).into()
109+
format!(
110+
"Failed to write batch to HDFS file '{}': {}",
111+
output_path, e
112+
)
113+
.into(),
102114
)
103115
})?;
104116
}
@@ -113,15 +125,18 @@ impl ParquetWriter {
113125
}
114126

115127
/// Close the writer and finalize the file
116-
async fn close(
117-
self,
118-
) -> std::result::Result<(), parquet::errors::ParquetError> {
128+
async fn close(self) -> std::result::Result<(), parquet::errors::ParquetError> {
119129
match self {
120130
ParquetWriter::LocalFile(writer) => {
121131
writer.close()?;
122132
Ok(())
123133
}
124-
ParquetWriter::Remote(arrow_parquet_buffer_writer, mut hdfs_writer_opt, op, output_path) => {
134+
ParquetWriter::Remote(
135+
arrow_parquet_buffer_writer,
136+
mut hdfs_writer_opt,
137+
op,
138+
output_path,
139+
) => {
125140
// Close the arrow writer to finalize parquet format
126141
let cursor = arrow_parquet_buffer_writer.into_inner()?;
127142
let final_data = cursor.into_inner();
@@ -130,7 +145,8 @@ impl ParquetWriter {
130145
if hdfs_writer_opt.is_none() && !final_data.is_empty() {
131146
let writer = op.writer(output_path.as_str()).await.map_err(|e| {
132147
parquet::errors::ParquetError::External(
133-
format!("Failed to create HDFS writer: {}", e).into()
148+
format!("Failed to create HDFS writer for '{}': {}", output_path, e)
149+
.into(),
134150
)
135151
})?;
136152
hdfs_writer_opt = Some(writer);
@@ -141,14 +157,19 @@ impl ParquetWriter {
141157
if let Some(mut hdfs_writer) = hdfs_writer_opt {
142158
hdfs_writer.write(final_data).await.map_err(|e| {
143159
parquet::errors::ParquetError::External(
144-
format!("Failed to write final data to HDFS: {}", e).into()
160+
format!(
161+
"Failed to write final data to HDFS file '{}': {}",
162+
output_path, e
163+
)
164+
.into(),
145165
)
146166
})?;
147167

148168
// Close the HDFS writer
149169
hdfs_writer.close().await.map_err(|e| {
150170
parquet::errors::ParquetError::External(
151-
format!("Failed to close HDFS writer: {}", e).into()
171+
format!("Failed to close HDFS writer for '{}': {}", output_path, e)
172+
.into(),
152173
)
153174
})?;
154175
}
@@ -243,13 +264,43 @@ impl ParquetWriterExec {
243264
/// * `Ok(ParquetWriter)` - A writer appropriate for the storage scheme
244265
/// * `Err(DataFusionError)` - If writer creation fails
245266
fn create_arrow_writer(
246-
storage_scheme: &str,
247267
output_file_path: &str,
248268
schema: SchemaRef,
249269
props: WriterProperties,
250270
) -> Result<ParquetWriter> {
271+
// Determine storage scheme from output_file_path
272+
let storage_scheme = if output_file_path.starts_with("hdfs://") {
273+
"hdfs"
274+
} else if output_file_path.starts_with("s3://") || output_file_path.starts_with("s3a://") {
275+
"s3"
276+
} else {
277+
"local"
278+
};
279+
251280
match storage_scheme {
252-
"hdfs" | "s3" => {
281+
"hdfs" => {
282+
// Parse the output_file_path to extract namenode and path
283+
// Expected format: hdfs://namenode:port/path/to/file
284+
let url = Url::parse(output_file_path).map_err(|e| {
285+
DataFusionError::Execution(format!(
286+
"Failed to parse HDFS URL '{}': {}",
287+
output_file_path, e
288+
))
289+
})?;
290+
291+
// Extract namenode (scheme + host + port)
292+
let namenode = format!(
293+
"{}://{}{}",
294+
url.scheme(),
295+
url.host_str().unwrap_or("localhost"),
296+
url.port()
297+
.map(|p| format!(":{}", p))
298+
.unwrap_or_else(|| ":9000".to_string())
299+
);
300+
301+
// Extract the path (without the scheme and host)
302+
let hdfs_path = url.path().to_string();
303+
253304
// For remote storage (HDFS, S3), write to an in-memory buffer
254305
let buffer = Vec::new();
255306
let cursor = Cursor::new(buffer);
@@ -261,19 +312,23 @@ impl ParquetWriterExec {
261312
))
262313
})?;
263314

264-
let builder = Hdfs::default().name_node("hdfs://namenode:9000");
315+
let builder = Hdfs::default().name_node(&namenode);
265316
let op = Operator::new(builder)
266317
.map_err(|e| {
267-
DataFusionError::Execution(format!("Failed to create HDFS operator: {}", e))
318+
DataFusionError::Execution(format!(
319+
"Failed to create HDFS operator for '{}' (namenode: {}): {}",
320+
output_file_path, namenode, e
321+
))
268322
})?
269323
.finish();
270324

271325
// HDFS writer will be created lazily on first write
326+
// Use only the path part for the HDFS writer
272327
Ok(ParquetWriter::Remote(
273328
arrow_parquet_buffer_writer,
274329
None,
275330
op,
276-
output_file_path.to_string(),
331+
hdfs_path,
277332
))
278333
}
279334
"local" => {
@@ -284,6 +339,14 @@ impl ParquetWriterExec {
284339
.or_else(|| output_file_path.strip_prefix("file:"))
285340
.unwrap_or(output_file_path);
286341

342+
// Create output directory
343+
std::fs::create_dir_all(&local_path).map_err(|e| {
344+
DataFusionError::Execution(format!(
345+
"Failed to create output directory '{}': {}",
346+
local_path, e
347+
))
348+
})?;
349+
287350
let file = File::create(local_path).map_err(|e| {
288351
DataFusionError::Execution(format!(
289352
"Failed to create output file '{}': {}",
@@ -400,52 +463,23 @@ impl ExecutionPlan for ParquetWriterExec {
400463
.collect();
401464
let output_schema = Arc::new(arrow::datatypes::Schema::new(fields));
402465

403-
// Determine storage scheme from work_dir
404-
let storage_scheme = if work_dir.starts_with("hdfs://") {
405-
"hdfs"
406-
} else if work_dir.starts_with("s3://") || work_dir.starts_with("s3a://") {
407-
"s3"
408-
} else {
409-
"local"
410-
};
411-
412-
// Strip file:// or file: prefix if present
413-
let local_path = work_dir
414-
.strip_prefix("file://")
415-
.or_else(|| work_dir.strip_prefix("file:"))
416-
.unwrap_or(&work_dir)
417-
.to_string();
418-
419-
// Create output directory
420-
std::fs::create_dir_all(&local_path).map_err(|e| {
421-
DataFusionError::Execution(format!(
422-
"Failed to create output directory '{}': {}",
423-
local_path, e
424-
))
425-
})?;
426-
427466
// Generate part file name for this partition
428467
// If using FileCommitProtocol (work_dir is set), include task_attempt_id in the filename
429468
let part_file = if let Some(attempt_id) = task_attempt_id {
430469
format!(
431470
"{}/part-{:05}-{:05}.parquet",
432-
local_path, self.partition_id, attempt_id
471+
work_dir, self.partition_id, attempt_id
433472
)
434473
} else {
435-
format!("{}/part-{:05}.parquet", local_path, self.partition_id)
474+
format!("{}/part-{:05}.parquet", work_dir, self.partition_id)
436475
};
437476

438477
// Configure writer properties
439478
let props = WriterProperties::builder()
440479
.set_compression(compression)
441480
.build();
442481

443-
let mut writer = Self::create_arrow_writer(
444-
storage_scheme,
445-
&part_file,
446-
Arc::clone(&output_schema),
447-
props,
448-
)?;
482+
let mut writer = Self::create_arrow_writer(&part_file, Arc::clone(&output_schema), props)?;
449483

450484
// Clone schema for use in async closure
451485
let schema_for_write = Arc::clone(&output_schema);
@@ -516,7 +550,6 @@ mod tests {
516550
use super::*;
517551
use arrow::array::{Int32Array, StringArray};
518552
use arrow::datatypes::{DataType, Field, Schema};
519-
use arrow::record_batch;
520553
use std::sync::Arc;
521554

522555
/// Helper function to create a test RecordBatch with 1000 rows of (int, string) data
@@ -684,17 +717,18 @@ mod tests {
684717
.build();
685718

686719
// Create ParquetWriter using the create_arrow_writer method
720+
// Use full HDFS URL format
721+
let full_output_path = format!("hdfs://namenode:9000{}", output_path);
687722
let mut writer = ParquetWriterExec::create_arrow_writer(
688-
"hdfs",
689-
output_path,
723+
&full_output_path,
690724
create_test_record_batch(1)?.schema(),
691725
props,
692726
)?;
693727

694728
// Write 5 batches in a loop
695729
for i in 1..=5 {
696730
let record_batch = create_test_record_batch(i)?;
697-
731+
698732
writer.write(&record_batch).await.map_err(|e| {
699733
DataFusionError::Execution(format!("Failed to write batch {}: {}", i, e))
700734
})?;
@@ -706,9 +740,10 @@ mod tests {
706740
}
707741

708742
// Close the writer
709-
writer.close().await.map_err(|e| {
710-
DataFusionError::Execution(format!("Failed to close writer: {}", e))
711-
})?;
743+
writer
744+
.close()
745+
.await
746+
.map_err(|e| DataFusionError::Execution(format!("Failed to close writer: {}", e)))?;
712747

713748
println!(
714749
"Successfully completed ParquetWriter streaming write of 5 batches (5000 total rows) to HDFS at {}",
@@ -717,4 +752,61 @@ mod tests {
717752

718753
Ok(())
719754
}
755+
756+
#[tokio::test]
757+
#[cfg(feature = "hdfs-opendal")]
758+
async fn test_parquet_writer_exec_with_memory_input() -> Result<()> {
759+
use datafusion::datasource::memory::MemorySourceConfig;
760+
use datafusion::datasource::source::DataSourceExec;
761+
use datafusion::prelude::SessionContext;
762+
763+
// Create 5 batches for the DataSourceExec input
764+
let mut batches = Vec::new();
765+
for i in 1..=5 {
766+
batches.push(create_test_record_batch(i)?);
767+
}
768+
769+
// Get schema from the first batch
770+
let schema = batches[0].schema();
771+
772+
// Create DataSourceExec with MemorySourceConfig containing the 5 batches as a single partition
773+
let partitions = vec![batches];
774+
let memory_source_config = MemorySourceConfig::try_new(&partitions, schema, None)?;
775+
let memory_exec = Arc::new(DataSourceExec::new(Arc::new(memory_source_config)));
776+
777+
// Create ParquetWriterExec with DataSourceExec as input
778+
let output_path = "unused".to_string();
779+
let work_dir = "hdfs://namenode:9000/user/test_parquet_writer_exec".to_string();
780+
let column_names = vec!["id".to_string(), "name".to_string()];
781+
782+
let parquet_writer = ParquetWriterExec::try_new(
783+
memory_exec,
784+
output_path,
785+
work_dir,
786+
None, // job_id
787+
Some(123), // task_attempt_id
788+
CompressionCodec::None,
789+
0, // partition_id
790+
column_names,
791+
)?;
792+
793+
// Create a session context and execute the plan
794+
let session_ctx = SessionContext::new();
795+
let task_ctx = session_ctx.task_ctx();
796+
797+
// Execute partition 0
798+
let mut stream = parquet_writer.execute(0, task_ctx)?;
799+
800+
// Consume the stream (this triggers the write)
801+
while let Some(batch_result) = stream.try_next().await? {
802+
// The stream should be empty as ParquetWriterExec returns empty batches
803+
assert_eq!(batch_result.num_rows(), 0);
804+
}
805+
806+
println!(
807+
"Successfully completed ParquetWriterExec test with DataSourceExec input (5 batches, 5000 total rows)"
808+
);
809+
810+
Ok(())
811+
}
720812
}

0 commit comments

Comments
 (0)