Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions core/src/co_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,32 @@ impl<'p> CoroutinePool<'p> {
match self.state() {
PoolState::Running => {
assert_eq!(PoolState::Running, self.stopping()?);
_ = self.try_timed_schedule_task(dur)?;
assert_eq!(PoolState::Stopping, self.stopped()?);
self.do_stop(dur)?;
}
PoolState::Stopping => {
_ = self.try_timed_schedule_task(dur)?;
assert_eq!(PoolState::Stopping, self.stopped()?);
}
PoolState::Stopped => {}
PoolState::Stopping => self.do_stop(dur)?,
PoolState::Stopped => self.do_clean(),
}
Ok(())
}

fn do_stop(&mut self, dur: Duration) -> std::io::Result<()> {
_ = self.try_timed_schedule_task(dur)?;
assert_eq!(PoolState::Stopping, self.stopped()?);
self.do_clean();
Ok(())
}

fn do_clean(&mut self) {
// clean up remaining wait tasks
for r in &self.waits {
let task_name = *r.key();
_ = self
.results
.insert(task_name.to_string(), Err("The coroutine pool has stopped"));
self.notify(task_name);
}
}

/// Submit a new task to this pool.
///
/// Allow multiple threads to concurrently submit task to the pool,
Expand Down Expand Up @@ -271,7 +285,6 @@ impl<'p> CoroutinePool<'p> {
let key = Box::leak(Box::from(task_name));
if let Some(r) = self.try_take_task_result(key) {
self.notify(key);
drop(self.waits.remove(key));
return Ok(r);
}
if SchedulableCoroutine::current().is_some() {
Expand Down Expand Up @@ -304,7 +317,6 @@ impl<'p> CoroutinePool<'p> {
);
if let Some(r) = self.try_take_task_result(key) {
self.notify(key);
assert!(self.waits.remove(key).is_some());
return Ok(r);
}
Err(Error::new(ErrorKind::TimedOut, "wait timeout"))
Expand Down Expand Up @@ -415,8 +427,8 @@ impl<'p> CoroutinePool<'p> {
}

fn notify(&self, task_name: &str) {
if let Some(arc) = self.waits.get(task_name) {
let (lock, cvar) = &**arc;
if let Some((_, arc)) = self.waits.remove(task_name) {
let (lock, cvar) = &*arc;
let mut pending = lock.lock().expect("notify task failed");
*pending = false;
cvar.notify_one();
Expand Down
2 changes: 2 additions & 0 deletions core/src/common/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ pub enum SyscallName {
WaitOnAddress,
#[cfg(windows)]
WSAPoll,
/// panic!
panicking,
}

impl SyscallName {
Expand Down
2 changes: 1 addition & 1 deletion core/src/coroutine/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ mod tests {
let local = CoroutineLocal::default();
assert!(local.put("1", 1).is_none());
assert_eq!(Some(1), local.put("1", 2));
assert_eq!(2, *local.get("1").unwrap());
assert_eq!(2, *local.get::<i32>("1").unwrap());
*local.get_mut("1").unwrap() = 3;
assert_eq!(Some(3), local.remove("1"));
}
Expand Down
29 changes: 29 additions & 0 deletions core/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,35 @@ impl Monitor {
match self.state.get() {
MonitorState::Created => {
self.state.set(MonitorState::Running);
// install panic hook
std::panic::set_hook(Box::new(|panic_hook_info| {
let syscall = crate::common::constants::SyscallName::panicking;
if let Some(co) = crate::scheduler::SchedulableCoroutine::current() {
let new_state = crate::common::constants::SyscallState::Executing;
if co.syscall((), syscall, new_state).is_err() {
error!(
"{} change to syscall {} {} failed !",
co.name(),
syscall,
new_state
);
}
}
eprintln!(
"panic hooked in open-coroutine, thread '{}' {}",
std::thread::current().name().unwrap_or("unknown"),
panic_hook_info
);
eprintln!(
"stack backtrace:\n{}",
std::backtrace::Backtrace::force_capture()
);
if let Some(co) = crate::scheduler::SchedulableCoroutine::current() {
if co.running().is_err() {
error!("{} change to running state failed !", co.name());
}
}
}));
// install SIGURG signal handler
let mut set = SigSet::empty();
set.add(Signal::SIGURG);
Expand Down
8 changes: 7 additions & 1 deletion core/src/net/operator/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ impl<'o> Operator<'o> {
unsafe {
let ret = CreateIoCompletionPort(handle, self.iocp, self.cpu, 0);
if ret.is_null()
&& ERROR_INVALID_PARAMETER == WSAGetLastError().try_into().expect("overflow")
&& ERROR_INVALID_PARAMETER
== TryInto::<u32>::try_into(WSAGetLastError()).expect("overflow")
{
// duplicate bind
return Ok(());
Expand Down Expand Up @@ -192,6 +193,11 @@ impl<'o> Operator<'o> {
Ok((cq.len(), cq, timeout.map(|t| t.saturating_sub(cost))))
}

#[allow(warnings)]
pub(crate) fn async_cancel(&self, user_data: usize) -> std::io::Result<()> {
todo!("CancelIoEx")
}

pub(crate) fn accept(
&self,
user_data: usize,
Expand Down
Loading