Skip to content

Commit 16377b1

Browse files
committed
fix: sync drop timing of unix stream sender to timing of an isolate drops
1 parent c7032c2 commit 16377b1

File tree

4 files changed

+39
-38
lines changed

4 files changed

+39
-38
lines changed

crates/base/src/deno_runtime.rs

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::rt_worker::supervisor::{CPUUsage, CPUUsageMetrics};
2+
use crate::rt_worker::worker::UnixStreamEntry;
23
use crate::utils::units::mib_to_bytes;
34

45
use anyhow::{anyhow, bail, Context, Error};
@@ -25,7 +26,6 @@ use std::fmt;
2526
use std::os::fd::RawFd;
2627
use std::sync::Arc;
2728
use std::task::Poll;
28-
use tokio::net::UnixStream;
2929
use tokio::sync::{mpsc, watch};
3030

3131
use crate::snapshot;
@@ -372,17 +372,14 @@ impl DenoRuntime {
372372

373373
pub async fn run(
374374
&mut self,
375-
unix_stream_rx: mpsc::UnboundedReceiver<(UnixStream, Option<watch::Receiver<ConnSync>>)>,
375+
unix_stream_rx: mpsc::UnboundedReceiver<UnixStreamEntry>,
376376
maybe_cpu_usage_metrics_tx: Option<mpsc::UnboundedSender<CPUUsageMetrics>>,
377377
name: Option<String>,
378378
) -> (Result<(), Error>, i64) {
379379
{
380380
let op_state_rc = self.js_runtime.op_state();
381381
let mut op_state = op_state_rc.borrow_mut();
382-
op_state
383-
.put::<mpsc::UnboundedReceiver<(UnixStream, Option<watch::Receiver<ConnSync>>)>>(
384-
unix_stream_rx,
385-
);
382+
op_state.put::<mpsc::UnboundedReceiver<UnixStreamEntry>>(unix_stream_rx);
386383

387384
if self.conf.is_main_worker() {
388385
op_state.put::<mpsc::UnboundedSender<UserWorkerMsgs>>(
@@ -447,7 +444,7 @@ impl DenoRuntime {
447444
cx,
448445
PollEventLoopOptions {
449446
wait_for_inspector: false,
450-
pump_v8_message_loop: true,
447+
pump_v8_message_loop: !is_termination_requested.is_raised(),
451448
},
452449
);
453450

@@ -533,8 +530,8 @@ fn set_v8_flags() {
533530
#[cfg(test)]
534531
mod test {
535532
use crate::deno_runtime::DenoRuntime;
533+
use crate::rt_worker::worker::UnixStreamEntry;
536534
use deno_core::{FastString, ModuleCode, PollEventLoopOptions};
537-
use sb_core::conn_sync::ConnSync;
538535
use sb_graph::emitter::EmitterFactory;
539536
use sb_graph::{generate_binary_eszip, EszipPayloadKind};
540537
use sb_workers::context::{
@@ -548,8 +545,7 @@ mod test {
548545
use std::io::Write;
549546
use std::path::PathBuf;
550547
use std::sync::Arc;
551-
use tokio::net::UnixStream;
552-
use tokio::sync::{mpsc, watch};
548+
use tokio::sync::mpsc;
553549

554550
#[tokio::test]
555551
#[serial]
@@ -1059,8 +1055,7 @@ mod test {
10591055
#[serial]
10601056
async fn test_read_file_user_rt() {
10611057
let mut user_rt = create_basic_user_runtime("./test_cases/readFile", 20, 1000).await;
1062-
let (_tx, unix_stream_rx) =
1063-
mpsc::unbounded_channel::<(UnixStream, Option<watch::Receiver<ConnSync>>)>();
1058+
let (_tx, unix_stream_rx) = mpsc::unbounded_channel::<UnixStreamEntry>();
10641059

10651060
let (result, _) = user_rt.run(unix_stream_rx, None, None).await;
10661061
match result {
@@ -1077,8 +1072,7 @@ mod test {
10771072
#[serial]
10781073
async fn test_array_buffer_allocation_below_limit() {
10791074
let mut user_rt = create_basic_user_runtime("./test_cases/array_buffers", 20, 1000).await;
1080-
let (_tx, unix_stream_rx) =
1081-
mpsc::unbounded_channel::<(UnixStream, Option<watch::Receiver<ConnSync>>)>();
1075+
let (_tx, unix_stream_rx) = mpsc::unbounded_channel::<UnixStreamEntry>();
10821076
let (result, _) = user_rt.run(unix_stream_rx, None, None).await;
10831077
assert!(result.is_ok(), "expected no errors");
10841078
}
@@ -1087,8 +1081,7 @@ mod test {
10871081
#[serial]
10881082
async fn test_array_buffer_allocation_above_limit() {
10891083
let mut user_rt = create_basic_user_runtime("./test_cases/array_buffers", 15, 1000).await;
1090-
let (_tx, unix_stream_rx) =
1091-
mpsc::unbounded_channel::<(UnixStream, Option<watch::Receiver<ConnSync>>)>();
1084+
let (_tx, unix_stream_rx) = mpsc::unbounded_channel::<UnixStreamEntry>();
10921085
let (result, _) = user_rt.run(unix_stream_rx, None, None).await;
10931086
match result {
10941087
Err(err) => {

crates/base/src/rt_worker/implementation/default_handler.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
use crate::deno_runtime::DenoRuntime;
22
use crate::rt_worker::supervisor::CPUUsageMetrics;
3-
use crate::rt_worker::worker::{HandleCreationType, Worker, WorkerHandler};
3+
use crate::rt_worker::worker::{HandleCreationType, UnixStreamEntry, Worker, WorkerHandler};
44
use anyhow::Error;
55
use event_worker::events::{BootFailureEvent, PseudoEvent, UncaughtExceptionEvent, WorkerEvents};
66
use log::error;
7-
use sb_core::conn_sync::ConnSync;
87
use std::any::Any;
9-
use tokio::net::UnixStream;
108
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
119
use tokio::sync::oneshot::Receiver;
12-
use tokio::sync::watch;
1310

1411
impl WorkerHandler for Worker {
1512
fn handle_error(&self, error: Error) -> Result<WorkerEvents, Error> {
@@ -22,7 +19,7 @@ impl WorkerHandler for Worker {
2219
fn handle_creation(
2320
&self,
2421
mut created_rt: DenoRuntime,
25-
unix_stream_rx: UnboundedReceiver<(UnixStream, Option<watch::Receiver<ConnSync>>)>,
22+
unix_stream_rx: UnboundedReceiver<UnixStreamEntry>,
2623
termination_event_rx: Receiver<WorkerEvents>,
2724
maybe_cpu_usage_metrics_tx: Option<UnboundedSender<CPUUsageMetrics>>,
2825
name: Option<String>,

crates/base/src/rt_worker/worker.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,14 @@ pub struct Worker {
4141
}
4242

4343
pub type HandleCreationType = Pin<Box<dyn Future<Output = Result<WorkerEvents, Error>>>>;
44+
pub type UnixStreamEntry = (UnixStream, Option<watch::Receiver<ConnSync>>);
4445

4546
pub trait WorkerHandler: Send {
4647
fn handle_error(&self, error: Error) -> Result<WorkerEvents, Error>;
4748
fn handle_creation(
4849
&self,
4950
created_rt: DenoRuntime,
50-
unix_stream_rx: UnboundedReceiver<(UnixStream, Option<watch::Receiver<ConnSync>>)>,
51+
unix_stream_rx: UnboundedReceiver<UnixStreamEntry>,
5152
termination_event_rx: Receiver<WorkerEvents>,
5253
maybe_cpu_metrics_tx: Option<UnboundedSender<CPUUsageMetrics>>,
5354
name: Option<String>,
@@ -57,10 +58,10 @@ pub trait WorkerHandler: Send {
5758

5859
impl Worker {
5960
pub fn new(init_opts: &WorkerContextInitOpts) -> Result<Self, Error> {
60-
let (worker_key, pool_msg_tx, events_msg_tx, cancel, thread_name) =
61+
let (worker_key, pool_msg_tx, events_msg_tx, cancel, worker_name) =
6162
parse_worker_conf(&init_opts.conf);
62-
let event_metadata = get_event_metadata(&init_opts.conf);
6363

64+
let event_metadata = get_event_metadata(&init_opts.conf);
6465
let worker_boot_start_time = Instant::now();
6566

6667
Ok(Self {
@@ -71,7 +72,7 @@ impl Worker {
7172
cancel,
7273
event_metadata,
7374
worker_key,
74-
worker_name: thread_name,
75+
worker_name,
7576
})
7677
}
7778

@@ -82,7 +83,10 @@ impl Worker {
8283
pub fn start(
8384
&self,
8485
mut opts: WorkerContextInitOpts,
85-
unix_channel_rx: UnboundedReceiver<(UnixStream, Option<watch::Receiver<ConnSync>>)>,
86+
unix_stream_pair: (
87+
UnboundedSender<UnixStreamEntry>,
88+
UnboundedReceiver<UnixStreamEntry>,
89+
),
8690
booter_signal: Sender<Result<(), Error>>,
8791
termination_token: Option<TerminationToken>,
8892
) {
@@ -91,6 +95,7 @@ impl Worker {
9195
let event_metadata = self.event_metadata.clone();
9296
let supervisor_policy = self.supervisor_policy.unwrap_or_default();
9397

98+
let (unix_stream_tx, unix_stream_rx) = unix_stream_pair;
9499
let events_msg_tx = self.events_msg_tx.clone();
95100
let pool_msg_tx = self.pool_msg_tx.clone();
96101

@@ -191,7 +196,7 @@ impl Worker {
191196

192197
let data = method_cloner.handle_creation(
193198
new_runtime,
194-
unix_channel_rx,
199+
unix_stream_rx,
195200
termination_event_rx,
196201
maybe_cpu_usage_metrics_tx,
197202
Some(worker_name),
@@ -216,6 +221,8 @@ impl Worker {
216221
}
217222
};
218223

224+
drop(unix_stream_tx);
225+
219226
match result {
220227
Ok(event) => {
221228
match event {

crates/base/src/rt_worker/worker_ctx.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use uuid::Uuid;
2929

3030
use super::rt;
3131
use super::supervisor::{self, CPUTimerParam, CPUUsageMetrics};
32+
use super::worker::UnixStreamEntry;
3233
use super::worker_pool::{SupervisorPolicy, WorkerPoolPolicy};
3334

3435
#[derive(Clone)]
@@ -75,7 +76,7 @@ impl TerminationToken {
7576
}
7677

7778
async fn handle_request(
78-
unix_stream_tx: mpsc::UnboundedSender<(UnixStream, Option<watch::Receiver<ConnSync>>)>,
79+
unix_stream_tx: mpsc::UnboundedSender<UnixStreamEntry>,
7980
msg: WorkerRequestMsg,
8081
) -> Result<(), Error> {
8182
// create a unix socket pair
@@ -310,8 +311,7 @@ pub async fn create_worker<Opt: Into<CreateWorkerArgs>>(
310311
init_opts: Opt,
311312
) -> Result<mpsc::UnboundedSender<WorkerRequestMsg>, Error> {
312313
let (worker_boot_result_tx, worker_boot_result_rx) = oneshot::channel::<Result<(), Error>>();
313-
let (unix_stream_tx, unix_stream_rx) =
314-
mpsc::unbounded_channel::<(UnixStream, Option<watch::Receiver<ConnSync>>)>();
314+
let (unix_stream_tx, unix_stream_rx) = mpsc::unbounded_channel::<UnixStreamEntry>();
315315

316316
let CreateWorkerArgs(init_opts, maybe_supervisor_policy, maybe_termination_token) =
317317
init_opts.into();
@@ -331,27 +331,31 @@ pub async fn create_worker<Opt: Into<CreateWorkerArgs>>(
331331
if let Some(worker_struct_ref) = downcast_reference {
332332
worker_struct_ref.start(
333333
init_opts,
334-
unix_stream_rx,
334+
(unix_stream_tx.clone(), unix_stream_rx),
335335
worker_boot_result_tx,
336336
maybe_termination_token.clone(),
337337
);
338338

339339
// create an async task waiting for requests for worker
340340
let (worker_req_tx, mut worker_req_rx) = mpsc::unbounded_channel::<WorkerRequestMsg>();
341341

342-
let worker_req_handle: tokio::task::JoinHandle<Result<(), Error>> =
343-
tokio::task::spawn(async move {
342+
let worker_req_handle: tokio::task::JoinHandle<Result<(), Error>> = tokio::task::spawn({
343+
let stream_tx = unix_stream_tx;
344+
async move {
344345
while let Some(msg) = worker_req_rx.recv().await {
345-
let unix_stream_tx_clone = unix_stream_tx.clone();
346-
tokio::task::spawn(async move {
347-
if let Err(err) = handle_request(unix_stream_tx_clone, msg).await {
348-
error!("worker failed to handle request: {:?}", err);
346+
tokio::task::spawn({
347+
let stream_tx_inner = stream_tx.clone();
348+
async move {
349+
if let Err(err) = handle_request(stream_tx_inner, msg).await {
350+
error!("worker failed to handle request: {:?}", err);
351+
}
349352
}
350353
});
351354
}
352355

353356
Ok(())
354-
});
357+
}
358+
});
355359

356360
// wait for worker to be successfully booted
357361
let worker_boot_result = worker_boot_result_rx.await?;

0 commit comments

Comments
 (0)