diff --git a/Cargo.toml b/Cargo.toml index a0723984f..4f2602316 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ description = "Apache DataFusion DataFrame and SQL Query Engine" readme = "README.md" license = "Apache-2.0" edition = "2021" -rust-version = "1.64" +rust-version = "1.78" include = ["/src", "/datafusion", "/LICENSE.txt", "pyproject.toml", "Cargo.toml", "Cargo.lock"] [features] diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py index 63c19b3e1..e0bc57f44 100644 --- a/python/datafusion/__init__.py +++ b/python/datafusion/__init__.py @@ -36,7 +36,7 @@ from .catalog import Catalog, Database, Table # The following imports are okay to remain as opaque to the user. -from ._internal import Config, runtime +from ._internal import Config from .record_batch import RecordBatchStream, RecordBatch @@ -75,7 +75,6 @@ "literal", "lit", "DFSchema", - "runtime", "Catalog", "Database", "Table", diff --git a/src/context.rs b/src/context.rs index fde442ce4..5317a3eda 100644 --- a/src/context.rs +++ b/src/context.rs @@ -982,7 +982,7 @@ impl PySessionContext { ) -> PyResult { let ctx: TaskContext = TaskContext::from(&self.ctx.state()); // create a Tokio runtime to run the async code - let rt = &get_tokio_runtime(py).0; + let rt = &get_tokio_runtime().0; let plan = plan.plan.clone(); let fut: JoinHandle> = rt.spawn(async move { plan.execute(part, Arc::new(ctx)) }); diff --git a/src/dataframe.rs b/src/dataframe.rs index 1f7f2e643..e77ca8425 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -543,7 +543,7 @@ impl PyDataFrame { fn execute_stream(&self, py: Python) -> PyResult { // create a Tokio runtime to run the async code - let rt = &get_tokio_runtime(py).0; + let rt = &get_tokio_runtime().0; let df = self.df.as_ref().clone(); let fut: JoinHandle> = rt.spawn(async move { df.execute_stream().await }); @@ -553,7 +553,7 @@ impl PyDataFrame { fn execute_stream_partitioned(&self, py: Python) -> PyResult> { // create a Tokio runtime to run the async code - let rt = &get_tokio_runtime(py).0; + let rt = &get_tokio_runtime().0; let df = self.df.as_ref().clone(); let fut: JoinHandle>> = rt.spawn(async move { df.execute_stream_partitioned().await }); diff --git a/src/lib.rs b/src/lib.rs index 98821833d..0b57e0999 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,7 +66,6 @@ pub mod utils; static GLOBAL: MiMalloc = MiMalloc; // Used to define Tokio Runtime as a Python module attribute -#[pyclass] pub(crate) struct TokioRuntime(tokio::runtime::Runtime); /// Low-level DataFusion internal package. @@ -75,11 +74,6 @@ pub(crate) struct TokioRuntime(tokio::runtime::Runtime); /// datafusion directory. #[pymodule] fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { - // Register the Tokio Runtime as a module attribute so we can reuse it - m.add( - "runtime", - TokioRuntime(tokio::runtime::Runtime::new().unwrap()), - )?; // Register the python classes m.add_class::()?; m.add_class::()?; diff --git a/src/utils.rs b/src/utils.rs index 0d72eaf75..7fb23cafe 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -20,20 +20,19 @@ use crate::TokioRuntime; use datafusion::logical_expr::Volatility; use pyo3::prelude::*; use std::future::Future; +use std::sync::OnceLock; use tokio::runtime::Runtime; /// Utility to get the Tokio Runtime from Python -pub(crate) fn get_tokio_runtime(py: Python) -> PyRef { - let datafusion = py.import_bound("datafusion._internal").unwrap(); - let tmp = datafusion.getattr("runtime").unwrap(); - match tmp.extract::>() { - Ok(runtime) => runtime, - Err(_e) => { - let rt = TokioRuntime(tokio::runtime::Runtime::new().unwrap()); - let obj: Bound<'_, TokioRuntime> = Py::new(py, rt).unwrap().into_bound(py); - obj.extract().unwrap() - } - } +#[inline] +pub(crate) fn get_tokio_runtime() -> &'static TokioRuntime { + // NOTE: Other pyo3 python libraries have had issues with using tokio + // behind a forking app-server like `gunicorn` + // If we run into that problem, in the future we can look to `delta-rs` + // which adds a check in that disallows calls from a forked process + // https://github.com/delta-io/delta-rs/blob/87010461cfe01563d91a4b9cd6fa468e2ad5f283/python/src/utils.rs#L10-L31 + static RUNTIME: OnceLock = OnceLock::new(); + RUNTIME.get_or_init(|| TokioRuntime(tokio::runtime::Runtime::new().unwrap())) } /// Utility to collect rust futures with GIL released @@ -42,7 +41,7 @@ where F: Future + Send, F::Output: Send, { - let runtime: &Runtime = &get_tokio_runtime(py).0; + let runtime: &Runtime = &get_tokio_runtime().0; py.allow_threads(|| runtime.block_on(f)) }