Skip to content

Commit 473aa8b

Browse files
authored
Merge pull request #246 from nyannyacha/fix-pool-sep
fix: make major worker tasks managed separately from the user worker
2 parents 3d218b5 + ce9108f commit 473aa8b

File tree

3 files changed

+59
-5
lines changed

3 files changed

+59
-5
lines changed

crates/base/src/deno_runtime.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,7 @@ mod test {
522522
MainWorkerRuntimeOpts, UserWorkerMsgs, UserWorkerRuntimeOpts, WorkerContextInitOpts,
523523
WorkerRuntimeOpts,
524524
};
525+
use serial_test::serial;
525526
use std::collections::HashMap;
526527
use std::fs;
527528
use std::fs::File;
@@ -532,6 +533,7 @@ mod test {
532533
use tokio::sync::{mpsc, watch};
533534

534535
#[tokio::test]
536+
#[serial]
535537
async fn test_module_code_no_eszip() {
536538
let (worker_pool_tx, _) = mpsc::unbounded_channel::<UserWorkerMsgs>();
537539
let mut rt = DenoRuntime::new(WorkerContextInitOpts {
@@ -559,6 +561,7 @@ mod test {
559561
}
560562

561563
#[tokio::test]
564+
#[serial]
562565
#[allow(clippy::arc_with_non_send_sync)]
563566
async fn test_eszip_with_source_file() {
564567
let (worker_pool_tx, _) = mpsc::unbounded_channel::<UserWorkerMsgs>();
@@ -616,6 +619,7 @@ mod test {
616619
}
617620

618621
#[tokio::test]
622+
#[serial]
619623
#[allow(clippy::arc_with_non_send_sync)]
620624
async fn test_create_eszip_from_graph() {
621625
let (worker_pool_tx, _) = mpsc::unbounded_channel::<UserWorkerMsgs>();
@@ -707,6 +711,7 @@ mod test {
707711

708712
// Main Runtime should have access to `EdgeRuntime`
709713
#[tokio::test]
714+
#[serial]
710715
async fn test_main_runtime_creation() {
711716
let mut runtime = create_runtime(None, None, None).await;
712717

@@ -726,6 +731,7 @@ mod test {
726731

727732
// User Runtime Should not have access to EdgeRuntime
728733
#[tokio::test]
734+
#[serial]
729735
async fn test_user_runtime_creation() {
730736
let mut runtime = create_runtime(
731737
None,
@@ -749,6 +755,7 @@ mod test {
749755
}
750756

751757
#[tokio::test]
758+
#[serial]
752759
async fn test_main_rt_fs() {
753760
let mut main_rt = create_runtime(None, Some(std::env::vars().collect()), None).await;
754761

@@ -798,6 +805,7 @@ mod test {
798805
// }
799806

800807
#[tokio::test]
808+
#[serial]
801809
async fn test_os_ops() {
802810
let mut user_rt = create_runtime(
803811
None,
@@ -920,6 +928,7 @@ mod test {
920928
}
921929

922930
#[tokio::test]
931+
#[serial]
923932
async fn test_os_env_vars() {
924933
std::env::set_var("Supa_Test", "Supa_Value");
925934
let mut main_rt = create_runtime(None, Some(std::env::vars().collect()), None).await;
@@ -1016,6 +1025,7 @@ mod test {
10161025
}
10171026

10181027
#[tokio::test]
1028+
#[serial]
10191029
async fn test_read_file_user_rt() {
10201030
let mut user_rt = create_basic_user_runtime("./test_cases/readFile", 20, 1000).await;
10211031
let (_tx, unix_stream_rx) =
@@ -1033,6 +1043,7 @@ mod test {
10331043
}
10341044

10351045
#[tokio::test]
1046+
#[serial]
10361047
async fn test_array_buffer_allocation_below_limit() {
10371048
let mut user_rt = create_basic_user_runtime("./test_cases/array_buffers", 20, 1000).await;
10381049
let (_tx, unix_stream_rx) =
@@ -1042,6 +1053,7 @@ mod test {
10421053
}
10431054

10441055
#[tokio::test]
1056+
#[serial]
10451057
async fn test_array_buffer_allocation_above_limit() {
10461058
let mut user_rt = create_basic_user_runtime("./test_cases/array_buffers", 15, 1000).await;
10471059
let (_tx, unix_stream_rx) =

crates/base/src/rt_worker/rt.rs

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1+
use std::num::NonZeroUsize;
2+
13
use once_cell::sync::Lazy;
24

5+
pub const DEFAULT_PRIMARY_WORKER_POOL_SIZE: usize = 2;
6+
pub const DEFAULT_USER_WORKER_POOL_SIZE: usize = 1;
7+
38
pub static SUPERVISOR_RT: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
49
tokio::runtime::Builder::new_multi_thread()
510
.enable_all()
@@ -8,14 +13,46 @@ pub static SUPERVISOR_RT: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
813
.unwrap()
914
});
1015

11-
pub static WORKER_RT: Lazy<tokio_util::task::LocalPoolHandle> = Lazy::new(|| {
16+
// NOTE: This pool is for the main and event workers. The reason why they should
17+
// separate from the user worker pool is they can starve them if user workers
18+
// are saturated.
19+
pub static PRIMARY_WORKER_RT: Lazy<tokio_util::task::LocalPoolHandle> = Lazy::new(|| {
20+
let maybe_pool_size = std::env::var("EDGE_RUNTIME_PRIMARY_WORKER_POOL_SIZE")
21+
.ok()
22+
.and_then(|it| it.parse::<usize>().ok())
23+
.map(|it| {
24+
if it < DEFAULT_PRIMARY_WORKER_POOL_SIZE {
25+
DEFAULT_PRIMARY_WORKER_POOL_SIZE
26+
} else {
27+
it
28+
}
29+
});
30+
31+
tokio_util::task::LocalPoolHandle::new(
32+
maybe_pool_size.unwrap_or(DEFAULT_PRIMARY_WORKER_POOL_SIZE),
33+
)
34+
});
35+
36+
pub static USER_WORKER_RT: Lazy<tokio_util::task::LocalPoolHandle> = Lazy::new(|| {
1237
let maybe_pool_size = std::env::var("EDGE_RUNTIME_WORKER_POOL_SIZE")
1338
.ok()
14-
.and_then(|it| it.parse::<usize>().ok());
39+
.and_then(|it| it.parse::<usize>().ok())
40+
.map(|it| {
41+
if it < DEFAULT_USER_WORKER_POOL_SIZE {
42+
DEFAULT_USER_WORKER_POOL_SIZE
43+
} else {
44+
it
45+
}
46+
});
1547

1648
tokio_util::task::LocalPoolHandle::new(if cfg!(debug_assertions) {
17-
maybe_pool_size.unwrap_or(1)
49+
maybe_pool_size.unwrap_or(DEFAULT_USER_WORKER_POOL_SIZE)
1850
} else {
19-
maybe_pool_size.unwrap_or(std::thread::available_parallelism().unwrap().get())
51+
maybe_pool_size.unwrap_or(
52+
std::thread::available_parallelism()
53+
.ok()
54+
.map(NonZeroUsize::get)
55+
.unwrap_or(DEFAULT_USER_WORKER_POOL_SIZE),
56+
)
2057
})
2158
});

crates/base/src/rt_worker/worker.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,13 @@ impl Worker {
9898
let is_user_worker = opts.conf.is_user_worker();
9999

100100
let cancel = self.cancel.clone();
101+
let rt = if is_user_worker {
102+
&rt::USER_WORKER_RT
103+
} else {
104+
&rt::PRIMARY_WORKER_RT
105+
};
101106

102-
let _worker_handle = rt::WORKER_RT.spawn_pinned(move || {
107+
let _worker_handle = rt.spawn_pinned(move || {
103108
tokio::task::spawn_local(async move {
104109
let (maybe_cpu_usage_metrics_tx, maybe_cpu_usage_metrics_rx) = is_user_worker
105110
.then(unbounded_channel::<CPUUsageMetrics>)

0 commit comments

Comments
 (0)