Skip to content

Commit 8dc53c6

Browse files
authored
Remove our own tokio runtime and use the one provided by pyo3-async-runtimes (#441)
1 parent 828d63b commit 8dc53c6

File tree

11 files changed

+39
-81
lines changed

11 files changed

+39
-81
lines changed

obstore/src/buffered.rs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,14 @@ use pyo3::exceptions::{PyIOError, PyStopAsyncIteration, PyStopIteration};
88
use pyo3::prelude::*;
99
use pyo3::types::PyString;
1010
use pyo3::{intern, IntoPyObjectExt};
11-
use pyo3_async_runtimes::tokio::future_into_py;
11+
use pyo3_async_runtimes::tokio::{future_into_py, get_runtime};
1212
use pyo3_bytes::PyBytes;
1313
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};
1414
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Lines};
1515
use tokio::sync::Mutex;
1616

1717
use crate::attributes::PyAttributes;
1818
use crate::list::PyObjectMeta;
19-
use crate::runtime::get_runtime;
2019
use crate::tags::PyTagSet;
2120

2221
#[pyfunction]
@@ -28,7 +27,7 @@ pub(crate) fn open_reader(
2827
buffer_size: usize,
2928
) -> PyObjectStoreResult<PyReadableFile> {
3029
let store = store.into_inner();
31-
let runtime = get_runtime(py)?;
30+
let runtime = get_runtime();
3231
let (reader, meta) =
3332
py.allow_threads(|| runtime.block_on(create_reader(store, path, buffer_size)))?;
3433
Ok(PyReadableFile::new(reader, meta, false))
@@ -105,7 +104,7 @@ impl PyReadableFile {
105104
let out = future_into_py(py, read(reader, size))?;
106105
Ok(out.unbind())
107106
} else {
108-
let runtime = get_runtime(py)?;
107+
let runtime = get_runtime();
109108
let out = py.allow_threads(|| runtime.block_on(read(reader, size)))?;
110109
out.into_py_any(py)
111110
}
@@ -121,7 +120,7 @@ impl PyReadableFile {
121120
let out = future_into_py(py, readline(reader))?;
122121
Ok(out.unbind())
123122
} else {
124-
let runtime = get_runtime(py)?;
123+
let runtime = get_runtime();
125124
let out = py.allow_threads(|| runtime.block_on(readline(reader)))?;
126125
out.into_py_any(py)
127126
}
@@ -135,7 +134,7 @@ impl PyReadableFile {
135134
let out = future_into_py(py, readlines(reader, hint))?;
136135
Ok(out.unbind())
137136
} else {
138-
let runtime = get_runtime(py)?;
137+
let runtime = get_runtime();
139138
let out = py.allow_threads(|| runtime.block_on(readlines(reader, hint)))?;
140139
out.into_py_any(py)
141140
}
@@ -163,7 +162,7 @@ impl PyReadableFile {
163162
let out = future_into_py(py, seek(reader, pos))?;
164163
Ok(out.unbind())
165164
} else {
166-
let runtime = get_runtime(py)?;
165+
let runtime = get_runtime();
167166
let out = py.allow_threads(|| runtime.block_on(seek(reader, pos)))?;
168167
out.into_py_any(py)
169168
}
@@ -184,7 +183,7 @@ impl PyReadableFile {
184183
let out = future_into_py(py, tell(reader))?;
185184
Ok(out.unbind())
186185
} else {
187-
let runtime = get_runtime(py)?;
186+
let runtime = get_runtime();
188187
let out = py.allow_threads(|| runtime.block_on(tell(reader)))?;
189188
out.into_py_any(py)
190189
}
@@ -267,7 +266,7 @@ impl PyLinesReader {
267266
}
268267

269268
fn __next__<'py>(&'py self, py: Python<'py>) -> PyResult<String> {
270-
let runtime = get_runtime(py)?;
269+
let runtime = get_runtime();
271270
let lines = self.0.clone();
272271
py.allow_threads(|| runtime.block_on(next_line(lines, false)))
273272
}
@@ -376,7 +375,7 @@ impl PyWritableFile {
376375
traceback: Option<PyObject>,
377376
) -> PyResult<()> {
378377
let writer = self.writer.clone();
379-
let runtime = get_runtime(py)?;
378+
let runtime = get_runtime();
380379
if exc_type.is_some() {
381380
py.allow_threads(|| runtime.block_on(abort_writer(writer)))?;
382381
} else {
@@ -395,7 +394,7 @@ impl PyWritableFile {
395394
traceback: Option<PyObject>,
396395
) -> PyResult<Bound<'py, PyAny>> {
397396
let writer = self.writer.clone();
398-
let runtime = get_runtime(py)?;
397+
let runtime = get_runtime();
399398
if exc_type.is_some() {
400399
future_into_py(py, abort_writer(writer))
401400
} else {
@@ -409,7 +408,7 @@ impl PyWritableFile {
409408
let out = future_into_py(py, close_writer(writer))?;
410409
Ok(out.unbind())
411410
} else {
412-
let runtime = get_runtime(py)?;
411+
let runtime = get_runtime();
413412
py.allow_threads(|| runtime.block_on(close_writer(writer)))?;
414413
Ok(py.None())
415414
}
@@ -436,7 +435,7 @@ impl PyWritableFile {
436435
let out = future_into_py(py, is_closed(writer))?;
437436
Ok(out.unbind())
438437
} else {
439-
let runtime = get_runtime(py)?;
438+
let runtime = get_runtime();
440439
let out = py.allow_threads(|| runtime.block_on(is_closed(writer)))?;
441440
out.into_py_any(py)
442441
}
@@ -448,7 +447,7 @@ impl PyWritableFile {
448447
let out = future_into_py(py, flush(writer))?;
449448
Ok(out.unbind())
450449
} else {
451-
let runtime = get_runtime(py)?;
450+
let runtime = get_runtime();
452451
py.allow_threads(|| runtime.block_on(flush(writer)))?;
453452
Ok(py.None())
454453
}
@@ -460,7 +459,7 @@ impl PyWritableFile {
460459
let out = future_into_py(py, write(writer, buffer))?;
461460
Ok(out.unbind())
462461
} else {
463-
let runtime = get_runtime(py)?;
462+
let runtime = get_runtime();
464463
let out = py.allow_threads(|| runtime.block_on(write(writer, buffer)))?;
465464
out.into_py_any(py)
466465
}

obstore/src/copy.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use object_store::ObjectStore;
22
use pyo3::prelude::*;
3+
use pyo3_async_runtimes::tokio::get_runtime;
34
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};
45

5-
use crate::runtime::get_runtime;
66
use crate::utils::PyNone;
77

88
#[pyfunction]
@@ -14,7 +14,7 @@ pub(crate) fn copy(
1414
to: String,
1515
overwrite: bool,
1616
) -> PyObjectStoreResult<()> {
17-
let runtime = get_runtime(py)?;
17+
let runtime = get_runtime();
1818
let from_ = from_.into();
1919
let to = to.into();
2020
py.allow_threads(|| {

obstore/src/delete.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
use futures::{StreamExt, TryStreamExt};
22
use pyo3::prelude::*;
3+
use pyo3_async_runtimes::tokio::get_runtime;
34
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};
45

56
use crate::path::PyPaths;
6-
use crate::runtime::get_runtime;
77
use crate::utils::PyNone;
88

99
#[pyfunction]
1010
pub(crate) fn delete(py: Python, store: PyObjectStore, paths: PyPaths) -> PyObjectStoreResult<()> {
11-
let runtime = get_runtime(py)?;
11+
let runtime = get_runtime();
1212
let store = store.into_inner();
1313
py.allow_threads(|| {
1414
match paths {

obstore/src/get.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ use futures::StreamExt;
99
use object_store::{Attributes, GetOptions, GetRange, GetResult, ObjectMeta, ObjectStore};
1010
use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration, PyValueError};
1111
use pyo3::prelude::*;
12+
use pyo3_async_runtimes::tokio::get_runtime;
1213
use pyo3_bytes::PyBytes;
1314
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};
1415
use tokio::sync::Mutex;
1516

1617
use crate::attributes::PyAttributes;
1718
use crate::list::PyObjectMeta;
18-
use crate::runtime::get_runtime;
1919

2020
/// 10MB default chunk size
2121
const DEFAULT_BYTES_CHUNK_SIZE: usize = 10 * 1024 * 1024;
@@ -154,7 +154,7 @@ impl PyGetResult {
154154
.unwrap()
155155
.take()
156156
.ok_or(PyValueError::new_err("Result has already been disposed."))?;
157-
let runtime = get_runtime(py)?;
157+
let runtime = get_runtime();
158158
py.allow_threads(|| {
159159
let bytes = runtime.block_on(get_result.bytes())?;
160160
Ok::<_, PyObjectStoreError>(PyBytes::new(bytes))
@@ -282,8 +282,8 @@ impl PyBytesStream {
282282
)
283283
}
284284

285-
fn __next__<'py>(&'py self, py: Python<'py>) -> PyResult<PyBytesWrapper> {
286-
let runtime = get_runtime(py)?;
285+
fn __next__(&self) -> PyResult<PyBytesWrapper> {
286+
let runtime = get_runtime();
287287
let stream = self.stream.clone();
288288
runtime.block_on(next_stream(stream, self.min_chunk_size, true))
289289
}
@@ -329,7 +329,7 @@ pub(crate) fn get(
329329
path: String,
330330
options: Option<PyGetOptions>,
331331
) -> PyObjectStoreResult<PyGetResult> {
332-
let runtime = get_runtime(py)?;
332+
let runtime = get_runtime();
333333
py.allow_threads(|| {
334334
let path = &path.into();
335335
let fut = if let Some(options) = options {
@@ -372,7 +372,7 @@ pub(crate) fn get_range(
372372
end: Option<u64>,
373373
length: Option<u64>,
374374
) -> PyObjectStoreResult<pyo3_bytes::PyBytes> {
375-
let runtime = get_runtime(py)?;
375+
let runtime = get_runtime();
376376
let range = params_to_range(start, end, length)?;
377377
py.allow_threads(|| {
378378
let out = runtime.block_on(store.as_ref().get_range(&path.into(), range))?;
@@ -426,7 +426,7 @@ pub(crate) fn get_ranges(
426426
ends: Option<Vec<u64>>,
427427
lengths: Option<Vec<u64>>,
428428
) -> PyObjectStoreResult<Vec<pyo3_bytes::PyBytes>> {
429-
let runtime = get_runtime(py)?;
429+
let runtime = get_runtime();
430430
let ranges = params_to_ranges(starts, ends, lengths)?;
431431
py.allow_threads(|| {
432432
let out = runtime.block_on(store.as_ref().get_ranges(&path.into(), &ranges))?;

obstore/src/head.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
use pyo3::prelude::*;
2+
use pyo3_async_runtimes::tokio::get_runtime;
23
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};
34

45
use crate::list::PyObjectMeta;
5-
use crate::runtime::get_runtime;
66

77
#[pyfunction]
88
pub fn head(py: Python, store: PyObjectStore, path: String) -> PyObjectStoreResult<PyObjectMeta> {
9-
let runtime = get_runtime(py)?;
9+
let runtime = get_runtime();
1010
let store = store.into_inner();
1111

1212
py.allow_threads(|| {

obstore/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ mod list;
1111
mod path;
1212
mod put;
1313
mod rename;
14-
mod runtime;
1514
mod scheme;
1615
mod signer;
1716
mod tags;

obstore/src/list.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,10 @@ use pyo3::prelude::*;
1515
use pyo3::types::PyDict;
1616
use pyo3::{intern, IntoPyObjectExt};
1717
use pyo3_arrow::{PyRecordBatch, PyTable};
18+
use pyo3_async_runtimes::tokio::get_runtime;
1819
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};
1920
use tokio::sync::Mutex;
2021

21-
use crate::runtime::get_runtime;
22-
2322
pub(crate) struct PyObjectMeta(ObjectMeta);
2423

2524
impl PyObjectMeta {
@@ -105,8 +104,8 @@ impl PyListStream {
105104
slf
106105
}
107106

108-
fn collect(&self, py: Python) -> PyResult<PyListIterResult> {
109-
let runtime = get_runtime(py)?;
107+
fn collect(&self) -> PyResult<PyListIterResult> {
108+
let runtime = get_runtime();
110109
let stream = self.stream.clone();
111110
runtime.block_on(collect_stream(stream, self.return_arrow))
112111
}
@@ -124,8 +123,8 @@ impl PyListStream {
124123
)
125124
}
126125

127-
fn __next__<'py>(&'py self, py: Python<'py>) -> PyResult<PyListIterResult> {
128-
let runtime = get_runtime(py)?;
126+
fn __next__(&self) -> PyResult<PyListIterResult> {
127+
let runtime = get_runtime();
129128
let stream = self.stream.clone();
130129
runtime.block_on(next_stream(
131130
stream,
@@ -435,7 +434,7 @@ pub(crate) fn list_with_delimiter(
435434
prefix: Option<String>,
436435
return_arrow: bool,
437436
) -> PyObjectStoreResult<PyListResult> {
438-
let runtime = get_runtime(py)?;
437+
let runtime = get_runtime();
439438
py.allow_threads(|| {
440439
let out = runtime.block_on(list_with_delimiter_materialize(
441440
store.into_inner(),

obstore/src/put.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ use pyo3::prelude::*;
1616
use pyo3::pybacked::PyBackedStr;
1717
use pyo3::types::PyDict;
1818
use pyo3::{intern, IntoPyObjectExt};
19+
use pyo3_async_runtimes::tokio::get_runtime;
1920
use pyo3_bytes::PyBytes;
2021
use pyo3_file::PyFileLikeObject;
2122
use pyo3_object_store::{PyObjectStore, PyObjectStoreResult};
2223

2324
use crate::attributes::PyAttributes;
24-
use crate::runtime::get_runtime;
2525
use crate::tags::PyTagSet;
2626

2727
pub(crate) struct PyPutMode(PutMode);
@@ -294,7 +294,6 @@ impl<'py> IntoPyObject<'py> for PyPutResult {
294294
#[pyo3(signature = (store, path, file, *, attributes=None, tags=None, mode=None, use_multipart=None, chunk_size=5242880, max_concurrency=12))]
295295
#[allow(clippy::too_many_arguments)]
296296
pub(crate) fn put(
297-
py: Python,
298297
store: PyObjectStore,
299298
path: String,
300299
mut file: PutInput,
@@ -324,7 +323,7 @@ pub(crate) fn put(
324323
}
325324
}
326325

327-
let runtime = get_runtime(py)?;
326+
let runtime = get_runtime();
328327
if use_multipart {
329328
runtime.block_on(put_multipart_inner(
330329
store.into_inner(),

obstore/src/rename.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use object_store::ObjectStore;
22
use pyo3::prelude::*;
3+
use pyo3_async_runtimes::tokio::get_runtime;
34
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};
45

5-
use crate::runtime::get_runtime;
66
use crate::utils::PyNone;
77

88
#[pyfunction]
@@ -14,7 +14,7 @@ pub(crate) fn rename(
1414
to: String,
1515
overwrite: bool,
1616
) -> PyObjectStoreResult<()> {
17-
let runtime = get_runtime(py)?;
17+
let runtime = get_runtime();
1818
let from_ = from_.into();
1919
let to = to.into();
2020
py.allow_threads(|| {

obstore/src/runtime.rs

Lines changed: 0 additions & 38 deletions
This file was deleted.

0 commit comments

Comments
 (0)