Skip to content

Commit 2f8bb14

Browse files
chore: Use Datafusion's existing empty stream (#1517)
* chore: Use Datafusion's existing empty stream * fmt --------- Co-authored-by: Emily Matheys <[email protected]>
1 parent 4ea60ce commit 2f8bb14

File tree

1 file changed

+6
-32
lines changed

1 file changed

+6
-32
lines changed

native/core/src/execution/shuffle/shuffle_writer.rs

Lines changed: 6 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::execution::shuffle::builders::{
2323
use crate::execution::shuffle::{CompressionCodec, ShuffleBlockWriter};
2424
use async_trait::async_trait;
2525
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
26+
use datafusion::physical_plan::EmptyRecordBatchStream;
2627
use datafusion::{
2728
arrow::{array::*, datatypes::SchemaRef, error::ArrowError, record_batch::RecordBatch},
2829
error::{DataFusionError, Result},
@@ -38,13 +39,13 @@ use datafusion::{
3839
},
3940
stream::RecordBatchStreamAdapter,
4041
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
41-
RecordBatchStream, SendableRecordBatchStream, Statistics,
42+
SendableRecordBatchStream, Statistics,
4243
},
4344
};
4445
use datafusion_comet_spark_expr::hash_funcs::murmur3::create_murmur3_hashes;
4546
use datafusion_physical_expr::EquivalenceProperties;
4647
use futures::executor::block_on;
47-
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
48+
use futures::{StreamExt, TryFutureExt, TryStreamExt};
4849
use itertools::Itertools;
4950
use std::io::Error;
5051
use std::{
@@ -54,7 +55,6 @@ use std::{
5455
fs::{File, OpenOptions},
5556
io::{BufReader, BufWriter, Cursor, Seek, SeekFrom, Write},
5657
sync::Arc,
57-
task::{Context, Poll},
5858
};
5959
use tokio::time::Instant;
6060

@@ -559,7 +559,9 @@ impl ShuffleRepartitioner {
559559
elapsed_compute.stop();
560560

561561
// shuffle writer always has empty output
562-
Ok(Box::pin(EmptyStream::try_new(Arc::clone(&self.schema))?))
562+
Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
563+
&self.schema,
564+
))))
563565
}
564566

565567
fn to_df_err(e: Error) -> DataFusionError {
@@ -863,34 +865,6 @@ impl PartitionBuffer {
863865
}
864866
}
865867

866-
/// A stream that yields no record batches which represent end of output.
867-
pub struct EmptyStream {
868-
/// Schema representing the data
869-
schema: SchemaRef,
870-
}
871-
872-
impl EmptyStream {
873-
/// Create an iterator for a vector of record batches
874-
pub fn try_new(schema: SchemaRef) -> Result<Self> {
875-
Ok(Self { schema })
876-
}
877-
}
878-
879-
impl Stream for EmptyStream {
880-
type Item = Result<RecordBatch>;
881-
882-
fn poll_next(self: std::pin::Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
883-
Poll::Ready(None)
884-
}
885-
}
886-
887-
impl RecordBatchStream for EmptyStream {
888-
/// Get the schema
889-
fn schema(&self) -> SchemaRef {
890-
Arc::clone(&self.schema)
891-
}
892-
}
893-
894868
fn pmod(hash: u32, n: usize) -> usize {
895869
let hash = hash as i32;
896870
let n = n as i32;

0 commit comments

Comments
 (0)