Skip to content

Commit 3476038

Browse files
committed
stamp: expose cancel token for capturing the timing when the user worker is terminated
1 parent ce4001b commit 3476038

File tree

4 files changed

+69
-18
lines changed

4 files changed

+69
-18
lines changed

crates/base/src/rt_worker/worker_ctx.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -575,8 +575,7 @@ pub async fn create_worker<Opt: Into<CreateWorkerArgs>>(
575575
maybe_request_idle_timeout: Option<u64>,
576576
) -> Result<WorkerCtx, Error> {
577577
let (duplex_stream_tx, duplex_stream_rx) = mpsc::unbounded_channel::<DuplexStreamEntry>();
578-
let (worker_boot_result_tx, worker_boot_result_rx) =
579-
oneshot::channel::<Result<MetricSource, Error>>();
578+
let (worker_boot_result_tx, worker_boot_result_rx) = oneshot::channel();
580579

581580
let CreateWorkerArgs(worker_init_opts, maybe_supervisor_policy, maybe_termination_token) =
582581
init_opts.into();

crates/base/src/rt_worker/worker_pool.rs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,11 @@ impl WorkerPool {
260260
.as_user_worker()
261261
.map_or(false, |it| !is_oneshot_policy && it.force_create);
262262

263-
if let Some(ref active_worker_uuid) = self.maybe_active_worker(&service_path, force_create)
264-
{
263+
if let Some((key, profile)) = self.maybe_active_worker(&service_path, force_create) {
265264
if tx
266265
.send(Ok(CreateUserWorkerResult {
267-
key: *active_worker_uuid,
266+
key,
267+
token: profile.cancel.clone(),
268268
}))
269269
.is_err()
270270
{
@@ -437,7 +437,7 @@ impl WorkerPool {
437437
permit: permit.map(Arc::new),
438438
status: status.clone(),
439439
exit: ctx.exit,
440-
cancel,
440+
cancel: cancel.clone(),
441441
};
442442

443443
if worker_pool_msgs_tx
@@ -446,12 +446,20 @@ impl WorkerPool {
446446
{
447447
error!("user worker msgs receiver dropped")
448448
}
449-
if tx.send(Ok(CreateUserWorkerResult { key: uuid })).is_err() {
449+
450+
if tx
451+
.send(Ok(CreateUserWorkerResult {
452+
key: uuid,
453+
token: cancel,
454+
}))
455+
.is_err()
456+
{
450457
error!("main worker receiver dropped")
451458
};
452459

453460
status.demand.fetch_add(1, Ordering::Release);
454461
}
462+
455463
Err(e) => {
456464
if tx.send(Err(e)).is_err() {
457465
error!("main worker receiver dropped")
@@ -622,7 +630,11 @@ impl WorkerPool {
622630
}
623631
}
624632

625-
fn maybe_active_worker(&mut self, service_path: &String, force_create: bool) -> Option<Uuid> {
633+
fn maybe_active_worker(
634+
&mut self,
635+
service_path: &String,
636+
force_create: bool,
637+
) -> Option<(Uuid, &UserWorkerProfile)> {
626638
if force_create {
627639
return None;
628640
}
@@ -639,18 +651,15 @@ impl WorkerPool {
639651
.map(|it| it.status.is_retired.clone())
640652
{
641653
Some(is_retired) if !is_retired.is_raised() => {
642-
self.user_workers
643-
.get(&worker_uuid)
644-
.map(|it| it.status.demand.as_ref())
645-
.unwrap()
646-
.fetch_add(1, Ordering::Release);
654+
let profile = self.user_workers.get(&worker_uuid).unwrap();
647655

648-
Some(worker_uuid)
656+
profile.status.demand.fetch_add(1, Ordering::Release);
657+
Some((worker_uuid, profile))
649658
}
650659

651660
_ => {
652661
self.retire(&worker_uuid);
653-
self.maybe_active_worker(service_path, force_create)
662+
self.maybe_active_worker(service_path, false)
654663
}
655664
}
656665
}

crates/sb_workers/context.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ pub type SendRequestResult = (Response<Body>, mpsc::UnboundedSender<()>);
221221
#[derive(Debug)]
222222
pub struct CreateUserWorkerResult {
223223
pub key: Uuid,
224+
pub token: CancellationToken,
224225
}
225226

226227
#[derive(Debug)]

crates/sb_workers/lib.rs

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ deno_core::extension!(
4343
op_user_worker_create,
4444
op_user_worker_fetch_build,
4545
op_user_worker_fetch_send,
46+
op_user_user_worker_wait_token_cancelled,
47+
op_user_worker_is_active,
4648
],
4749
esm_entry_point = "ext:sb_user_workers/user_workers.js",
4850
esm = ["user_workers.js",]
@@ -82,11 +84,11 @@ pub struct UserWorkerCreateOptions {
8284
}
8385

8486
#[op2(async)]
85-
#[string]
87+
#[serde]
8688
pub async fn op_user_worker_create(
8789
state: Rc<RefCell<OpState>>,
8890
#[serde] opts: UserWorkerCreateOptions,
89-
) -> Result<String, AnyError> {
91+
) -> Result<(String, ResourceId), AnyError> {
9092
let result_rx = {
9193
let op_state = state.borrow();
9294
let tx = op_state.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>();
@@ -182,7 +184,13 @@ pub async fn op_user_worker_create(
182184
let result = result.unwrap();
183185
match result {
184186
Err(e) => Err(custom_error("InvalidWorkerCreation", e.to_string())),
185-
Ok(res) => Ok(res.key.to_string()),
187+
Ok(CreateUserWorkerResult { key, token }) => Ok((
188+
key.to_string(),
189+
state
190+
.borrow_mut()
191+
.resource_table
192+
.add(UserWorkerCancellationToken(token.clone())),
193+
)),
186194
}
187195
}
188196

@@ -559,3 +567,37 @@ impl Stream for BodyStream {
559567
self.0.poll_recv(cx)
560568
}
561569
}
570+
571+
struct UserWorkerCancellationToken(CancellationToken);
572+
573+
impl Resource for UserWorkerCancellationToken {
574+
fn name(&self) -> std::borrow::Cow<str> {
575+
std::any::type_name::<Self>().into()
576+
}
577+
}
578+
579+
#[op2(async)]
580+
#[serde]
581+
pub async fn op_user_user_worker_wait_token_cancelled(
582+
state: Rc<RefCell<OpState>>,
583+
#[smi] rid: ResourceId,
584+
) -> Result<(), AnyError> {
585+
let token = state
586+
.borrow()
587+
.resource_table
588+
.get::<UserWorkerCancellationToken>(rid)?
589+
.0
590+
.clone();
591+
592+
token.cancelled().await;
593+
Ok(())
594+
}
595+
596+
#[op2(fast)]
597+
pub fn op_user_worker_is_active(state: &mut OpState, #[smi] rid: ResourceId) -> bool {
598+
state
599+
.resource_table
600+
.get::<UserWorkerCancellationToken>(rid)
601+
.map(|it| !it.0.is_cancelled())
602+
.unwrap_or_default()
603+
}

0 commit comments

Comments
 (0)