Skip to content

Commit 9b5ce51

Browse files
committed
stamp: expose cancel token for capturing the timing when the user worker is terminated
1 parent 826a84a commit 9b5ce51

File tree

4 files changed

+68
-18
lines changed

4 files changed

+68
-18
lines changed

crates/base/src/rt_worker/worker_ctx.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -580,8 +580,7 @@ pub async fn create_worker<Opt: Into<CreateWorkerArgs>>(
580580
maybe_request_idle_timeout: Option<u64>,
581581
) -> Result<WorkerCtx, Error> {
582582
let (duplex_stream_tx, duplex_stream_rx) = mpsc::unbounded_channel::<DuplexStreamEntry>();
583-
let (worker_boot_result_tx, worker_boot_result_rx) =
584-
oneshot::channel::<Result<MetricSource, Error>>();
583+
let (worker_boot_result_tx, worker_boot_result_rx) = oneshot::channel();
585584

586585
let CreateWorkerArgs(worker_init_opts, maybe_supervisor_policy, maybe_termination_token) =
587586
init_opts.into();

crates/base/src/rt_worker/worker_pool.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,11 @@ impl WorkerPool {
260260
.as_user_worker()
261261
.map_or(false, |it| !is_oneshot_policy && it.force_create);
262262

263-
if let Some(ref active_worker_uuid) = self.maybe_active_worker(&service_path, force_create)
264-
{
263+
if let Some((key, profile)) = self.maybe_active_worker(&service_path, force_create) {
265264
if tx
266265
.send(Ok(CreateUserWorkerResult {
267-
key: *active_worker_uuid,
266+
key,
267+
token: profile.cancel.clone(),
268268
}))
269269
.is_err()
270270
{
@@ -437,7 +437,7 @@ impl WorkerPool {
437437
permit: permit.map(Arc::new),
438438
status: status.clone(),
439439
exit: ctx.exit,
440-
cancel,
440+
cancel: cancel.clone(),
441441
};
442442

443443
if worker_pool_msgs_tx
@@ -446,7 +446,14 @@ impl WorkerPool {
446446
{
447447
error!("user worker msgs receiver dropped")
448448
}
449-
if tx.send(Ok(CreateUserWorkerResult { key: uuid })).is_err() {
449+
450+
if tx
451+
.send(Ok(CreateUserWorkerResult {
452+
key: uuid,
453+
token: cancel,
454+
}))
455+
.is_err()
456+
{
450457
error!("main worker receiver dropped")
451458
};
452459

@@ -621,7 +628,11 @@ impl WorkerPool {
621628
}
622629
}
623630

624-
fn maybe_active_worker(&mut self, service_path: &String, force_create: bool) -> Option<Uuid> {
631+
fn maybe_active_worker(
632+
&mut self,
633+
service_path: &String,
634+
force_create: bool,
635+
) -> Option<(Uuid, &UserWorkerProfile)> {
625636
if force_create {
626637
return None;
627638
}
@@ -638,18 +649,15 @@ impl WorkerPool {
638649
.map(|it| it.status.is_retired.clone())
639650
{
640651
Some(is_retired) if !is_retired.is_raised() => {
641-
self.user_workers
642-
.get(&worker_uuid)
643-
.map(|it| it.status.demand.as_ref())
644-
.unwrap()
645-
.fetch_add(1, Ordering::Release);
652+
let profile = self.user_workers.get(&worker_uuid).unwrap();
646653

647-
Some(worker_uuid)
654+
profile.status.demand.fetch_add(1, Ordering::Release);
655+
Some((worker_uuid, profile))
648656
}
649657

650658
_ => {
651659
self.retire(&worker_uuid);
652-
self.maybe_active_worker(service_path, force_create)
660+
self.maybe_active_worker(service_path, false)
653661
}
654662
}
655663
}

crates/sb_workers/context.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ pub type SendRequestResult = (Response<Body>, mpsc::UnboundedSender<()>);
223223
#[derive(Debug)]
224224
pub struct CreateUserWorkerResult {
225225
pub key: Uuid,
226+
pub token: CancellationToken,
226227
}
227228

228229
#[derive(Debug)]

crates/sb_workers/lib.rs

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ deno_core::extension!(
4343
op_user_worker_create,
4444
op_user_worker_fetch_build,
4545
op_user_worker_fetch_send,
46+
op_user_user_worker_wait_token_cancelled,
47+
op_user_worker_is_active,
4648
],
4749
esm_entry_point = "ext:sb_user_workers/user_workers.js",
4850
esm = ["user_workers.js",]
@@ -83,11 +85,11 @@ pub struct UserWorkerCreateOptions {
8385
}
8486

8587
#[op2(async)]
86-
#[string]
88+
#[serde]
8789
pub async fn op_user_worker_create(
8890
state: Rc<RefCell<OpState>>,
8991
#[serde] opts: UserWorkerCreateOptions,
90-
) -> Result<String, AnyError> {
92+
) -> Result<(String, ResourceId), AnyError> {
9193
let result_rx = {
9294
let op_state = state.borrow();
9395
let tx = op_state.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>();
@@ -185,7 +187,13 @@ pub async fn op_user_worker_create(
185187
let result = result.unwrap();
186188
match result {
187189
Err(e) => Err(custom_error("InvalidWorkerCreation", format!("{e:#}"))),
188-
Ok(res) => Ok(res.key.to_string()),
190+
Ok(CreateUserWorkerResult { key, token }) => Ok((
191+
key.to_string(),
192+
state
193+
.borrow_mut()
194+
.resource_table
195+
.add(UserWorkerCancellationToken(token.clone())),
196+
)),
189197
}
190198
}
191199

@@ -562,3 +570,37 @@ impl Stream for BodyStream {
562570
self.0.poll_recv(cx)
563571
}
564572
}
573+
574+
struct UserWorkerCancellationToken(CancellationToken);
575+
576+
impl Resource for UserWorkerCancellationToken {
577+
fn name(&self) -> std::borrow::Cow<str> {
578+
std::any::type_name::<Self>().into()
579+
}
580+
}
581+
582+
#[op2(async)]
583+
#[serde]
584+
pub async fn op_user_user_worker_wait_token_cancelled(
585+
state: Rc<RefCell<OpState>>,
586+
#[smi] rid: ResourceId,
587+
) -> Result<(), AnyError> {
588+
let token = state
589+
.borrow()
590+
.resource_table
591+
.get::<UserWorkerCancellationToken>(rid)?
592+
.0
593+
.clone();
594+
595+
token.cancelled().await;
596+
Ok(())
597+
}
598+
599+
#[op2(fast)]
600+
pub fn op_user_worker_is_active(state: &mut OpState, #[smi] rid: ResourceId) -> bool {
601+
state
602+
.resource_table
603+
.get::<UserWorkerCancellationToken>(rid)
604+
.map(|it| !it.0.is_cancelled())
605+
.unwrap_or_default()
606+
}

0 commit comments

Comments
 (0)