Skip to content

Commit 411a609

Browse files
committed
feat: add flags related methods
1 parent d75b76e commit 411a609

File tree

5 files changed

+171
-5
lines changed

5 files changed

+171
-5
lines changed

compio-driver/src/iour/mod.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,37 @@ impl Driver {
238238
}
239239
}
240240

241+
pub fn push_flags<T: crate::sys::OpCode + 'static>(
242+
&mut self,
243+
op: &mut Key<T>,
244+
) -> Poll<(io::Result<usize>, u32)> {
245+
instrument!(compio_log::Level::TRACE, "push_flags", ?op);
246+
let user_data = op.user_data();
247+
let op_pin = op.as_op_pin();
248+
trace!("push RawOp");
249+
match op_pin.create_entry() {
250+
OpEntry::Submission(entry) => {
251+
#[allow(clippy::useless_conversion)]
252+
if let Err(err) = self.push_raw(entry.user_data(user_data as _).into()) {
253+
return Poll::Ready((Err(err), 0));
254+
}
255+
Poll::Pending
256+
}
257+
#[cfg(feature = "io-uring-sqe128")]
258+
OpEntry::Submission128(entry) => {
259+
if let Err(err) = self.push_raw(entry.user_data(user_data as _)) {
260+
return Poll::Ready((Err(err), 0));
261+
}
262+
Poll::Pending
263+
}
264+
OpEntry::Blocking => match self.push_blocking(user_data) {
265+
Err(err) => Poll::Ready((Err(err), 0)),
266+
Ok(true) => Poll::Pending,
267+
Ok(false) => Poll::Ready((Err(io::Error::from_raw_os_error(libc::EBUSY)), 0)),
268+
},
269+
}
270+
}
271+
241272
fn push_blocking(&mut self, user_data: usize) -> io::Result<bool> {
242273
let handle = self.handle()?;
243274
let completed = self.pool_completed.clone();
@@ -247,7 +278,7 @@ impl Driver {
247278
let mut op = unsafe { Key::<dyn crate::sys::OpCode>::new_unchecked(user_data) };
248279
let op_pin = op.as_op_pin();
249280
let res = op_pin.call_blocking();
250-
completed.push(Entry::new(user_data, res));
281+
completed.push(Entry::new(user_data, res, todo!("how to get flags?")));
251282
handle.notify().ok();
252283
})
253284
.is_ok();
@@ -294,7 +325,7 @@ fn create_entry(entry: CEntry) -> Entry {
294325
} else {
295326
Ok(result as _)
296327
};
297-
Entry::new(entry.user_data() as _, result)
328+
Entry::new(entry.user_data() as _, result, entry.flags())
298329
}
299330

300331
fn timespec(duration: std::time::Duration) -> Timespec {

compio-driver/src/key.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub(crate) struct RawOp<T: ?Sized> {
2121
// The metadata in `*mut RawOp<dyn OpCode>`
2222
metadata: usize,
2323
result: PushEntry<Option<Waker>, io::Result<usize>>,
24+
flags: u32,
2425
op: T,
2526
}
2627

@@ -84,6 +85,7 @@ impl<T: OpCode + 'static> Key<T> {
8485
cancelled: false,
8586
metadata: opcode_metadata::<T>(),
8687
result: PushEntry::Pending(None),
88+
flags: 0,
8789
op,
8890
});
8991
unsafe { Self::new_unchecked(Box::into_raw(raw_op) as _) }
@@ -154,6 +156,10 @@ impl<T: ?Sized> Key<T> {
154156
this.cancelled
155157
}
156158

159+
pub(crate) fn set_flags(&mut self, flags: u32) {
160+
self.as_opaque_mut().flags = flags;
161+
}
162+
157163
/// Whether the op is completed.
158164
pub(crate) fn has_result(&self) -> bool {
159165
self.as_opaque().result.is_ready()
@@ -189,6 +195,19 @@ impl<T> Key<T> {
189195
let op = unsafe { Box::from_raw(self.user_data as *mut RawOp<T>) };
190196
BufResult(op.result.take_ready().unwrap_unchecked(), op.op)
191197
}
198+
199+
/// Get the inner result and flags if it is completed.
200+
///
201+
/// # Safety
202+
///
203+
/// Call it only when the op is completed, otherwise it is UB.
204+
pub(crate) unsafe fn into_inner_flags(self) -> (BufResult<usize, T>, u32) {
205+
let op = unsafe { Box::from_raw(self.user_data as *mut RawOp<T>) };
206+
(
207+
BufResult(op.result.take_ready().unwrap_unchecked(), op.op),
208+
op.flags,
209+
)
210+
}
192211
}
193212

194213
impl<T: OpCode + ?Sized> Key<T> {

compio-driver/src/lib.rs

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,24 @@ impl Proactor {
272272
}
273273
}
274274

275+
/// Push an operation into the driver, and return the unique key, called
276+
/// user-defined data, associated with it.
277+
pub fn push_flags<T: OpCode + 'static>(
278+
&mut self,
279+
op: T,
280+
) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
281+
let mut op = self.driver.create_op(op);
282+
match self.driver.push_flags(&mut op) {
283+
Poll::Pending => PushEntry::Pending(op),
284+
Poll::Ready((res, flags)) => {
285+
op.set_result(res);
286+
op.set_flags(flags);
287+
// SAFETY: just completed.
288+
PushEntry::Ready(unsafe { op.into_inner_flags() })
289+
}
290+
}
291+
}
292+
275293
/// Poll the driver and get completed entries.
276294
/// You need to call [`Proactor::pop`] to get the pushed operations.
277295
pub fn poll(
@@ -300,6 +318,21 @@ impl Proactor {
300318
}
301319
}
302320

321+
/// Get the pushed operations from the completion entries.
322+
///
323+
/// # Panics
324+
/// This function will panic if the requested operation has not been
325+
/// completed.
326+
pub fn pop_flags<T>(&mut self, op: Key<T>) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
327+
instrument!(compio_log::Level::DEBUG, "pop_flags", ?op);
328+
if op.has_result() {
329+
// SAFETY: completed.
330+
PushEntry::Ready(unsafe { op.into_inner_flags() })
331+
} else {
332+
PushEntry::Pending(op)
333+
}
334+
}
335+
303336
/// Update the waker of the specified op.
304337
pub fn update_waker<T>(&mut self, op: &mut Key<T>, waker: Waker) {
305338
op.set_waker(waker);
@@ -322,18 +355,27 @@ impl AsRawFd for Proactor {
322355
pub(crate) struct Entry {
323356
user_data: usize,
324357
result: io::Result<usize>,
358+
flags: u32,
325359
}
326360

327361
impl Entry {
328-
pub(crate) fn new(user_data: usize, result: io::Result<usize>) -> Self {
329-
Self { user_data, result }
362+
pub(crate) fn new(user_data: usize, result: io::Result<usize>, flags: u32) -> Self {
363+
Self {
364+
user_data,
365+
result,
366+
flags,
367+
}
330368
}
331369

332370
/// The user-defined data returned by [`Proactor::push`].
333371
pub fn user_data(&self) -> usize {
334372
self.user_data
335373
}
336374

375+
pub fn flags(&self) -> u32 {
376+
self.flags
377+
}
378+
337379
/// The result of the operation.
338380
pub fn into_result(self) -> io::Result<usize> {
339381
self.result
@@ -357,6 +399,7 @@ impl<E: Extend<usize>> Extend<Entry> for OutEntries<'_, E> {
357399
self.entries.extend(iter.into_iter().filter_map(|e| {
358400
let user_data = e.user_data();
359401
let mut op = unsafe { Key::<()>::new_unchecked(user_data) };
402+
op.set_flags(e.flags());
360403
if op.set_result(e.into_result()) {
361404
// SAFETY: completed and cancelled.
362405
let _ = unsafe { op.into_box() };

compio-runtime/src/runtime/mod.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ use send_wrapper::SendWrapper;
3131

3232
#[cfg(feature = "time")]
3333
use crate::runtime::time::{TimerFuture, TimerRuntime};
34-
use crate::{runtime::op::OpFuture, BufResult};
34+
use crate::{
35+
runtime::op::{OpFlagsFuture, OpFuture},
36+
BufResult,
37+
};
3538

3639
scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
3740

@@ -231,6 +234,13 @@ impl Runtime {
231234
self.driver.borrow_mut().push(op)
232235
}
233236

237+
fn submit_flags_raw<T: OpCode + 'static>(
238+
&self,
239+
op: T,
240+
) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
241+
self.driver.borrow_mut().push_flags(op)
242+
}
243+
234244
/// Submit an operation to the runtime.
235245
///
236246
/// You only need this when authoring your own [`OpCode`].
@@ -241,6 +251,22 @@ impl Runtime {
241251
}
242252
}
243253

254+
/// Submit an operation to the runtime.
255+
///
256+
/// The difference between [`Runtime::submit`] is this method will return
257+
/// the flags
258+
///
259+
/// You only need this when authoring your own [`OpCode`].
260+
pub fn submit_flags<T: OpCode + 'static>(
261+
&self,
262+
op: T,
263+
) -> impl Future<Output = (BufResult<usize, T>, u32)> {
264+
match self.submit_flags_raw(op) {
265+
PushEntry::Pending(user_data) => Either::Left(OpFlagsFuture::new(user_data)),
266+
PushEntry::Ready(res) => Either::Right(ready(res)),
267+
}
268+
}
269+
244270
#[cfg(feature = "time")]
245271
pub(crate) fn create_timer(&self, delay: std::time::Duration) -> impl Future<Output = ()> {
246272
let mut timer_runtime = self.timer_runtime.borrow_mut();
@@ -273,6 +299,19 @@ impl Runtime {
273299
})
274300
}
275301

302+
pub(crate) fn poll_task_flags<T: OpCode>(
303+
&self,
304+
cx: &mut Context,
305+
op: Key<T>,
306+
) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
307+
instrument!(compio_log::Level::DEBUG, "poll_task_flags", ?op);
308+
let mut driver = self.driver.borrow_mut();
309+
driver.pop_flags(op).map_pending(|mut k| {
310+
driver.update_waker(&mut k, cx.waker().clone());
311+
k
312+
})
313+
}
314+
276315
#[cfg(feature = "time")]
277316
pub(crate) fn poll_timer(&self, cx: &mut Context, key: usize) -> Poll<()> {
278317
instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key);

compio-runtime/src/runtime/op.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,37 @@ impl<T: OpCode> Drop for OpFuture<T> {
4242
}
4343
}
4444
}
45+
46+
#[derive(Debug)]
47+
pub struct OpFlagsFuture<T: OpCode> {
48+
key: Option<Key<T>>,
49+
}
50+
51+
impl<T: OpCode> OpFlagsFuture<T> {
52+
pub fn new(key: Key<T>) -> Self {
53+
Self { key: Some(key) }
54+
}
55+
}
56+
57+
impl<T: OpCode> Future for OpFlagsFuture<T> {
58+
type Output = (BufResult<usize, T>, u32);
59+
60+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
61+
let res = Runtime::with_current(|r| r.poll_task_flags(cx, self.key.take().unwrap()));
62+
match res {
63+
PushEntry::Pending(key) => {
64+
self.key = Some(key);
65+
Poll::Pending
66+
}
67+
PushEntry::Ready(res) => Poll::Ready(res),
68+
}
69+
}
70+
}
71+
72+
impl<T: OpCode> Drop for OpFlagsFuture<T> {
73+
fn drop(&mut self) {
74+
if let Some(key) = self.key.take() {
75+
Runtime::with_current(|r| r.cancel_op(key))
76+
}
77+
}
78+
}

0 commit comments

Comments
 (0)