Skip to content

Commit 2f34353

Browse files
author
Lucas Paixão
committed
Code and formater fix
1 parent c2bd8b9 commit 2f34353

File tree

16 files changed

+229
-112
lines changed

16 files changed

+229
-112
lines changed

src/buf/fixed/buffers.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use libc::iovec;
2+
use std::any::Any;
23

34
// Abstracts management of fixed buffers in a buffer registry.
4-
pub(crate) trait FixedBuffers {
5+
pub(crate) trait FixedBuffers: Any {
56
// Provides access to the raw buffers as a slice of iovec.
67
fn iovecs(&self) -> &[iovec];
78

src/buf/fixed/handle.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use super::FixedBuffers;
22
use crate::buf::{IoBuf, IoBufMut};
3+
use std::borrow::BorrowMut;
4+
use std::cell::RefCell;
35

46
use libc::iovec;
57
use std::fmt::{self, Debug};
@@ -30,19 +32,17 @@ pub(crate) struct CheckedOutBuf {
3032
/// [`FixedBufPool`]: super::FixedBufPool
3133
///
3234
pub struct FixedBuf {
33-
registry: Rc<dyn FixedBuffers>,
35+
registry: Rc<RefCell<dyn FixedBuffers>>,
3436
buf: CheckedOutBuf,
3537
}
3638

3739
impl Drop for FixedBuf {
3840
fn drop(&mut self) {
39-
let registry = Rc::get_mut(&mut self.registry);
41+
let borrow_mut = self.registry.borrow_mut();
4042

41-
if registry.is_none() {
42-
return;
43-
}
43+
let mut item = borrow_mut.as_ref().borrow_mut();
4444

45-
let registry = registry.unwrap();
45+
let registry = item.borrow_mut();
4646
// Safety: the length of the initialized data in the buffer has been
4747
// maintained accordingly to the safety contracts on
4848
// Self::new and IoBufMut.
@@ -57,7 +57,7 @@ impl FixedBuf {
5757
// - the array will not be deallocated until the buffer is checked in;
5858
// - the data in the array must be initialized up to the number of bytes
5959
// given in init_len.
60-
pub(super) unsafe fn new(registry: Rc<dyn FixedBuffers>, buf: CheckedOutBuf) -> Self {
60+
pub(super) unsafe fn new(registry: Rc<RefCell<dyn FixedBuffers>>, buf: CheckedOutBuf) -> Self {
6161
FixedBuf { registry, buf }
6262
}
6363

src/buf/fixed/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,6 @@ pub mod pool;
2727
pub use pool::FixedBufPool;
2828

2929
mod registry;
30+
mod shared;
31+
3032
pub use registry::FixedBufRegistry;

src/buf/fixed/plumbing/registry.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@ use libc::{iovec, UIO_MAXIOV};
55
use std::cmp;
66
use std::mem;
77
use std::ptr;
8+
use std::ptr::NonNull;
89
use std::slice;
910

1011
// Internal state shared by FixedBufRegistry and FixedBuf handles.
1112
pub(crate) struct Registry<T: IoBufMut> {
1213
// Pointer to an allocated array of iovec records referencing
1314
// the allocated buffers. The number of initialized records is the
1415
// same as the length of the states array.
15-
raw_bufs: ptr::NonNull<iovec>,
16+
raw_bufs: NonNull<iovec>,
1617
// Original capacity of raw_bufs as a Vec.
1718
orig_cap: usize,
1819
// State information on the buffers. Indices in this array correspond to
@@ -22,6 +23,17 @@ pub(crate) struct Registry<T: IoBufMut> {
2223
buffers: Vec<T>,
2324
}
2425

26+
impl<T: IoBufMut> Default for Registry<T> {
27+
fn default() -> Self {
28+
Self {
29+
raw_bufs: NonNull::<iovec>::dangling(),
30+
orig_cap: 0,
31+
states: vec![],
32+
buffers: Vec::with_capacity(0),
33+
}
34+
}
35+
}
36+
2537
// State information of a buffer in the registry,
2638
enum BufState {
2739
// The buffer is not in use.
@@ -128,7 +140,10 @@ impl<T: IoBufMut> Drop for Registry<T> {
128140
// from Registry ownership, rather than deallocate.
129141
unsafe { self.buffers[i].set_init(*init_len) };
130142
}
131-
BufState::CheckedOut => unreachable!("all buffers must be checked in"),
143+
BufState::CheckedOut => {
144+
//unreachable!("all buffers must be checked in")
145+
println!("Bug");
146+
},
132147
}
133148
}
134149

src/buf/fixed/pool.rs

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77
//! by [`FixedBufPool`].
88
//!
99
//! [`FixedBufPool`]: self::FixedBufPool
10-
1110
use super::plumbing;
1211
use super::FixedBuf;
1312
use crate::buf::IoBufMut;
14-
use crate::runtime::CONTEXT;
13+
use std::cell::RefCell;
1514

15+
use crate::buf::fixed::handle::CheckedOutBuf;
16+
use crate::buf::fixed::plumbing::Pool;
17+
use crate::buf::fixed::shared::{process, register, unregister};
1618
use std::io;
1719
use std::rc::Rc;
1820
use std::sync::Arc;
@@ -93,7 +95,7 @@ use tokio::sync::Notify;
9395
/// ```
9496
#[derive(Clone)]
9597
pub struct FixedBufPool<T: IoBufMut> {
96-
inner: Rc<plumbing::Pool<T>>,
98+
inner: RefCell<Rc<RefCell<plumbing::Pool<T>>>>,
9799
}
98100

99101
impl<T: IoBufMut> FixedBufPool<T> {
@@ -162,7 +164,7 @@ impl<T: IoBufMut> FixedBufPool<T> {
162164
/// ```
163165
pub fn new(bufs: impl IntoIterator<Item = T>) -> Self {
164166
FixedBufPool {
165-
inner: Rc::new(plumbing::Pool::new(bufs.into_iter())),
167+
inner: RefCell::new(Rc::new(RefCell::new(Pool::new(bufs.into_iter())))),
166168
}
167169
}
168170

@@ -186,7 +188,7 @@ impl<T: IoBufMut> FixedBufPool<T> {
186188
/// of the `tokio-uring` runtime this call is made in, the function returns
187189
/// an error.
188190
pub fn register(&self) -> io::Result<()> {
189-
CONTEXT.with(|x| x.handle().register_buffers(Rc::clone(&self.inner) as _))
191+
register(self.inner.borrow().clone())
190192
}
191193

192194
/// Unregisters this collection of buffers.
@@ -204,8 +206,9 @@ impl<T: IoBufMut> FixedBufPool<T> {
204206
/// of the `tokio-uring` runtime this call is made in, the function returns
205207
/// an error. Calling `unregister` when no `FixedBufPool` is currently
206208
/// registered on this runtime also returns an error.
209+
#[allow(private_interfaces)]
207210
pub fn unregister(&self) -> io::Result<()> {
208-
CONTEXT.with(|x| x.handle().unregister_buffers(Rc::clone(&self.inner) as _))
211+
unregister()
209212
}
210213

211214
/// Returns a buffer of requested capacity from this pool
@@ -220,14 +223,13 @@ impl<T: IoBufMut> FixedBufPool<T> {
220223
/// An application should not rely on any particular order
221224
/// in which available buffers are retrieved.
222225
pub fn try_next(&mut self, cap: usize) -> Option<FixedBuf> {
223-
let mut inner = Rc::get_mut(&mut self.inner);
224-
225-
inner.as_mut()?.try_next(cap).map(|data| {
226-
let pool = Rc::clone(&self.inner);
227-
// Safety: the validity of buffer data is ensured by
228-
// plumbing::Pool::try_next
229-
unsafe { FixedBuf::new(pool, data) }
230-
})
226+
let cob = {
227+
let inner = self.inner.borrow_mut();
228+
let mut inner = inner.borrow_mut();
229+
inner.try_next(cap)
230+
};
231+
let current = self.inner.borrow().clone();
232+
process(current, cob)
231233
}
232234

233235
/// Resolves to a buffer of requested capacity
@@ -243,12 +245,11 @@ impl<T: IoBufMut> FixedBufPool<T> {
243245
pub async fn next(&mut self, cap: usize) -> FixedBuf {
244246
// Fast path: get the buffer if it's already available
245247
let notify = {
246-
let inner = Rc::get_mut(&mut self.inner).unwrap();
248+
let item = self.inner.borrow();
249+
let mut inner = item.borrow_mut();
247250
if let Some(data) = inner.try_next(cap) {
248-
// Safety: the validity of buffer data is ensured by
249-
// plumbing::Pool::try_next
250-
let buf = unsafe { FixedBuf::new(Rc::clone(&self.inner) as _, data) };
251-
return buf;
251+
let current = self.inner.borrow().clone();
252+
return Self::get_fixed_buf(current, data);
252253
}
253254
inner.notify_on_next(cap)
254255
};
@@ -257,6 +258,12 @@ impl<T: IoBufMut> FixedBufPool<T> {
257258
self.next_when_notified(cap, notify).await
258259
}
259260

261+
fn get_fixed_buf(inner: Rc<RefCell<Pool<T>>>, data: CheckedOutBuf) -> FixedBuf {
262+
// Safety: the validity of buffer data is ensured by
263+
// plumbing::Pool::try_next
264+
unsafe { FixedBuf::new(inner, data) }
265+
}
266+
260267
#[cold]
261268
async fn next_when_notified(&mut self, cap: usize, notify: Arc<Notify>) -> FixedBuf {
262269
let notified = notify.notified();
@@ -266,11 +273,12 @@ impl<T: IoBufMut> FixedBufPool<T> {
266273
// between us calling `try_next` and here, so we can't miss a wakeup.
267274
notified.as_mut().await;
268275

269-
if let Some(data) = Rc::get_mut(&mut self.inner).unwrap().try_next(cap) {
270-
// Safety: the validity of buffer data is ensured by
271-
// plumbing::Pool::try_next
272-
let buf = unsafe { FixedBuf::new(Rc::clone(&self.inner) as _, data) };
273-
return buf;
276+
let inner = self.inner.borrow_mut();
277+
let mut inner = inner.borrow_mut();
278+
279+
if let Some(data) = inner.try_next(cap) {
280+
let current = self.inner.borrow().clone();
281+
return Self::get_fixed_buf(current, data);
274282
}
275283

276284
// It's possible that the task did not get a buffer from `try_next`.

src/buf/fixed/registry.rs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
use super::plumbing;
21
use super::FixedBuf;
2+
use super::plumbing;
3+
use std::cell::RefCell;
34

5+
use crate::buf::fixed::shared::{process, register, unregister};
46
use crate::buf::IoBufMut;
5-
use crate::runtime::CONTEXT;
67
use std::io;
78
use std::rc::Rc;
89

@@ -30,7 +31,7 @@ use std::rc::Rc;
3031
/// [`Runtime`]: crate::Runtime
3132
#[derive(Clone)]
3233
pub struct FixedBufRegistry<T: IoBufMut> {
33-
inner: Rc<plumbing::Registry<T>>,
34+
inner: RefCell<Rc<RefCell<plumbing::Registry<T>>>>,
3435
}
3536

3637
impl<T: IoBufMut> FixedBufRegistry<T> {
@@ -99,7 +100,9 @@ impl<T: IoBufMut> FixedBufRegistry<T> {
99100
/// ```
100101
pub fn new(bufs: impl IntoIterator<Item = T>) -> Self {
101102
FixedBufRegistry {
102-
inner: Rc::new(plumbing::Registry::new(bufs.into_iter())),
103+
inner: RefCell::new(Rc::new(RefCell::new(plumbing::Registry::new(
104+
bufs.into_iter(),
105+
)))),
103106
}
104107
}
105108

@@ -123,7 +126,7 @@ impl<T: IoBufMut> FixedBufRegistry<T> {
123126
/// of the `tokio-uring` runtime this call is made in, the function returns
124127
/// an error.
125128
pub fn register(&self) -> io::Result<()> {
126-
CONTEXT.with(|x| x.handle().register_buffers(Rc::clone(&self.inner) as _))
129+
register(self.inner.borrow().clone())
127130
}
128131

129132
/// Unregisters this collection of buffers.
@@ -142,7 +145,7 @@ impl<T: IoBufMut> FixedBufRegistry<T> {
142145
/// an error. Calling `unregister` when no `FixedBufRegistry` is currently
143146
/// registered on this runtime also returns an error.
144147
pub fn unregister(&self) -> io::Result<()> {
145-
CONTEXT.with(|x| x.handle().unregister_buffers(Rc::clone(&self.inner) as _))
148+
unregister()
146149
}
147150

148151
/// Returns a buffer identified by the specified index for use by the
@@ -153,12 +156,12 @@ impl<T: IoBufMut> FixedBufRegistry<T> {
153156
/// using the buffer takes ownership of it and returns it once completed,
154157
/// preventing shared use of the buffer while the operation is in flight.
155158
pub fn check_out(&mut self, index: usize) -> Option<FixedBuf> {
156-
let inner = Rc::get_mut(&mut self.inner).unwrap();
157-
inner.check_out(index).map(|data| {
158-
let registry = Rc::clone(&self.inner);
159-
// Safety: the validity of buffer data is ensured by
160-
// plumbing::Registry::check_out
161-
unsafe { FixedBuf::new(registry, data) }
162-
})
159+
let cob = {
160+
let inner = self.inner.borrow_mut();
161+
let mut inner = inner.borrow_mut();
162+
inner.check_out(index)
163+
};
164+
let current = self.inner.borrow().clone();
165+
process(current, cob)
163166
}
164167
}

src/buf/fixed/shared.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
use crate::buf::fixed::handle::CheckedOutBuf;
2+
use crate::buf::fixed::{FixedBuf, FixedBuffers};
3+
use crate::runtime::CONTEXT;
4+
use std::cell::RefCell;
5+
use std::io;
6+
use std::rc::Rc;
7+
8+
pub(crate) fn process(
9+
this: Rc<RefCell<dyn FixedBuffers>>,
10+
cob: Option<CheckedOutBuf>,
11+
) -> Option<FixedBuf> {
12+
cob.map(|data| {
13+
// Safety: the validity of buffer data is ensured by
14+
// plumbing::Pool::try_next
15+
unsafe { FixedBuf::new(this, data) }
16+
})
17+
}
18+
19+
pub(crate) fn register(this: Rc<RefCell<dyn FixedBuffers>>) -> io::Result<()> {
20+
CONTEXT.with(|x| x.handle().register_buffers(this))
21+
}
22+
23+
pub(crate) fn unregister() -> io::Result<()> {
24+
CONTEXT.with(|x| x.handle().unregister_buffers())
25+
}

src/fs/file.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ impl File {
465465
/// use std::iter;
466466
///
467467
/// tokio_uring::start(async {
468-
/// let registry = FixedBufRegistry::new(iter::repeat(vec![0; 10]).take(10));
468+
/// let mut registry = FixedBufRegistry::new(iter::repeat(vec![0; 10]).take(10));
469469
/// registry.register()?;
470470
///
471471
/// let f = File::open("foo.txt").await?;
@@ -656,7 +656,7 @@ impl File {
656656
/// use tokio_uring::buf::BoundedBuf;
657657
///
658658
/// tokio_uring::start(async {
659-
/// let registry = FixedBufRegistry::new([b"some bytes".to_vec()]);
659+
/// let mut registry = FixedBufRegistry::new([b"some bytes".to_vec()]);
660660
/// registry.register()?;
661661
///
662662
/// let file = File::create("foo.txt").await?;

0 commit comments

Comments
 (0)