Skip to content

Commit 03e530c

Browse files
committed
refactor: replace spawn_stream and spawn_streams with spawn_future for consistency
1 parent 0130a72 commit 03e530c

File tree

3 files changed

+8
-30
lines changed

3 files changed

+8
-30
lines changed

src/context.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use crate::udaf::PyAggregateUDF;
4545
use crate::udf::PyScalarUDF;
4646
use crate::udtf::PyTableFunction;
4747
use crate::udwf::PyWindowUDF;
48-
use crate::utils::{get_global_ctx, spawn_stream, validate_pycapsule, wait_for_future};
48+
use crate::utils::{get_global_ctx, spawn_future, validate_pycapsule, wait_for_future};
4949
use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
5050
use datafusion::arrow::pyarrow::PyArrowType;
5151
use datafusion::arrow::record_batch::RecordBatch;
@@ -1131,7 +1131,7 @@ impl PySessionContext {
11311131
) -> PyDataFusionResult<PyRecordBatchStream> {
11321132
let ctx: TaskContext = TaskContext::from(&self.ctx.state());
11331133
let plan = plan.plan.clone();
1134-
let stream = spawn_stream(py, async move { plan.execute(part, Arc::new(ctx)) })?;
1134+
let stream = spawn_future(py, async move { plan.execute(part, Arc::new(ctx)) })?;
11351135
Ok(PyRecordBatchStream::new(stream))
11361136
}
11371137
}

src/dataframe.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ use crate::physical_plan::PyExecutionPlan;
5151
use crate::record_batch::{poll_next_batch, PyRecordBatchStream};
5252
use crate::sql::logical::PyLogicalPlan;
5353
use crate::utils::{
54-
get_tokio_runtime, is_ipython_env, py_obj_to_scalar_value, spawn_stream, spawn_streams,
55-
validate_pycapsule, wait_for_future,
54+
get_tokio_runtime, is_ipython_env, py_obj_to_scalar_value, spawn_future, validate_pycapsule,
55+
wait_for_future,
5656
};
5757
use crate::{
5858
errors::PyDataFusionResult,
@@ -967,7 +967,7 @@ impl PyDataFrame {
967967
requested_schema: Option<Bound<'py, PyCapsule>>,
968968
) -> PyDataFusionResult<Bound<'py, PyCapsule>> {
969969
let df = self.df.as_ref().clone();
970-
let streams = spawn_streams(py, async move { df.execute_stream_partitioned().await })?;
970+
let streams = spawn_future(py, async move { df.execute_stream_partitioned().await })?;
971971

972972
let mut schema: Schema = self.df.schema().to_owned().into();
973973
let mut projection: Option<SchemaRef> = None;
@@ -1020,13 +1020,13 @@ impl PyDataFrame {
10201020

10211021
fn execute_stream(&self, py: Python) -> PyDataFusionResult<PyRecordBatchStream> {
10221022
let df = self.df.as_ref().clone();
1023-
let stream = spawn_stream(py, async move { df.execute_stream().await })?;
1023+
let stream = spawn_future(py, async move { df.execute_stream().await })?;
10241024
Ok(PyRecordBatchStream::new(stream))
10251025
}
10261026

10271027
fn execute_stream_partitioned(&self, py: Python) -> PyResult<Vec<PyRecordBatchStream>> {
10281028
let df = self.df.as_ref().clone();
1029-
let streams = spawn_streams(py, async move { df.execute_stream_partitioned().await })?;
1029+
let streams = spawn_future(py, async move { df.execute_stream_partitioned().await })?;
10301030
Ok(streams.into_iter().map(PyRecordBatchStream::new).collect())
10311031
}
10321032

src/utils.rs

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ use crate::{
2121
TokioRuntime,
2222
};
2323
use datafusion::{
24-
common::ScalarValue, execution::context::SessionContext, execution::SendableRecordBatchStream,
25-
logical_expr::Volatility,
24+
common::ScalarValue, execution::context::SessionContext, logical_expr::Volatility,
2625
};
2726
use pyo3::prelude::*;
2827
use pyo3::{exceptions::PyValueError, types::PyCapsule};
@@ -114,27 +113,6 @@ where
114113
Ok(inner_result?)
115114
}
116115

117-
/// Spawn a [`SendableRecordBatchStream`] on the Tokio runtime and wait for completion
118-
/// while respecting Python signal handling.
119-
pub(crate) fn spawn_stream<F>(py: Python, fut: F) -> PyDataFusionResult<SendableRecordBatchStream>
120-
where
121-
F: Future<Output = datafusion::common::Result<SendableRecordBatchStream>> + Send + 'static,
122-
{
123-
spawn_future(py, fut)
124-
}
125-
126-
/// Spawn a partitioned [`SendableRecordBatchStream`] on the Tokio runtime and wait for completion
127-
/// while respecting Python signal handling.
128-
pub(crate) fn spawn_streams<F>(
129-
py: Python,
130-
fut: F,
131-
) -> PyDataFusionResult<Vec<SendableRecordBatchStream>>
132-
where
133-
F: Future<Output = datafusion::common::Result<Vec<SendableRecordBatchStream>>> + Send + 'static,
134-
{
135-
spawn_future(py, fut)
136-
}
137-
138116
pub(crate) fn parse_volatility(value: &str) -> PyDataFusionResult<Volatility> {
139117
Ok(match value {
140118
"immutable" => Volatility::Immutable,

0 commit comments

Comments
 (0)