Skip to content

Commit 42ab687

Browse files
committed
Fix error handling in async stream execution for PySessionContext and PyDataFrame
1 parent bc6f5a6 commit 42ab687

File tree

2 files changed

+3
-2
lines changed

2 files changed

+3
-2
lines changed

src/context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1031,7 +1031,7 @@ impl PySessionContext {
10311031
let plan = plan.plan.clone();
10321032
let fut: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> =
10331033
rt.spawn(async move { plan.execute(part, Arc::new(ctx)) });
1034-
let stream = wait_for_future(py, async { fut.await.map_err(to_datafusion_err) })??;
1034+
let stream = wait_for_future(py, async { fut.await.map_err(to_datafusion_err) })???;
10351035
Ok(PyRecordBatchStream::new(stream))
10361036
}
10371037
}

src/dataframe.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,7 @@ impl PyDataFrame {
753753
let df = self.df.as_ref().clone();
754754
let fut: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> =
755755
rt.spawn(async move { df.execute_stream().await });
756-
let stream = wait_for_future(py, async { fut.await.map_err(to_datafusion_err) })??;
756+
let stream = wait_for_future(py, async { fut.await.map_err(to_datafusion_err) })???;
757757
Ok(PyRecordBatchStream::new(stream))
758758
}
759759

@@ -764,6 +764,7 @@ impl PyDataFrame {
764764
let fut: JoinHandle<datafusion::common::Result<Vec<SendableRecordBatchStream>>> =
765765
rt.spawn(async move { df.execute_stream_partitioned().await });
766766
let stream = wait_for_future(py, async { fut.await.map_err(to_datafusion_err) })?
767+
.map_err(py_datafusion_err)?
767768
.map_err(py_datafusion_err)?;
768769

769770
Ok(stream.into_iter().map(PyRecordBatchStream::new).collect())

0 commit comments

Comments
 (0)