Skip to content

Commit b8de2b0

Browse files
committed
stamp: expose cancel token for capturing the timing when the user worker is terminated
1 parent a255d4a commit b8de2b0

File tree

4 files changed

+73
-19
lines changed

4 files changed

+73
-19
lines changed

crates/base/src/worker/pool.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -263,11 +263,13 @@ impl WorkerPool {
263263
.as_user_worker()
264264
.map_or(false, |it| !is_oneshot_policy && it.force_create);
265265

266-
if let Some(ref active_worker_uuid) = self.maybe_active_worker(&service_path, force_create)
266+
if let Some((ref active_worker_uuid, profile)) =
267+
self.maybe_active_worker(&service_path, force_create)
267268
{
268269
if tx
269270
.send(Ok(CreateUserWorkerResult {
270271
key: *active_worker_uuid,
272+
token: profile.cancel.clone(),
271273
}))
272274
.is_err()
273275
{
@@ -448,7 +450,7 @@ impl WorkerPool {
448450
permit: permit.map(Arc::new),
449451
status: status.clone(),
450452
exit: surface.exit,
451-
cancel,
453+
cancel: cancel.clone(),
452454
};
453455

454456
if worker_pool_msgs_tx
@@ -457,7 +459,13 @@ impl WorkerPool {
457459
{
458460
error!("user worker msgs receiver dropped")
459461
}
460-
if tx.send(Ok(CreateUserWorkerResult { key: uuid })).is_err() {
462+
if tx
463+
.send(Ok(CreateUserWorkerResult {
464+
key: uuid,
465+
token: cancel,
466+
}))
467+
.is_err()
468+
{
461469
error!("main worker receiver dropped")
462470
};
463471
}
@@ -632,7 +640,11 @@ impl WorkerPool {
632640
}
633641
}
634642

635-
fn maybe_active_worker(&mut self, service_path: &String, force_create: bool) -> Option<Uuid> {
643+
fn maybe_active_worker(
644+
&mut self,
645+
service_path: &String,
646+
force_create: bool,
647+
) -> Option<(Uuid, &UserWorkerProfile)> {
636648
if force_create {
637649
return None;
638650
}
@@ -648,11 +660,13 @@ impl WorkerPool {
648660
.get(&worker_uuid)
649661
.map(|it| it.status.is_retired.clone())
650662
{
651-
Some(is_retired) if !is_retired.is_raised() => Some(worker_uuid),
663+
Some(is_retired) if !is_retired.is_raised() => {
664+
Some((worker_uuid, self.user_workers.get(&worker_uuid).unwrap()))
665+
}
652666

653667
_ => {
654668
self.retire(&worker_uuid);
655-
self.maybe_active_worker(service_path, force_create)
669+
self.maybe_active_worker(service_path, false)
656670
}
657671
}
658672
}

crates/base/src/worker/worker_surface_creation.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use sb_workers::context::{
1414
WorkerExit, WorkerRequestMsg, WorkerRuntimeOpts,
1515
};
1616
use tokio::sync::{mpsc, oneshot};
17-
use tokio_util::sync::CancellationToken;
1817

1918
use crate::{inspector_server::Inspector, server::ServerFlags};
2019

@@ -329,8 +328,7 @@ impl WorkerSurfaceBuilder {
329328
worker_builder_hook,
330329
} = self;
331330

332-
let (worker_boot_result_tx, worker_boot_result_rx) =
333-
oneshot::channel::<Result<(MetricSource, CancellationToken), anyhow::Error>>();
331+
let (worker_boot_result_tx, worker_boot_result_rx) = oneshot::channel();
334332

335333
let flags = flags.unwrap_or_default();
336334
let init_opts = init_opts.context("init_opts must be specified")?;

ext/workers/context.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ pub type SendRequestResult = (Response<Body>, mpsc::UnboundedSender<()>);
245245
#[derive(Debug)]
246246
pub struct CreateUserWorkerResult {
247247
pub key: Uuid,
248+
pub token: CancellationToken,
248249
}
249250

250251
#[derive(Debug)]

ext/workers/lib.rs

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,13 @@ pub struct UserWorkerCreateOptions {
9494
}
9595

9696
#[op2(async)]
97-
#[string]
97+
#[serde]
9898
pub async fn op_user_worker_create(
9999
state: Rc<RefCell<OpState>>,
100100
#[serde] opts: UserWorkerCreateOptions,
101-
) -> Result<String, AnyError> {
101+
) -> Result<(String, ResourceId), AnyError> {
102102
let result_rx = {
103-
let op_state = state.borrow();
104-
let tx = op_state.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>();
105-
let (result_tx, result_rx) = oneshot::channel::<Result<CreateUserWorkerResult, Error>>();
106-
103+
let (tx, rx) = oneshot::channel::<Result<CreateUserWorkerResult, Error>>();
107104
let UserWorkerCreateOptions {
108105
service_path,
109106
env_vars,
@@ -183,7 +180,7 @@ pub async fn op_user_worker_create(
183180
deno_core::resolve_url_or_path(
184181
// FIXME: The type alias does not have a unique
185182
// type id and should not be used here.
186-
op_state.borrow::<ModuleSpecifier>().as_str(),
183+
state.borrow().borrow::<ModuleSpecifier>().as_str(),
187184
std::env::current_dir()?.as_path(),
188185
)?
189186
},
@@ -195,8 +192,12 @@ pub async fn op_user_worker_create(
195192
maybe_tmp_fs_config,
196193
};
197194

198-
tx.send(UserWorkerMsgs::Create(user_worker_options, result_tx))?;
199-
result_rx
195+
state
196+
.borrow()
197+
.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>()
198+
.send(UserWorkerMsgs::Create(user_worker_options, tx))?;
199+
200+
rx
200201
};
201202

202203
match result_rx.await {
@@ -209,7 +210,13 @@ pub async fn op_user_worker_create(
209210
)),
210211

211212
Ok(Err(err)) => Err(custom_error("InvalidWorkerCreation", format!("{err:#}"))),
212-
Ok(Ok(v)) => Ok(v.key.to_string()),
213+
Ok(Ok(CreateUserWorkerResult { key, token })) => Ok((
214+
key.to_string(),
215+
state
216+
.borrow_mut()
217+
.resource_table
218+
.add(UserWorkerCancellationToken(token.clone())),
219+
)),
213220
}
214221
}
215222

@@ -586,3 +593,37 @@ impl Stream for BodyStream {
586593
self.0.poll_recv(cx)
587594
}
588595
}
596+
597+
struct UserWorkerCancellationToken(CancellationToken);
598+
599+
impl Resource for UserWorkerCancellationToken {
600+
fn name(&self) -> std::borrow::Cow<str> {
601+
std::any::type_name::<Self>().into()
602+
}
603+
}
604+
605+
#[op2(async)]
606+
#[serde]
607+
pub async fn op_user_user_worker_wait_token_cancelled(
608+
state: Rc<RefCell<OpState>>,
609+
#[smi] rid: ResourceId,
610+
) -> Result<(), AnyError> {
611+
let token = state
612+
.borrow()
613+
.resource_table
614+
.get::<UserWorkerCancellationToken>(rid)?
615+
.0
616+
.clone();
617+
618+
token.cancelled().await;
619+
Ok(())
620+
}
621+
622+
#[op2(fast)]
623+
pub fn op_user_worker_is_active(state: &mut OpState, #[smi] rid: ResourceId) -> bool {
624+
state
625+
.resource_table
626+
.get::<UserWorkerCancellationToken>(rid)
627+
.map(|it| !it.0.is_cancelled())
628+
.unwrap_or_default()
629+
}

0 commit comments

Comments
 (0)