Skip to content

Commit 292f630

Browse files
authored
Merge branch 'main' into feature/s3
2 parents cd96b8a + 06f088d commit 292f630

File tree

3 files changed

+174
-14
lines changed

3 files changed

+174
-14
lines changed

bindings/python/Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ name = "_opendal"
198198

199199
[dependencies]
200200
bytes = "1.5.0"
201-
dict_derive = "0.6.0"
202201
futures = "0.3.28"
203202
jiff = { version = "0.2.15" }
204203
mea = "0.6"
@@ -207,8 +206,8 @@ opendal = { version = ">=0", path = "../../core", features = [
207206
"blocking",
208207
"layers-mime-guess",
209208
] }
210-
pyo3 = { version = "0.26.0", features = ["generate-import-lib", "jiff-02"] }
211-
pyo3-async-runtimes = { version = "0.26.0", features = ["tokio-runtime"] }
209+
pyo3 = { version = "0.27.2", features = ["generate-import-lib", "jiff-02"] }
210+
pyo3-async-runtimes = { version = "0.27.0", features = ["tokio-runtime"] }
212211
pyo3-stub-gen = { version = "0.17" }
213212
tokio = "1"
214213

bindings/python/src/options.rs

Lines changed: 120 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,22 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use dict_derive::FromPyObject;
1918
use opendal::{self as ocore, raw::BytesRange};
19+
use pyo3::Borrowed;
20+
use pyo3::FromPyObject;
21+
use pyo3::PyAny;
22+
use pyo3::PyErr;
23+
use pyo3::PyResult;
24+
use pyo3::conversion::FromPyObjectOwned;
25+
use pyo3::exceptions::PyTypeError;
2026
use pyo3::pyclass;
27+
use pyo3::types::PyAnyMethods;
28+
use pyo3::types::PyDict;
29+
use pyo3::types::PyDictMethods;
2130
use std::collections::HashMap;
2231

2332
#[pyclass(module = "opendal")]
24-
#[derive(FromPyObject, Default)]
33+
#[derive(Default)]
2534
pub struct ReadOptions {
2635
pub version: Option<String>,
2736
pub concurrent: Option<usize>,
@@ -39,6 +48,55 @@ pub struct ReadOptions {
3948
pub content_disposition: Option<String>,
4049
}
4150

51+
fn map_exception(name: &str, err: PyErr) -> PyErr {
52+
PyErr::new::<PyTypeError, _>(format!("Unable to convert key: {name}. Error: {err}"))
53+
}
54+
55+
fn extract_optional<'py, T>(dict: &pyo3::Bound<'py, PyDict>, name: &str) -> PyResult<Option<T>>
56+
where
57+
T: FromPyObjectOwned<'py>,
58+
{
59+
match dict.get_item(name)? {
60+
Some(v) => v
61+
.extract::<T>()
62+
.map(Some)
63+
.map_err(|err| map_exception(name, err.into())),
64+
None => Ok(None),
65+
}
66+
}
67+
68+
fn downcast_kwargs<'a, 'py>(obj: Borrowed<'a, 'py, PyAny>) -> PyResult<pyo3::Bound<'py, PyDict>> {
69+
let obj: &pyo3::Bound<'_, PyAny> = &obj;
70+
obj.cast::<PyDict>()
71+
.cloned()
72+
.map_err(|_| PyErr::new::<PyTypeError, _>("Invalid type to convert, expected dict"))
73+
}
74+
75+
impl<'a, 'py> FromPyObject<'a, 'py> for ReadOptions {
76+
type Error = PyErr;
77+
78+
fn extract(obj: Borrowed<'a, 'py, PyAny>) -> Result<Self, Self::Error> {
79+
let dict = downcast_kwargs(obj)?;
80+
81+
Ok(Self {
82+
version: extract_optional(&dict, "version")?,
83+
concurrent: extract_optional(&dict, "concurrent")?,
84+
chunk: extract_optional(&dict, "chunk")?,
85+
gap: extract_optional(&dict, "gap")?,
86+
offset: extract_optional(&dict, "offset")?,
87+
prefetch: extract_optional(&dict, "prefetch")?,
88+
size: extract_optional(&dict, "size")?,
89+
if_match: extract_optional(&dict, "if_match")?,
90+
if_none_match: extract_optional(&dict, "if_none_match")?,
91+
if_modified_since: extract_optional(&dict, "if_modified_since")?,
92+
if_unmodified_since: extract_optional(&dict, "if_unmodified_since")?,
93+
content_type: extract_optional(&dict, "content_type")?,
94+
cache_control: extract_optional(&dict, "cache_control")?,
95+
content_disposition: extract_optional(&dict, "content_disposition")?,
96+
})
97+
}
98+
}
99+
42100
impl ReadOptions {
43101
pub fn make_range(&self) -> BytesRange {
44102
let offset = self.offset.unwrap_or_default() as u64;
@@ -49,7 +107,7 @@ impl ReadOptions {
49107
}
50108

51109
#[pyclass(module = "opendal")]
52-
#[derive(FromPyObject, Default)]
110+
#[derive(Default)]
53111
pub struct WriteOptions {
54112
pub append: Option<bool>,
55113
pub chunk: Option<usize>,
@@ -64,6 +122,28 @@ pub struct WriteOptions {
64122
pub user_metadata: Option<HashMap<String, String>>,
65123
}
66124

125+
impl<'a, 'py> FromPyObject<'a, 'py> for WriteOptions {
126+
type Error = PyErr;
127+
128+
fn extract(obj: Borrowed<'a, 'py, PyAny>) -> Result<Self, Self::Error> {
129+
let dict = downcast_kwargs(obj)?;
130+
131+
Ok(Self {
132+
append: extract_optional(&dict, "append")?,
133+
chunk: extract_optional(&dict, "chunk")?,
134+
concurrent: extract_optional(&dict, "concurrent")?,
135+
cache_control: extract_optional(&dict, "cache_control")?,
136+
content_type: extract_optional(&dict, "content_type")?,
137+
content_disposition: extract_optional(&dict, "content_disposition")?,
138+
content_encoding: extract_optional(&dict, "content_encoding")?,
139+
if_match: extract_optional(&dict, "if_match")?,
140+
if_none_match: extract_optional(&dict, "if_none_match")?,
141+
if_not_exists: extract_optional(&dict, "if_not_exists")?,
142+
user_metadata: extract_optional(&dict, "user_metadata")?,
143+
})
144+
}
145+
}
146+
67147
impl From<ReadOptions> for ocore::options::ReadOptions {
68148
fn from(opts: ReadOptions) -> Self {
69149
Self {
@@ -118,7 +198,7 @@ impl From<WriteOptions> for ocore::options::WriteOptions {
118198
}
119199

120200
#[pyclass(module = "opendal")]
121-
#[derive(FromPyObject, Default, Debug)]
201+
#[derive(Default, Debug)]
122202
pub struct ListOptions {
123203
pub limit: Option<usize>,
124204
pub start_after: Option<String>,
@@ -127,6 +207,22 @@ pub struct ListOptions {
127207
pub deleted: Option<bool>,
128208
}
129209

210+
impl<'a, 'py> FromPyObject<'a, 'py> for ListOptions {
211+
type Error = PyErr;
212+
213+
fn extract(obj: Borrowed<'a, 'py, PyAny>) -> Result<Self, Self::Error> {
214+
let dict = downcast_kwargs(obj)?;
215+
216+
Ok(Self {
217+
limit: extract_optional(&dict, "limit")?,
218+
start_after: extract_optional(&dict, "start_after")?,
219+
recursive: extract_optional(&dict, "recursive")?,
220+
versions: extract_optional(&dict, "versions")?,
221+
deleted: extract_optional(&dict, "deleted")?,
222+
})
223+
}
224+
}
225+
130226
impl From<ListOptions> for ocore::options::ListOptions {
131227
fn from(opts: ListOptions) -> Self {
132228
Self {
@@ -140,7 +236,7 @@ impl From<ListOptions> for ocore::options::ListOptions {
140236
}
141237

142238
#[pyclass(module = "opendal")]
143-
#[derive(FromPyObject, Default, Debug)]
239+
#[derive(Default, Debug)]
144240
pub struct StatOptions {
145241
pub version: Option<String>,
146242
pub if_match: Option<String>,
@@ -152,6 +248,25 @@ pub struct StatOptions {
152248
pub content_disposition: Option<String>,
153249
}
154250

251+
impl<'a, 'py> FromPyObject<'a, 'py> for StatOptions {
252+
type Error = PyErr;
253+
254+
fn extract(obj: Borrowed<'a, 'py, PyAny>) -> Result<Self, Self::Error> {
255+
let dict = downcast_kwargs(obj)?;
256+
257+
Ok(Self {
258+
version: extract_optional(&dict, "version")?,
259+
if_match: extract_optional(&dict, "if_match")?,
260+
if_none_match: extract_optional(&dict, "if_none_match")?,
261+
if_modified_since: extract_optional(&dict, "if_modified_since")?,
262+
if_unmodified_since: extract_optional(&dict, "if_unmodified_since")?,
263+
content_type: extract_optional(&dict, "content_type")?,
264+
cache_control: extract_optional(&dict, "cache_control")?,
265+
content_disposition: extract_optional(&dict, "content_disposition")?,
266+
})
267+
}
268+
}
269+
155270
impl From<StatOptions> for ocore::options::StatOptions {
156271
fn from(opts: StatOptions) -> Self {
157272
Self {

core/core/src/layers/complete.rs

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,32 @@ impl<R: oio::Read> oio::Read for CompleteReader<R> {
153153
}
154154
}
155155

156+
/// Tracks the state of the Write operation.
157+
/// A successful operation goes through states: Open -> Written -> Closed
158+
/// A failed operation terminates in the Error state
159+
#[derive(Debug, PartialEq, Eq)]
160+
enum CompleteState {
161+
Open,
162+
Written,
163+
Closed,
164+
Error,
165+
}
166+
167+
impl CompleteState {
168+
/// Attempt to transition to the destination state. Once CompleteState has
169+
/// errored all further transitions are ignored.
170+
fn transition(&mut self, destination: CompleteState) {
171+
if *self != CompleteState::Error {
172+
*self = destination
173+
}
174+
}
175+
}
176+
156177
pub struct CompleteWriter<W> {
157178
inner: Option<W>,
158179
append: bool,
159180
size: u64,
181+
state: CompleteState,
160182
}
161183

162184
impl<W> CompleteWriter<W> {
@@ -165,6 +187,7 @@ impl<W> CompleteWriter<W> {
165187
inner: Some(inner),
166188
append,
167189
size: 0,
190+
state: CompleteState::Open,
168191
}
169192
}
170193

@@ -194,8 +217,10 @@ impl<W> CompleteWriter<W> {
194217
#[cfg(debug_assertions)]
195218
impl<W> Drop for CompleteWriter<W> {
196219
fn drop(&mut self) {
197-
if self.inner.is_some() {
198-
log::warn!("writer has not been closed or aborted, must be a bug")
220+
if self.state == CompleteState::Written {
221+
log::warn!(
222+
"writer has not been closed or aborted after successful write operation, must be a bug"
223+
)
199224
}
200225
}
201226
}
@@ -206,29 +231,47 @@ where
206231
{
207232
async fn write(&mut self, bs: Buffer) -> Result<()> {
208233
let w = self.inner.as_mut().ok_or_else(|| {
234+
debug_assert_ne!(
235+
self.state,
236+
CompleteState::Open,
237+
"bug: inner is empty, but state is Open"
238+
);
209239
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
210240
})?;
211241

212242
let len = bs.len();
213-
w.write(bs).await?;
243+
w.write(bs)
244+
.await
245+
.inspect_err(|_| self.state.transition(CompleteState::Error))?;
214246
self.size += len as u64;
247+
self.state.transition(CompleteState::Written);
215248

216249
Ok(())
217250
}
218251

219252
async fn close(&mut self) -> Result<Metadata> {
220253
let w = self.inner.as_mut().ok_or_else(|| {
254+
debug_assert_ne!(
255+
self.state,
256+
CompleteState::Open,
257+
"bug: inner is empty, but state is Open"
258+
);
221259
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
222260
})?;
223261

224262
// we must return `Err` before setting inner to None; otherwise,
225263
// we won't be able to retry `close` in `RetryLayer`.
226-
let mut ret = w.close().await?;
227-
self.check(ret.content_length())?;
264+
let mut ret = w
265+
.close()
266+
.await
267+
.inspect_err(|_| self.state.transition(CompleteState::Error))?;
268+
self.check(ret.content_length())
269+
.inspect_err(|_| self.state.transition(CompleteState::Error))?;
228270
if ret.content_length() == 0 {
229271
ret = ret.with_content_length(self.size);
230272
}
231273
self.inner = None;
274+
self.state.transition(CompleteState::Closed);
232275

233276
Ok(ret)
234277
}
@@ -238,8 +281,11 @@ where
238281
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
239282
})?;
240283

241-
w.abort().await?;
284+
w.abort()
285+
.await
286+
.inspect_err(|_| self.state.transition(CompleteState::Error))?;
242287
self.inner = None;
288+
self.state.transition(CompleteState::Closed);
243289

244290
Ok(())
245291
}

0 commit comments

Comments
 (0)