Skip to content

Commit 9678c21

Browse files
committed
stamp: expose cancel token for capturing the timing when the user worker is terminated
1 parent f9aac00 commit 9678c21

File tree

4 files changed

+76
-25
lines changed

4 files changed

+76
-25
lines changed

crates/base/src/rt_worker/worker_ctx.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -599,8 +599,7 @@ pub async fn create_worker<Opt: Into<CreateWorkerArgs>>(
599599
inspector: Option<Inspector>,
600600
) -> Result<WorkerCtx, Error> {
601601
let (duplex_stream_tx, duplex_stream_rx) = mpsc::unbounded_channel::<DuplexStreamEntry>();
602-
let (worker_boot_result_tx, worker_boot_result_rx) =
603-
oneshot::channel::<Result<MetricSource, Error>>();
602+
let (worker_boot_result_tx, worker_boot_result_rx) = oneshot::channel();
604603

605604
let CreateWorkerArgs(worker_init_opts, maybe_supervisor_policy, maybe_termination_token) =
606605
init_opts.into();

crates/base/src/rt_worker/worker_pool.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -258,11 +258,11 @@ impl WorkerPool {
258258
.as_user_worker()
259259
.map_or(false, |it| !is_oneshot_policy && it.force_create);
260260

261-
if let Some(ref active_worker_uuid) = self.maybe_active_worker(&service_path, force_create)
262-
{
261+
if let Some((key, profile)) = self.maybe_active_worker(&service_path, force_create) {
263262
if tx
264263
.send(Ok(CreateUserWorkerResult {
265-
key: *active_worker_uuid,
264+
key,
265+
token: profile.cancel.clone(),
266266
}))
267267
.is_err()
268268
{
@@ -440,7 +440,7 @@ impl WorkerPool {
440440
permit: permit.map(Arc::new),
441441
status: status.clone(),
442442
exit: ctx.exit,
443-
cancel,
443+
cancel: cancel.clone(),
444444
};
445445

446446
if worker_pool_msgs_tx
@@ -449,7 +449,14 @@ impl WorkerPool {
449449
{
450450
error!("user worker msgs receiver dropped")
451451
}
452-
if tx.send(Ok(CreateUserWorkerResult { key: uuid })).is_err() {
452+
453+
if tx
454+
.send(Ok(CreateUserWorkerResult {
455+
key: uuid,
456+
token: cancel,
457+
}))
458+
.is_err()
459+
{
453460
error!("main worker receiver dropped")
454461
};
455462

@@ -624,7 +631,11 @@ impl WorkerPool {
624631
}
625632
}
626633

627-
fn maybe_active_worker(&mut self, service_path: &String, force_create: bool) -> Option<Uuid> {
634+
fn maybe_active_worker(
635+
&mut self,
636+
service_path: &String,
637+
force_create: bool,
638+
) -> Option<(Uuid, &UserWorkerProfile)> {
628639
if force_create {
629640
return None;
630641
}
@@ -641,18 +652,15 @@ impl WorkerPool {
641652
.map(|it| it.status.is_retired.clone())
642653
{
643654
Some(is_retired) if !is_retired.is_raised() => {
644-
self.user_workers
645-
.get(&worker_uuid)
646-
.map(|it| it.status.demand.as_ref())
647-
.unwrap()
648-
.fetch_add(1, Ordering::Release);
655+
let profile = self.user_workers.get(&worker_uuid).unwrap();
649656

650-
Some(worker_uuid)
657+
profile.status.demand.fetch_add(1, Ordering::Release);
658+
Some((worker_uuid, profile))
651659
}
652660

653661
_ => {
654662
self.retire(&worker_uuid);
655-
self.maybe_active_worker(service_path, force_create)
663+
self.maybe_active_worker(service_path, false)
656664
}
657665
}
658666
}

crates/sb_workers/context.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ pub type SendRequestResult = (Response<Body>, mpsc::UnboundedSender<()>);
245245
#[derive(Debug)]
246246
pub struct CreateUserWorkerResult {
247247
pub key: Uuid,
248+
pub token: CancellationToken,
248249
}
249250

250251
#[derive(Debug)]

crates/sb_workers/lib.rs

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ deno_core::extension!(
4545
op_user_worker_create,
4646
op_user_worker_fetch_build,
4747
op_user_worker_fetch_send,
48+
op_user_user_worker_wait_token_cancelled,
49+
op_user_worker_is_active,
4850
],
4951
esm_entry_point = "ext:sb_user_workers/user_workers.js",
5052
esm = ["user_workers.js",]
@@ -94,16 +96,13 @@ pub struct UserWorkerCreateOptions {
9496
}
9597

9698
#[op2(async)]
97-
#[string]
99+
#[serde]
98100
pub async fn op_user_worker_create(
99101
state: Rc<RefCell<OpState>>,
100102
#[serde] opts: UserWorkerCreateOptions,
101-
) -> Result<String, AnyError> {
103+
) -> Result<(String, ResourceId), AnyError> {
102104
let result_rx = {
103-
let op_state = state.borrow();
104-
let tx = op_state.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>();
105-
let (result_tx, result_rx) = oneshot::channel::<Result<CreateUserWorkerResult, Error>>();
106-
105+
let (tx, rx) = oneshot::channel::<Result<CreateUserWorkerResult, Error>>();
107106
let UserWorkerCreateOptions {
108107
service_path,
109108
env_vars,
@@ -183,7 +182,7 @@ pub async fn op_user_worker_create(
183182
deno_core::resolve_url_or_path(
184183
// FIXME: The type alias does not have a unique
185184
// type id and should not be used here.
186-
op_state.borrow::<ModuleSpecifier>().as_str(),
185+
state.borrow().borrow::<ModuleSpecifier>().as_str(),
187186
std::env::current_dir()?.as_path(),
188187
)?
189188
},
@@ -195,8 +194,12 @@ pub async fn op_user_worker_create(
195194
maybe_tmp_fs_config,
196195
};
197196

198-
tx.send(UserWorkerMsgs::Create(user_worker_options, result_tx))?;
199-
result_rx
197+
state
198+
.borrow()
199+
.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>()
200+
.send(UserWorkerMsgs::Create(user_worker_options, tx))?;
201+
202+
rx
200203
};
201204

202205
match result_rx.await {
@@ -209,7 +212,13 @@ pub async fn op_user_worker_create(
209212
)),
210213

211214
Ok(Err(err)) => Err(custom_error("InvalidWorkerCreation", format!("{err:#}"))),
212-
Ok(Ok(v)) => Ok(v.key.to_string()),
215+
Ok(Ok(CreateUserWorkerResult { key, token })) => Ok((
216+
key.to_string(),
217+
state
218+
.borrow_mut()
219+
.resource_table
220+
.add(UserWorkerCancellationToken(token.clone())),
221+
)),
213222
}
214223
}
215224

@@ -586,3 +595,37 @@ impl Stream for BodyStream {
586595
self.0.poll_recv(cx)
587596
}
588597
}
598+
599+
struct UserWorkerCancellationToken(CancellationToken);
600+
601+
impl Resource for UserWorkerCancellationToken {
602+
fn name(&self) -> std::borrow::Cow<str> {
603+
std::any::type_name::<Self>().into()
604+
}
605+
}
606+
607+
#[op2(async)]
608+
#[serde]
609+
pub async fn op_user_user_worker_wait_token_cancelled(
610+
state: Rc<RefCell<OpState>>,
611+
#[smi] rid: ResourceId,
612+
) -> Result<(), AnyError> {
613+
let token = state
614+
.borrow()
615+
.resource_table
616+
.get::<UserWorkerCancellationToken>(rid)?
617+
.0
618+
.clone();
619+
620+
token.cancelled().await;
621+
Ok(())
622+
}
623+
624+
#[op2(fast)]
625+
pub fn op_user_worker_is_active(state: &mut OpState, #[smi] rid: ResourceId) -> bool {
626+
state
627+
.resource_table
628+
.get::<UserWorkerCancellationToken>(rid)
629+
.map(|it| !it.0.is_cancelled())
630+
.unwrap_or_default()
631+
}

0 commit comments

Comments
 (0)