diff --git a/Cargo.lock b/Cargo.lock index 2ae1becd1..ec79f56aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6226,6 +6226,7 @@ name = "sb_workers" version = "0.1.0" dependencies = [ "anyhow", + "base_rt", "bytes", "deno_config", "deno_core", diff --git a/crates/base/src/worker/pool.rs b/crates/base/src/worker/pool.rs index 73fa7054d..e020d965c 100644 --- a/crates/base/src/worker/pool.rs +++ b/crates/base/src/worker/pool.rs @@ -263,11 +263,13 @@ impl WorkerPool { .as_user_worker() .map_or(false, |it| !is_oneshot_policy && it.force_create); - if let Some(ref active_worker_uuid) = self.maybe_active_worker(&service_path, force_create) + if let Some((ref active_worker_uuid, profile)) = + self.maybe_active_worker(&service_path, force_create) { if tx .send(Ok(CreateUserWorkerResult { key: *active_worker_uuid, + token: profile.cancel.clone(), })) .is_err() { @@ -448,7 +450,7 @@ impl WorkerPool { permit: permit.map(Arc::new), status: status.clone(), exit: surface.exit, - cancel, + cancel: cancel.clone(), }; if worker_pool_msgs_tx @@ -457,7 +459,13 @@ impl WorkerPool { { error!("user worker msgs receiver dropped") } - if tx.send(Ok(CreateUserWorkerResult { key: uuid })).is_err() { + if tx + .send(Ok(CreateUserWorkerResult { + key: uuid, + token: cancel, + })) + .is_err() + { error!("main worker receiver dropped") }; } @@ -632,7 +640,11 @@ impl WorkerPool { } } - fn maybe_active_worker(&mut self, service_path: &String, force_create: bool) -> Option { + fn maybe_active_worker( + &mut self, + service_path: &String, + force_create: bool, + ) -> Option<(Uuid, &UserWorkerProfile)> { if force_create { return None; } @@ -648,11 +660,13 @@ impl WorkerPool { .get(&worker_uuid) .map(|it| it.status.is_retired.clone()) { - Some(is_retired) if !is_retired.is_raised() => Some(worker_uuid), + Some(is_retired) if !is_retired.is_raised() => { + Some((worker_uuid, self.user_workers.get(&worker_uuid).unwrap())) + } _ => { self.retire(&worker_uuid); - self.maybe_active_worker(service_path, force_create) + self.maybe_active_worker(service_path, false) } } } diff --git a/crates/base/src/worker/worker_surface_creation.rs b/crates/base/src/worker/worker_surface_creation.rs index 5baa28743..a161a579c 100644 --- a/crates/base/src/worker/worker_surface_creation.rs +++ b/crates/base/src/worker/worker_surface_creation.rs @@ -14,7 +14,6 @@ use sb_workers::context::{ WorkerExit, WorkerRequestMsg, WorkerRuntimeOpts, }; use tokio::sync::{mpsc, oneshot}; -use tokio_util::sync::CancellationToken; use crate::{inspector_server::Inspector, server::ServerFlags}; @@ -329,8 +328,7 @@ impl WorkerSurfaceBuilder { worker_builder_hook, } = self; - let (worker_boot_result_tx, worker_boot_result_rx) = - oneshot::channel::>(); + let (worker_boot_result_tx, worker_boot_result_rx) = oneshot::channel(); let flags = flags.unwrap_or_default(); let init_opts = init_opts.context("init_opts must be specified")?; diff --git a/examples/main-session/index.ts b/examples/main-session/index.ts new file mode 100644 index 000000000..5a68d1624 --- /dev/null +++ b/examples/main-session/index.ts @@ -0,0 +1,136 @@ +// @ts-ignore +import { STATUS_CODE } from 'https://deno.land/std/http/status.ts'; + +const SESSION_HEADER_NAME = 'X-Edge-Runtime-Session-Id'; +const WORKERS = new Map(); + +setInterval(() => { + const shouldBeRemoved: string[] = []; + + for (const [uuid, worker] of WORKERS) { + if (!worker.active) { + shouldBeRemoved.push(uuid); + } + } + + for (const uuid of shouldBeRemoved) { + console.log("deleted: ", uuid); + WORKERS.delete(uuid); + } +}, 2500); + +console.log('main function started (session mode)'); + +Deno.serve(async (req: Request) => { + const headers = new Headers({ + 'Content-Type': 'application/json', + }); + + const url = new URL(req.url); + const { pathname } = url; + + // handle health checks + if (pathname === '/_internal/health') { + return new Response( + JSON.stringify({ 'message': 'ok' }), + { + status: STATUS_CODE.OK, + headers, + }, + ); + } + + if (pathname === '/_internal/metric') { + const metric = await EdgeRuntime.getRuntimeMetrics(); + return Response.json(metric); + } + + const path_parts = pathname.split('/'); + const service_name = path_parts[1]; + + if (!service_name || service_name === '') { + const error = { msg: 'missing function name in request' }; + return new Response( + JSON.stringify(error), + { status: STATUS_CODE.BadRequest, headers: { 'Content-Type': 'application/json' } }, + ); + } + + const servicePath = `./examples/${service_name}`; + const createWorker = async (): Promise => { + const memoryLimitMb = 150; + const workerTimeoutMs = 30 * 1000; + const noModuleCache = false; + + const importMapPath = null; + const envVarsObj = Deno.env.toObject(); + const envVars = Object.keys(envVarsObj).map((k) => [k, envVarsObj[k]]); + const forceCreate = false; + const netAccessDisabled = false; + const cpuTimeSoftLimitMs = 10000; + const cpuTimeHardLimitMs = 20000; + + return await EdgeRuntime.userWorkers.create({ + servicePath, + memoryLimitMb, + workerTimeoutMs, + noModuleCache, + importMapPath, + envVars, + forceCreate, + netAccessDisabled, + cpuTimeSoftLimitMs, + cpuTimeHardLimitMs, + }); + }; + + const callWorker = async () => { + + try { + let worker: EdgeRuntime.UserWorker | null = null; + + if (req.headers.get(SESSION_HEADER_NAME)) { + const sessionId = req.headers.get(SESSION_HEADER_NAME)!; + const complexSessionId = `${servicePath}/${sessionId}`; + + const maybeWorker = WORKERS.get(complexSessionId); + + if (maybeWorker && maybeWorker.active) { + worker = maybeWorker; + } + } + + if (!worker) { + worker = await createWorker(); + } + + const resp = await worker.fetch(req); + + if (resp.headers.has(SESSION_HEADER_NAME)) { + const sessionIdFromWorker = resp.headers.get(SESSION_HEADER_NAME)!; + const complexSessionId = `${servicePath}/${sessionIdFromWorker}`; + + WORKERS.set(complexSessionId, worker); + } + + return resp; + } catch (e) { + console.error(e); + + if (e instanceof Deno.errors.WorkerRequestCancelled) { + headers.append('Connection', 'close'); + } + + const error = { msg: e.toString() }; + return new Response( + JSON.stringify(error), + { + status: STATUS_CODE.InternalServerError, + headers, + }, + ); + } + }; + + return callWorker(); +}); diff --git a/examples/serve-session/index.ts b/examples/serve-session/index.ts new file mode 100644 index 000000000..5c9605183 --- /dev/null +++ b/examples/serve-session/index.ts @@ -0,0 +1,63 @@ +// @ts-ignore +import { STATUS_CODE } from "https://deno.land/std/http/status.ts"; + +type SessionStroage = { [key: string]: unknown }; + +const SESSION_HEADER_NAME = "X-Edge-Runtime-Session-Id"; +const SESSIONS = new Map(); + +function makeNewSession(): [string, SessionStroage] { + const uuid = crypto.randomUUID(); + const storage = {}; + + SESSIONS.set(uuid, storage); + return [uuid, storage]; +} + +function getSessionStorageFromRequest(req: Request): SessionStroage | void { + const maybeSessionId = req.headers.get(SESSION_HEADER_NAME); + + if (typeof maybeSessionId === "string" && SESSIONS.has(maybeSessionId)) { + return SESSIONS.get(maybeSessionId); + } +} + +export default { + fetch(req: Request) { + const headers = new Headers(); + let storage: SessionStroage; + + if (req.headers.get(SESSION_HEADER_NAME)) { + const maybeStorage = getSessionStorageFromRequest(req); + + if (!maybeStorage) { + return new Response(null, { + status: STATUS_CODE.BadRequest + }); + } + + storage = maybeStorage; + } else { + const [sessionId, newStorage] = makeNewSession(); + + headers.set(SESSION_HEADER_NAME, sessionId); + + storage = newStorage; + } + + if (!("count" in storage)) { + storage["count"] = 0; + } else { + (storage["count"] as number)++; + } + + const count = storage["count"] as number; + + return new Response( + JSON.stringify({ count }), + { + headers + } + ); + } +} \ No newline at end of file diff --git a/ext/workers/Cargo.toml b/ext/workers/Cargo.toml index 2f41baeca..9d6930525 100644 --- a/ext/workers/Cargo.toml +++ b/ext/workers/Cargo.toml @@ -13,6 +13,7 @@ deno_core.workspace = true deno_http.workspace = true deno_config.workspace = true +base_rt.workspace = true http_utils.workspace = true graph.workspace = true fs.workspace = true diff --git a/ext/workers/context.rs b/ext/workers/context.rs index ea59cdbf2..e4c69fb02 100644 --- a/ext/workers/context.rs +++ b/ext/workers/context.rs @@ -245,6 +245,7 @@ pub type SendRequestResult = (Response, mpsc::UnboundedSender<()>); #[derive(Debug)] pub struct CreateUserWorkerResult { pub key: Uuid, + pub token: CancellationToken, } #[derive(Debug)] diff --git a/ext/workers/lib.rs b/ext/workers/lib.rs index 42520213c..6a6b36794 100644 --- a/ext/workers/lib.rs +++ b/ext/workers/lib.rs @@ -94,16 +94,13 @@ pub struct UserWorkerCreateOptions { } #[op2(async)] -#[string] +#[serde] pub async fn op_user_worker_create( state: Rc>, #[serde] opts: UserWorkerCreateOptions, -) -> Result { +) -> Result<(String, ResourceId), AnyError> { let result_rx = { - let op_state = state.borrow(); - let tx = op_state.borrow::>(); - let (result_tx, result_rx) = oneshot::channel::>(); - + let (tx, rx) = oneshot::channel::>(); let UserWorkerCreateOptions { service_path, env_vars, @@ -183,7 +180,7 @@ pub async fn op_user_worker_create( deno_core::resolve_url_or_path( // FIXME: The type alias does not have a unique // type id and should not be used here. - op_state.borrow::().as_str(), + state.borrow().borrow::().as_str(), std::env::current_dir()?.as_path(), )? }, @@ -195,8 +192,12 @@ pub async fn op_user_worker_create( maybe_tmp_fs_config, }; - tx.send(UserWorkerMsgs::Create(user_worker_options, result_tx))?; - result_rx + state + .borrow() + .borrow::>() + .send(UserWorkerMsgs::Create(user_worker_options, tx))?; + + rx }; match result_rx.await { @@ -209,7 +210,13 @@ pub async fn op_user_worker_create( )), Ok(Err(err)) => Err(custom_error("InvalidWorkerCreation", format!("{err:#}"))), - Ok(Ok(v)) => Ok(v.key.to_string()), + Ok(Ok(CreateUserWorkerResult { key, token })) => Ok(( + key.to_string(), + state + .borrow_mut() + .resource_table + .add(UserWorkerCancellationToken(token.clone())), + )), } } @@ -586,3 +593,37 @@ impl Stream for BodyStream { self.0.poll_recv(cx) } } + +struct UserWorkerCancellationToken(CancellationToken); + +impl Resource for UserWorkerCancellationToken { + fn name(&self) -> std::borrow::Cow { + std::any::type_name::().into() + } +} + +#[op2(async)] +#[serde] +pub async fn op_user_user_worker_wait_token_cancelled( + state: Rc>, + #[smi] rid: ResourceId, +) -> Result<(), AnyError> { + let token = state + .borrow() + .resource_table + .get::(rid)? + .0 + .clone(); + + token.cancelled().await; + Ok(()) +} + +#[op2(fast)] +pub fn op_user_worker_is_active(state: &mut OpState, #[smi] rid: ResourceId) -> bool { + state + .resource_table + .get::(rid) + .map(|it| !it.0.is_cancelled()) + .unwrap_or_default() +} diff --git a/ext/workers/user_workers.js b/ext/workers/user_workers.js index caf2c17b1..bdf31eba0 100644 --- a/ext/workers/user_workers.js +++ b/ext/workers/user_workers.js @@ -1,4 +1,5 @@ import { primordials, core } from "ext:core/mod.js"; +import { SymbolDispose } from "ext:deno_web/00_infra.js"; import { readableStreamForRid, writableStreamForRid } from "ext:deno_web/06_streams.js"; import { getSupabaseTag } from "ext:sb_core_main_js/js/http.js"; @@ -9,6 +10,8 @@ const { TypeError } = primordials; const { op_user_worker_fetch_send, op_user_worker_create, + op_user_user_worker_wait_token_cancelled, + op_user_worker_is_active, } = ops; const NO_SUPABASE_TAG_WARN_MSG = `Unable to find the supabase tag from the request instance.\n\ @@ -24,8 +27,34 @@ function redirectStatus(status) { } class UserWorker { - constructor(key) { - this.key = key; + /** @type {string} */ + #key = ""; + + /** @type {number | null} */ + #rid = null; + + /** @type {boolean} */ + #disposed = false; + + /** + * @param {string} key + * @param {number} rid + */ + constructor(key, rid) { + this.#key = key; + this.#rid = rid; + + // deno-lint-ignore no-this-alias + const self = this; + + setTimeout(async () => { + try { + await op_user_user_worker_wait_token_cancelled(rid); + self.dispose(); + } catch { + // TODO(Nyannyacha): Link it with the tracing for telemetry. + } + }); } async fetch(request, options = {}) { @@ -62,7 +91,7 @@ class UserWorker { } const responsePromise = op_user_worker_fetch_send( - this.key, + this.#key, requestRid, requestBodyRid, tag.streamRid, @@ -75,6 +104,7 @@ class UserWorker { ]); if (requestBodyPromiseResult.status === "rejected") { + // TODO(Nyannyacha): Link it with the tracing for telemetry. // console.warn(requestBodyPromiseResult.reason); } @@ -114,6 +144,26 @@ class UserWorker { }); } + /** @returns {boolean} */ + get active() { + if (this.#disposed) { + return false; + } + + return op_user_worker_is_active(this.#rid); + } + + dispose() { + if (!this.#disposed) { + core.tryClose(this.#rid); + this.#disposed = true; + } + } + + [SymbolDispose]() { + this.dispose(); + } + static async create(opts) { const readyOptions = { noModuleCache: false, @@ -136,9 +186,9 @@ class UserWorker { throw new TypeError("service path must be defined"); } - const key = await op_user_worker_create(readyOptions); + const [key, rid] = await op_user_worker_create(readyOptions); - return new UserWorker(key); + return new UserWorker(key, rid); } } diff --git a/types/global.d.ts b/types/global.d.ts index 5c324363a..f11194c20 100644 --- a/types/global.d.ts +++ b/types/global.d.ts @@ -95,6 +95,10 @@ declare namespace EdgeRuntime { constructor(key: string); fetch(request: Request, options?: UserWorkerFetchOptions): Promise; + dispose(): void; + + get active(): boolean; + static create(opts: UserWorkerCreateOptions): Promise; } @@ -105,7 +109,10 @@ declare namespace EdgeRuntime { export function systemMemoryInfo(): MemInfo; export function raiseSegfault(): void; - export { UserWorker as userWorkers }; + export { + type UserWorker, + UserWorker as userWorkers, + }; } declare namespace Supabase {