Skip to content

Commit 705db6e

Browse files
authored
Merge pull request #264 from nyannyacha/feat-metric-api
feat: runtime metric API
2 parents dcaaf35 + cb9dcea commit 705db6e

File tree

14 files changed

+487
-111
lines changed

14 files changed

+487
-111
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/base/src/deno_runtime.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ impl DenoRuntime {
421421
// the task from the other threads.
422422
// let mut current_thread_id = std::thread::current().id();
423423

424-
let result = match poll_fn(|cx| {
424+
let poll_result = poll_fn(|cx| {
425425
// INVARIANT: Only can steal current task by other threads when LIFO
426426
// task scheduler heuristic disabled. Turning off the heuristic is
427427
// unstable now, so it's not considered.
@@ -487,8 +487,9 @@ impl DenoRuntime {
487487

488488
poll_result
489489
})
490-
.await
491-
{
490+
.await;
491+
492+
let result = match poll_result {
492493
Err(err) => Err(anyhow!("event loop error: {}", err)),
493494
Ok(_) => match mod_result_rx.await {
494495
Err(e) => {
@@ -573,7 +574,13 @@ mod test {
573574
maybe_module_code: Some(FastString::from(String::from(
574575
"Deno.serve((req) => new Response('Hello World'));",
575576
))),
576-
conf: { WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx }) },
577+
conf: {
578+
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
579+
worker_pool_tx,
580+
shared_metric_src: None,
581+
event_worker_metric_src: None,
582+
})
583+
},
577584
})
578585
.await
579586
.expect("It should not panic");
@@ -612,7 +619,13 @@ mod test {
612619
maybe_eszip: Some(EszipPayloadKind::VecKind(eszip_code)),
613620
maybe_entrypoint: None,
614621
maybe_module_code: None,
615-
conf: { WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx }) },
622+
conf: {
623+
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
624+
worker_pool_tx,
625+
shared_metric_src: None,
626+
event_worker_metric_src: None,
627+
})
628+
},
616629
})
617630
.await;
618631

@@ -673,7 +686,13 @@ mod test {
673686
maybe_eszip: Some(EszipPayloadKind::VecKind(eszip_code)),
674687
maybe_entrypoint: None,
675688
maybe_module_code: None,
676-
conf: { WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx }) },
689+
conf: {
690+
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
691+
worker_pool_tx,
692+
shared_metric_src: None,
693+
event_worker_metric_src: None,
694+
})
695+
},
677696
})
678697
.await;
679698

@@ -731,7 +750,11 @@ mod test {
731750
if let Some(uc) = user_conf {
732751
uc
733752
} else {
734-
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx })
753+
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
754+
worker_pool_tx,
755+
shared_metric_src: None,
756+
event_worker_metric_src: None,
757+
})
735758
}
736759
},
737760
})

crates/base/src/rt_worker/worker.rs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use event_worker::events::{
1111
use futures_util::FutureExt;
1212
use log::{debug, error};
1313
use sb_core::conn_sync::ConnSync;
14+
use sb_core::{MetricSource, RuntimeMetricSource, WorkerMetricSource};
1415
use sb_workers::context::{UserWorkerMsgs, WorkerContextInitOpts};
1516
use std::any::Any;
1617
use std::future::{pending, Future};
@@ -87,7 +88,7 @@ impl Worker {
8788
UnboundedSender<UnixStreamEntry>,
8889
UnboundedReceiver<UnixStreamEntry>,
8990
),
90-
booter_signal: Sender<Result<(), Error>>,
91+
booter_signal: Sender<Result<MetricSource, Error>>,
9192
termination_token: Option<TerminationToken>,
9293
) {
9394
let worker_name = self.worker_name.clone();
@@ -101,24 +102,48 @@ impl Worker {
101102

102103
let method_cloner = self.clone();
103104
let timing = opts.timing.take();
104-
let is_user_worker = opts.conf.is_user_worker();
105+
let worker_kind = opts.conf.to_worker_kind();
106+
let maybe_main_worker_opts = opts.conf.as_main_worker().cloned();
105107

106108
let cancel = self.cancel.clone();
107-
let rt = if is_user_worker {
109+
let rt = if worker_kind.is_user_worker() {
108110
&rt::USER_WORKER_RT
109111
} else {
110112
&rt::PRIMARY_WORKER_RT
111113
};
112114

113115
let _worker_handle = rt.spawn_pinned(move || {
114116
tokio::task::spawn_local(async move {
115-
let (maybe_cpu_usage_metrics_tx, maybe_cpu_usage_metrics_rx) = is_user_worker
117+
let (maybe_cpu_usage_metrics_tx, maybe_cpu_usage_metrics_rx) = worker_kind
118+
.is_user_worker()
116119
.then(unbounded_channel::<CPUUsageMetrics>)
117120
.unzip();
118121

119122
let result = match DenoRuntime::new(opts).await {
120123
Ok(mut new_runtime) => {
121-
let _ = booter_signal.send(Ok(()));
124+
let metric_src = {
125+
let js_runtime = &mut new_runtime.js_runtime;
126+
let metric_src = WorkerMetricSource::from_js_runtime(js_runtime);
127+
128+
if worker_kind.is_main_worker() {
129+
let opts = maybe_main_worker_opts.unwrap();
130+
let state = js_runtime.op_state();
131+
let mut state_mut = state.borrow_mut();
132+
let metric_src = RuntimeMetricSource::new(
133+
metric_src.clone(),
134+
opts.event_worker_metric_src
135+
.and_then(|it| it.into_worker().ok()),
136+
opts.shared_metric_src,
137+
);
138+
139+
state_mut.put(metric_src.clone());
140+
MetricSource::Runtime(metric_src)
141+
} else {
142+
MetricSource::Worker(metric_src)
143+
}
144+
};
145+
146+
let _ = booter_signal.send(Ok(metric_src));
122147

123148
// CPU TIMER
124149
let (termination_event_tx, termination_event_rx) =
@@ -127,7 +152,7 @@ impl Worker {
127152
let _cpu_timer;
128153

129154
// TODO: Allow customization of supervisor
130-
let termination_fut = if is_user_worker {
155+
let termination_fut = if worker_kind.is_user_worker() {
131156
// cputimer is returned from supervisor and assigned here to keep it in scope.
132157
let Ok(maybe_timer) = create_supervisor(
133158
worker_key.unwrap_or(Uuid::nil()),
@@ -209,7 +234,7 @@ impl Worker {
209234
let result = data.await;
210235

211236
if let Some(token) = termination_token.as_ref() {
212-
if !is_user_worker {
237+
if !worker_kind.is_user_worker() {
213238
let _ = termination_fut.await;
214239
}
215240

0 commit comments

Comments
 (0)