Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions core/docs/en/coroutine.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,38 @@ author: loongs-zhang

# Coroutine Overview

## Usage

```rust
use open_coroutine_core::common::constants::CoroutineState;
use open_coroutine_core::coroutine::Coroutine;

fn main() -> std::io::Result<()> {
let mut co = Coroutine::new(
// optional coroutine name
None,
|suspender, input| {
assert_eq!(1, input);
assert_eq!(3, suspender.suspend_with(2));
4
},
// optional stack size
None,
// optional coroutine priority
None,
)?;
// Macro `co!` is equivalent to the code above
// let mut co = open_coroutine_core::co!(|suspender, input| {
// assert_eq!(1, input);
// assert_eq!(3, suspender.suspend_with(2));
// 4
// })?;
assert_eq!(CoroutineState::Suspend(2, 0), co.resume_with(1)?);
assert_eq!(CoroutineState::Complete(4), co.resume_with(3)?);
Ok(())
}
```

## What is coroutine?

A [coroutine](https://en.wikipedia.org/wiki/Coroutine) is a function that can be paused and resumed, yielding values to
Expand All @@ -15,6 +47,26 @@ resumed.

The above is excerpted from [corosensei](https://github.com/Amanieu/corosensei).

## Coroutine VS Thread

| | coroutine | thread |
|-------------------|-----------|---------|
| switch efficiency | ✅ Higher | ❌ High |
| memory efficiency | KB/MB | KB/MB |
| scheduled by OS | ❌ | ✅ |
| stack grow | ✅ | ❌ |

## Stackfull VS Stackless

| | stackfull | stackless |
|-------------------|-----------|-----------|
| switch efficiency | ❌ High | ✅ Higher |
| memory efficiency | ❌ KB/MB | ✅ Bytes |
| limitations | ✅ Few | ❌ Many |

In general, if the requirements for resource utilization and switching performance are not very strict, using a
stackfull approach would be more convenient and the code would be easier to maintain.

## State in open-coroutine

```text
Expand Down
31 changes: 31 additions & 0 deletions core/docs/en/monitor.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,37 @@ The `preemptive` feature currently supports the following targets:

✅ Tested and stable; ⚠️ Tested but unstable; ❌ Not supported.

## Usage

```rust
use open_coroutine_core::co;
use open_coroutine_core::common::constants::CoroutineState;
use open_coroutine_core::coroutine::Coroutine;

fn main() -> std::io::Result<()> {
// Simulate the most extreme dead loop, if the preemptive feature
// is not enabled, it will remain stuck in a dead loop after resume.
let mut coroutine: Coroutine<(), (), ()> = co!(|_, ()| { loop {} })?;
assert_eq!(CoroutineState::Suspend((), 0), coroutine.resume()?);
assert_eq!(CoroutineState::Suspend((), 0), coroutine.state());
Ok(())
}
```

## What is monitor?

The `monitor` mod implements the `preemptive` feature for open-coroutine, which allows the coroutine to be preempted
when it is running for a long time.

## Why preempt

After a `Coroutine::resume_with`, a coroutine may occupy the scheduling thread for a long time, thereby slowing down
other coroutines scheduled by that scheduling thread. To solve this problem, we introduce preemptive scheduling, which
automatically suspends coroutines that are stuck in long-term execution and allows other coroutines to execute.

The coroutine occupies scheduling threads for a long time in two scenarios: getting stuck in heavy computing or syscall.
The following only solves the problem of getting stuck in heavy computing.

## How it works

```mermaid
Expand Down
9 changes: 6 additions & 3 deletions core/src/coroutine/korosensei.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,20 +381,23 @@ where
///# Errors
/// if stack allocate failed.
pub fn new<F>(
name: String,
name: Option<String>,
f: F,
stack_size: usize,
stack_size: Option<usize>,
priority: Option<c_longlong>,
) -> std::io::Result<Self>
where
F: FnOnce(&Suspender<Param, Yield>, Param) -> Return + 'static,
{
let stack_size = stack_size.max(crate::common::page_size());
let stack_size = stack_size
.unwrap_or(crate::common::constants::DEFAULT_STACK_SIZE)
.max(crate::common::page_size());
let stack = DefaultStack::new(stack_size)?;
let stack_infos = UnsafeCell::new(VecDeque::from([StackInfo {
stack_top: stack.base().get(),
stack_bottom: stack.limit().get(),
}]));
let name = name.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let co_name = name.clone().leak();
let inner = corosensei::Coroutine::with_stack(stack, move |y, p| {
catch!(
Expand Down
23 changes: 4 additions & 19 deletions core/src/coroutine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,19 @@ macro_rules! co {
$crate::coroutine::Coroutine::new($name, $f, $size, $priority)
};
($f:expr, $size:literal, $priority:literal $(,)?) => {
$crate::coroutine::Coroutine::new(
uuid::Uuid::new_v4().to_string(),
$f,
$size,
Some($priority),
)
$crate::coroutine::Coroutine::new(None, $f, $size, Some($priority))
};
($name:expr, $f:expr, $size:expr $(,)?) => {
$crate::coroutine::Coroutine::new($name, $f, $size, None)
};
($f:expr, $size:literal $(,)?) => {
$crate::coroutine::Coroutine::new(uuid::Uuid::new_v4().to_string(), $f, $size, None)
$crate::coroutine::Coroutine::new(None, $f, $size, None)
};
($name:expr, $f:expr $(,)?) => {
$crate::coroutine::Coroutine::new(
$name,
$f,
$crate::common::constants::DEFAULT_STACK_SIZE,
None,
)
$crate::coroutine::Coroutine::new($name, $f, None, None)
};
($f:expr $(,)?) => {
$crate::coroutine::Coroutine::new(
uuid::Uuid::new_v4().to_string(),
$f,
$crate::common::constants::DEFAULT_STACK_SIZE,
None,
)
$crate::coroutine::Coroutine::new(None, $f, None, None)
};
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/net/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use std::time::Duration;

cfg_if::cfg_if! {
if #[cfg(all(target_os = "linux", feature = "io_uring"))] {
use libc::{epoll_event, iovec, msghdr, off_t, size_t, sockaddr, socklen_t};
use dashmap::DashMap;
use std::ffi::c_longlong;
use libc::{epoll_event, iovec, mode_t, msghdr, off_t, size_t, sockaddr, socklen_t};
use std::ffi::{c_longlong, c_uint};
}
}

Expand Down Expand Up @@ -441,6 +441,10 @@ impl_io_uring!(pwrite(fd: c_int, buf: *const c_void, count: size_t, offset: off_
impl_io_uring!(writev(fd: c_int, iov: *const iovec, iovcnt: c_int) -> ssize_t);
impl_io_uring!(pwritev(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t);
impl_io_uring!(sendmsg(fd: c_int, msg: *const msghdr, flags: c_int) -> ssize_t);
impl_io_uring!(fsync(fd: c_int) -> c_int);
impl_io_uring!(mkdirat(dirfd: c_int, pathname: *const c_char, mode: mode_t) -> c_int);
impl_io_uring!(renameat(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char) -> c_int);
impl_io_uring!(renameat2(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char, flags: c_uint) -> c_int);

#[cfg(all(test, not(all(unix, feature = "preemptive"))))]
mod tests {
Expand Down
8 changes: 6 additions & 2 deletions core/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use std::time::Duration;

cfg_if::cfg_if! {
if #[cfg(all(target_os = "linux", feature = "io_uring"))] {
use libc::{epoll_event, iovec, msghdr, off_t, size_t, sockaddr, socklen_t};
use std::ffi::c_void;
use libc::{epoll_event, iovec, mode_t, msghdr, off_t, size_t, sockaddr, socklen_t};
use std::ffi::{c_char, c_uint, c_void};
}
}

Expand Down Expand Up @@ -276,3 +276,7 @@ impl_io_uring!(pwrite(fd: c_int, buf: *const c_void, count: size_t, offset: off_
impl_io_uring!(writev(fd: c_int, iov: *const iovec, iovcnt: c_int) -> ssize_t);
impl_io_uring!(pwritev(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t);
impl_io_uring!(sendmsg(fd: c_int, msg: *const msghdr, flags: c_int) -> ssize_t);
impl_io_uring!(fsync(fd: c_int) -> c_int);
impl_io_uring!(mkdirat(dirfd: c_int, pathname: *const c_char, mode: mode_t) -> c_int);
impl_io_uring!(renameat(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char) -> c_int);
impl_io_uring!(renameat2(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char, flags: c_uint) -> c_int);
4 changes: 2 additions & 2 deletions core/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ impl<'s> Scheduler<'s> {
priority: Option<c_longlong>,
) -> std::io::Result<()> {
self.submit_raw_co(co!(
format!("{}@{}", self.name(), uuid::Uuid::new_v4()),
Some(format!("{}@{}", self.name(), uuid::Uuid::new_v4())),
f,
stack_size.unwrap_or(self.stack_size()),
Some(stack_size.unwrap_or(self.stack_size())),
priority
)?)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/syscall/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
macro_rules! syscall_mod {
($($mod_name: ident);*) => {
($($mod_name: ident);*$(;)?) => {
$(
pub use $mod_name::$mod_name;
mod $mod_name;
Expand Down
32 changes: 32 additions & 0 deletions core/src/syscall/unix/fsync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use once_cell::sync::Lazy;
use std::ffi::c_int;

#[must_use]
pub extern "C" fn fsync(
fn_ptr: Option<&extern "C" fn(c_int) -> c_int>,
fd: c_int,
) -> c_int {
cfg_if::cfg_if! {
if #[cfg(all(target_os = "linux", feature = "io_uring"))] {
static CHAIN: Lazy<FsyncSyscallFacade<IoUringFsyncSyscall<RawFsyncSyscall>>> =
Lazy::new(Default::default);
} else {
static CHAIN: Lazy<FsyncSyscallFacade<RawFsyncSyscall>> = Lazy::new(Default::default);
}
}
CHAIN.fsync(fn_ptr, fd)
}

trait FsyncSyscall {
extern "C" fn fsync(
&self,
fn_ptr: Option<&extern "C" fn(c_int) -> c_int>,
fd: c_int,
) -> c_int;
}

impl_facade!(FsyncSyscallFacade, FsyncSyscall, fsync(fd: c_int) -> c_int);

impl_io_uring!(IoUringFsyncSyscall, FsyncSyscall, fsync(fd: c_int) -> c_int);

impl_raw!(RawFsyncSyscall, FsyncSyscall, fsync(fd: c_int) -> c_int);
43 changes: 43 additions & 0 deletions core/src/syscall/unix/mkdirat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use libc::mode_t;
use once_cell::sync::Lazy;
use std::ffi::{c_char, c_int};

#[must_use]
pub extern "C" fn mkdirat(
fn_ptr: Option<&extern "C" fn(c_int, *const c_char, mode_t) -> c_int>,
dirfd: c_int,
pathname: *const c_char,
mode: mode_t,
) -> c_int {
cfg_if::cfg_if! {
if #[cfg(all(target_os = "linux", feature = "io_uring"))] {
static CHAIN: Lazy<MkdiratSyscallFacade<IoUringMkdiratSyscall<RawMkdiratSyscall>>> =
Lazy::new(Default::default);
} else {
static CHAIN: Lazy<MkdiratSyscallFacade<RawMkdiratSyscall>> = Lazy::new(Default::default);
}
}
CHAIN.mkdirat(fn_ptr, dirfd, pathname, mode)
}

trait MkdiratSyscall {
extern "C" fn mkdirat(
&self,
fn_ptr: Option<&extern "C" fn(c_int, *const c_char, mode_t) -> c_int>,
dirfd: c_int,
pathname: *const c_char,
mode: mode_t,
) -> c_int;
}

impl_facade!(MkdiratSyscallFacade, MkdiratSyscall,
mkdirat(dirfd: c_int, pathname: *const c_char, mode: mode_t) -> c_int
);

impl_io_uring!(IoUringMkdiratSyscall, MkdiratSyscall,
mkdirat(dirfd: c_int, pathname: *const c_char, mode: mode_t) -> c_int
);

impl_raw!(RawMkdiratSyscall, MkdiratSyscall,
mkdirat(dirfd: c_int, pathname: *const c_char, mode: mode_t) -> c_int
);
Loading
Loading