Skip to content

Commit 5525c8b

Browse files
committed
stamp(sb_workers): sync apis with rust land
1 parent 9b5ce51 commit 5525c8b

File tree

1 file changed

+54
-5
lines changed

1 file changed

+54
-5
lines changed

crates/sb_workers/user_workers.js

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { primordials, core } from "ext:core/mod.js";
22
import { readableStreamForRid, writableStreamForRid } from "ext:deno_web/06_streams.js";
33
import { getSupabaseTag } from "ext:sb_core_main_js/js/http.js";
4+
import { SymbolDispose } from "ext:deno_web/00_infra.js";
45

56
const ops = core.ops;
67

@@ -9,6 +10,8 @@ const { TypeError } = primordials;
910
const {
1011
op_user_worker_fetch_send,
1112
op_user_worker_create,
13+
op_user_user_worker_wait_token_cancelled,
14+
op_user_worker_is_active,
1215
} = ops;
1316

1417
const NO_SUPABASE_TAG_WARN_MSG = `Unable to find the supabase tag from the request instance.\n\
@@ -24,8 +27,33 @@ function redirectStatus(status) {
2427
}
2528

2629
class UserWorker {
27-
constructor(key) {
28-
this.key = key;
30+
/** @type {string} */
31+
#key = "";
32+
33+
/** @type {number | null} */
34+
#rid = null;
35+
36+
/** @type {boolean} */
37+
#disposed = false;
38+
39+
/**
40+
* @param {string} key
41+
* @param {number} rid
42+
*/
43+
constructor(key, rid) {
44+
this.#key = key;
45+
this.#rid = rid;
46+
47+
const self = this;
48+
49+
setTimeout(async () => {
50+
try {
51+
await op_user_user_worker_wait_token_cancelled(rid);
52+
self.dispose();
53+
} catch {
54+
// TODO(Nyannyacha): Link it with the tracing for telemetry.
55+
}
56+
});
2957
}
3058

3159
async fetch(request, options = {}) {
@@ -63,7 +91,7 @@ class UserWorker {
6391
}
6492

6593
const responsePromise = op_user_worker_fetch_send(
66-
this.key,
94+
this.#key,
6795
requestRid,
6896
requestBodyRid,
6997
tag.streamRid,
@@ -76,6 +104,7 @@ class UserWorker {
76104
]);
77105

78106
if (requestBodyPromiseResult.status === "rejected") {
107+
// TODO(Nyannyacha): Link it with the tracing for telemetry.
79108
// console.warn(requestBodyPromiseResult.reason);
80109
}
81110

@@ -115,6 +144,26 @@ class UserWorker {
115144
});
116145
}
117146

147+
/** @returns {boolean} */
148+
get active() {
149+
if (this.#disposed) {
150+
return false;
151+
}
152+
153+
return op_user_worker_is_active(this.#rid);
154+
}
155+
156+
dispose() {
157+
if (!this.#disposed) {
158+
core.tryClose(this.#rid);
159+
this.#disposed = true;
160+
}
161+
}
162+
163+
[SymbolDispose]() {
164+
this.dispose();
165+
}
166+
118167
static async create(opts) {
119168
const readyOptions = {
120169
memoryLimitMb: 512,
@@ -142,9 +191,9 @@ class UserWorker {
142191
throw new TypeError("service path must be defined");
143192
}
144193

145-
const key = await op_user_worker_create(readyOptions);
194+
const [key, rid] = await op_user_worker_create(readyOptions);
146195

147-
return new UserWorker(key);
196+
return new UserWorker(key, rid);
148197
}
149198
}
150199

0 commit comments

Comments
 (0)