Skip to content

Commit ce9108f

Browse files
committed
fix: exposing primary worker pool size as an env var and restricts the value as its acceptable bounds
1 parent 8f62afa commit ce9108f

File tree

1 file changed

+36
-5
lines changed
  • crates/base/src/rt_worker

1 file changed

+36
-5
lines changed

crates/base/src/rt_worker/rt.rs

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1+
use std::num::NonZeroUsize;
2+
13
use once_cell::sync::Lazy;
24

5+
pub const DEFAULT_PRIMARY_WORKER_POOL_SIZE: usize = 2;
6+
pub const DEFAULT_USER_WORKER_POOL_SIZE: usize = 1;
7+
38
pub static SUPERVISOR_RT: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
49
tokio::runtime::Builder::new_multi_thread()
510
.enable_all()
@@ -11,17 +16,43 @@ pub static SUPERVISOR_RT: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
1116
// NOTE: This pool is for the main and event workers. The reason why they should
1217
// separate from the user worker pool is they can starve them if user workers
1318
// are saturated.
14-
pub static PRIMARY_WORKER_RT: Lazy<tokio_util::task::LocalPoolHandle> =
15-
Lazy::new(|| tokio_util::task::LocalPoolHandle::new(2));
19+
pub static PRIMARY_WORKER_RT: Lazy<tokio_util::task::LocalPoolHandle> = Lazy::new(|| {
20+
let maybe_pool_size = std::env::var("EDGE_RUNTIME_PRIMARY_WORKER_POOL_SIZE")
21+
.ok()
22+
.and_then(|it| it.parse::<usize>().ok())
23+
.map(|it| {
24+
if it < DEFAULT_PRIMARY_WORKER_POOL_SIZE {
25+
DEFAULT_PRIMARY_WORKER_POOL_SIZE
26+
} else {
27+
it
28+
}
29+
});
30+
31+
tokio_util::task::LocalPoolHandle::new(
32+
maybe_pool_size.unwrap_or(DEFAULT_PRIMARY_WORKER_POOL_SIZE),
33+
)
34+
});
1635

1736
pub static USER_WORKER_RT: Lazy<tokio_util::task::LocalPoolHandle> = Lazy::new(|| {
1837
let maybe_pool_size = std::env::var("EDGE_RUNTIME_WORKER_POOL_SIZE")
1938
.ok()
20-
.and_then(|it| it.parse::<usize>().ok());
39+
.and_then(|it| it.parse::<usize>().ok())
40+
.map(|it| {
41+
if it < DEFAULT_USER_WORKER_POOL_SIZE {
42+
DEFAULT_USER_WORKER_POOL_SIZE
43+
} else {
44+
it
45+
}
46+
});
2147

2248
tokio_util::task::LocalPoolHandle::new(if cfg!(debug_assertions) {
23-
maybe_pool_size.unwrap_or(1)
49+
maybe_pool_size.unwrap_or(DEFAULT_USER_WORKER_POOL_SIZE)
2450
} else {
25-
maybe_pool_size.unwrap_or(std::thread::available_parallelism().unwrap().get())
51+
maybe_pool_size.unwrap_or(
52+
std::thread::available_parallelism()
53+
.ok()
54+
.map(NonZeroUsize::get)
55+
.unwrap_or(DEFAULT_USER_WORKER_POOL_SIZE),
56+
)
2657
})
2758
});

0 commit comments

Comments
 (0)