Skip to content

Commit a4ae2f3

Browse files
committed
clean up remaining wait tasks
1 parent 6134c34 commit a4ae2f3

File tree

3 files changed

+28
-12
lines changed

3 files changed

+28
-12
lines changed

core/src/co_pool/mod.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -201,18 +201,32 @@ impl<'p> CoroutinePool<'p> {
201201
match self.state() {
202202
PoolState::Running => {
203203
assert_eq!(PoolState::Running, self.stopping()?);
204-
_ = self.try_timed_schedule_task(dur)?;
205-
assert_eq!(PoolState::Stopping, self.stopped()?);
204+
self.do_stop(dur)?;
206205
}
207-
PoolState::Stopping => {
208-
_ = self.try_timed_schedule_task(dur)?;
209-
assert_eq!(PoolState::Stopping, self.stopped()?);
210-
}
211-
PoolState::Stopped => {}
206+
PoolState::Stopping => self.do_stop(dur)?,
207+
PoolState::Stopped => self.do_clean(),
212208
}
213209
Ok(())
214210
}
215211

212+
fn do_stop(&mut self, dur: Duration) -> std::io::Result<()> {
213+
_ = self.try_timed_schedule_task(dur)?;
214+
assert_eq!(PoolState::Stopping, self.stopped()?);
215+
self.do_clean();
216+
Ok(())
217+
}
218+
219+
fn do_clean(&mut self) {
220+
// clean up remaining wait tasks
221+
for r in &self.waits {
222+
let task_name = *r.key();
223+
_ = self
224+
.results
225+
.insert(task_name.to_string(), Err("The coroutine pool has stopped"));
226+
self.notify(task_name);
227+
}
228+
}
229+
216230
/// Submit a new task to this pool.
217231
///
218232
/// Allow multiple threads to concurrently submit task to the pool,
@@ -271,7 +285,6 @@ impl<'p> CoroutinePool<'p> {
271285
let key = Box::leak(Box::from(task_name));
272286
if let Some(r) = self.try_take_task_result(key) {
273287
self.notify(key);
274-
drop(self.waits.remove(key));
275288
return Ok(r);
276289
}
277290
if SchedulableCoroutine::current().is_some() {
@@ -304,7 +317,6 @@ impl<'p> CoroutinePool<'p> {
304317
);
305318
if let Some(r) = self.try_take_task_result(key) {
306319
self.notify(key);
307-
assert!(self.waits.remove(key).is_some());
308320
return Ok(r);
309321
}
310322
Err(Error::new(ErrorKind::TimedOut, "wait timeout"))
@@ -415,8 +427,8 @@ impl<'p> CoroutinePool<'p> {
415427
}
416428

417429
fn notify(&self, task_name: &str) {
418-
if let Some(arc) = self.waits.get(task_name) {
419-
let (lock, cvar) = &**arc;
430+
if let Some((_, arc)) = self.waits.remove(task_name) {
431+
let (lock, cvar) = &*arc;
420432
let mut pending = lock.lock().expect("notify task failed");
421433
*pending = false;
422434
cvar.notify_one();

core/src/coroutine/local.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ mod tests {
5252
let local = CoroutineLocal::default();
5353
assert!(local.put("1", 1).is_none());
5454
assert_eq!(Some(1), local.put("1", 2));
55-
assert_eq!(2, *local.get("1").unwrap());
55+
assert_eq!(2, *local.get::<i32>("1").unwrap());
5656
*local.get_mut("1").unwrap() = 3;
5757
assert_eq!(Some(3), local.remove("1"));
5858
}

core/src/net/operator/windows/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,10 @@ impl<'o> Operator<'o> {
192192
Ok((cq.len(), cq, timeout.map(|t| t.saturating_sub(cost))))
193193
}
194194

195+
pub(crate) fn async_cancel(&self, user_data: usize) -> std::io::Result<()> {
196+
todo!("CancelIoEx")
197+
}
198+
195199
pub(crate) fn accept(
196200
&self,
197201
user_data: usize,

0 commit comments

Comments
 (0)