Skip to content

Commit 20d6f03

Browse files
committed
stamp(sb_workers): sync apis with rust land
1 parent 5f390ba commit 20d6f03

File tree

1 file changed

+54
-5
lines changed

1 file changed

+54
-5
lines changed

crates/sb_workers/user_workers.js

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { primordials, core } from "ext:core/mod.js";
22
import { readableStreamForRid, writableStreamForRid } from "ext:deno_web/06_streams.js";
33
import { getSupabaseTag } from "ext:sb_core_main_js/js/http.js";
4+
import { SymbolDispose } from "ext:deno_web/00_infra.js";
45

56
const ops = core.ops;
67

@@ -9,6 +10,8 @@ const { TypeError } = primordials;
910
const {
1011
op_user_worker_fetch_send,
1112
op_user_worker_create,
13+
op_user_user_worker_wait_token_cancelled,
14+
op_user_worker_is_active,
1215
} = ops;
1316

1417
const NO_SUPABASE_TAG_WARN_MSG = `Unable to find the supabase tag from the request instance.\n\
@@ -24,8 +27,33 @@ function redirectStatus(status) {
2427
}
2528

2629
class UserWorker {
27-
constructor(key) {
28-
this.key = key;
30+
/** @type {string} */
31+
#key = "";
32+
33+
/** @type {number | null} */
34+
#rid = null;
35+
36+
/** @type {boolean} */
37+
#disposed = false;
38+
39+
/**
40+
* @param {string} key
41+
* @param {number} rid
42+
*/
43+
constructor(key, rid) {
44+
this.#key = key;
45+
this.#rid = rid;
46+
47+
const self = this;
48+
49+
setTimeout(async () => {
50+
try {
51+
await op_user_user_worker_wait_token_cancelled(rid);
52+
self.dispose();
53+
} catch {
54+
// TODO(Nyannyacha): Link it with the tracing for telemetry.
55+
}
56+
});
2957
}
3058

3159
async fetch(request, options = {}) {
@@ -62,7 +90,7 @@ class UserWorker {
6290
}
6391

6492
const responsePromise = op_user_worker_fetch_send(
65-
this.key,
93+
this.#key,
6694
requestRid,
6795
requestBodyRid,
6896
tag.streamRid,
@@ -75,6 +103,7 @@ class UserWorker {
75103
]);
76104

77105
if (requestBodyPromiseResult.status === "rejected") {
106+
// TODO(Nyannyacha): Link it with the tracing for telemetry.
78107
// console.warn(requestBodyPromiseResult.reason);
79108
}
80109

@@ -114,6 +143,26 @@ class UserWorker {
114143
});
115144
}
116145

146+
/** @returns {boolean} */
147+
get active() {
148+
if (this.#disposed) {
149+
return false;
150+
}
151+
152+
return op_user_worker_is_active(this.#rid);
153+
}
154+
155+
dispose() {
156+
if (!this.#disposed) {
157+
core.tryClose(this.#rid);
158+
this.#disposed = true;
159+
}
160+
}
161+
162+
[SymbolDispose]() {
163+
this.dispose();
164+
}
165+
117166
static async create(opts) {
118167
const readyOptions = {
119168
noModuleCache: false,
@@ -136,9 +185,9 @@ class UserWorker {
136185
throw new TypeError("service path must be defined");
137186
}
138187

139-
const key = await op_user_worker_create(readyOptions);
188+
const [key, rid] = await op_user_worker_create(readyOptions);
140189

141-
return new UserWorker(key);
190+
return new UserWorker(key, rid);
142191
}
143192
}
144193

0 commit comments

Comments
 (0)