diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 44a3cd67aa..44fc06d16d 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -44,7 +44,6 @@ use datafusion::{ }, }; use datafusion_comet_spark_expr::hash_funcs::murmur3::create_murmur3_hashes; -use futures::executor::block_on; use futures::{StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use std::borrow::Borrow; @@ -254,11 +253,11 @@ async fn external_shuffle( }; while let Some(batch) = input.next().await { - // Block on the repartitioner to insert the batch and shuffle the rows + // Await the repartitioner to insert the batch and shuffle the rows // into the corresponding partition buffer. // Otherwise, pull the next batch from the input stream might overwrite the // current batch in the repartitioner. - block_on(repartitioner.insert_batch(batch?))?; + repartitioner.insert_batch(batch?).await?; } repartitioner.shuffle_write()?;