Skip to content

Commit c0a79ea

Browse files
committed
refactor io_uring impls
1 parent 5a39680 commit c0a79ea

File tree

20 files changed

+217
-34
lines changed

20 files changed

+217
-34
lines changed

core/src/coroutine/korosensei.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -333,10 +333,7 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
333333
return Ok(callback());
334334
}
335335
return stack_pool.allocate(stack_size).map(|stack| {
336-
co.stack_infos_mut().push_back(StackInfo {
337-
stack_top: stack.base().get(),
338-
stack_bottom: stack.limit().get(),
339-
});
336+
co.stack_infos_mut().push_back(StackInfo::from(&stack));
340337
let r = corosensei::on_stack(stack, callback);
341338
_ = co.stack_infos_mut().pop_back();
342339
r
@@ -357,10 +354,7 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
357354
}
358355
stack_pool.allocate(stack_size).map(|stack| {
359356
STACK_INFOS.with(|s| {
360-
s.borrow_mut().push_back(StackInfo {
361-
stack_top: stack.base().get(),
362-
stack_bottom: stack.limit().get(),
363-
});
357+
s.borrow_mut().push_back(StackInfo::from(&stack));
364358
});
365359
let r = corosensei::on_stack(stack, callback);
366360
_ = STACK_INFOS.with(|s| s.borrow_mut().pop_back());
@@ -469,3 +463,12 @@ where
469463
}
470464
}
471465
}
466+
467+
impl<S: Stack> From<&S> for StackInfo {
468+
fn from(stack: &S) -> Self {
469+
Self {
470+
stack_top: stack.base().get(),
471+
stack_bottom: stack.limit().get(),
472+
}
473+
}
474+
}

core/src/coroutine/stack_pool.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
use crate::common::now;
22
use crate::config::Config;
3+
use crate::coroutine::StackInfo;
34
use corosensei::stack::{DefaultStack, Stack, StackPointer};
45
use once_cell::sync::OnceCell;
56
use std::cell::UnsafeCell;
67
use std::cmp::Ordering;
78
use std::collections::BinaryHeap;
9+
use std::fmt::Debug;
810
use std::ops::{Deref, DerefMut};
911
use std::rc::Rc;
1012
use std::sync::atomic::{AtomicU64, AtomicUsize};
@@ -15,6 +17,16 @@ pub(crate) struct PooledStack {
1517
create_time: u64,
1618
}
1719

20+
impl Debug for PooledStack {
21+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22+
f.debug_struct("PooledStack")
23+
.field("stack_size", &self.stack_size)
24+
.field("stack", &StackInfo::from(self))
25+
.field("create_time", &self.create_time)
26+
.finish()
27+
}
28+
}
29+
1830
impl Deref for PooledStack {
1931
type Target = DefaultStack;
2032

@@ -53,7 +65,10 @@ impl Clone for PooledStack {
5365

5466
impl PartialEq<Self> for PooledStack {
5567
fn eq(&self, other: &Self) -> bool {
56-
Rc::strong_count(&other.stack).eq(&Rc::strong_count(&self.stack))
68+
other.stack_size.eq(&self.stack_size)
69+
&& other.base().eq(&self.base())
70+
&& other.limit().eq(&self.limit())
71+
&& other.create_time.eq(&self.create_time)
5772
}
5873
}
5974

core/src/scheduler.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -176,17 +176,12 @@ impl<'s> Scheduler<'s> {
176176
stack_size: Option<usize>,
177177
priority: Option<c_longlong>,
178178
) -> std::io::Result<()> {
179-
let mut co = co!(
179+
self.submit_raw_co(co!(
180180
format!("{}@{}", self.name(), uuid::Uuid::new_v4()),
181181
f,
182182
stack_size.unwrap_or(self.stack_size()),
183183
priority
184-
)?;
185-
for listener in self.listeners.clone() {
186-
co.add_raw_listener(listener);
187-
}
188-
// let co_name = Box::leak(Box::from(coroutine.get_name()));
189-
self.submit_raw_co(co)
184+
)?)
190185
}
191186

192187
/// Add a listener to this scheduler.
@@ -198,8 +193,11 @@ impl<'s> Scheduler<'s> {
198193
///
199194
/// Allow multiple threads to concurrently submit coroutine to the scheduler,
200195
/// but only allow one thread to execute scheduling.
201-
pub fn submit_raw_co(&self, coroutine: SchedulableCoroutine<'s>) -> std::io::Result<()> {
202-
self.ready.push(coroutine);
196+
pub fn submit_raw_co(&self, mut co: SchedulableCoroutine<'s>) -> std::io::Result<()> {
197+
for listener in self.listeners.clone() {
198+
co.add_raw_listener(listener);
199+
}
200+
self.ready.push(co);
203201
Ok(())
204202
}
205203

core/src/syscall/unix/accept.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ impl_facade!(AcceptSyscallFacade, AcceptSyscall,
3636
accept(fd: c_int, address: *mut sockaddr, address_len: *mut socklen_t) -> c_int
3737
);
3838

39-
impl_io_uring!(IoUringAcceptSyscall, AcceptSyscall,
39+
impl_io_uring_read!(IoUringAcceptSyscall, AcceptSyscall,
4040
accept(fd: c_int, address: *mut sockaddr, address_len: *mut socklen_t) -> c_int
4141
);
4242

core/src/syscall/unix/connect.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl_facade!(ConnectSyscallFacade, ConnectSyscall,
4040
connect(fd: c_int, address: *const sockaddr, len: socklen_t) -> c_int
4141
);
4242

43-
impl_io_uring!(IoUringConnectSyscall, ConnectSyscall,
43+
impl_io_uring_write!(IoUringConnectSyscall, ConnectSyscall,
4444
connect(fd: c_int, address: *const sockaddr, len: socklen_t) -> c_int
4545
);
4646

core/src/syscall/unix/mod.rs

Lines changed: 167 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ macro_rules! impl_io_uring {
7373
}
7474
if let Some(suspender) = SchedulableSuspender::current() {
7575
suspender.suspend();
76-
//回来的时候,系统调用已经执行完了
76+
//回来的时候,系统调用已经执行完毕
7777
}
7878
if let Some(co) = SchedulableCoroutine::current() {
7979
if let CoroutineState::Syscall((), syscall, SyscallState::Callback) = co.state()
@@ -110,6 +110,172 @@ macro_rules! impl_io_uring {
110110
}
111111
}
112112

113+
macro_rules! impl_io_uring_read {
114+
( $struct_name:ident, $trait_name: ident, $syscall: ident($fd: ident : $fd_type: ty, $($arg: ident : $arg_type: ty),*) -> $result: ty ) => {
115+
#[repr(C)]
116+
#[derive(Debug, Default)]
117+
#[cfg(all(target_os = "linux", feature = "io_uring"))]
118+
struct $struct_name<I: $trait_name> {
119+
inner: I,
120+
}
121+
122+
#[cfg(all(target_os = "linux", feature = "io_uring"))]
123+
impl<I: $trait_name> $trait_name for $struct_name<I> {
124+
extern "C" fn $syscall(
125+
&self,
126+
fn_ptr: Option<&extern "C" fn($fd_type, $($arg_type),*) -> $result>,
127+
$fd: $fd_type,
128+
$($arg: $arg_type),*
129+
) -> $result {
130+
if let Ok(arc) = $crate::net::EventLoops::$syscall($($arg, )*) {
131+
use $crate::common::constants::{CoroutineState, SyscallState};
132+
use $crate::scheduler::{SchedulableCoroutine, SchedulableSuspender};
133+
134+
if let Some(co) = SchedulableCoroutine::current() {
135+
if let CoroutineState::Syscall((), syscall, SyscallState::Executing) = co.state()
136+
{
137+
let new_state = SyscallState::Suspend(
138+
$crate::common::now()
139+
.saturating_add($crate::syscall::recv_time_limit($fd))
140+
);
141+
if co.syscall((), syscall, new_state).is_err() {
142+
$crate::error!(
143+
"{} change to syscall {} {} failed !",
144+
co.name(), syscall, new_state
145+
);
146+
}
147+
}
148+
}
149+
if let Some(suspender) = SchedulableSuspender::current() {
150+
suspender.suspend();
151+
//回来的时候,系统调用已经执行完毕或者超时
152+
}
153+
if let Some(co) = SchedulableCoroutine::current() {
154+
if let CoroutineState::Syscall((), syscall, syscall_state) = co.state() {
155+
match syscall_state {
156+
SyscallState::Timeout => {
157+
$crate::syscall::set_errno(libc::ETIMEDOUT);
158+
return -1;
159+
},
160+
SyscallState::Callback => {
161+
let new_state = SyscallState::Executing;
162+
if co.syscall((), syscall, new_state).is_err() {
163+
$crate::error!(
164+
"{} change to syscall {} {} failed !",
165+
co.name(), syscall, new_state
166+
);
167+
}
168+
},
169+
_ => {}
170+
}
171+
}
172+
}
173+
let (lock, cvar) = &*arc;
174+
let mut syscall_result: $result = cvar
175+
.wait_while(lock.lock().expect("lock failed"),
176+
|&mut result| result.is_none()
177+
)
178+
.expect("lock failed")
179+
.expect("no syscall result")
180+
.try_into()
181+
.expect("io_uring syscall result overflow");
182+
if syscall_result < 0 {
183+
let errno: std::ffi::c_int = (-syscall_result).try_into()
184+
.expect("io_uring errno overflow");
185+
$crate::syscall::set_errno(errno);
186+
syscall_result = -1;
187+
}
188+
return syscall_result;
189+
}
190+
self.inner.$syscall(fn_ptr, $($arg, )*)
191+
}
192+
}
193+
}
194+
}
195+
196+
macro_rules! impl_io_uring_write {
197+
( $struct_name:ident, $trait_name: ident, $syscall: ident($fd: ident : $fd_type: ty, $($arg: ident : $arg_type: ty),*) -> $result: ty ) => {
198+
#[repr(C)]
199+
#[derive(Debug, Default)]
200+
#[cfg(all(target_os = "linux", feature = "io_uring"))]
201+
struct $struct_name<I: $trait_name> {
202+
inner: I,
203+
}
204+
205+
#[cfg(all(target_os = "linux", feature = "io_uring"))]
206+
impl<I: $trait_name> $trait_name for $struct_name<I> {
207+
extern "C" fn $syscall(
208+
&self,
209+
fn_ptr: Option<&extern "C" fn($fd_type, $($arg_type),*) -> $result>,
210+
$fd: $fd_type,
211+
$($arg: $arg_type),*
212+
) -> $result {
213+
if let Ok(arc) = $crate::net::EventLoops::$syscall($($arg, )*) {
214+
use $crate::common::constants::{CoroutineState, SyscallState};
215+
use $crate::scheduler::{SchedulableCoroutine, SchedulableSuspender};
216+
217+
if let Some(co) = SchedulableCoroutine::current() {
218+
if let CoroutineState::Syscall((), syscall, SyscallState::Executing) = co.state()
219+
{
220+
let new_state = SyscallState::Suspend(
221+
$crate::common::now()
222+
.saturating_add($crate::syscall::send_time_limit($fd))
223+
);
224+
if co.syscall((), syscall, new_state).is_err() {
225+
$crate::error!(
226+
"{} change to syscall {} {} failed !",
227+
co.name(), syscall, new_state
228+
);
229+
}
230+
}
231+
}
232+
if let Some(suspender) = SchedulableSuspender::current() {
233+
suspender.suspend();
234+
//回来的时候,系统调用已经执行完毕或者超时
235+
}
236+
if let Some(co) = SchedulableCoroutine::current() {
237+
if let CoroutineState::Syscall((), syscall, syscall_state) = co.state() {
238+
match syscall_state {
239+
SyscallState::Timeout => {
240+
$crate::syscall::set_errno(libc::ETIMEDOUT);
241+
return -1;
242+
},
243+
SyscallState::Callback => {
244+
let new_state = SyscallState::Executing;
245+
if co.syscall((), syscall, new_state).is_err() {
246+
$crate::error!(
247+
"{} change to syscall {} {} failed !",
248+
co.name(), syscall, new_state
249+
);
250+
}
251+
},
252+
_ => {}
253+
}
254+
}
255+
}
256+
let (lock, cvar) = &*arc;
257+
let mut syscall_result: $result = cvar
258+
.wait_while(lock.lock().expect("lock failed"),
259+
|&mut result| result.is_none()
260+
)
261+
.expect("lock failed")
262+
.expect("no syscall result")
263+
.try_into()
264+
.expect("io_uring syscall result overflow");
265+
if syscall_result < 0 {
266+
let errno: std::ffi::c_int = (-syscall_result).try_into()
267+
.expect("io_uring errno overflow");
268+
$crate::syscall::set_errno(errno);
269+
syscall_result = -1;
270+
}
271+
return syscall_result;
272+
}
273+
self.inner.$syscall(fn_ptr, $($arg, )*)
274+
}
275+
}
276+
}
277+
}
278+
113279
macro_rules! impl_nio_read {
114280
( $struct_name:ident, $trait_name: ident, $syscall: ident($fd: ident : $fd_type: ty, $($arg: ident : $arg_type: ty),*) -> $result: ty ) => {
115281
#[repr(C)]

core/src/syscall/unix/pread.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl_facade!(PreadSyscallFacade, PreadSyscall,
3838
pread(fd: c_int, buf: *mut c_void, len: size_t, offset: off_t) -> ssize_t
3939
);
4040

41-
impl_io_uring!(IoUringPreadSyscall, PreadSyscall,
41+
impl_io_uring_read!(IoUringPreadSyscall, PreadSyscall,
4242
pread(fd: c_int, buf: *mut c_void, len: size_t, offset: off_t) -> ssize_t
4343
);
4444

core/src/syscall/unix/preadv.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl_facade!(PreadvSyscallFacade, PreadvSyscall,
3838
preadv(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t
3939
);
4040

41-
impl_io_uring!(IoUringPreadvSyscall, PreadvSyscall,
41+
impl_io_uring_read!(IoUringPreadvSyscall, PreadvSyscall,
4242
preadv(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t
4343
);
4444

core/src/syscall/unix/pwrite.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl_facade!(PwriteSyscallFacade, PwriteSyscall,
3838
pwrite(fd: c_int, buf: *const c_void, len: size_t, offset: off_t) -> ssize_t
3939
);
4040

41-
impl_io_uring!(IoUringPwriteSyscall, PwriteSyscall,
41+
impl_io_uring_write!(IoUringPwriteSyscall, PwriteSyscall,
4242
pwrite(fd: c_int, buf: *const c_void, len: size_t, offset: off_t) -> ssize_t
4343
);
4444

core/src/syscall/unix/pwritev.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl_facade!(PwritevSyscallFacade, PwritevSyscall,
3838
pwritev(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t
3939
);
4040

41-
impl_io_uring!(IoUringPwritevSyscall, PwritevSyscall,
41+
impl_io_uring_write!(IoUringPwritevSyscall, PwritevSyscall,
4242
pwritev(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t
4343
);
4444

0 commit comments

Comments
 (0)