Skip to content

Commit b3f612d

Browse files
committed
stamp(ext/workers): sync apis with rust land
1 parent 7babb28 commit b3f612d

File tree

1 file changed

+55
-5
lines changed

1 file changed

+55
-5
lines changed

ext/workers/user_workers.js

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { primordials, core } from "ext:core/mod.js";
2+
import { SymbolDispose } from "ext:deno_web/00_infra.js";
23
import { readableStreamForRid, writableStreamForRid } from "ext:deno_web/06_streams.js";
34
import { getSupabaseTag } from "ext:sb_core_main_js/js/http.js";
45

@@ -9,6 +10,8 @@ const { TypeError } = primordials;
910
const {
1011
op_user_worker_fetch_send,
1112
op_user_worker_create,
13+
op_user_user_worker_wait_token_cancelled,
14+
op_user_worker_is_active,
1215
} = ops;
1316

1417
const NO_SUPABASE_TAG_WARN_MSG = `Unable to find the supabase tag from the request instance.\n\
@@ -24,8 +27,34 @@ function redirectStatus(status) {
2427
}
2528

2629
class UserWorker {
27-
constructor(key) {
28-
this.key = key;
30+
/** @type {string} */
31+
#key = "";
32+
33+
/** @type {number | null} */
34+
#rid = null;
35+
36+
/** @type {boolean} */
37+
#disposed = false;
38+
39+
/**
40+
* @param {string} key
41+
* @param {number} rid
42+
*/
43+
constructor(key, rid) {
44+
this.#key = key;
45+
this.#rid = rid;
46+
47+
// deno-lint-ignore no-this-alias
48+
const self = this;
49+
50+
setTimeout(async () => {
51+
try {
52+
await op_user_user_worker_wait_token_cancelled(rid);
53+
self.dispose();
54+
} catch {
55+
// TODO(Nyannyacha): Link it with the tracing for telemetry.
56+
}
57+
});
2958
}
3059

3160
async fetch(request, options = {}) {
@@ -62,7 +91,7 @@ class UserWorker {
6291
}
6392

6493
const responsePromise = op_user_worker_fetch_send(
65-
this.key,
94+
this.#key,
6695
requestRid,
6796
requestBodyRid,
6897
tag.streamRid,
@@ -75,6 +104,7 @@ class UserWorker {
75104
]);
76105

77106
if (requestBodyPromiseResult.status === "rejected") {
107+
// TODO(Nyannyacha): Link it with the tracing for telemetry.
78108
// console.warn(requestBodyPromiseResult.reason);
79109
}
80110

@@ -114,6 +144,26 @@ class UserWorker {
114144
});
115145
}
116146

147+
/** @returns {boolean} */
148+
get active() {
149+
if (this.#disposed) {
150+
return false;
151+
}
152+
153+
return op_user_worker_is_active(this.#rid);
154+
}
155+
156+
dispose() {
157+
if (!this.#disposed) {
158+
core.tryClose(this.#rid);
159+
this.#disposed = true;
160+
}
161+
}
162+
163+
[SymbolDispose]() {
164+
this.dispose();
165+
}
166+
117167
static async create(opts) {
118168
const readyOptions = {
119169
noModuleCache: false,
@@ -136,9 +186,9 @@ class UserWorker {
136186
throw new TypeError("service path must be defined");
137187
}
138188

139-
const key = await op_user_worker_create(readyOptions);
189+
const [key, rid] = await op_user_worker_create(readyOptions);
140190

141-
return new UserWorker(key);
191+
return new UserWorker(key, rid);
142192
}
143193
}
144194

0 commit comments

Comments
 (0)