Skip to content

Commit bc6f5a6

Browse files
committed
map_err instead of panic
1 parent 0179619 commit bc6f5a6

File tree

2 files changed

+5
-5
lines changed

2 files changed

+5
-5
lines changed

src/context.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use pyo3::prelude::*;
3434
use crate::catalog::{PyCatalog, PyTable};
3535
use crate::dataframe::PyDataFrame;
3636
use crate::dataset::Dataset;
37-
use crate::errors::{py_datafusion_err, PyDataFusionResult};
37+
use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionResult};
3838
use crate::expr::sort_expr::PySortExpr;
3939
use crate::physical_plan::PyExecutionPlan;
4040
use crate::record_batch::PyRecordBatchStream;
@@ -1031,7 +1031,7 @@ impl PySessionContext {
10311031
let plan = plan.plan.clone();
10321032
let fut: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> =
10331033
rt.spawn(async move { plan.execute(part, Arc::new(ctx)) });
1034-
let stream = wait_for_future(py, async { fut.await.expect("Tokio task panicked") })??;
1034+
let stream = wait_for_future(py, async { fut.await.map_err(to_datafusion_err) })??;
10351035
Ok(PyRecordBatchStream::new(stream))
10361036
}
10371037
}

src/dataframe.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use pyo3::types::{PyCapsule, PyList, PyTuple, PyTupleMethods};
4343
use tokio::task::JoinHandle;
4444

4545
use crate::catalog::PyTable;
46-
use crate::errors::{py_datafusion_err, PyDataFusionError};
46+
use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionError};
4747
use crate::expr::sort_expr::to_sort_expressions;
4848
use crate::physical_plan::PyExecutionPlan;
4949
use crate::record_batch::PyRecordBatchStream;
@@ -753,7 +753,7 @@ impl PyDataFrame {
753753
let df = self.df.as_ref().clone();
754754
let fut: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> =
755755
rt.spawn(async move { df.execute_stream().await });
756-
let stream = wait_for_future(py, async { fut.await.expect("Tokio task panicked") })??;
756+
let stream = wait_for_future(py, async { fut.await.map_err(to_datafusion_err) })??;
757757
Ok(PyRecordBatchStream::new(stream))
758758
}
759759

@@ -763,7 +763,7 @@ impl PyDataFrame {
763763
let df = self.df.as_ref().clone();
764764
let fut: JoinHandle<datafusion::common::Result<Vec<SendableRecordBatchStream>>> =
765765
rt.spawn(async move { df.execute_stream_partitioned().await });
766-
let stream = wait_for_future(py, async { fut.await.expect("Tokio task panicked") })?
766+
let stream = wait_for_future(py, async { fut.await.map_err(to_datafusion_err) })?
767767
.map_err(py_datafusion_err)?;
768768

769769
Ok(stream.into_iter().map(PyRecordBatchStream::new).collect())

0 commit comments

Comments
 (0)