Skip to content

Commit f26ecd9

Browse files
authored
clean expired result (#383)
2 parents fc431db + d667fed commit f26ecd9

File tree

2 files changed

+17
-3
lines changed

2 files changed

+17
-3
lines changed

core/src/co_pool/mod.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::common::{get_timeout_time, now, CondvarBlocker};
77
use crate::coroutine::suspender::Suspender;
88
use crate::scheduler::{SchedulableCoroutine, Scheduler};
99
use crate::{error, impl_current_for, impl_display_by_debug, impl_for_named, trace};
10-
use dashmap::DashMap;
10+
use dashmap::{DashMap, DashSet};
1111
use std::cell::Cell;
1212
use std::ffi::c_longlong;
1313
use std::io::{Error, ErrorKind};
@@ -52,6 +52,7 @@ pub struct CoroutinePool<'p> {
5252
waits: DashMap<&'p str, Arc<(Mutex<bool>, Condvar)>>,
5353
//任务执行结果
5454
results: DashMap<String, Result<Option<usize>, &'p str>>,
55+
no_waits: DashSet<&'p str>,
5556
}
5657

5758
impl Drop for CoroutinePool<'_> {
@@ -136,6 +137,7 @@ impl<'p> CoroutinePool<'p> {
136137
blocker: Arc::default(),
137138
results: DashMap::new(),
138139
waits: DashMap::default(),
140+
no_waits: DashSet::default(),
139141
}
140142
}
141143

@@ -240,6 +242,14 @@ impl<'p> CoroutinePool<'p> {
240242
self.results.remove(task_name).map(|(_, r)| r)
241243
}
242244

245+
/// clean the task result data.
246+
pub fn clean_task_result(&self, task_name: &str) {
247+
if self.try_take_task_result(task_name).is_some() {
248+
return;
249+
}
250+
_ = self.no_waits.insert(Box::leak(Box::from(task_name)));
251+
}
252+
243253
/// Use the given `task_name` to obtain task results, and if no results are found,
244254
/// block the current thread for `wait_time`.
245255
///
@@ -375,6 +385,11 @@ impl<'p> CoroutinePool<'p> {
375385
fn try_run(&self) -> Option<()> {
376386
self.task_queue.pop().map(|task| {
377387
let (task_name, result) = task.run();
388+
let n = task_name.clone().leak();
389+
if self.no_waits.contains(n) {
390+
_ = self.no_waits.remove(n);
391+
return;
392+
}
378393
assert!(
379394
self.results.insert(task_name.clone(), result).is_none(),
380395
"The previous result was not retrieved in a timely manner"

core/src/net/join.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@ pub struct JoinHandle(&'static Arc<EventLoop<'static>>, *const c_char);
1212
impl Drop for JoinHandle {
1313
fn drop(&mut self) {
1414
if let Ok(name) = self.get_name() {
15-
// clean data
16-
_ = self.0.try_take_task_result(name);
15+
self.0.clean_task_result(name);
1716
}
1817
}
1918
}

0 commit comments

Comments
 (0)