Skip to content

Commit 2bf4c89

Browse files
committed
Merge remote-tracking branch 'upstream/master' into calculate-chance
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
2 parents a31ff96 + d23564c commit 2bf4c89

File tree

9 files changed

+114
-29
lines changed

9 files changed

+114
-29
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,4 @@ harness = false
4545

4646
[profile.bench]
4747
codegen-units = 1
48+
debug = true

benches/chained_spawn.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,7 @@ mod yatp_future {
3939
use yatp::task::future::TaskCell;
4040
use yatp::Remote;
4141

42-
pub fn chained_spawn(b: &mut Bencher<'_>, iter_count: usize) {
43-
let pool = yatp::Builder::new("chained_spawn").build_future_pool();
44-
42+
fn chained_spawn(b: &mut Bencher<'_>, pool: yatp::ThreadPool<TaskCell>, iter_count: usize) {
4543
fn iter(remote: Remote<TaskCell>, done_tx: mpsc::SyncSender<()>, n: usize) {
4644
if n == 0 {
4745
done_tx.send(()).unwrap();
@@ -65,6 +63,16 @@ mod yatp_future {
6563
done_rx.recv().unwrap();
6664
});
6765
}
66+
67+
pub fn chained_spawn_single_level(b: &mut Bencher<'_>, iter_count: usize) {
68+
let pool = yatp::Builder::new("chained_spawn").build_future_pool();
69+
chained_spawn(b, pool, iter_count)
70+
}
71+
72+
pub fn chained_spawn_multilevel(b: &mut Bencher<'_>, iter_count: usize) {
73+
let pool = yatp::Builder::new("chained_spawn").build_multilevel_future_pool();
74+
chained_spawn(b, pool, iter_count)
75+
}
6876
}
6977

7078
mod tokio {
@@ -134,13 +142,18 @@ mod async_std {
134142

135143
pub fn chained_spawn(b: &mut Criterion) {
136144
let mut group = b.benchmark_group("chained_spawn");
137-
for i in &[100, 400, 700, 1000] {
145+
for i in &[256, 512, 1024] {
138146
group.bench_with_input(BenchmarkId::new("yatp::future", i), i, |b, i| {
139-
yatp_future::chained_spawn(b, *i)
147+
yatp_future::chained_spawn_single_level(b, *i)
140148
});
141149
group.bench_with_input(BenchmarkId::new("yatp::callback", i), i, |b, i| {
142150
yatp_callback::chained_spawn(b, *i)
143151
});
152+
group.bench_with_input(
153+
BenchmarkId::new("yatp::future::multilevel", i),
154+
i,
155+
|b, i| yatp_future::chained_spawn_multilevel(b, *i),
156+
);
144157
group.bench_with_input(BenchmarkId::new("tokio", i), i, |b, i| {
145158
tokio::chained_spawn(b, *i)
146159
});

benches/ping_pong.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,9 @@ mod yatp_future {
5353
use std::sync::atomic::*;
5454
use std::sync::*;
5555
use tokio::sync::oneshot;
56+
use yatp::task::future::TaskCell;
5657

57-
pub fn ping_pong(b: &mut Bencher<'_>, ping_count: usize) {
58-
let pool = yatp::Builder::new("ping_pong").build_future_pool();
59-
58+
fn ping_pong(b: &mut Bencher<'_>, pool: yatp::ThreadPool<TaskCell>, ping_count: usize) {
6059
let (done_tx, done_rx) = mpsc::sync_channel(1000);
6160
let rem = Arc::new(AtomicUsize::new(0));
6261

@@ -96,6 +95,16 @@ mod yatp_future {
9695
done_rx.recv().unwrap();
9796
});
9897
}
98+
99+
pub fn ping_pong_single_level(b: &mut Bencher<'_>, ping_count: usize) {
100+
let pool = yatp::Builder::new("ping_pong").build_future_pool();
101+
ping_pong(b, pool, ping_count)
102+
}
103+
104+
pub fn ping_pong_multilevel(b: &mut Bencher<'_>, ping_count: usize) {
105+
let pool = yatp::Builder::new("ping_pong").build_multilevel_future_pool();
106+
ping_pong(b, pool, ping_count)
107+
}
99108
}
100109

101110
mod tokio {
@@ -199,13 +208,18 @@ mod async_std {
199208

200209
pub fn ping_pong(b: &mut Criterion) {
201210
let mut group = b.benchmark_group("ping_pong");
202-
for i in &[100, 400, 700, 1000] {
211+
for i in &[256, 512, 1024] {
203212
group.bench_with_input(BenchmarkId::new("yatp::future", i), i, |b, i| {
204-
yatp_future::ping_pong(b, *i)
213+
yatp_future::ping_pong_single_level(b, *i)
205214
});
206215
group.bench_with_input(BenchmarkId::new("yatp::callback", i), i, |b, i| {
207216
yatp_callback::ping_pong(b, *i)
208217
});
218+
group.bench_with_input(
219+
BenchmarkId::new("yatp::future::multilevel", i),
220+
i,
221+
|b, i| yatp_future::ping_pong_multilevel(b, *i),
222+
);
209223
group.bench_with_input(BenchmarkId::new("tokio", i), i, |b, i| {
210224
tokio::ping_pong(b, *i)
211225
});

benches/spawn_many.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ mod yatp_future {
3535
use criterion::*;
3636
use std::sync::atomic::*;
3737
use std::sync::*;
38+
use yatp::task::future::TaskCell;
3839

39-
pub fn spawn_many(b: &mut Bencher<'_>, spawn_count: usize) {
40+
fn spawn_many(b: &mut Bencher<'_>, pool: yatp::ThreadPool<TaskCell>, spawn_count: usize) {
4041
let (tx, rx) = mpsc::sync_channel(1000);
4142
let rem = Arc::new(AtomicUsize::new(0));
42-
let pool = yatp::Builder::new("spawn_many").build_future_pool();
4343

4444
b.iter(|| {
4545
rem.store(spawn_count, Ordering::Relaxed);
@@ -58,6 +58,16 @@ mod yatp_future {
5858
let _ = rx.recv().unwrap();
5959
});
6060
}
61+
62+
pub fn spawn_many_single_level(b: &mut Bencher<'_>, spawn_count: usize) {
63+
let pool = yatp::Builder::new("spawn_many").build_future_pool();
64+
spawn_many(b, pool, spawn_count)
65+
}
66+
67+
pub fn spawn_many_multilevel(b: &mut Bencher<'_>, spawn_count: usize) {
68+
let pool = yatp::Builder::new("spawn_many").build_multilevel_future_pool();
69+
spawn_many(b, pool, spawn_count)
70+
}
6171
}
6272

6373
mod threadpool {
@@ -153,13 +163,18 @@ mod async_std {
153163

154164
pub fn spawn_many(b: &mut Criterion) {
155165
let mut group = b.benchmark_group("spawn_many");
156-
for i in &[1000, 4000, 7000, 10000] {
166+
for i in &[1024, 4096, 8192, 16384] {
157167
group.bench_with_input(BenchmarkId::new("yatp::future", i), i, |b, i| {
158-
yatp_future::spawn_many(b, *i)
168+
yatp_future::spawn_many_single_level(b, *i)
159169
});
160170
group.bench_with_input(BenchmarkId::new("yatp::callback", i), i, |b, i| {
161171
yatp_callback::spawn_many(b, *i)
162172
});
173+
group.bench_with_input(
174+
BenchmarkId::new("yatp::future::multilevel", i),
175+
i,
176+
|b, i| yatp_future::spawn_many_multilevel(b, *i),
177+
);
163178
group.bench_with_input(BenchmarkId::new("threadpool", i), i, |b, i| {
164179
threadpool::spawn_many(b, *i)
165180
});

benches/yield_many.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ mod yatp_callback {
6262
mod yatp_future {
6363
use criterion::*;
6464
use std::sync::mpsc;
65+
use yatp::task::future::TaskCell;
6566

66-
pub fn yield_many(b: &mut Bencher<'_>, yield_count: usize) {
67+
fn yield_many(b: &mut Bencher<'_>, pool: yatp::ThreadPool<TaskCell>, yield_count: usize) {
6768
let tasks = super::TASKS_PER_CPU * num_cpus::get();
6869
let (tx, rx) = mpsc::sync_channel(tasks);
69-
let pool = yatp::Builder::new("yield_many").build_future_pool();
7070

7171
b.iter(move || {
7272
for _ in 0..tasks {
@@ -84,6 +84,16 @@ mod yatp_future {
8484
}
8585
});
8686
}
87+
88+
pub fn yield_many_single_level(b: &mut Bencher<'_>, yield_count: usize) {
89+
let pool = yatp::Builder::new("yield_many").build_future_pool();
90+
yield_many(b, pool, yield_count)
91+
}
92+
93+
pub fn yield_many_multilevel(b: &mut Bencher<'_>, yield_count: usize) {
94+
let pool = yatp::Builder::new("yield_many").build_multilevel_future_pool();
95+
yield_many(b, pool, yield_count)
96+
}
8797
}
8898

8999
mod tokio {
@@ -146,13 +156,18 @@ mod async_std {
146156

147157
pub fn yield_many(b: &mut Criterion) {
148158
let mut group = b.benchmark_group("yield_many");
149-
for i in &[100, 400, 700, 1000] {
159+
for i in &[256, 512, 1024] {
150160
group.bench_with_input(BenchmarkId::new("yatp::future", i), i, |b, i| {
151-
yatp_future::yield_many(b, *i)
161+
yatp_future::yield_many_single_level(b, *i)
152162
});
153163
group.bench_with_input(BenchmarkId::new("yatp::callback", i), i, |b, i| {
154164
yatp_callback::yield_many(b, *i)
155165
});
166+
group.bench_with_input(
167+
BenchmarkId::new("yatp::future::multilevel", i),
168+
i,
169+
|b, i| yatp_future::yield_many_multilevel(b, *i),
170+
);
156171
group.bench_with_input(BenchmarkId::new("tokio", i), i, |b, i| {
157172
tokio::yield_many(b, *i)
158173
});

src/pool.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub use self::spawn::{build_spawn, Local, Remote};
1717
use crate::queue::{TaskCell, WithExtras};
1818
use std::mem;
1919
use std::sync::Mutex;
20-
use std::thread::JoinHandle;
20+
use std::thread::{self, JoinHandle};
2121

2222
/// A generic thread pool.
2323
pub struct ThreadPool<T: TaskCell + Send> {
@@ -39,8 +39,11 @@ impl<T: TaskCell + Send> ThreadPool<T> {
3939
pub fn shutdown(&self) {
4040
self.remote.stop();
4141
let mut threads = mem::replace(&mut *self.threads.lock().unwrap(), Vec::new());
42+
let curr_id = thread::current().id();
4243
for j in threads.drain(..) {
43-
j.join().unwrap();
44+
if curr_id != j.thread().id() {
45+
j.join().unwrap();
46+
}
4447
}
4548
}
4649

src/pool/builder.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use crate::pool::spawn::QueueCore;
44
use crate::pool::worker::WorkerThread;
55
use crate::pool::{CloneRunnerBuilder, Local, Remote, Runner, RunnerBuilder, ThreadPool};
6-
use crate::queue::{self, LocalQueue, QueueType, TaskCell};
6+
use crate::queue::{self, multilevel, LocalQueue, QueueType, TaskCell};
77
use crate::task::{callback, future};
88
use std::sync::{Arc, Mutex};
99
use std::thread;
@@ -233,6 +233,16 @@ impl Builder {
233233
self.build_with_queue_and_runner(QueueType::SingleLevel, fb)
234234
}
235235

236+
/// Spawns a multilevel future pool.
237+
///
238+
/// It setups the pool with multi level queue.
239+
pub fn build_multilevel_future_pool(&self) -> ThreadPool<future::TaskCell> {
240+
let fb = CloneRunnerBuilder(future::Runner::default());
241+
let queue_builder = multilevel::Builder::new(multilevel::Config::default());
242+
let runner_builder = queue_builder.runner_builder(fb);
243+
self.build_with_queue_and_runner(QueueType::Multilevel(queue_builder), runner_builder)
244+
}
245+
236246
/// Spawns the thread pool immediately.
237247
///
238248
/// `queue_builder` is a closure that creates a task queue. It accepts the

src/pool/tests.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,17 @@ fn test_remote() {
9696
let res = rx.recv_timeout(Duration::from_millis(500));
9797
assert_eq!(res, Err(mpsc::RecvTimeoutError::Timeout));
9898
}
99+
100+
#[test]
101+
fn test_shutdown_in_pool() {
102+
let pool = Builder::new("test_shutdown_in_pool")
103+
.max_thread_count(4)
104+
.build_callback_pool();
105+
let remote = pool.remote().clone();
106+
let (tx, rx) = mpsc::channel();
107+
remote.spawn(move |_: &mut Handle<'_>| {
108+
pool.shutdown();
109+
tx.send(()).unwrap();
110+
});
111+
rx.recv_timeout(Duration::from_secs(1)).unwrap();
112+
}

src/queue/multilevel.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use prometheus::local::LocalIntCounter;
1717
use prometheus::{Gauge, IntCounter};
1818
use rand::prelude::*;
1919
use std::cell::Cell;
20-
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst};
20+
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering::*};
2121
use std::sync::{Arc, Mutex};
2222
use std::time::{Duration, Instant};
2323
use std::{f64, fmt, iter};
@@ -291,12 +291,12 @@ impl LevelManager {
291291
{
292292
let extras = task_cell.mut_extras();
293293
let task_id = extras.task_id;
294-
let running_time = extras
295-
.running_time
296-
.get_or_insert_with(|| self.task_elapsed_map.get_elapsed(task_id));
297294
let current_level = match extras.fixed_level {
298295
Some(level) => level,
299296
None => {
297+
let running_time = extras
298+
.running_time
299+
.get_or_insert_with(|| self.task_elapsed_map.get_elapsed(task_id));
300300
let running_time = running_time.as_duration();
301301
self.level_time_threshold
302302
.iter()
@@ -363,15 +363,15 @@ fn calculate_level0_chance(
363363
}
364364
}
365365

366-
pub(crate) struct ElapsedTime(IntCounter);
366+
pub(crate) struct ElapsedTime(AtomicU64);
367367

368368
impl ElapsedTime {
369369
fn as_duration(&self) -> Duration {
370-
Duration::from_micros(self.0.get() as u64)
370+
Duration::from_micros(self.0.load(Relaxed) as u64)
371371
}
372372

373373
fn inc_by(&self, t: Duration) {
374-
self.0.inc_by(t.as_micros() as i64);
374+
self.0.fetch_add(t.as_micros() as u64, Relaxed);
375375
}
376376

377377
#[cfg(test)]
@@ -384,7 +384,7 @@ impl ElapsedTime {
384384

385385
impl Default for ElapsedTime {
386386
fn default() -> ElapsedTime {
387-
ElapsedTime(IntCounter::new("_", "_").unwrap())
387+
ElapsedTime(AtomicU64::new(0))
388388
}
389389
}
390390

0 commit comments

Comments
 (0)