Skip to content

Commit 2297ed7

Browse files
ollie-etlollie-etl
andauthored
Name fields (#149)
Co-authored-by: ollie-etl <Oliver [email protected]>
1 parent c6b884e commit 2297ed7

File tree

16 files changed

+106
-85
lines changed

16 files changed

+106
-85
lines changed

src/driver/accept.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::driver::op::Completable;
1+
use crate::driver::op::{self, Completable};
22
use crate::driver::{Op, SharedFd, Socket};
33
use std::net::SocketAddr;
44
use std::{boxed::Box, io};
@@ -37,8 +37,8 @@ impl Op<Accept> {
3737
impl Completable for Accept {
3838
type Output = io::Result<(Socket, Option<SocketAddr>)>;
3939

40-
fn complete(self, result: io::Result<u32>, _flags: u32) -> Self::Output {
41-
let fd = result?;
40+
fn complete(self, cqe: op::CqeResult) -> Self::Output {
41+
let fd = cqe.result?;
4242
let fd = SharedFd::new(fd as i32);
4343
let socket = Socket { fd };
4444
let (_, addr) = unsafe {

src/driver/close.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::driver::Op;
22

3-
use crate::driver::op::Completable;
3+
use crate::driver::op::{self, Completable};
44
use std::io;
55
use std::os::unix::io::RawFd;
66

@@ -21,8 +21,8 @@ impl Op<Close> {
2121
impl Completable for Close {
2222
type Output = io::Result<()>;
2323

24-
fn complete(self, result: io::Result<u32>, _flags: u32) -> Self::Output {
25-
let _ = result?;
24+
fn complete(self, cqe: op::CqeResult) -> Self::Output {
25+
let _ = cqe.result?;
2626

2727
Ok(())
2828
}

src/driver/connect.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::driver::op::Completable;
1+
use crate::driver::op::{self, Completable};
22
use crate::driver::{Op, SharedFd};
33
use socket2::SockAddr;
44
use std::io;
@@ -36,7 +36,7 @@ impl Op<Connect> {
3636
impl Completable for Connect {
3737
type Output = io::Result<()>;
3838

39-
fn complete(self, result: io::Result<u32>, _flags: u32) -> Self::Output {
40-
result.map(|_| ())
39+
fn complete(self, cqe: op::CqeResult) -> Self::Output {
40+
cqe.result.map(|_| ())
4141
}
4242
}

src/driver/fsync.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::driver::{Op, SharedFd};
22

33
use std::io;
44

5-
use crate::driver::op::Completable;
5+
use crate::driver::op::{self, Completable};
66
use io_uring::{opcode, types};
77

88
pub(crate) struct Fsync {
@@ -28,7 +28,7 @@ impl Op<Fsync> {
2828
impl Completable for Fsync {
2929
type Output = io::Result<()>;
3030

31-
fn complete(self, result: io::Result<u32>, _flags: u32) -> Self::Output {
32-
result.map(|_| ())
31+
fn complete(self, cqe: op::CqeResult) -> Self::Output {
32+
cqe.result.map(|_| ())
3333
}
3434
}

src/driver/mod.rs

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ mod write;
3939

4040
mod writev;
4141

42-
use io_uring::{cqueue, IoUring};
42+
use io_uring::IoUring;
4343
use scoped_tls::scoped_thread_local;
4444
use slab::Slab;
4545
use std::cell::RefCell;
@@ -61,9 +61,11 @@ pub(crate) struct Inner {
6161
pub(crate) uring: IoUring,
6262
}
6363

64-
// When dropping the driver, all in-flight operations must have completed. This
65-
// type wraps the slab and ensures that, on drop, the slab is empty.
66-
struct Ops(Slab<op::Lifecycle>);
64+
struct Ops {
65+
// When dropping the driver, all in-flight operations must have completed. This
66+
// type wraps the slab and ensures that, on drop, the slab is empty.
67+
lifecycle: Slab<op::Lifecycle>,
68+
}
6769

6870
scoped_thread_local!(pub(crate) static CURRENT: Rc<RefCell<Inner>>);
6971

@@ -98,7 +100,7 @@ impl Driver {
98100

99101
fn num_operations(&self) -> usize {
100102
let inner = self.inner.borrow();
101-
inner.ops.0.len()
103+
inner.ops.lifecycle.len()
102104
}
103105
}
104106

@@ -117,7 +119,7 @@ impl Inner {
117119

118120
let index = cqe.user_data() as _;
119121

120-
self.ops.complete(index, resultify(&cqe), cqe.flags());
122+
self.ops.complete(index, cqe.into());
121123
}
122124
}
123125

@@ -158,42 +160,34 @@ impl Drop for Driver {
158160

159161
impl Ops {
160162
fn new() -> Ops {
161-
Ops(Slab::with_capacity(64))
163+
Ops {
164+
lifecycle: Slab::with_capacity(64),
165+
}
162166
}
163167

164168
fn get_mut(&mut self, index: usize) -> Option<&mut op::Lifecycle> {
165-
self.0.get_mut(index)
169+
self.lifecycle.get_mut(index)
166170
}
167171

168172
// Insert a new operation
169173
fn insert(&mut self) -> usize {
170-
self.0.insert(op::Lifecycle::Submitted)
174+
self.lifecycle.insert(op::Lifecycle::Submitted)
171175
}
172176

173177
// Remove an operation
174178
fn remove(&mut self, index: usize) {
175-
self.0.remove(index);
179+
self.lifecycle.remove(index);
176180
}
177181

178-
fn complete(&mut self, index: usize, result: io::Result<u32>, flags: u32) {
179-
if self.0[index].complete(result, flags) {
180-
self.0.remove(index);
182+
fn complete(&mut self, index: usize, cqe: op::CqeResult) {
183+
if self.lifecycle[index].complete(cqe) {
184+
self.lifecycle.remove(index);
181185
}
182186
}
183187
}
184188

185189
impl Drop for Ops {
186190
fn drop(&mut self) {
187-
assert!(self.0.is_empty());
188-
}
189-
}
190-
191-
fn resultify(cqe: &cqueue::Entry) -> io::Result<u32> {
192-
let res = cqe.result();
193-
194-
if res >= 0 {
195-
Ok(res as u32)
196-
} else {
197-
Err(io::Error::from_raw_os_error(-res))
191+
assert!(self.lifecycle.is_empty());
198192
}
199193
}

src/driver/noop.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use crate::driver::{op::Completable, Op};
1+
use crate::driver::{
2+
op::{self, Completable},
3+
Op,
4+
};
25
use std::io;
36

47
/// No operation. Just posts a completion event, nothing else.
@@ -17,8 +20,8 @@ impl Op<NoOp> {
1720
impl Completable for NoOp {
1821
type Output = io::Result<()>;
1922

20-
fn complete(self, _result: io::Result<u32>, _flags: u32) -> Self::Output {
21-
Ok(())
23+
fn complete(self, cqe: op::CqeResult) -> Self::Output {
24+
cqe.result.map(|_| ())
2225
}
2326
}
2427

src/driver/op.rs

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::pin::Pin;
55
use std::rc::Rc;
66
use std::task::{Context, Poll, Waker};
77

8-
use io_uring::squeue;
8+
use io_uring::{cqueue, squeue};
99

1010
use crate::driver;
1111

@@ -23,7 +23,7 @@ pub(crate) struct Op<T: 'static> {
2323

2424
pub(crate) trait Completable {
2525
type Output;
26-
fn complete(self, result: io::Result<u32>, flags: u32) -> Self::Output;
26+
fn complete(self, cqe: CqeResult) -> Self::Output;
2727
}
2828

2929
pub(crate) enum Lifecycle {
@@ -37,16 +37,36 @@ pub(crate) enum Lifecycle {
3737
/// must be passed to the driver and held until the operation completes.
3838
Ignored(Box<dyn std::any::Any>),
3939

40-
/// The operation has completed.
41-
Completed(io::Result<u32>, u32),
40+
/// The operation has completed with a single cqe result
41+
Completed(CqeResult),
42+
}
43+
44+
/// A single CQE entry
45+
pub(crate) struct CqeResult {
46+
pub(crate) result: io::Result<u32>,
47+
#[allow(dead_code)]
48+
pub(crate) flags: u32,
49+
}
50+
51+
impl From<cqueue::Entry> for CqeResult {
52+
fn from(cqe: cqueue::Entry) -> Self {
53+
let res = cqe.result();
54+
let flags = cqe.flags();
55+
let result = if res >= 0 {
56+
Ok(res as u32)
57+
} else {
58+
Err(io::Error::from_raw_os_error(-res))
59+
};
60+
CqeResult { result, flags }
61+
}
4262
}
4363

4464
impl<T> Op<T>
4565
where
4666
T: Completable,
4767
{
4868
/// Create a new operation
49-
fn new(data: T, inner: &mut driver::Inner, inner_rc: &Rc<RefCell<driver::Inner>>) -> Op<T> {
69+
fn new(data: T, inner: &mut driver::Inner, inner_rc: &Rc<RefCell<driver::Inner>>) -> Self {
5070
Op {
5171
driver: inner_rc.clone(),
5272
index: inner.ops.insert(),
@@ -58,7 +78,7 @@ where
5878
///
5979
/// `state` is stored during the operation tracking any state submitted to
6080
/// the kernel.
61-
pub(super) fn submit_with<F>(data: T, f: F) -> io::Result<Op<T>>
81+
pub(super) fn submit_with<F>(data: T, f: F) -> io::Result<Self>
6282
where
6383
F: FnOnce(&mut T) -> squeue::Entry,
6484
{
@@ -83,7 +103,7 @@ where
83103
}
84104

85105
/// Try submitting an operation to uring
86-
pub(super) fn try_submit_with<F>(data: T, f: F) -> io::Result<Op<T>>
106+
pub(super) fn try_submit_with<F>(data: T, f: F) -> io::Result<Self>
87107
where
88108
F: FnOnce(&mut T) -> squeue::Entry,
89109
{
@@ -122,11 +142,10 @@ where
122142
Poll::Pending
123143
}
124144
Lifecycle::Ignored(..) => unreachable!(),
125-
Lifecycle::Completed(result, flags) => {
145+
Lifecycle::Completed(cqe) => {
126146
inner.ops.remove(me.index);
127147
me.index = usize::MAX;
128-
129-
Poll::Ready(me.data.take().unwrap().complete(result, flags))
148+
Poll::Ready(me.data.take().unwrap().complete(cqe))
130149
}
131150
}
132151
}
@@ -153,16 +172,16 @@ impl<T> Drop for Op<T> {
153172
}
154173

155174
impl Lifecycle {
156-
pub(super) fn complete(&mut self, result: io::Result<u32>, flags: u32) -> bool {
175+
pub(super) fn complete(&mut self, cqe: CqeResult) -> bool {
157176
use std::mem;
158177

159178
match mem::replace(self, Lifecycle::Submitted) {
160179
Lifecycle::Submitted => {
161-
*self = Lifecycle::Completed(result, flags);
180+
*self = Lifecycle::Completed(cqe);
162181
false
163182
}
164183
Lifecycle::Waiting(waker) => {
165-
*self = Lifecycle::Completed(result, flags);
184+
*self = Lifecycle::Completed(cqe);
166185
waker.wake();
167186
false
168187
}
@@ -190,10 +209,10 @@ mod test {
190209
impl Completable for Rc<()> {
191210
type Output = Completion;
192211

193-
fn complete(self, result: io::Result<u32>, flags: u32) -> Self::Output {
212+
fn complete(self, cqe: CqeResult) -> Self::Output {
194213
Completion {
195-
result,
196-
flags,
214+
result: cqe.result,
215+
flags: cqe.flags,
197216
data: self.clone(),
198217
}
199218
}
@@ -304,7 +323,11 @@ mod test {
304323
assert_eq!(2, Rc::strong_count(&data));
305324

306325
assert_eq!(1, driver.num_operations());
307-
driver.inner.borrow_mut().ops.complete(index, Ok(1), 0);
326+
let cqe = CqeResult {
327+
result: Ok(1),
328+
flags: 0,
329+
};
330+
driver.inner.borrow_mut().ops.complete(index, cqe);
308331
assert_eq!(1, Rc::strong_count(&data));
309332
assert_eq!(0, driver.num_operations());
310333
release(driver);
@@ -326,11 +349,12 @@ mod test {
326349
}
327350

328351
fn complete(op: &Op<Rc<()>>, result: io::Result<u32>) {
329-
op.driver.borrow_mut().ops.complete(op.index, result, 0);
352+
let cqe = CqeResult { result, flags: 0 };
353+
op.driver.borrow_mut().ops.complete(op.index, cqe);
330354
}
331355

332356
fn release(driver: crate::driver::Driver) {
333357
// Clear ops, we aren't really doing any I/O
334-
driver.inner.borrow_mut().ops.0.clear();
358+
driver.inner.borrow_mut().ops.lifecycle.clear();
335359
}
336360
}

src/driver/open.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::driver::{self, Op, SharedFd};
22
use crate::fs::{File, OpenOptions};
33

4-
use crate::driver::op::Completable;
4+
use crate::driver::op::{self, Completable};
55
use std::ffi::CString;
66
use std::io;
77
use std::path::Path;
@@ -40,7 +40,7 @@ impl Op<Open> {
4040
impl Completable for Open {
4141
type Output = io::Result<File>;
4242

43-
fn complete(self, result: io::Result<u32>, _flags: u32) -> Self::Output {
44-
Ok(File::from_shared_fd(SharedFd::new(result? as _)))
43+
fn complete(self, cqe: op::CqeResult) -> Self::Output {
44+
Ok(File::from_shared_fd(SharedFd::new(cqe.result? as _)))
4545
}
4646
}

src/driver/read.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::buf::IoBufMut;
22
use crate::driver::{Op, SharedFd};
33
use crate::BufResult;
44

5-
use crate::driver::op::Completable;
5+
use crate::driver::op::{self, Completable};
66
use std::io;
77

88
pub(crate) struct Read<T> {
@@ -42,9 +42,9 @@ where
4242
{
4343
type Output = BufResult<usize, T>;
4444

45-
fn complete(self, result: io::Result<u32>, _flags: u32) -> Self::Output {
45+
fn complete(self, cqe: op::CqeResult) -> Self::Output {
4646
// Convert the operation result to `usize`
47-
let res = result.map(|v| v as usize);
47+
let res = cqe.result.map(|v| v as usize);
4848
// Recover the buffer
4949
let mut buf = self.buf;
5050

src/driver/readv.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::buf::IoBufMut;
22
use crate::driver::{Op, SharedFd};
33
use crate::BufResult;
44

5-
use crate::driver::op::Completable;
5+
use crate::driver::op::{self, Completable};
66
use libc::iovec;
77
use std::io;
88

@@ -61,9 +61,9 @@ where
6161
{
6262
type Output = BufResult<usize, Vec<T>>;
6363

64-
fn complete(self, result: io::Result<u32>, _flags: u32) -> Self::Output {
64+
fn complete(self, cqe: op::CqeResult) -> Self::Output {
6565
// Convert the operation result to `usize`
66-
let res = result.map(|v| v as usize);
66+
let res = cqe.result.map(|v| v as usize);
6767
// Recover the buffer
6868
let mut bufs = self.bufs;
6969

0 commit comments

Comments
 (0)