Skip to content

Commit ef3c0a1

Browse files
authored
fix: narrow the lifetime of the connection token (#379)
* chore: add `base_rt` crate * chore: update `Cargo.lock` * refactor(base): separate runtime into `base_rt` * stamp: narrow the lifetime of the connection token * stamp: nit * chore(sb_core): add dependency * chore: update `Cargo.lock` * stamp(sb_core): add debug info
1 parent 107f27d commit ef3c0a1

File tree

11 files changed

+77
-16
lines changed

11 files changed

+77
-16
lines changed

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/base/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ edition = "2021"
66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
77

88
[dependencies]
9+
base_rt = { version = "0.1.0", path = "../base_rt" }
910
base_mem_check = { version = "0.1.0", path = "../base_mem_check" }
1011
http_utils = { version = "0.1.0", path = "../http_utils" }
1112
async-trait.workspace = true

crates/base/src/deno_runtime.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::inspector_server::Inspector;
2-
use crate::rt_worker::rt;
32
use crate::rt_worker::supervisor::{CPUUsage, CPUUsageMetrics};
43
use crate::rt_worker::worker::DuplexStreamEntry;
54
use crate::utils::units::{bytes_to_display, mib_to_bytes};
@@ -25,6 +24,7 @@ use futures_util::future::poll_fn;
2524
use futures_util::task::AtomicWaker;
2625
use log::{error, trace};
2726
use once_cell::sync::{Lazy, OnceCell};
27+
use sb_core::conn_sync::DenoRuntimeDropToken;
2828
use sb_core::http::sb_core_http;
2929
use sb_core::http_start::sb_core_http_start;
3030
use sb_core::util::sync::AtomicFlag;
@@ -184,6 +184,7 @@ impl GetRuntimeContext for () {
184184
}
185185

186186
pub struct DenoRuntime<RuntimeContext = ()> {
187+
pub drop_token: CancellationToken,
187188
pub js_runtime: JsRuntime,
188189
pub env_vars: HashMap<String, String>, // TODO: does this need to be pub?
189190
pub conf: WorkerRuntimeOpts,
@@ -203,6 +204,8 @@ pub struct DenoRuntime<RuntimeContext = ()> {
203204

204205
impl<RuntimeContext> Drop for DenoRuntime<RuntimeContext> {
205206
fn drop(&mut self) {
207+
self.drop_token.cancel();
208+
206209
if self.conf.is_user_worker() {
207210
self.js_runtime.v8_isolate().remove_gc_prologue_callback(
208211
mem_check_gc_prologue_callback_fn,
@@ -238,13 +241,16 @@ where
238241
..
239242
} = opts;
240243

244+
let drop_token = CancellationToken::default();
245+
241246
let base_dir_path = std::env::current_dir().map(|p| p.join(&service_path))?;
242247
let base_url = Url::from_directory_path(&base_dir_path).unwrap();
243248

244249
let is_user_worker = conf.is_user_worker();
245250

246251
let potential_exts = vec!["ts", "tsx", "js", "jsx"];
247252
let mut main_module_url = base_url.join("index.ts")?;
253+
248254
for potential_ext in potential_exts {
249255
main_module_url = base_url.join(format!("index.{}", potential_ext).as_str())?;
250256
if main_module_url.to_file_path().unwrap().exists() {
@@ -587,6 +593,7 @@ where
587593
}
588594

589595
op_state.put::<sb_env::EnvVars>(env_vars);
596+
op_state.put(DenoRuntimeDropToken(drop_token.clone()))
590597
}
591598

592599
let main_module_id = {
@@ -600,7 +607,7 @@ where
600607
};
601608

602609
if is_user_worker {
603-
drop(rt::SUPERVISOR_RT.spawn({
610+
drop(base_rt::SUPERVISOR_RT.spawn({
604611
let drop_token = mem_check.drop_token.clone();
605612
let waker = mem_check.waker.clone();
606613

@@ -624,6 +631,7 @@ where
624631
}
625632

626633
Ok(Self {
634+
drop_token,
627635
js_runtime,
628636
env_vars,
629637
conf,
@@ -866,7 +874,7 @@ where
866874
let drop_token = self.mem_check.drop_token.clone();
867875
let state = self.mem_check_state();
868876

869-
drop(rt::SUPERVISOR_RT.spawn(async move {
877+
drop(base_rt::SUPERVISOR_RT.spawn(async move {
870878
loop {
871879
tokio::select! {
872880
_ = notify.notified() => {

crates/base/src/rt_worker/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
pub mod implementation;
2-
pub mod rt;
32
pub mod supervisor;
43
pub mod utils;
54
pub mod worker;

crates/base/src/rt_worker/worker.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use tokio::time::Instant;
2424
use tokio_util::sync::CancellationToken;
2525
use uuid::Uuid;
2626

27-
use super::rt;
2827
use super::supervisor::CPUUsageMetrics;
2928
use super::worker_ctx::TerminationToken;
3029
use super::worker_pool::SupervisorPolicy;
@@ -115,9 +114,9 @@ impl Worker {
115114

116115
let cancel = self.cancel.clone();
117116
let rt = if worker_kind.is_user_worker() {
118-
&rt::USER_WORKER_RT
117+
&base_rt::USER_WORKER_RT
119118
} else {
120-
&rt::PRIMARY_WORKER_RT
119+
&base_rt::PRIMARY_WORKER_RT
121120
};
122121

123122
let _worker_handle = rt.spawn_pinned(move || {
@@ -194,7 +193,7 @@ impl Worker {
194193
)
195194
};
196195

197-
rt::SUPERVISOR_RT
196+
base_rt::SUPERVISOR_RT
198197
.spawn(async move {
199198
token.inbound.cancelled().await;
200199
is_termination_requested.raise();

crates/base/src/rt_worker/worker_ctx.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ use tokio_rustls::server::TlsStream;
4242
use tokio_util::sync::CancellationToken;
4343
use uuid::Uuid;
4444

45-
use super::rt;
4645
use super::supervisor::{self, CPUTimerParam, CPUUsageMetrics};
4746
use super::worker::DuplexStreamEntry;
4847
use super::worker_pool::{SupervisorPolicy, WorkerPoolPolicy};
@@ -333,7 +332,7 @@ pub fn create_supervisor(
333332
cpu_timer_param.get_cpu_timer(supervisor_policy).unzip();
334333

335334
drop({
336-
let _rt_guard = rt::SUPERVISOR_RT.enter();
335+
let _rt_guard = base_rt::SUPERVISOR_RT.enter();
337336
let maybe_cpu_timer_inner = maybe_cpu_timer.clone();
338337
let supervise_cancel_token_inner = supervise_cancel_token.clone();
339338

@@ -378,7 +377,7 @@ pub fn create_supervisor(
378377
use deno_core::futures::channel::mpsc;
379378
use deno_core::serde_json::Value;
380379

381-
rt::SUPERVISOR_RT
380+
base_rt::SUPERVISOR_RT
382381
.spawn_blocking(move || {
383382
let wait_inspector_disconnect_fut = async move {
384383
let ls = tokio::task::LocalSet::new();
@@ -449,7 +448,7 @@ pub fn create_supervisor(
449448
.await;
450449
};
451450

452-
rt::SUPERVISOR_RT.block_on(wait_inspector_disconnect_fut);
451+
base_rt::SUPERVISOR_RT.block_on(wait_inspector_disconnect_fut);
453452
})
454453
.await
455454
.unwrap();

crates/base_rt/Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[package]
2+
name = "base_rt"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
tokio.workspace = true
8+
tokio-util.workspace = true
9+
once_cell.workspace = true
File renamed without changes.

crates/sb_core/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ serde.workspace = true
2626
bytes.workspace = true
2727
deno_tls.workspace = true
2828
thiserror.workspace = true
29+
base_rt = { version = "0.1.0", path = "../base_rt" }
2930
base_mem_check = { version = "0.1.0", path = "../base_mem_check" }
3031
sb_node = { version = "0.1.0", path = "../node" }
3132
deno_crypto.workspace = true
@@ -54,4 +55,5 @@ enum-as-inner.workspace = true
5455
httparse.workspace = true
5556
http.workspace = true
5657
memmem = "0.1"
57-
faster-hex.workspace=true
58+
faster-hex.workspace = true
59+
tracing.workspace = true

crates/sb_core/conn_sync.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,6 @@ impl ConnWatcher {
1414
self.0.clone()
1515
}
1616
}
17+
18+
#[derive(Clone)]
19+
pub struct DenoRuntimeDropToken(pub CancellationToken);

0 commit comments

Comments
 (0)