Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions turbopack/crates/turbopack-node/src/pool_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ impl NodeJsPoolStats {
}

pub fn finished_booting_worker(&mut self) {
self.booting_workers -= 1;
self.booting_workers = self.booting_workers.saturating_sub(1);
}

pub fn remove_worker(&mut self) {
self.workers -= 1;
self.workers = self.workers.saturating_sub(1);
}

#[allow(unused)]
Expand All @@ -60,14 +60,14 @@ impl NodeJsPoolStats {
pub fn add_cold_process_time(&mut self, time: Duration) {
self.total_cold_process_time += time;
self.cold_process_count += 1;
self.queued_tasks -= 1;
self.queued_tasks = self.queued_tasks.saturating_sub(1);
}

#[allow(unused)]
pub fn add_warm_process_time(&mut self, time: Duration) {
self.total_warm_process_time += time;
self.warm_process_count += 1;
self.queued_tasks -= 1;
self.queued_tasks = self.queued_tasks.saturating_sub(1);
}

pub fn estimated_bootup_time(&self) -> Duration {
Expand Down
26 changes: 9 additions & 17 deletions turbopack/crates/turbopack-node/src/worker_pool/worker_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ static WORKER_TERMINATOR: OnceCell<
ThreadsafeFunction<NapiWorkerTermination, ErrorStrategy::Fatal>,
> = OnceCell::new();

static PENDING_CREATIONS: OnceCell<Mutex<VecDeque<(Arc<WorkerOptions>, oneshot::Sender<u32>)>>> =
OnceCell::new();
static PENDING_CREATIONS: OnceCell<Mutex<VecDeque<oneshot::Sender<u32>>>> = OnceCell::new();

#[napi]
#[allow(unused)]
Expand Down Expand Up @@ -58,6 +57,8 @@ pub fn register_worker_scheduler(
pub async fn create_worker(options: Arc<WorkerOptions>) -> anyhow::Result<u32> {
let (tx, rx) = oneshot::channel();

let napi_options = (&options).into();

{
let pending = PENDING_CREATIONS.get_or_init(|| Mutex::new(VecDeque::new()));
// ensure pool entry exists for these options so scale ops can observe it
Expand All @@ -66,13 +67,13 @@ pub async fn create_worker(options: Arc<WorkerOptions>) -> anyhow::Result<u32> {
.lock()
.entry(options.clone())
.or_default();
pending.lock().push_back((options.clone(), tx));
pending.lock().push_back(tx);
}

if let Some(creator) = WORKER_CREATOR.get() {
creator.call(
NapiWorkerCreation {
options: options.into(),
options: napi_options,
},
ThreadsafeFunctionCallMode::NonBlocking,
);
Expand All @@ -87,19 +88,10 @@ pub async fn create_worker(options: Arc<WorkerOptions>) -> anyhow::Result<u32> {
#[napi]
#[allow(unused)]
pub fn worker_created(worker_id: u32) {
if let Some(pending) = PENDING_CREATIONS.get() {
if let Some((options, tx)) = pending.lock().pop_front() {
// record into global pool
WORKER_POOL_OPERATION
.pools
.lock()
.entry(options.clone())
.or_default()
.idle_workers
.lock()
.push(worker_id);
let _ = tx.send(worker_id);
}
if let Some(pending) = PENDING_CREATIONS.get()
&& let Some(tx) = pending.lock().pop_front()
{
let _ = tx.send(worker_id);
}
}

Expand Down
Loading