Skip to content

Commit 1a1256b

Browse files
authored
feat: expose a function that tries to clean up idle workers immediately (#584)
* feat: expose a function that tries to clean up idle workers immediately * stamp: polishing * stamp: clippy
1 parent 5f260be commit 1a1256b

File tree

8 files changed

+148
-1
lines changed

8 files changed

+148
-1
lines changed

crates/base/src/worker/pool.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,19 @@ use ext_workers::context::UserWorkerProfile;
2626
use ext_workers::context::WorkerContextInitOpts;
2727
use ext_workers::context::WorkerRuntimeOpts;
2828
use ext_workers::errors::WorkerError;
29+
use futures_util::future::join_all;
2930
use http_v02::Request;
3031
use hyper_v014::Body;
3132
use log::error;
3233
use tokio::sync::mpsc;
3334
use tokio::sync::mpsc::UnboundedSender;
35+
use tokio::sync::oneshot;
3436
use tokio::sync::oneshot::Sender;
3537
use tokio::sync::Notify;
3638
use tokio::sync::OwnedSemaphorePermit;
3739
use tokio::sync::Semaphore;
3840
use tokio::sync::TryAcquireError;
41+
use tokio::time::timeout;
3942
use tokio_util::sync::CancellationToken;
4043
use uuid::Uuid;
4144

@@ -442,6 +445,7 @@ impl WorkerPool {
442445
is_retired: Arc::new(AtomicFlag::default()),
443446
};
444447

448+
let (early_drop_tx, early_drop_rx) = mpsc::unbounded_channel();
445449
let (req_end_timing_tx, req_end_timing_rx) =
446450
mpsc::unbounded_channel::<()>();
447451

@@ -453,6 +457,7 @@ impl WorkerPool {
453457
user_worker_rt_opts.cancel = Some(cancel.clone());
454458

455459
worker_options.timing = Some(Timing {
460+
early_drop_rx,
456461
status: status.clone(),
457462
req: (req_start_timing_rx, req_end_timing_rx),
458463
});
@@ -472,6 +477,7 @@ impl WorkerPool {
472477
Ok(surface) => {
473478
let profile = UserWorkerProfile {
474479
worker_request_msg_tx: surface.msg_tx,
480+
early_drop_tx,
475481
timing_tx_pair: (req_start_timing_tx, req_end_timing_tx),
476482
service_path,
477483
permit: permit.map(Arc::new),
@@ -656,6 +662,24 @@ impl WorkerPool {
656662
self.metric_src.decl_active_user_workers();
657663
}
658664

665+
async fn try_cleanup_idle_workers(&mut self, timeout_ms: usize) -> usize {
666+
let mut rxs = vec![];
667+
for profile in self.user_workers.values_mut() {
668+
let (tx, rx) = oneshot::channel();
669+
if profile.early_drop_tx.send(tx).is_ok() {
670+
rxs.push(timeout(Duration::from_millis(timeout_ms as u64), rx));
671+
}
672+
}
673+
674+
join_all(rxs)
675+
.await
676+
.into_iter()
677+
.filter_map(|it| it.ok())
678+
.map(|it| it.unwrap_or_default())
679+
.filter(|it| *it)
680+
.count()
681+
}
682+
659683
fn retire(&mut self, key: &Uuid) {
660684
if let Some(profile) = self.user_workers.get_mut(key) {
661685
let registry = self
@@ -795,6 +819,10 @@ pub async fn create_user_worker_pool(
795819
break;
796820
}
797821
}
822+
823+
Some(UserWorkerMsgs::TryCleanupIdleWorkers(timeout_ms, res_tx)) => {
824+
let _ = res_tx.send(worker_pool.try_cleanup_idle_workers(timeout_ms).await);
825+
}
798826
}
799827
}
800828
}

crates/base/src/worker/supervisor/strategy_per_worker.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,10 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
143143
} = args;
144144

145145
let Timing {
146+
mut early_drop_rx,
146147
status: TimingStatus { demand, is_retired },
147148
req: (_, mut req_end_rx),
149+
..
148150
} = timing.unwrap_or_default();
149151

150152
let UserWorkerRuntimeOpts {
@@ -467,6 +469,18 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
467469
complete_reason = Some(ShutdownReason::Memory);
468470
}
469471

472+
Some(tx) = early_drop_rx.recv() => {
473+
let mut acknowledged = false;
474+
if state.have_all_pending_tasks_been_resolved() {
475+
if let Some(func) = dispatch_early_drop_beforeunload_fn.take() {
476+
early_retire_fn();
477+
func();
478+
acknowledged = true;
479+
}
480+
}
481+
let _ = tx.send(acknowledged);
482+
}
483+
470484
_ = &mut early_drop_fut => {
471485
info!("early termination has been triggered: isolate: {:?}", key);
472486
complete_reason = Some(ShutdownReason::EarlyDrop);

crates/base/test_cases/main/index.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,18 @@ function parseIntFromHeadersOrDefault(req: Request, key: string, val?: number) {
1414
return parsedValue;
1515
}
1616

17-
Deno.serve((req: Request) => {
17+
Deno.serve(async (req: Request) => {
1818
console.log(req.url);
1919
const url = new URL(req.url);
2020
const { pathname } = url;
21+
22+
// handle health checks
23+
if (pathname === "/_internal/cleanup-idle-workers") {
24+
return Response.json({
25+
count: await EdgeRuntime.userWorkers.tryCleanupIdleWorkers(1000),
26+
});
27+
}
28+
2129
const path_parts = pathname.split("/");
2230
let service_name = path_parts[1];
2331

crates/base/tests/integration_tests.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3911,6 +3911,66 @@ async fn test_request_absent_timeout() {
39113911
unreachable!("test failed");
39123912
}
39133913

3914+
#[tokio::test]
3915+
#[serial]
3916+
async fn test_user_workers_cleanup_idle_workers() {
3917+
let (tx, mut rx) = mpsc::unbounded_channel();
3918+
let tb = TestBedBuilder::new("./test_cases/main")
3919+
.with_per_worker_policy(None)
3920+
.with_worker_event_sender(Some(tx))
3921+
.build()
3922+
.await;
3923+
3924+
let resp = tb
3925+
.request(|b| {
3926+
b.uri("/sleep-5000ms")
3927+
.header("x-worker-timeout-ms", HeaderValue::from_static("3600000"))
3928+
.body(Body::empty())
3929+
.context("can't make request")
3930+
})
3931+
.await
3932+
.unwrap();
3933+
3934+
assert_eq!(resp.status().as_u16(), StatusCode::OK);
3935+
3936+
let mut resp = tb
3937+
.request(|b| {
3938+
b.uri("/_internal/cleanup-idle-workers")
3939+
.body(Body::empty())
3940+
.context("can't make request")
3941+
})
3942+
.await
3943+
.unwrap();
3944+
3945+
assert_eq!(resp.status().as_u16(), StatusCode::OK);
3946+
3947+
let bytes = hyper_v014::body::HttpBody::collect(resp.body_mut())
3948+
.await
3949+
.unwrap()
3950+
.to_bytes();
3951+
3952+
let payload = serde_json::from_slice::<serde_json::Value>(&bytes).unwrap();
3953+
let count = payload.get("count").unwrap().as_u64().unwrap();
3954+
3955+
assert_eq!(count, 1);
3956+
3957+
sleep(Duration::from_secs(3)).await;
3958+
3959+
rx.close();
3960+
tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await;
3961+
while let Some(ev) = rx.recv().await {
3962+
let WorkerEvents::Shutdown(ev) = ev.event else {
3963+
continue;
3964+
};
3965+
if ev.reason != ShutdownReason::EarlyDrop {
3966+
break;
3967+
}
3968+
return;
3969+
}
3970+
3971+
unreachable!("test failed");
3972+
}
3973+
39143974
#[derive(Deserialize)]
39153975
struct ErrorResponsePayload {
39163976
msg: String,

ext/workers/context.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ impl Default for UserWorkerRuntimeOpts {
138138
#[derive(Debug, Clone)]
139139
pub struct UserWorkerProfile {
140140
pub worker_request_msg_tx: mpsc::UnboundedSender<WorkerRequestMsg>,
141+
pub early_drop_tx: mpsc::UnboundedSender<oneshot::Sender<bool>>,
141142
pub timing_tx_pair: (
142143
mpsc::UnboundedSender<Arc<Notify>>,
143144
mpsc::UnboundedSender<()>,
@@ -226,6 +227,7 @@ pub struct TimingStatus {
226227

227228
#[derive(Debug)]
228229
pub struct Timing {
230+
pub early_drop_rx: mpsc::UnboundedReceiver<oneshot::Sender<bool>>,
229231
pub status: TimingStatus,
230232
pub req: (
231233
mpsc::UnboundedReceiver<Arc<Notify>>,
@@ -235,10 +237,12 @@ pub struct Timing {
235237

236238
impl Default for Timing {
237239
fn default() -> Self {
240+
let (_, dumb_early_drop_rx) = unbounded_channel();
238241
let (_, dumb_start_rx) = unbounded_channel::<Arc<Notify>>();
239242
let (_, dumb_end_rx) = unbounded_channel::<()>();
240243

241244
Self {
245+
early_drop_rx: dumb_early_drop_rx,
242246
status: TimingStatus::default(),
243247
req: (dumb_start_rx, dumb_end_rx),
244248
}
@@ -277,6 +281,7 @@ pub enum UserWorkerMsgs {
277281
),
278282
Idle(Uuid),
279283
Shutdown(Uuid),
284+
TryCleanupIdleWorkers(usize, oneshot::Sender<usize>),
280285
}
281286

282287
pub type SendRequestResult = (Response<Body>, mpsc::UnboundedSender<()>);

ext/workers/lib.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ deno_core::extension!(
6969
op_user_worker_create,
7070
op_user_worker_fetch_build,
7171
op_user_worker_fetch_send,
72+
op_user_worker_cleanup_idle_workers,
7273
],
7374
esm_entry_point = "ext:user_workers/user_workers.js",
7475
esm = ["user_workers.js",]
@@ -631,6 +632,30 @@ pub async fn op_user_worker_fetch_send(
631632
Ok(response)
632633
}
633634

635+
#[op2(async)]
636+
#[number]
637+
pub async fn op_user_worker_cleanup_idle_workers(
638+
state: Rc<RefCell<OpState>>,
639+
#[number] timeout_ms: usize,
640+
) -> usize {
641+
let msg_tx = {
642+
state
643+
.borrow()
644+
.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>()
645+
.clone()
646+
};
647+
648+
let (tx, rx) = oneshot::channel();
649+
if msg_tx
650+
.send(UserWorkerMsgs::TryCleanupIdleWorkers(timeout_ms, tx))
651+
.is_err()
652+
{
653+
return 0;
654+
}
655+
656+
(rx.await).unwrap_or_default()
657+
}
658+
634659
/// Wraps a [`mpsc::Receiver`] in a [`Stream`] that can be used as a Hyper [`Body`].
635660
pub struct BodyStream(pub mpsc::Receiver<Result<bytes::Bytes, Error>>);
636661

ext/workers/user_workers.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const { TypeError } = primordials;
1212
const {
1313
op_user_worker_fetch_send,
1414
op_user_worker_create,
15+
op_user_worker_cleanup_idle_workers,
1516
} = ops;
1617

1718
const NO_SUPABASE_TAG_WARN_MSG =
@@ -144,6 +145,10 @@ class UserWorker {
144145

145146
return new UserWorker(key);
146147
}
148+
149+
static async tryCleanupIdleWorkers(timeoutMs) {
150+
return await op_user_worker_cleanup_idle_workers(timeoutMs);
151+
}
147152
}
148153

149154
const SUPABASE_USER_WORKERS = UserWorker;

types/global.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,9 @@ declare namespace EdgeRuntime {
121121
request: Request,
122122
options?: UserWorkerFetchOptions,
123123
): Promise<Response>;
124+
124125
static create(opts: UserWorkerCreateOptions): Promise<UserWorker>;
126+
static tryCleanupIdleWorkers(timeoutMs: number): Promise<number>;
125127
}
126128

127129
export function scheduleTermination(): void;

0 commit comments

Comments
 (0)