Skip to content

Commit bef97bb

Browse files
authored
Use pyo3-bytes from obstore (#128)
* Use pyo3-bytes from obstore * Use pyo3 bytes in put
1 parent b937a8c commit bef97bb

File tree

5 files changed

+36
-30
lines changed

5 files changed

+36
-30
lines changed

Cargo.lock

Lines changed: 13 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

obstore/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ object_store = { workspace = true }
2828
pyo3 = { workspace = true, features = ["chrono"] }
2929
pyo3-arrow = "0.6"
3030
pyo3-async-runtimes = { workspace = true, features = ["tokio-runtime"] }
31+
pyo3-bytes = "0.1.2"
3132
pyo3-file = { workspace = true }
3233
pyo3-object_store = { path = "../pyo3-object_store" }
3334
tokio = { workspace = true, features = [

obstore/src/buffered.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
use std::io::SeekFrom;
22
use std::sync::Arc;
33

4-
use arrow::buffer::Buffer;
4+
use bytes::Bytes;
55
use object_store::buffered::BufReader;
66
use pyo3::exceptions::{PyIOError, PyStopAsyncIteration, PyStopIteration};
77
use pyo3::prelude::*;
8-
use pyo3_arrow::buffer::PyArrowBuffer;
98
use pyo3_async_runtimes::tokio::future_into_py;
9+
use pyo3_bytes::PyBytes;
1010
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};
1111
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, Lines};
1212
use tokio::sync::Mutex;
@@ -154,34 +154,34 @@ impl PyReadableFile {
154154
}
155155
}
156156

157-
async fn read(reader: Arc<Mutex<BufReader>>, size: Option<usize>) -> PyResult<PyArrowBuffer> {
157+
async fn read(reader: Arc<Mutex<BufReader>>, size: Option<usize>) -> PyResult<PyBytes> {
158158
let mut reader = reader.lock().await;
159159
if let Some(size) = size {
160160
let mut buf = vec![0; size as _];
161161
reader.read_exact(&mut buf).await?;
162-
Ok(PyArrowBuffer::new(Buffer::from_vec(buf)))
162+
Ok(Bytes::from(buf).into())
163163
} else {
164164
let mut buf = Vec::new();
165165
reader.read_to_end(&mut buf).await?;
166-
Ok(PyArrowBuffer::new(Buffer::from_vec(buf)))
166+
Ok(Bytes::from(buf).into())
167167
}
168168
}
169169

170-
async fn readline(reader: Arc<Mutex<BufReader>>) -> PyResult<PyArrowBuffer> {
170+
async fn readline(reader: Arc<Mutex<BufReader>>) -> PyResult<PyBytes> {
171171
let mut reader = reader.lock().await;
172172
let mut buf = String::new();
173173
reader.read_line(&mut buf).await?;
174-
Ok(PyArrowBuffer::new(Buffer::from_vec(buf.into_bytes())))
174+
Ok(Bytes::from(buf.into_bytes()).into())
175175
}
176176

177-
async fn readlines(reader: Arc<Mutex<BufReader>>, hint: i64) -> PyResult<Vec<PyArrowBuffer>> {
177+
async fn readlines(reader: Arc<Mutex<BufReader>>, hint: i64) -> PyResult<Vec<PyBytes>> {
178178
let mut reader = reader.lock().await;
179179
if hint <= 0 {
180180
let mut lines = Vec::new();
181181
loop {
182182
let mut buf = String::new();
183183
let n = reader.read_line(&mut buf).await?;
184-
lines.push(PyArrowBuffer::new(Buffer::from_vec(buf.into_bytes())));
184+
lines.push(Bytes::from(buf.into_bytes()).into());
185185
// Ok(0) signifies EOF
186186
if n == 0 {
187187
return Ok(lines);
@@ -198,7 +198,7 @@ async fn readlines(reader: Arc<Mutex<BufReader>>, hint: i64) -> PyResult<Vec<PyA
198198
let mut buf = String::new();
199199
let n = reader.read_line(&mut buf).await?;
200200
byte_count += n;
201-
lines.push(PyArrowBuffer::new(Buffer::from_vec(buf.into_bytes())));
201+
lines.push(Bytes::from(buf.into_bytes()).into());
202202
// Ok(0) signifies EOF
203203
if n == 0 {
204204
return Ok(lines);

obstore/src/get.rs

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
use std::collections::HashMap;
22
use std::sync::Arc;
33

4-
use arrow::buffer::Buffer;
54
use bytes::Bytes;
65
use chrono::{DateTime, Utc};
76
use futures::stream::{BoxStream, Fuse};
87
use futures::StreamExt;
98
use object_store::{GetOptions, GetRange, GetResult, ObjectStore};
109
use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration, PyValueError};
1110
use pyo3::prelude::*;
12-
use pyo3::types::PyBytes;
13-
use pyo3_arrow::buffer::PyArrowBuffer;
1411
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};
1512
use tokio::sync::Mutex;
1613

@@ -302,7 +299,7 @@ impl PyBytesWrapper {
302299
// support the buffer protocol in the future (e.g. for get_range) you may need to have a separate
303300
// wrapper of Bytes
304301
impl<'py> IntoPyObject<'py> for PyBytesWrapper {
305-
type Target = PyBytes;
302+
type Target = pyo3::types::PyBytes;
306303
type Output = Bound<'py, Self::Target>;
307304
type Error = PyErr;
308305

@@ -311,7 +308,7 @@ impl<'py> IntoPyObject<'py> for PyBytesWrapper {
311308

312309
// Copy all internal Bytes objects into a single PyBytes
313310
// Since our inner callback is infallible, this will only panic on out of memory
314-
PyBytes::new_with(py, total_len, |target| {
311+
pyo3::types::PyBytes::new_with(py, total_len, |target| {
315312
let mut offset = 0;
316313
for buf in self.0.iter() {
317314
target[offset..offset + buf.len()].copy_from_slice(buf);
@@ -370,11 +367,11 @@ pub(crate) fn get_range(
370367
path: String,
371368
start: usize,
372369
end: usize,
373-
) -> PyObjectStoreResult<PyArrowBuffer> {
370+
) -> PyObjectStoreResult<pyo3_bytes::PyBytes> {
374371
let runtime = get_runtime(py)?;
375372
py.allow_threads(|| {
376373
let out = runtime.block_on(store.as_ref().get_range(&path.into(), start..end))?;
377-
Ok::<_, PyObjectStoreError>(PyArrowBuffer::new(Buffer::from_bytes(out.into())))
374+
Ok::<_, PyObjectStoreError>(pyo3_bytes::PyBytes::new(out))
378375
})
379376
}
380377

@@ -392,7 +389,7 @@ pub(crate) fn get_range_async(
392389
.get_range(&path.into(), start..end)
393390
.await
394391
.map_err(PyObjectStoreError::ObjectStoreError)?;
395-
Ok(PyArrowBuffer::new(Buffer::from_bytes(out.into())))
392+
Ok(pyo3_bytes::PyBytes::new(out))
396393
})
397394
}
398395

@@ -403,7 +400,7 @@ pub(crate) fn get_ranges(
403400
path: String,
404401
starts: Vec<usize>,
405402
ends: Vec<usize>,
406-
) -> PyObjectStoreResult<Vec<PyArrowBuffer>> {
403+
) -> PyObjectStoreResult<Vec<pyo3_bytes::PyBytes>> {
407404
let runtime = get_runtime(py)?;
408405
let ranges = starts
409406
.into_iter()
@@ -412,11 +409,7 @@ pub(crate) fn get_ranges(
412409
.collect::<Vec<_>>();
413410
py.allow_threads(|| {
414411
let out = runtime.block_on(store.as_ref().get_ranges(&path.into(), &ranges))?;
415-
Ok::<_, PyObjectStoreError>(
416-
out.into_iter()
417-
.map(|buf| PyArrowBuffer::new(Buffer::from_bytes(buf.into())))
418-
.collect(),
419-
)
412+
Ok::<_, PyObjectStoreError>(out.into_iter().map(|buf| buf.into()).collect())
420413
})
421414
}
422415

@@ -441,7 +434,7 @@ pub(crate) fn get_ranges_async(
441434
.map_err(PyObjectStoreError::ObjectStoreError)?;
442435
Ok(out
443436
.into_iter()
444-
.map(|buf| PyArrowBuffer::new(Buffer::from_bytes(buf.into())))
437+
.map(pyo3_bytes::PyBytes::new)
445438
.collect::<Vec<_>>())
446439
})
447440
}

obstore/src/put.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ use object_store::{
1212
};
1313
use pyo3::exceptions::PyValueError;
1414
use pyo3::prelude::*;
15-
use pyo3::pybacked::{PyBackedBytes, PyBackedStr};
15+
use pyo3::pybacked::PyBackedStr;
1616
use pyo3::types::PyDict;
17+
use pyo3_bytes::PyBytes;
1718
use pyo3_file::PyFileLikeObject;
1819
use pyo3_object_store::{PyObjectStore, PyObjectStoreResult};
1920

@@ -61,7 +62,7 @@ impl<'py> FromPyObject<'py> for PyUpdateVersion {
6162
pub(crate) enum MultipartPutInput {
6263
File(BufReader<File>),
6364
FileLike(PyFileLikeObject),
64-
Buffer(Cursor<PyBackedBytes>),
65+
Buffer(Cursor<PyBytes>),
6566
}
6667

6768
impl MultipartPutInput {
@@ -83,7 +84,7 @@ impl<'py> FromPyObject<'py> for MultipartPutInput {
8384
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
8485
if let Ok(path) = ob.extract::<PathBuf>() {
8586
Ok(Self::File(BufReader::new(File::open(path)?)))
86-
} else if let Ok(buffer) = ob.extract::<PyBackedBytes>() {
87+
} else if let Ok(buffer) = ob.extract::<PyBytes>() {
8788
Ok(Self::Buffer(Cursor::new(buffer)))
8889
} else {
8990
Ok(Self::FileLike(PyFileLikeObject::with_requirements(

0 commit comments

Comments
 (0)