Skip to content

Commit 048f02e

Browse files
committed
stamp: make termination routine more reliably
(cherry picked from commit f415e6a3950c5298f5b718d9b783226a78c418ea) (cherry picked from commit 9679d18)
1 parent b734575 commit 048f02e

File tree

2 files changed

+61
-48
lines changed

2 files changed

+61
-48
lines changed

crates/base/src/deno_runtime.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use deno_tls::RootCertStoreProvider;
1818
use futures_util::future::poll_fn;
1919
use log::{error, trace};
2020
use sb_core::conn_sync::ConnSync;
21+
use sb_core::util::sync::AtomicFlag;
2122
use serde::de::DeserializeOwned;
2223
use std::collections::HashMap;
2324
use std::fmt;
@@ -80,8 +81,11 @@ fn get_error_class_name(e: &AnyError) -> &'static str {
8081
pub struct DenoRuntime {
8182
pub js_runtime: JsRuntime,
8283
pub env_vars: HashMap<String, String>, // TODO: does this need to be pub?
83-
main_module_id: ModuleId,
8484
pub conf: WorkerRuntimeOpts,
85+
pub is_termination_requested: Arc<AtomicFlag>,
86+
pub is_terminated: Arc<AtomicFlag>,
87+
88+
main_module_id: ModuleId,
8589
}
8690

8791
impl DenoRuntime {
@@ -363,6 +367,8 @@ impl DenoRuntime {
363367
main_module_id,
364368
env_vars,
365369
conf,
370+
is_termination_requested: Arc::default(),
371+
is_terminated: Arc::default(),
366372
})
367373
}
368374

@@ -398,6 +404,7 @@ impl DenoRuntime {
398404
js_runtime.mod_evaluate(self.main_module_id)
399405
};
400406

407+
let is_termination_requested = self.is_termination_requested.clone();
401408
let is_user_worker = self.conf.is_user_worker();
402409

403410
#[cfg(debug_assertions)]
@@ -469,6 +476,10 @@ impl DenoRuntime {
469476
);
470477
}
471478

479+
if poll_result.is_pending() && is_termination_requested.is_raised() {
480+
return Poll::Ready(Ok(()));
481+
}
482+
472483
poll_result
473484
})
474485
.await
@@ -483,6 +494,8 @@ impl DenoRuntime {
483494
},
484495
};
485496

497+
self.is_terminated.raise();
498+
486499
(result, accumulated_cpu_time_ns)
487500
}
488501

crates/base/src/rt_worker/worker.rs

Lines changed: 47 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::deno_runtime::DenoRuntime;
2+
use crate::rt_worker::supervisor;
23
use crate::rt_worker::utils::{get_event_metadata, parse_worker_conf};
34
use crate::rt_worker::worker_ctx::create_supervisor;
45
use crate::utils::send_event_if_event_worker_available;
@@ -7,7 +8,7 @@ use event_worker::events::{
78
EventMetadata, ShutdownEvent, ShutdownReason, UncaughtExceptionEvent, WorkerEventWithMetadata,
89
WorkerEvents, WorkerMemoryUsed,
910
};
10-
use futures_util::{pin_mut, FutureExt};
11+
use futures_util::FutureExt;
1112
use log::{debug, error};
1213
use sb_core::conn_sync::ConnSync;
1314
use sb_workers::context::{UserWorkerMsgs, WorkerContextInitOpts};
@@ -22,10 +23,10 @@ use tokio::sync::{oneshot, watch, Notify};
2223
use tokio::time::Instant;
2324
use uuid::Uuid;
2425

26+
use super::rt;
2527
use super::supervisor::CPUUsageMetrics;
2628
use super::worker_ctx::TerminationToken;
2729
use super::worker_pool::SupervisorPolicy;
28-
use super::{rt, supervisor};
2930

3031
#[derive(Clone)]
3132
pub struct Worker {
@@ -139,7 +140,11 @@ impl Worker {
139140

140141
_cpu_timer = maybe_timer;
141142
pending().boxed()
142-
} else if let Some(token) = termination_token.as_ref() {
143+
} else if let Some(token) = termination_token.clone() {
144+
let is_terminated = new_runtime.is_terminated.clone();
145+
let is_termination_requested =
146+
new_runtime.is_termination_requested.clone();
147+
143148
let (waker, thread_safe_handle) = {
144149
let js_runtime = &mut new_runtime.js_runtime;
145150
(
@@ -148,32 +153,38 @@ impl Worker {
148153
)
149154
};
150155

151-
async move {
152-
token.inbound.cancelled().await;
153-
thread_safe_handle.request_interrupt(
154-
supervisor::handle_interrupt,
155-
Box::into_raw(Box::new(supervisor::IsolateInterruptData {
156-
should_terminate: true,
157-
isolate_memory_usage_tx: None,
158-
}))
159-
as *mut std::ffi::c_void,
160-
);
161-
162-
waker.wake();
163-
164-
let _ = termination_event_tx.send(WorkerEvents::Shutdown(
165-
ShutdownEvent {
166-
reason: ShutdownReason::TerminationRequested,
167-
cpu_time_used: 0,
168-
memory_used: WorkerMemoryUsed {
169-
total: 0,
170-
heap: 0,
171-
external: 0,
156+
rt::SUPERVISOR_RT
157+
.spawn(async move {
158+
token.inbound.cancelled().await;
159+
160+
is_termination_requested.raise();
161+
thread_safe_handle.request_interrupt(
162+
supervisor::handle_interrupt,
163+
Box::into_raw(Box::new(supervisor::IsolateInterruptData {
164+
should_terminate: true,
165+
isolate_memory_usage_tx: None,
166+
}))
167+
as *mut std::ffi::c_void,
168+
);
169+
170+
while !is_terminated.is_raised() {
171+
waker.wake();
172+
tokio::task::yield_now().await;
173+
}
174+
175+
let _ = termination_event_tx.send(WorkerEvents::Shutdown(
176+
ShutdownEvent {
177+
reason: ShutdownReason::TerminationRequested,
178+
cpu_time_used: 0,
179+
memory_used: WorkerMemoryUsed {
180+
total: 0,
181+
heap: 0,
182+
external: 0,
183+
},
172184
},
173-
},
174-
));
175-
}
176-
.boxed()
185+
));
186+
})
187+
.boxed()
177188
} else {
178189
pending().boxed()
179190
};
@@ -186,28 +197,17 @@ impl Worker {
186197
Some(worker_name),
187198
);
188199

189-
let mut termination_fut_resolved = false;
190-
191-
pin_mut!(termination_fut);
192-
pin_mut!(data);
200+
let result = data.await;
193201

194-
loop {
195-
tokio::select! {
196-
_ = &mut termination_fut, if !termination_fut_resolved => {
197-
termination_fut_resolved = true;
198-
}
199-
200-
result = &mut data => {
201-
if let Some(token) = termination_token.as_ref() {
202-
if is_user_worker || termination_fut_resolved {
203-
token.outbound.cancel();
204-
}
205-
}
206-
207-
break result
208-
}
202+
if let Some(token) = termination_token.as_ref() {
203+
if !is_user_worker {
204+
let _ = termination_fut.await;
209205
}
206+
207+
token.outbound.cancel();
210208
}
209+
210+
result
211211
}
212212

213213
Err(err) => {

0 commit comments

Comments
 (0)