From a4ae2f3788b3e811c9b228bdf41e00b56e3c1424 Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 25 Mar 2025 07:45:07 +0800 Subject: [PATCH 1/3] clean up remaining wait tasks --- core/src/co_pool/mod.rs | 34 +++++++++++++++++++--------- core/src/coroutine/local.rs | 2 +- core/src/net/operator/windows/mod.rs | 4 ++++ 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/core/src/co_pool/mod.rs b/core/src/co_pool/mod.rs index 4a962783..d86d9c14 100644 --- a/core/src/co_pool/mod.rs +++ b/core/src/co_pool/mod.rs @@ -201,18 +201,32 @@ impl<'p> CoroutinePool<'p> { match self.state() { PoolState::Running => { assert_eq!(PoolState::Running, self.stopping()?); - _ = self.try_timed_schedule_task(dur)?; - assert_eq!(PoolState::Stopping, self.stopped()?); + self.do_stop(dur)?; } - PoolState::Stopping => { - _ = self.try_timed_schedule_task(dur)?; - assert_eq!(PoolState::Stopping, self.stopped()?); - } - PoolState::Stopped => {} + PoolState::Stopping => self.do_stop(dur)?, + PoolState::Stopped => self.do_clean(), } Ok(()) } + fn do_stop(&mut self, dur: Duration) -> std::io::Result<()> { + _ = self.try_timed_schedule_task(dur)?; + assert_eq!(PoolState::Stopping, self.stopped()?); + self.do_clean(); + Ok(()) + } + + fn do_clean(&mut self) { + // clean up remaining wait tasks + for r in &self.waits { + let task_name = *r.key(); + _ = self + .results + .insert(task_name.to_string(), Err("The coroutine pool has stopped")); + self.notify(task_name); + } + } + /// Submit a new task to this pool. /// /// Allow multiple threads to concurrently submit task to the pool, @@ -271,7 +285,6 @@ impl<'p> CoroutinePool<'p> { let key = Box::leak(Box::from(task_name)); if let Some(r) = self.try_take_task_result(key) { self.notify(key); - drop(self.waits.remove(key)); return Ok(r); } if SchedulableCoroutine::current().is_some() { @@ -304,7 +317,6 @@ impl<'p> CoroutinePool<'p> { ); if let Some(r) = self.try_take_task_result(key) { self.notify(key); - assert!(self.waits.remove(key).is_some()); return Ok(r); } Err(Error::new(ErrorKind::TimedOut, "wait timeout")) @@ -415,8 +427,8 @@ impl<'p> CoroutinePool<'p> { } fn notify(&self, task_name: &str) { - if let Some(arc) = self.waits.get(task_name) { - let (lock, cvar) = &**arc; + if let Some((_, arc)) = self.waits.remove(task_name) { + let (lock, cvar) = &*arc; let mut pending = lock.lock().expect("notify task failed"); *pending = false; cvar.notify_one(); diff --git a/core/src/coroutine/local.rs b/core/src/coroutine/local.rs index f9611eb7..40da92b8 100644 --- a/core/src/coroutine/local.rs +++ b/core/src/coroutine/local.rs @@ -52,7 +52,7 @@ mod tests { let local = CoroutineLocal::default(); assert!(local.put("1", 1).is_none()); assert_eq!(Some(1), local.put("1", 2)); - assert_eq!(2, *local.get("1").unwrap()); + assert_eq!(2, *local.get::("1").unwrap()); *local.get_mut("1").unwrap() = 3; assert_eq!(Some(3), local.remove("1")); } diff --git a/core/src/net/operator/windows/mod.rs b/core/src/net/operator/windows/mod.rs index b68f3f6c..eb0bffc0 100644 --- a/core/src/net/operator/windows/mod.rs +++ b/core/src/net/operator/windows/mod.rs @@ -192,6 +192,10 @@ impl<'o> Operator<'o> { Ok((cq.len(), cq, timeout.map(|t| t.saturating_sub(cost)))) } + pub(crate) fn async_cancel(&self, user_data: usize) -> std::io::Result<()> { + todo!("CancelIoEx") + } + pub(crate) fn accept( &self, user_data: usize, From c3c7cea88ea8ec3811adff62d5e3daf3ef9414f1 Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 25 Mar 2025 08:57:16 +0800 Subject: [PATCH 2/3] install panic hook --- core/src/common/constants.rs | 2 ++ core/src/monitor.rs | 29 ++++++++++++++++++++++++++++ core/src/net/operator/windows/mod.rs | 1 + 3 files changed, 32 insertions(+) diff --git a/core/src/common/constants.rs b/core/src/common/constants.rs index bb8a4868..f7392211 100644 --- a/core/src/common/constants.rs +++ b/core/src/common/constants.rs @@ -129,6 +129,8 @@ pub enum SyscallName { WaitOnAddress, #[cfg(windows)] WSAPoll, + /// panic! + panicking, } impl SyscallName { diff --git a/core/src/monitor.rs b/core/src/monitor.rs index d1f35e47..c655468e 100644 --- a/core/src/monitor.rs +++ b/core/src/monitor.rs @@ -80,6 +80,35 @@ impl Monitor { match self.state.get() { MonitorState::Created => { self.state.set(MonitorState::Running); + // install panic hook + std::panic::set_hook(Box::new(|panic_hook_info| { + let syscall = crate::common::constants::SyscallName::panicking; + if let Some(co) = crate::scheduler::SchedulableCoroutine::current() { + let new_state = crate::common::constants::SyscallState::Executing; + if co.syscall((), syscall, new_state).is_err() { + error!( + "{} change to syscall {} {} failed !", + co.name(), + syscall, + new_state + ); + } + } + eprintln!( + "panic hooked in open-coroutine, thread '{}' {}", + std::thread::current().name().unwrap_or("unknown"), + panic_hook_info + ); + eprintln!( + "stack backtrace:\n{}", + std::backtrace::Backtrace::force_capture() + ); + if let Some(co) = crate::scheduler::SchedulableCoroutine::current() { + if co.running().is_err() { + error!("{} change to running state failed !", co.name()); + } + } + })); // install SIGURG signal handler let mut set = SigSet::empty(); set.add(Signal::SIGURG); diff --git a/core/src/net/operator/windows/mod.rs b/core/src/net/operator/windows/mod.rs index eb0bffc0..c382f2c3 100644 --- a/core/src/net/operator/windows/mod.rs +++ b/core/src/net/operator/windows/mod.rs @@ -192,6 +192,7 @@ impl<'o> Operator<'o> { Ok((cq.len(), cq, timeout.map(|t| t.saturating_sub(cost)))) } + #[allow(warnings)] pub(crate) fn async_cancel(&self, user_data: usize) -> std::io::Result<()> { todo!("CancelIoEx") } From 907e285a54838baacf0d948bd8a3ce659e8fb96a Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 25 Mar 2025 09:18:26 +0800 Subject: [PATCH 3/3] fix clippy --- core/src/net/operator/windows/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/net/operator/windows/mod.rs b/core/src/net/operator/windows/mod.rs index c382f2c3..531fc9c7 100644 --- a/core/src/net/operator/windows/mod.rs +++ b/core/src/net/operator/windows/mod.rs @@ -85,7 +85,8 @@ impl<'o> Operator<'o> { unsafe { let ret = CreateIoCompletionPort(handle, self.iocp, self.cpu, 0); if ret.is_null() - && ERROR_INVALID_PARAMETER == WSAGetLastError().try_into().expect("overflow") + && ERROR_INVALID_PARAMETER + == TryInto::::try_into(WSAGetLastError()).expect("overflow") { // duplicate bind return Ok(());