Skip to content

Commit 98f63c8

Browse files
authored
feat(runtime): submit_multi (#743)
* feat(runtime): submit_multi * fix: unused import * feat: apply suggestion * fix: doctest
1 parent 0446642 commit 98f63c8

File tree

4 files changed

+204
-26
lines changed

4 files changed

+204
-26
lines changed

compio-io/src/framed/codec/bytes.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
//! codec.encode(data.clone(), &mut buffer).unwrap();
1818
//!
1919
//! // Decoding
20-
//! let decoded = codec.decode(&buffer.as_slice()).unwrap();
20+
//! let decoded = codec.decode(&buffer.slice(..)).unwrap();
2121
//! assert_eq!(decoded, data);
2222
//! ```
2323
use std::io::{self, Write};

compio-runtime/src/runtime/future.rs

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,31 @@ impl ContextExt for Context<'_> {
4242
Some(extra)
4343
}
4444
}
45+
pin_project_lite::pin_project! {
46+
/// Returned [`Future`] for [`Runtime::submit`].
47+
///
48+
/// When this is dropped and the operation hasn't finished yet, it will try to
49+
/// cancel the operation.
50+
///
51+
/// By default, this implements `Future<Output = BufResult<usize, T>>`. If
52+
/// [`Extra`] is needed, call [`.with_extra()`] to get a `Submit<T, Extra>`
53+
/// which implements `Future<Output = (BufResult<usize, T>, Extra)>`.
54+
///
55+
/// [`.with_extra()`]: Submit::with_extra
56+
pub struct Submit<T: OpCode, E = ()> {
57+
runtime: Runtime,
58+
state: Option<State<T, E>>,
59+
}
60+
61+
impl<T: OpCode, E> PinnedDrop for Submit<T, E> {
62+
fn drop(this: Pin<&mut Self>) {
63+
let this = this.project();
64+
if let Some(State::Submitted { key, .. }) = this.state.take() {
65+
this.runtime.cancel(key);
66+
}
67+
}
68+
}
4569

46-
/// Return type for `Runtime::submit`
47-
///
48-
/// By default, this implements `Future<Output = BufResult<usize, T>>`. If
49-
/// [`Extra`] is needed, call [`.with_extra()`] to get a `Submit<T, Extra>`
50-
/// which implements `Future<Output = (BufResult<usize, T>, Extra)>`.
51-
///
52-
/// [`.with_extra()`]: Submit::with_extra
53-
pub struct Submit<T: OpCode, E = ()> {
54-
runtime: Runtime,
55-
state: Option<State<T, E>>,
5670
}
5771

5872
enum State<T: OpCode, E> {
@@ -107,12 +121,13 @@ impl<T: OpCode + 'static> Future for Submit<T, ()> {
107121
type Output = BufResult<usize, T>;
108122

109123
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
110-
let this = unsafe { self.get_unchecked_mut() };
124+
let this = self.project();
125+
111126
loop {
112127
match this.state.take().expect("Cannot poll after ready") {
113128
State::Submitted { key, .. } => match this.runtime.poll_task(cx.waker(), key) {
114129
PushEntry::Pending(key) => {
115-
this.state = Some(State::submitted(key));
130+
*this.state = Some(State::submitted(key));
116131
return Poll::Pending;
117132
}
118133
PushEntry::Ready(res) => return Poll::Ready(res),
@@ -127,7 +142,7 @@ impl<T: OpCode + 'static> Future for Submit<T, ()> {
127142
cancel.register(&key);
128143
};
129144

130-
this.state = Some(State::submitted(key))
145+
*this.state = Some(State::submitted(key))
131146
}
132147
PushEntry::Ready(res) => {
133148
return Poll::Ready(res);
@@ -143,13 +158,14 @@ impl<T: OpCode + 'static> Future for Submit<T, Extra> {
143158
type Output = (BufResult<usize, T>, Extra);
144159

145160
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146-
let this = unsafe { self.get_unchecked_mut() };
161+
let this = self.project();
162+
147163
loop {
148164
match this.state.take().expect("Cannot poll after ready") {
149165
State::Submitted { key, .. } => {
150166
match this.runtime.poll_task_with_extra(cx.waker(), key) {
151167
PushEntry::Pending(key) => {
152-
this.state = Some(State::submitted(key));
168+
*this.state = Some(State::submitted(key));
153169
return Poll::Pending;
154170
}
155171
PushEntry::Ready(res) => return Poll::Ready(res),
@@ -163,7 +179,7 @@ impl<T: OpCode + 'static> Future for Submit<T, Extra> {
163179
cancel.register(&key);
164180
}
165181

166-
this.state = Some(State::submitted(key))
182+
*this.state = Some(State::submitted(key))
167183
}
168184
PushEntry::Ready(res) => {
169185
return Poll::Ready((res, this.runtime.default_extra()));
@@ -183,11 +199,3 @@ where
183199
self.state.is_none()
184200
}
185201
}
186-
187-
impl<T: OpCode, E> Drop for Submit<T, E> {
188-
fn drop(&mut self) {
189-
if let Some(State::Submitted { key, .. }) = self.state.take() {
190-
self.runtime.cancel(key);
191-
}
192-
}
193-
}

compio-runtime/src/runtime/mod.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ use futures_util::FutureExt;
2525
mod future;
2626
pub use future::*;
2727

28+
mod stream;
29+
pub use stream::*;
30+
2831
#[cfg(feature = "time")]
2932
pub(crate) mod time;
3033

@@ -284,6 +287,13 @@ impl Runtime {
284287
Submit::new(self.clone(), op)
285288
}
286289

290+
/// Submit a multishot operation to the runtime.
291+
///
292+
/// You only need this when authoring your own [`OpCode`].
293+
pub fn submit_multi<T: OpCode + 'static>(&self, op: T) -> SubmitMulti<T> {
294+
SubmitMulti::new(self.clone(), op)
295+
}
296+
287297
pub(crate) fn cancel<T: OpCode>(&self, key: Key<T>) {
288298
self.driver.borrow_mut().cancel(key);
289299
}
@@ -327,6 +337,20 @@ impl Runtime {
327337
})
328338
}
329339

340+
pub(crate) fn poll_multishot<T: OpCode>(
341+
&self,
342+
waker: &Waker,
343+
key: &Key<T>,
344+
) -> Option<BufResult<usize, Extra>> {
345+
instrument!(compio_log::Level::DEBUG, "poll_multishot", ?key);
346+
let mut driver = self.driver.borrow_mut();
347+
if let Some(res) = driver.pop_multishot(key) {
348+
return Some(res);
349+
}
350+
driver.update_waker(key, waker);
351+
None
352+
}
353+
330354
#[cfg(feature = "time")]
331355
pub(crate) fn poll_timer(&self, cx: &mut Context, key: &TimerKey) -> Poll<()> {
332356
instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key);
@@ -595,12 +619,24 @@ pub fn spawn_blocking<T: Send + 'static>(
595619
/// ## Panics
596620
///
597621
/// This method doesn't create runtime and will panic if it's not within a
598-
/// runtime. It tries to obtain the current runtime by
622+
/// runtime. It tries to obtain the current runtime with
599623
/// [`Runtime::with_current`].
600624
pub fn submit<T: OpCode + 'static>(op: T) -> Submit<T> {
601625
Runtime::with_current(|r| r.submit(op))
602626
}
603627

628+
/// Submit a multishot operation to the current runtime, and return a stream for
629+
/// it.
630+
///
631+
/// ## Panics
632+
///
633+
/// This method doesn't create runtime and will panic if it's not within a
634+
/// runtime. It tries to obtain the current runtime with
635+
/// [`Runtime::with_current`].
636+
pub fn submit_multi<T: OpCode + 'static>(op: T) -> SubmitMulti<T> {
637+
Runtime::with_current(|r| r.submit_multi(op))
638+
}
639+
604640
/// Register file descriptors for fixed-file operations with the current
605641
/// runtime's io_uring instance.
606642
///
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
use std::{
2+
pin::Pin,
3+
task::{Context, Poll},
4+
};
5+
6+
use compio_buf::BufResult;
7+
use compio_driver::{Extra, Key, OpCode, PushEntry};
8+
use futures_util::{Stream, stream::FusedStream};
9+
10+
use crate::{ContextExt, Runtime};
11+
12+
pin_project_lite::pin_project! {
13+
/// Returned [`Stream`] for [`Runtime::submit_multi`].
14+
///
15+
/// When this is dropped and the operation hasn't finished yet, it will try to
16+
/// cancel the operation.
17+
pub struct SubmitMulti<T: OpCode> {
18+
runtime: Runtime,
19+
state: Option<State<T>>,
20+
}
21+
22+
impl<T: OpCode> PinnedDrop for SubmitMulti<T> {
23+
fn drop(this: Pin<&mut Self>) {
24+
let this = this.project();
25+
if let Some(State::Submitted { key }) = this.state.take() {
26+
this.runtime.cancel(key);
27+
}
28+
}
29+
}
30+
}
31+
32+
enum State<T: OpCode> {
33+
Idle { op: T },
34+
Submitted { key: Key<T> },
35+
Finished { op: T },
36+
}
37+
38+
impl<T: OpCode> State<T> {
39+
fn submitted(key: Key<T>) -> Self {
40+
State::Submitted { key }
41+
}
42+
}
43+
44+
impl<T: OpCode> SubmitMulti<T> {
45+
pub(crate) fn new(runtime: Runtime, op: T) -> Self {
46+
SubmitMulti {
47+
runtime,
48+
state: Some(State::Idle { op }),
49+
}
50+
}
51+
52+
/// Try to take the inner op from the stream.
53+
///
54+
/// Returns `Ok(T)` if the stream:
55+
///
56+
/// - has not been polled yet, or
57+
/// - is finished and the op is returned by the driver
58+
///
59+
/// Returns `Err(Self)` if it's still running.
60+
pub fn try_take(mut self) -> Result<T, Self> {
61+
match self.state.take() {
62+
Some(State::Finished { op }) | Some(State::Idle { op }) => Ok(op),
63+
state => {
64+
debug_assert!(state.is_some());
65+
self.state = state;
66+
Err(self)
67+
}
68+
}
69+
}
70+
}
71+
72+
impl<T: OpCode + 'static> Stream for SubmitMulti<T> {
73+
type Item = BufResult<usize, Extra>;
74+
75+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76+
let this = self.project();
77+
78+
loop {
79+
match this.state.take().expect("State error, this is a bug") {
80+
State::Idle { op } => {
81+
let extra = cx.as_extra(|| this.runtime.default_extra());
82+
match this.runtime.submit_raw(op, extra) {
83+
PushEntry::Pending(key) => {
84+
if let Some(cancel) = cx.as_cancel() {
85+
cancel.register(&key);
86+
}
87+
88+
*this.state = Some(State::submitted(key))
89+
}
90+
PushEntry::Ready(BufResult(res, op)) => {
91+
*this.state = Some(State::Finished { op });
92+
let extra = this.runtime.default_extra();
93+
94+
return Poll::Ready(Some(BufResult(res, extra)));
95+
}
96+
}
97+
}
98+
99+
State::Submitted { key, .. } => {
100+
if let Some(res) = this.runtime.poll_multishot(cx.waker(), &key) {
101+
*this.state = Some(State::submitted(key));
102+
103+
return Poll::Ready(Some(res));
104+
};
105+
106+
match this.runtime.poll_task_with_extra(cx.waker(), key) {
107+
PushEntry::Pending(key) => {
108+
*this.state = Some(State::submitted(key));
109+
110+
return Poll::Pending;
111+
}
112+
PushEntry::Ready((BufResult(res, op), extra)) => {
113+
*this.state = Some(State::Finished { op });
114+
115+
return Poll::Ready(Some(BufResult(res, extra)));
116+
}
117+
}
118+
}
119+
120+
State::Finished { op } => {
121+
*this.state = Some(State::Finished { op });
122+
123+
return Poll::Ready(None);
124+
}
125+
}
126+
}
127+
}
128+
}
129+
130+
impl<T: OpCode + 'static> FusedStream for SubmitMulti<T> {
131+
fn is_terminated(&self) -> bool {
132+
matches!(self.state, None | Some(State::Finished { .. }))
133+
}
134+
}

0 commit comments

Comments
 (0)