Skip to content

Commit d3b9cee

Browse files
authored
fix: remove another bottleneck that causes boot time spike (#596)
* fix: remove another bottleneck that causes boot time spike * chore: add integration test
1 parent 6084d3b commit d3b9cee

File tree

5 files changed

+126
-21
lines changed

5 files changed

+126
-21
lines changed

crates/base/src/utils/test_utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ impl TestBedBuilder {
271271
}
272272
}
273273

274+
#[derive(Clone)]
274275
pub struct TestBed {
275276
pool_termination_token: TerminationToken,
276277
main_termination_token: TerminationToken,

crates/base/src/worker/worker_inner.rs

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,12 @@ impl Worker {
273273

274274
let _ =
275275
booter_signal.send(Ok((metric_src, new_runtime.drop_token.clone())));
276+
277+
let span = debug_span!(
278+
"poll",
279+
thread = ?std::thread::current().id(),
280+
);
281+
276282
let supervise_fut = match imp.clone().supervise(&mut new_runtime) {
277283
Some(v) => v.boxed(),
278284
None if worker_kind.is_user_worker() => return None,
@@ -290,28 +296,41 @@ impl Worker {
290296
}
291297
});
292298

293-
let result = imp.on_created(&mut new_runtime).await;
294-
let maybe_uncaught_exception_event = match result.as_ref() {
295-
Ok(WorkerEvents::UncaughtException(ev)) => Some(ev.clone()),
296-
Err(err) => Some(UncaughtExceptionEvent {
297-
cpu_time_used: 0,
298-
exception: err.to_string(),
299-
}),
299+
let worker_poll_fut = async move {
300+
let result = imp.on_created(&mut new_runtime).await;
301+
let maybe_uncaught_exception_event = match result.as_ref() {
302+
Ok(WorkerEvents::UncaughtException(ev)) => Some(ev.clone()),
303+
Err(err) => Some(UncaughtExceptionEvent {
304+
cpu_time_used: 0,
305+
exception: err.to_string(),
306+
}),
300307

301-
_ => None,
302-
};
308+
_ => None,
309+
};
303310

304-
if let Some(ev) = maybe_uncaught_exception_event {
305-
exit.set(WorkerExitStatus::WithUncaughtException(ev)).await;
306-
}
311+
if let Some(ev) = maybe_uncaught_exception_event {
312+
exit.set(WorkerExitStatus::WithUncaughtException(ev)).await;
313+
}
307314

308-
drop(new_runtime);
309-
let _ = supervise_fut.await;
315+
drop(new_runtime);
316+
let _ = supervise_fut.await;
310317

311-
Some(result)
318+
result
319+
}
320+
.instrument(span);
321+
322+
Some(
323+
rt.spawn_pinned({
324+
let fut = unsafe { MaskFutureAsSend::new(worker_poll_fut) };
325+
move || tokio::task::spawn_local(fut)
326+
})
327+
.await
328+
.map_err(anyhow::Error::from)
329+
.and_then(|it| it.map_err(anyhow::Error::from))
330+
.and_then(|it| it.into_inner()),
331+
)
312332
};
313-
314-
let worker_fut = {
333+
let worker_result_fut = {
315334
let event_metadata = event_metadata.clone();
316335
async move {
317336
let Some(result) = worker_fut.await else {
@@ -349,13 +368,11 @@ impl Worker {
349368
"worker",
350369
id = worker_name.as_str(),
351370
kind = %worker_kind,
352-
thread = ?std::thread::current().id(),
353371
metadata = ?event_metadata
354372
));
355373

356-
drop(rt.spawn_pinned({
357-
let worker_fut = unsafe { MaskFutureAsSend::new(worker_fut) };
358-
move || tokio::task::spawn_local(worker_fut)
374+
drop(tokio::spawn(unsafe {
375+
MaskFutureAsSend::new(worker_result_fut)
359376
}));
360377
}
361378
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Deno.serve(() => {
2+
return new Response("meow");
3+
});
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
function mySlowFunction(baseNumber) {
2+
console.time("mySlowFunction");
3+
let now = Date.now();
4+
let result = 0;
5+
for (var i = Math.pow(baseNumber, 7); i >= 0; i--) {
6+
result += Math.atan(i) * Math.tan(i);
7+
}
8+
let duration = Date.now() - now;
9+
console.timeEnd("mySlowFunction");
10+
return { result: result, duration: duration };
11+
}
12+
13+
Deno.serve(async () => {
14+
let count = 0;
15+
while (++count != 300) {
16+
mySlowFunction(8);
17+
}
18+
return new Response("meow");
19+
});

crates/base/tests/integration_tests.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use deno_facade::EszipPayloadKind;
4444
use deno_facade::Metadata;
4545
use ext_event_worker::events::LogLevel;
4646
use ext_event_worker::events::ShutdownReason;
47+
use ext_event_worker::events::WorkerEventWithMetadata;
4748
use ext_event_worker::events::WorkerEvents;
4849
use ext_runtime::SharedMetricSource;
4950
use ext_workers::context::MainWorkerRuntimeOpts;
@@ -2606,6 +2607,70 @@ async fn test_issue_func_280() {
26062607
run("mem", ShutdownReason::Memory).await;
26072608
}
26082609

2610+
#[tokio::test]
2611+
#[serial]
2612+
async fn test_issue_func_284() {
2613+
async fn find_boot_event(
2614+
rx: &mut mpsc::UnboundedReceiver<WorkerEventWithMetadata>,
2615+
) -> Option<usize> {
2616+
while let Some(ev) = rx.recv().await {
2617+
match ev.event {
2618+
WorkerEvents::Boot(ev) => return Some(ev.boot_time),
2619+
_ => continue,
2620+
}
2621+
}
2622+
2623+
None
2624+
}
2625+
2626+
let (tx, mut rx) = mpsc::unbounded_channel();
2627+
let tb = TestBedBuilder::new("./test_cases/main")
2628+
.with_per_worker_policy(None)
2629+
.with_worker_event_sender(Some(tx))
2630+
.build()
2631+
.await;
2632+
2633+
tokio::spawn({
2634+
let tb = tb.clone();
2635+
async move {
2636+
tb.request(|b| {
2637+
b.uri("/meow")
2638+
.header("x-service-path", "issue-func-284/noisy")
2639+
.body(Body::empty())
2640+
.context("can't make request")
2641+
})
2642+
.await
2643+
.unwrap();
2644+
}
2645+
});
2646+
2647+
timeout(Duration::from_secs(1), find_boot_event(&mut rx))
2648+
.await
2649+
.unwrap()
2650+
.unwrap();
2651+
2652+
tokio::spawn({
2653+
let tb = tb.clone();
2654+
async move {
2655+
tb.request(|b| {
2656+
b.uri("/meow")
2657+
.header("x-service-path", "issue-func-284/baseline")
2658+
.body(Body::empty())
2659+
.context("can't make request")
2660+
})
2661+
.await
2662+
.unwrap();
2663+
}
2664+
});
2665+
2666+
let boot_time = timeout(Duration::from_secs(1), find_boot_event(&mut rx))
2667+
.await
2668+
.unwrap()
2669+
.unwrap();
2670+
2671+
assert!(boot_time < 1000);
2672+
}
2673+
26092674
#[tokio::test]
26102675
#[serial]
26112676
async fn test_should_render_detailed_failed_to_create_graph_error() {

0 commit comments

Comments
 (0)