Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/packages/engine/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{path::PathBuf, sync::Arc};

use anyhow::*;
use anyhow::Result;
use clap::Parser;
use once_cell::sync::Lazy;
use rivet_engine::{SubCommand, run_config};
Expand Down
6 changes: 3 additions & 3 deletions engine/packages/pegboard/src/workflows/runner2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ pub(crate) async fn allocate_pending_actors(
}

let Some(entry) = stream.try_next().await? else {
break;
break 'queue_loop;
};

let (old_runner_alloc_key, old_runner_alloc_key_data) =
Expand All @@ -722,15 +722,15 @@ pub(crate) async fn allocate_pending_actors(
// We have passed all of the runners with the highest version. This is reachable if
// the ping of the highest version workers makes them ineligible
if old_runner_alloc_key.version < highest_version {
break;
break 'queue_loop;
}
} else {
highest_version = Some(old_runner_alloc_key.version);
}

// An empty runner means we have reached the end of the runners with the highest version
if old_runner_alloc_key.remaining_millislots == 0 {
break;
break 'queue_loop;
}

// Scan by last ping
Expand Down
10 changes: 8 additions & 2 deletions engine/packages/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,14 @@ fn build_tokio_runtime_builder() -> tokio::runtime::Builder {
rt_builder.thread_stack_size(8 * 1024 * 1024);
}

if let Ok(worker_threads) = env::var("TOKIO_WORKER_THREADS") {
rt_builder.worker_threads(worker_threads.parse().unwrap());
// Sets the thread count to available_parallelism - N
if let Ok(subtract_worker_threads) = env::var("TOKIO_WORKER_THREADS_SUBTRACT") {
rt_builder.worker_threads(
std::thread::available_parallelism()
.map_or(1, std::num::NonZeroUsize::get)
.saturating_sub(subtract_worker_threads.parse().unwrap())
.max(1),
);
}

if let Ok(max_blocking_threads) = env::var("TOKIO_MAX_BLOCKING_THREADS") {
Expand Down
Loading