Skip to content

Commit 776ee84

Browse files
Threadpool fix (#813)
* Remove bounded flag * Pedantic * Fix toml change * Change log * Fix error version --------- Co-authored-by: James Bell <jamesbell@microsoft.com>
1 parent 1079b6c commit 776ee84

File tree

7 files changed

+63
-68
lines changed

7 files changed

+63
-68
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,15 @@ ntex-util = { path = "ntex-util" }
5656
ntex = "3.6.2"
5757
ntex-bytes = "1.5.2"
5858
ntex-codec = "1.1.0"
59-
ntex-error = "1.6.0"
59+
ntex-error = "1.6.1"
6060
ntex-h2 = "3.8.2"
6161
ntex-http = "1.1.0"
6262
ntex-dispatcher = "3.1.0"
6363
ntex-io = "3.9.1"
6464
ntex-macros = "3.2.0"
6565
ntex-net = "3.8.2"
6666
ntex-router = "1.0.0"
67-
ntex-rt = "3.11.0"
67+
ntex-rt = "3.12.0"
6868
ntex-server = "3.9.0"
6969
ntex-service = "4.6.0"
7070
ntex-tls = "3.4.0"

ntex-error/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-error"
3-
version = "1.6.0"
3+
version = "1.6.1"
44
authors = ["ntex contributors <team@ntex.rs>"]
55
description = "ntex error management"
66
keywords = ["network", "framework"]

ntex-rt/CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [3.12.0] - 2026-03-21
4+
5+
* Changes `ThreadPool::execute` behavior to not err on an exhausted threadpool
6+
37
## [3.11.0] - 2026-03-21
48

59
* Expose `ThreadPool` type as part of the public api

ntex-rt/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-rt"
3-
version = "3.11.0"
3+
version = "3.12.0"
44
authors = ["ntex contributors <team@ntex.rs>"]
55
description = "ntex runtime"
66
keywords = ["network", "framework", "async", "futures"]

ntex-rt/src/builder.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ pub struct Builder {
2020
ping_interval: usize,
2121
/// Thread pool config
2222
pool_limit: usize,
23-
/// Thread pool boundness
24-
pool_bounded: bool,
2523
pool_recv_timeout: time::Duration,
2624
/// testing flag
2725
testing: bool,
@@ -36,7 +34,6 @@ impl Builder {
3634
ping_interval: 1000,
3735
testing: false,
3836
pool_limit: 256,
39-
pool_bounded: true,
4037
pool_recv_timeout: time::Duration::from_secs(60),
4138
}
4239
}
@@ -85,13 +82,6 @@ impl Builder {
8582
self
8683
}
8784

88-
#[must_use]
89-
/// Configures the thread pool to be unbounded.
90-
pub fn thread_pool_unbounded(mut self) -> Self {
91-
self.pool_bounded = false;
92-
self
93-
}
94-
9585
#[must_use]
9686
/// Mark system as testing
9787
pub fn testing(mut self) -> Self {
@@ -121,7 +111,6 @@ impl Builder {
121111
stop_on_panic: self.stop_on_panic,
122112
ping_interval: self.ping_interval,
123113
pool_limit: self.pool_limit,
124-
pool_bounded: self.pool_bounded,
125114
pool_recv_timeout: self.pool_recv_timeout,
126115
runner: Arc::new(runner),
127116
};

ntex-rt/src/pool.rs

Lines changed: 53 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
33
use std::task::{Context, Poll};
44
use std::{any::Any, fmt, future::Future, panic, pin::Pin, thread, time::Duration};
55

6-
use crossbeam_channel::{Receiver, Sender, TrySendError, bounded, unbounded};
6+
use crossbeam_channel::{Receiver, Select, Sender, TrySendError, bounded, unbounded};
77

88
/// An error that may be emitted when all worker threads are busy.
99
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
@@ -19,7 +19,7 @@ impl fmt::Display for BlockingError {
1919

2020
#[derive(Debug)]
2121
pub struct BlockingResult<T> {
22-
rx: Option<oneshot::AsyncReceiver<Result<T, Box<dyn Any + Send>>>>,
22+
rx: oneshot::AsyncReceiver<Result<T, Box<dyn Any + Send>>>,
2323
}
2424

2525
type BoxedDispatchable = Box<dyn Dispatchable + Send>;
@@ -46,15 +46,31 @@ impl Drop for CounterGuard {
4646
}
4747

4848
fn worker(
49-
receiver: Receiver<BoxedDispatchable>,
49+
receiver_high_prio: Receiver<BoxedDispatchable>,
50+
receiver_low_prio: Receiver<BoxedDispatchable>,
5051
counter: Arc<AtomicUsize>,
5152
timeout: Duration,
5253
) -> impl FnOnce() {
5354
move || {
5455
counter.fetch_add(1, Ordering::AcqRel);
5556
let _guard = CounterGuard(counter);
56-
while let Ok(f) = receiver.recv_timeout(timeout) {
57-
f.run();
57+
let mut sel = Select::new_biased();
58+
sel.recv(&receiver_high_prio);
59+
sel.recv(&receiver_low_prio);
60+
while let Ok(op) = sel.select_timeout(timeout) {
61+
match op {
62+
op if op.index() == 0 => {
63+
if let Ok(f) = op.recv(&receiver_high_prio) {
64+
f.run();
65+
}
66+
}
67+
op if op.index() == 1 => {
68+
if let Ok(f) = op.recv(&receiver_low_prio) {
69+
f.run();
70+
}
71+
}
72+
_ => unreachable!(),
73+
}
5874
}
5975
}
6076
}
@@ -74,8 +90,10 @@ fn worker(
7490
#[derive(Debug, Clone)]
7591
pub struct ThreadPool {
7692
name: String,
77-
sender: Sender<BoxedDispatchable>,
78-
receiver: Receiver<BoxedDispatchable>,
93+
sender_low_prio: Sender<BoxedDispatchable>,
94+
receiver_low_prio: Receiver<BoxedDispatchable>,
95+
sender_high_prio: Sender<BoxedDispatchable>,
96+
receiver_high_prio: Receiver<BoxedDispatchable>,
7997
counter: Arc<AtomicUsize>,
8098
thread_limit: usize,
8199
recv_timeout: Duration,
@@ -84,16 +102,14 @@ pub struct ThreadPool {
84102
impl ThreadPool {
85103
/// Creates a [`ThreadPool`] with a maximum number of worker threads
86104
/// and a timeout for receiving tasks from the task channel.
87-
pub fn new(
88-
name: &str,
89-
thread_limit: usize,
90-
recv_timeout: Duration,
91-
bound: bool,
92-
) -> Self {
93-
let (sender, receiver) = if bound { bounded(0) } else { unbounded() };
105+
pub fn new(name: &str, thread_limit: usize, recv_timeout: Duration) -> Self {
106+
let (sender_low_prio, receiver_low_prio) = bounded(0);
107+
let (sender_high_prio, receiver_high_prio) = unbounded();
94108
Self {
95-
sender,
96-
receiver,
109+
sender_low_prio,
110+
receiver_low_prio,
111+
sender_high_prio,
112+
receiver_high_prio,
97113
thread_limit,
98114
recv_timeout,
99115
name: format!("{name}:pool-wrk"),
@@ -106,43 +122,45 @@ impl ThreadPool {
106122
///
107123
/// The task will be executed by an available worker thread.
108124
/// If no threads are available and the pool has reached its maximum size,
109-
/// the behavior depends on the `boundedness` configuration:
110-
///
111-
/// - For a bounded pool, the function returns an error.
112-
/// - For an unbounded pool, the task is queued and executed when a worker
113-
/// becomes available.
125+
/// the work will be queued until a worker thread becomes available.
114126
pub fn execute<F, R>(&self, f: F) -> BlockingResult<R>
115127
where
116128
F: FnOnce() -> R + Send + 'static,
117129
R: Send + 'static,
118130
{
119131
let (tx, rx) = oneshot::async_channel();
120132
let f = Box::new(move || {
121-
// do not execute operation if recevier is dropped
133+
// do not execute operation if receiver is dropped
122134
if !tx.is_closed() {
123135
let result = panic::catch_unwind(panic::AssertUnwindSafe(f));
124136
let _ = tx.send(result);
125137
}
126138
});
127139

128-
match self.sender.try_send(f) {
129-
Ok(()) => BlockingResult { rx: Some(rx) },
140+
match self.sender_low_prio.try_send(f) {
141+
Ok(()) => BlockingResult { rx },
130142
Err(e) => match e {
131143
TrySendError::Full(f) => {
132144
let cnt = self.counter.load(Ordering::Acquire);
133145
if cnt >= self.thread_limit {
134-
BlockingResult { rx: None }
146+
self.sender_high_prio
147+
.send(f)
148+
.expect("the channel should not be full");
149+
BlockingResult { rx }
135150
} else {
136151
thread::Builder::new()
137152
.name(format!("{}:{}", self.name, cnt))
138153
.spawn(worker(
139-
self.receiver.clone(),
154+
self.receiver_high_prio.clone(),
155+
self.receiver_low_prio.clone(),
140156
self.counter.clone(),
141157
self.recv_timeout,
142158
))
143159
.expect("Cannot construct new thread");
144-
self.sender.send(f).expect("the channel should not be full");
145-
BlockingResult { rx: Some(rx) }
160+
self.sender_low_prio
161+
.send(f)
162+
.expect("the channel should not be full");
163+
BlockingResult { rx }
146164
}
147165
}
148166
TrySendError::Disconnected(_) => {
@@ -159,24 +177,13 @@ impl<R> Future for BlockingResult<R> {
159177
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
160178
let this = self.get_mut();
161179

162-
if this.rx.is_none() {
163-
return Poll::Ready(Err(BlockingError));
164-
}
165-
166-
if let Some(mut rx) = this.rx.take() {
167-
match Pin::new(&mut rx).poll(cx) {
168-
Poll::Pending => {
169-
this.rx = Some(rx);
170-
Poll::Pending
171-
}
172-
Poll::Ready(result) => Poll::Ready(
173-
result
174-
.map_err(|_| BlockingError)
175-
.and_then(|res| res.map_err(|_| BlockingError)),
176-
),
177-
}
178-
} else {
179-
unreachable!()
180+
match Pin::new(&mut this.rx).poll(cx) {
181+
Poll::Pending => Poll::Pending,
182+
Poll::Ready(result) => Poll::Ready(
183+
result
184+
.map_err(|_| BlockingError)
185+
.and_then(|res| res.map_err(|_| BlockingError)),
186+
),
180187
}
181188
}
182189
}

ntex-rt/src/system.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ pub struct SystemConfig {
4545
pub(super) stop_on_panic: bool,
4646
pub(super) ping_interval: usize,
4747
pub(super) pool_limit: usize,
48-
pub(super) pool_bounded: bool,
4948
pub(super) pool_recv_timeout: Duration,
5049
pub(super) testing: bool,
5150
pub(super) runner: Arc<dyn Runner>,
@@ -75,12 +74,8 @@ impl System {
7574
let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
7675
let (sender, receiver) = unbounded();
7776

78-
let pool = ThreadPool::new(
79-
&config.name,
80-
config.pool_limit,
81-
config.pool_recv_timeout,
82-
config.pool_bounded,
83-
);
77+
let pool =
78+
ThreadPool::new(&config.name, config.pool_limit, config.pool_recv_timeout);
8479

8580
System {
8681
id,

0 commit comments

Comments
 (0)