Skip to content

Commit a6bd18a

Browse files
authored
Merge pull request #241 from nyannyacha/feat-policy
feat: introduce various supervisor policy
2 parents 6b2c205 + 5f99749 commit a6bd18a

40 files changed

+1834
-455
lines changed

Cargo.lock

Lines changed: 40 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ ring = "=0.16.20"
6464
urlencoding = { version = "2.1.2" }
6565
import_map = { version = "0.15.0" }
6666
base64 = { version = "=0.13.1" }
67+
futures = { version = "0.3.28" }
68+
futures-util = { version = "0.3.28" }
6769

6870
[profile.release]
6971
lto = true

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# syntax=docker/dockerfile:1.4
2-
FROM rust:1.73.0-bookworm as builder
2+
FROM rust:1.74.1-bookworm as builder
33
ARG TARGETPLATFORM
44
ARG GIT_V_VERSION
55
RUN apt-get update && apt-get install -y llvm-dev libclang-dev clang cmake

crates/base/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ log = { workspace = true }
3838
reqwest.workspace = true
3939
serde = { version = "1.0.149", features = ["derive"] }
4040
tokio = { workspace = true }
41+
tokio-util = { workspace = true }
42+
futures-util = { workspace = true }
4143
url = { version = "2.3.1" }
4244
event_worker ={ version = "0.1.0", path = "../event_worker" }
4345
sb_workers = { version = "0.1.0", path = "../sb_workers" }
@@ -52,10 +54,11 @@ deno_broadcast_channel.workspace = true
5254
sb_node = { version = "0.1.0", path = "../node" }
5355
eszip.workspace = true
5456
notify = { version = "6.1.1", default-features = false, features = ["macos_kqueue"] }
57+
flume = { version = "0.11.0" }
58+
strum = { version = "0.25.0", features = ["derive"] }
5559
urlencoding.workspace = true
5660

5761
[dev-dependencies]
58-
futures-util = { version = "0.3.28" }
5962
flaky_test = { version = "0.1.0", path = "../flaky_test" }
6063

6164
[build-dependencies]

crates/base/src/commands.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use crate::server::{Server, ServerCodes, WorkerEntrypoints};
1+
use crate::{
2+
rt_worker::worker_pool::WorkerPoolPolicy,
3+
server::{Server, ServerCodes, WorkerEntrypoints},
4+
};
25
use anyhow::Error;
36
use deno_core::JsRuntime;
47
use log::error;
@@ -10,8 +13,10 @@ pub async fn start_server(
1013
port: u16,
1114
main_service_path: String,
1215
event_worker_path: Option<String>,
16+
user_worker_policy: Option<WorkerPoolPolicy>,
1317
import_map_path: Option<String>,
1418
no_module_cache: bool,
19+
no_signal_handler: bool,
1520
callback_tx: Option<Sender<ServerCodes>>,
1621
entrypoints: WorkerEntrypoints,
1722
) -> Result<(), Error> {
@@ -27,8 +32,10 @@ pub async fn start_server(
2732
port,
2833
main_service_path,
2934
event_worker_path,
35+
user_worker_policy,
3036
import_map_path,
3137
no_module_cache,
38+
no_signal_handler,
3239
callback_tx,
3340
entrypoints,
3441
)

crates/base/src/deno_runtime.rs

Lines changed: 42 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@ use deno_tls::rustls::RootCertStore;
1010
use deno_tls::rustls_native_certs::load_native_certs;
1111
use deno_tls::RootCertStoreProvider;
1212
use log::error;
13+
use sb_core::conn_sync::ConnSync;
1314
use serde::de::DeserializeOwned;
1415
use std::collections::HashMap;
1516
use std::fmt;
17+
use std::os::fd::RawFd;
1618
use std::sync::Arc;
17-
use std::time::Duration;
1819
use tokio::net::UnixStream;
19-
use tokio::sync::mpsc;
20+
use tokio::sync::{mpsc, watch};
2021

2122
use crate::snapshot;
2223
use event_worker::events::{EventMetadata, WorkerEventWithMetadata};
@@ -79,6 +80,7 @@ impl DenoRuntime {
7980
maybe_eszip,
8081
maybe_entrypoint,
8182
maybe_module_code,
83+
..
8284
} = opts;
8385

8486
let user_agent = "supabase-edge-runtime".to_string();
@@ -302,6 +304,10 @@ impl DenoRuntime {
302304
.put::<mpsc::UnboundedReceiver<WorkerEventWithMetadata>>(events_rx.unwrap());
303305
}
304306

307+
if conf.is_main_worker() || conf.is_user_worker() {
308+
op_state.put::<HashMap<RawFd, watch::Receiver<ConnSync>>>(HashMap::new());
309+
}
310+
305311
if conf.is_user_worker() {
306312
let conf = conf.as_user_worker().unwrap();
307313

@@ -336,13 +342,16 @@ impl DenoRuntime {
336342
}
337343

338344
pub async fn run(
339-
mut self,
340-
unix_stream_rx: mpsc::UnboundedReceiver<UnixStream>,
345+
&mut self,
346+
unix_stream_rx: mpsc::UnboundedReceiver<(UnixStream, Option<watch::Receiver<ConnSync>>)>,
341347
) -> Result<(), Error> {
342348
{
343349
let op_state_rc = self.js_runtime.op_state();
344350
let mut op_state = op_state_rc.borrow_mut();
345-
op_state.put::<mpsc::UnboundedReceiver<UnixStream>>(unix_stream_rx);
351+
op_state
352+
.put::<mpsc::UnboundedReceiver<(UnixStream, Option<watch::Receiver<ConnSync>>)>>(
353+
unix_stream_rx,
354+
);
346355

347356
if self.conf.is_main_worker() {
348357
op_state.put::<mpsc::UnboundedSender<UserWorkerMsgs>>(
@@ -351,36 +360,19 @@ impl DenoRuntime {
351360
}
352361
}
353362

354-
let mut js_runtime = self.js_runtime;
363+
let js_runtime = &mut self.js_runtime;
364+
let mod_result_rx = js_runtime.mod_evaluate(self.main_module_id);
355365

356-
let future = async move {
357-
let mod_result_rx = js_runtime.mod_evaluate(self.main_module_id);
358-
match js_runtime.run_event_loop(false).await {
359-
Err(err) => {
360-
// usually this happens because isolate is terminated
361-
error!("event loop error: {}", err);
362-
Err(anyhow!("event loop error: {}", err))
366+
match js_runtime.run_event_loop(false).await {
367+
Err(err) => Err(anyhow!("event loop error: {}", err)),
368+
Ok(_) => match mod_result_rx.await {
369+
Err(_) => Err(anyhow!("mod result sender dropped")),
370+
Ok(Err(err)) => {
371+
error!("{}", err.to_string());
372+
Err(err)
363373
}
364-
Ok(_) => match mod_result_rx.await {
365-
Err(_) => Err(anyhow!("mod result sender dropped")),
366-
Ok(Err(err)) => {
367-
error!("{}", err.to_string());
368-
Err(err)
369-
}
370-
Ok(Ok(_)) => Ok(()),
371-
},
372-
}
373-
};
374-
375-
// need to set an explicit timeout here in case the event loop idle
376-
let mut duration = Duration::MAX;
377-
if self.conf.is_user_worker() {
378-
let worker_timeout_ms = self.conf.as_user_worker().unwrap().worker_timeout_ms;
379-
duration = Duration::from_millis(worker_timeout_ms);
380-
}
381-
match tokio::time::timeout(duration, future).await {
382-
Err(_) => Err(anyhow!("wall clock duration reached")),
383-
Ok(res) => res,
374+
Ok(Ok(_)) => Ok(()),
375+
},
384376
}
385377
}
386378

@@ -404,6 +396,7 @@ impl DenoRuntime {
404396
mod test {
405397
use crate::deno_runtime::DenoRuntime;
406398
use deno_core::{FastString, ModuleCode};
399+
use sb_core::conn_sync::ConnSync;
407400
use sb_graph::emitter::EmitterFactory;
408401
use sb_graph::{generate_binary_eszip, EszipPayloadKind};
409402
use sb_workers::context::{
@@ -417,7 +410,7 @@ mod test {
417410
use std::path::PathBuf;
418411
use std::sync::Arc;
419412
use tokio::net::UnixStream;
420-
use tokio::sync::mpsc;
413+
use tokio::sync::{mpsc, watch};
421414

422415
#[tokio::test]
423416
async fn test_module_code_no_eszip() {
@@ -428,6 +421,7 @@ mod test {
428421
import_map_path: None,
429422
env_vars: Default::default(),
430423
events_rx: None,
424+
timing: None,
431425
maybe_eszip: None,
432426
maybe_entrypoint: None,
433427
maybe_module_code: Some(FastString::from(String::from(
@@ -461,6 +455,7 @@ mod test {
461455
import_map_path: None,
462456
env_vars: Default::default(),
463457
events_rx: None,
458+
timing: None,
464459
maybe_eszip: Some(EszipPayloadKind::VecKind(eszip_code)),
465460
maybe_entrypoint: None,
466461
maybe_module_code: None,
@@ -509,6 +504,7 @@ mod test {
509504
import_map_path: None,
510505
env_vars: Default::default(),
511506
events_rx: None,
507+
timing: None,
512508
maybe_eszip: Some(EszipPayloadKind::VecKind(eszip_code)),
513509
maybe_entrypoint: None,
514510
maybe_module_code: None,
@@ -551,6 +547,7 @@ mod test {
551547
import_map_path: None,
552548
env_vars: env_vars.unwrap_or_default(),
553549
events_rx: None,
550+
timing: None,
554551
maybe_eszip: None,
555552
maybe_entrypoint: None,
556553
maybe_module_code: None,
@@ -869,6 +866,7 @@ mod test {
869866
key: None,
870867
pool_msg_tx: None,
871868
events_msg_tx: None,
869+
cancel: None,
872870
service_path: None,
873871
})),
874872
)
@@ -877,8 +875,10 @@ mod test {
877875

878876
#[tokio::test]
879877
async fn test_read_file_user_rt() {
880-
let user_rt = create_basic_user_runtime("./test_cases/readFile", 20, 1000).await;
881-
let (_tx, unix_stream_rx) = mpsc::unbounded_channel::<UnixStream>();
878+
let mut user_rt = create_basic_user_runtime("./test_cases/readFile", 20, 1000).await;
879+
let (_tx, unix_stream_rx) =
880+
mpsc::unbounded_channel::<(UnixStream, Option<watch::Receiver<ConnSync>>)>();
881+
882882
let result = user_rt.run(unix_stream_rx).await;
883883
match result {
884884
Err(err) => {
@@ -892,16 +892,18 @@ mod test {
892892

893893
#[tokio::test]
894894
async fn test_array_buffer_allocation_below_limit() {
895-
let user_rt = create_basic_user_runtime("./test_cases/array_buffers", 20, 1000).await;
896-
let (_tx, unix_stream_rx) = mpsc::unbounded_channel::<UnixStream>();
895+
let mut user_rt = create_basic_user_runtime("./test_cases/array_buffers", 20, 1000).await;
896+
let (_tx, unix_stream_rx) =
897+
mpsc::unbounded_channel::<(UnixStream, Option<watch::Receiver<ConnSync>>)>();
897898
let result = user_rt.run(unix_stream_rx).await;
898899
assert!(result.is_ok(), "expected no errors");
899900
}
900901

901902
#[tokio::test]
902903
async fn test_array_buffer_allocation_above_limit() {
903-
let user_rt = create_basic_user_runtime("./test_cases/array_buffers", 15, 1000).await;
904-
let (_tx, unix_stream_rx) = mpsc::unbounded_channel::<UnixStream>();
904+
let mut user_rt = create_basic_user_runtime("./test_cases/array_buffers", 15, 1000).await;
905+
let (_tx, unix_stream_rx) =
906+
mpsc::unbounded_channel::<(UnixStream, Option<watch::Receiver<ConnSync>>)>();
905907
let result = user_rt.run(unix_stream_rx).await;
906908
match result {
907909
Err(err) => {

crates/base/src/macros/test_macros.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ macro_rules! integration_test {
2525
String::from($main_file),
2626
None,
2727
None,
28+
None,
2829
false,
30+
true,
2931
Some(tx.clone()),
3032
$crate::server::WorkerEntrypoints {
3133
main: None,

0 commit comments

Comments
 (0)