Skip to content

Commit ee38bb8

Browse files
committed
stamp: initial impl for worker heap metrics
1 parent dcaaf35 commit ee38bb8

File tree

11 files changed

+315
-57
lines changed

11 files changed

+315
-57
lines changed

crates/base/src/deno_runtime.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,12 @@ mod test {
573573
maybe_module_code: Some(FastString::from(String::from(
574574
"Deno.serve((req) => new Response('Hello World'));",
575575
))),
576-
conf: { WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx }) },
576+
conf: {
577+
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
578+
worker_pool_tx,
579+
event_worker_metric_src: None,
580+
})
581+
},
577582
})
578583
.await
579584
.expect("It should not panic");
@@ -612,7 +617,12 @@ mod test {
612617
maybe_eszip: Some(EszipPayloadKind::VecKind(eszip_code)),
613618
maybe_entrypoint: None,
614619
maybe_module_code: None,
615-
conf: { WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx }) },
620+
conf: {
621+
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
622+
worker_pool_tx,
623+
event_worker_metric_src: None,
624+
})
625+
},
616626
})
617627
.await;
618628

@@ -673,7 +683,12 @@ mod test {
673683
maybe_eszip: Some(EszipPayloadKind::VecKind(eszip_code)),
674684
maybe_entrypoint: None,
675685
maybe_module_code: None,
676-
conf: { WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx }) },
686+
conf: {
687+
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
688+
worker_pool_tx,
689+
event_worker_metric_src: None,
690+
})
691+
},
677692
})
678693
.await;
679694

@@ -731,7 +746,10 @@ mod test {
731746
if let Some(uc) = user_conf {
732747
uc
733748
} else {
734-
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx })
749+
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
750+
worker_pool_tx,
751+
event_worker_metric_src: None,
752+
})
735753
}
736754
},
737755
})

crates/base/src/rt_worker/worker.rs

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use event_worker::events::{
1111
use futures_util::FutureExt;
1212
use log::{debug, error};
1313
use sb_core::conn_sync::ConnSync;
14+
use sb_core::{RuntimeMetricSource, WorkerMetricSource};
1415
use sb_workers::context::{UserWorkerMsgs, WorkerContextInitOpts};
1516
use std::any::Any;
1617
use std::future::{pending, Future};
@@ -87,7 +88,7 @@ impl Worker {
8788
UnboundedSender<UnixStreamEntry>,
8889
UnboundedReceiver<UnixStreamEntry>,
8990
),
90-
booter_signal: Sender<Result<(), Error>>,
91+
booter_signal: Sender<Result<WorkerMetricSource, Error>>,
9192
termination_token: Option<TerminationToken>,
9293
) {
9394
let worker_name = self.worker_name.clone();
@@ -101,24 +102,47 @@ impl Worker {
101102

102103
let method_cloner = self.clone();
103104
let timing = opts.timing.take();
104-
let is_user_worker = opts.conf.is_user_worker();
105+
let worker_kind = opts.conf.to_worker_kind();
106+
let maybe_event_worker_metric_src = opts
107+
.conf
108+
.as_main_worker()
109+
.as_ref()
110+
.and_then(|it| it.event_worker_metric_src.clone());
105111

106112
let cancel = self.cancel.clone();
107-
let rt = if is_user_worker {
113+
let rt = if worker_kind.is_user_worker() {
108114
&rt::USER_WORKER_RT
109115
} else {
110116
&rt::PRIMARY_WORKER_RT
111117
};
112118

113119
let _worker_handle = rt.spawn_pinned(move || {
114120
tokio::task::spawn_local(async move {
115-
let (maybe_cpu_usage_metrics_tx, maybe_cpu_usage_metrics_rx) = is_user_worker
121+
let (maybe_cpu_usage_metrics_tx, maybe_cpu_usage_metrics_rx) = worker_kind
122+
.is_user_worker()
116123
.then(unbounded_channel::<CPUUsageMetrics>)
117124
.unzip();
118125

119126
let result = match DenoRuntime::new(opts).await {
120127
Ok(mut new_runtime) => {
121-
let _ = booter_signal.send(Ok(()));
128+
let metric = {
129+
let js_runtime = &mut new_runtime.js_runtime;
130+
let metric = WorkerMetricSource::from_js_runtime(js_runtime);
131+
132+
if worker_kind.is_main_worker() {
133+
let state = js_runtime.op_state();
134+
let mut state_mut = state.borrow_mut();
135+
136+
state_mut.put(RuntimeMetricSource::new(
137+
metric.clone(),
138+
maybe_event_worker_metric_src,
139+
));
140+
}
141+
142+
metric
143+
};
144+
145+
let _ = booter_signal.send(Ok(metric));
122146

123147
// CPU TIMER
124148
let (termination_event_tx, termination_event_rx) =
@@ -127,7 +151,7 @@ impl Worker {
127151
let _cpu_timer;
128152

129153
// TODO: Allow customization of supervisor
130-
let termination_fut = if is_user_worker {
154+
let termination_fut = if worker_kind.is_user_worker() {
131155
// cputimer is returned from supervisor and assigned here to keep it in scope.
132156
let Ok(maybe_timer) = create_supervisor(
133157
worker_key.unwrap_or(Uuid::nil()),
@@ -209,7 +233,7 @@ impl Worker {
209233
let result = data.await;
210234

211235
if let Some(token) = termination_token.as_ref() {
212-
if !is_user_worker {
236+
if !worker_kind.is_user_worker() {
213237
let _ = termination_fut.await;
214238
}
215239

crates/base/src/rt_worker/worker_ctx.rs

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use event_worker::events::{
1212
use hyper::{Body, Request, Response};
1313
use log::{debug, error};
1414
use sb_core::conn_sync::ConnSync;
15+
use sb_core::WorkerMetricSource;
1516
use sb_graph::EszipPayloadKind;
1617
use sb_workers::context::{
1718
EventWorkerRuntimeOpts, MainWorkerRuntimeOpts, Timing, UserWorkerMsgs, WorkerContextInitOpts,
@@ -309,9 +310,10 @@ impl CreateWorkerArgs {
309310

310311
pub async fn create_worker<Opt: Into<CreateWorkerArgs>>(
311312
init_opts: Opt,
312-
) -> Result<mpsc::UnboundedSender<WorkerRequestMsg>, Error> {
313-
let (worker_boot_result_tx, worker_boot_result_rx) = oneshot::channel::<Result<(), Error>>();
313+
) -> Result<(WorkerMetricSource, mpsc::UnboundedSender<WorkerRequestMsg>), Error> {
314314
let (unix_stream_tx, unix_stream_rx) = mpsc::unbounded_channel::<UnixStreamEntry>();
315+
let (worker_boot_result_tx, worker_boot_result_rx) =
316+
oneshot::channel::<Result<WorkerMetricSource, Error>>();
315317

316318
let CreateWorkerArgs(init_opts, maybe_supervisor_policy, maybe_termination_token) =
317319
init_opts.into();
@@ -370,7 +372,7 @@ pub async fn create_worker<Opt: Into<CreateWorkerArgs>>(
370372

371373
bail!(err)
372374
}
373-
Ok(_) => {
375+
Ok(metric) => {
374376
let elapsed = worker_struct_ref
375377
.worker_boot_start_time
376378
.elapsed()
@@ -384,7 +386,7 @@ pub async fn create_worker<Opt: Into<CreateWorkerArgs>>(
384386
worker_struct_ref.event_metadata.clone(),
385387
);
386388

387-
Ok(worker_req_tx)
389+
Ok((metric, worker_req_tx))
388390
}
389391
}
390392
} else {
@@ -422,7 +424,7 @@ pub async fn create_main_worker(
422424
main_worker_path: PathBuf,
423425
import_map_path: Option<String>,
424426
no_module_cache: bool,
425-
user_worker_msgs_tx: mpsc::UnboundedSender<UserWorkerMsgs>,
427+
runtime_opts: MainWorkerRuntimeOpts,
426428
maybe_entrypoint: Option<String>,
427429
termination_token: Option<TerminationToken>,
428430
) -> Result<mpsc::UnboundedSender<WorkerRequestMsg>, Error> {
@@ -435,7 +437,7 @@ pub async fn create_main_worker(
435437
}
436438
}
437439

438-
let main_worker_req_tx = create_worker((
440+
let (_, sender) = create_worker((
439441
WorkerContextInitOpts {
440442
service_path,
441443
import_map_path,
@@ -445,17 +447,15 @@ pub async fn create_main_worker(
445447
maybe_eszip,
446448
maybe_entrypoint,
447449
maybe_module_code: None,
448-
conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
449-
worker_pool_tx: user_worker_msgs_tx,
450-
}),
450+
conf: WorkerRuntimeOpts::MainWorker(runtime_opts),
451451
env_vars: std::env::vars().collect(),
452452
},
453453
termination_token,
454454
))
455455
.await
456456
.map_err(|err| anyhow!("main worker boot error: {}", err))?;
457457

458-
Ok(main_worker_req_tx)
458+
Ok(sender)
459459
}
460460

461461
pub async fn create_events_worker(
@@ -464,7 +464,13 @@ pub async fn create_events_worker(
464464
no_module_cache: bool,
465465
maybe_entrypoint: Option<String>,
466466
termination_token: Option<TerminationToken>,
467-
) -> Result<mpsc::UnboundedSender<WorkerEventWithMetadata>, Error> {
467+
) -> Result<
468+
(
469+
WorkerMetricSource,
470+
mpsc::UnboundedSender<WorkerEventWithMetadata>,
471+
),
472+
Error,
473+
> {
468474
let (events_tx, events_rx) = mpsc::unbounded_channel::<WorkerEventWithMetadata>();
469475

470476
let mut service_path = events_worker_path.clone();
@@ -478,7 +484,7 @@ pub async fn create_events_worker(
478484
}
479485
}
480486

481-
let _ = create_worker((
487+
let (metric, _) = create_worker((
482488
WorkerContextInitOpts {
483489
service_path,
484490
no_module_cache,
@@ -496,7 +502,7 @@ pub async fn create_events_worker(
496502
.await
497503
.map_err(|err| anyhow!("events worker boot error: {}", err))?;
498504

499-
Ok(events_tx)
505+
Ok((metric, events_tx))
500506
}
501507

502508
pub async fn create_user_worker_pool(

crates/base/src/rt_worker/worker_pool.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ impl WorkerPool {
404404
match create_worker((worker_options, supervisor_policy, termination_token.clone()))
405405
.await
406406
{
407-
Ok(worker_request_msg_tx) => {
407+
Ok((_, worker_request_msg_tx)) => {
408408
let profile = UserWorkerProfile {
409409
worker_request_msg_tx,
410410
timing_tx_pair: (req_start_timing_tx, req_end_timing_tx),

crates/base/src/server.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use futures_util::Stream;
88
use hyper::{server::conn::Http, service::Service, Body, Request, Response};
99
use log::{debug, error, info};
1010
use sb_core::conn_sync::ConnSync;
11-
use sb_workers::context::WorkerRequestMsg;
11+
use sb_workers::context::{MainWorkerRuntimeOpts, WorkerRequestMsg};
1212
use std::future::Future;
1313
use std::net::IpAddr;
1414
use std::net::Ipv4Addr;
@@ -180,17 +180,17 @@ impl Server {
180180
entrypoints: WorkerEntrypoints,
181181
termination_token: Option<TerminationToken>,
182182
) -> Result<Self, Error> {
183-
let mut worker_events_sender: Option<mpsc::UnboundedSender<WorkerEventWithMetadata>> = None;
183+
let mut worker_events_tx: Option<mpsc::UnboundedSender<WorkerEventWithMetadata>> = None;
184184
let maybe_events_entrypoint = entrypoints.events;
185185
let maybe_main_entrypoint = entrypoints.main;
186186
let termination_token = termination_token.unwrap_or_default();
187187

188188
// Create Event Worker
189-
if let Some(events_service_path) = maybe_events_service_path {
189+
let event_worker_metric_src = if let Some(events_service_path) = maybe_events_service_path {
190190
let events_path = Path::new(&events_service_path);
191191
let events_path_buf = events_path.to_path_buf();
192192

193-
let events_worker = create_events_worker(
193+
let (metric, sender) = create_events_worker(
194194
events_path_buf,
195195
import_map_path.clone(),
196196
no_module_cache,
@@ -199,13 +199,16 @@ impl Server {
199199
)
200200
.await?;
201201

202-
worker_events_sender = Some(events_worker);
203-
}
202+
worker_events_tx = Some(sender);
203+
Some(metric)
204+
} else {
205+
None
206+
};
204207

205208
// Create a user worker pool
206-
let user_worker_msgs_tx = create_user_worker_pool(
209+
let worker_pool_tx = create_user_worker_pool(
207210
maybe_user_worker_policy.unwrap_or_default(),
208-
worker_events_sender,
211+
worker_events_tx,
209212
None,
210213
)
211214
.await?;
@@ -216,7 +219,10 @@ impl Server {
216219
main_worker_path,
217220
import_map_path.clone(),
218221
no_module_cache,
219-
user_worker_msgs_tx,
222+
MainWorkerRuntimeOpts {
223+
worker_pool_tx,
224+
event_worker_metric_src,
225+
},
220226
maybe_main_entrypoint,
221227
Some(termination_token.child_token()),
222228
)

crates/base/src/utils/integration_test_helper.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ impl TestBedBuilder {
192192
}
193193

194194
pub async fn build(self) -> TestBed {
195-
let (worker_pool_msg_tx, pool_termination_token) = {
195+
let (worker_pool_tx, pool_termination_token) = {
196196
let token = TerminationToken::new();
197197
(
198198
create_user_worker_pool(
@@ -218,12 +218,13 @@ impl TestBedBuilder {
218218
maybe_entrypoint: None,
219219
maybe_module_code: None,
220220
conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
221-
worker_pool_tx: worker_pool_msg_tx,
221+
worker_pool_tx,
222+
event_worker_metric_src: None,
222223
}),
223224
};
224225

225226
let main_termination_token = TerminationToken::new();
226-
let main_worker_msg_tx =
227+
let (_, main_worker_msg_tx) =
227228
create_worker((main_worker_init_opts, main_termination_token.clone()))
228229
.await
229230
.unwrap();
@@ -293,20 +294,24 @@ pub async fn create_test_user_worker<Opt: Into<CreateTestUserWorkerArgs>>(
293294
..Default::default()
294295
});
295296

296-
Ok((
297-
create_worker(
297+
Ok({
298+
let (_, sender) = create_worker(
298299
opts.with_policy(policy)
299300
.with_termination_token(termination_token.clone()),
300301
)
301-
.await?,
302-
RequestScope {
303-
policy,
304-
req_start_tx,
305-
req_end_tx,
306-
termination_token,
307-
conn: (Some(conn_tx), conn_rx),
308-
},
309-
))
302+
.await?;
303+
304+
(
305+
sender,
306+
RequestScope {
307+
policy,
308+
req_start_tx,
309+
req_end_tx,
310+
termination_token,
311+
conn: (Some(conn_tx), conn_rx),
312+
},
313+
)
314+
})
310315
}
311316

312317
pub fn test_user_worker_pool_policy() -> WorkerPoolPolicy {

0 commit comments

Comments
 (0)