1
1
import { primordials , core } from "ext:core/mod.js" ;
2
+ import { SymbolDispose } from "ext:deno_web/00_infra.js" ;
2
3
import { readableStreamForRid , writableStreamForRid } from "ext:deno_web/06_streams.js" ;
3
4
import { getSupabaseTag } from "ext:sb_core_main_js/js/http.js" ;
4
5
@@ -9,6 +10,8 @@ const { TypeError } = primordials;
9
10
const {
10
11
op_user_worker_fetch_send,
11
12
op_user_worker_create,
13
+ op_user_user_worker_wait_token_cancelled,
14
+ op_user_worker_is_active,
12
15
} = ops ;
13
16
14
17
const NO_SUPABASE_TAG_WARN_MSG = `Unable to find the supabase tag from the request instance.\n\
@@ -24,8 +27,34 @@ function redirectStatus(status) {
24
27
}
25
28
26
29
class UserWorker {
27
- constructor ( key ) {
28
- this . key = key ;
30
+ /** @type {string } */
31
+ #key = "" ;
32
+
33
+ /** @type {number | null } */
34
+ #rid = null ;
35
+
36
+ /** @type {boolean } */
37
+ #disposed = false ;
38
+
39
+ /**
40
+ * @param {string } key
41
+ * @param {number } rid
42
+ */
43
+ constructor ( key , rid ) {
44
+ this . #key = key ;
45
+ this . #rid = rid ;
46
+
47
+ // deno-lint-ignore no-this-alias
48
+ const self = this ;
49
+
50
+ setTimeout ( async ( ) => {
51
+ try {
52
+ await op_user_user_worker_wait_token_cancelled ( rid ) ;
53
+ self . dispose ( ) ;
54
+ } catch {
55
+ // TODO(Nyannyacha): Link it with the tracing for telemetry.
56
+ }
57
+ } ) ;
29
58
}
30
59
31
60
async fetch ( request , options = { } ) {
@@ -62,7 +91,7 @@ class UserWorker {
62
91
}
63
92
64
93
const responsePromise = op_user_worker_fetch_send (
65
- this . key ,
94
+ this . # key,
66
95
requestRid ,
67
96
requestBodyRid ,
68
97
tag . streamRid ,
@@ -75,6 +104,7 @@ class UserWorker {
75
104
] ) ;
76
105
77
106
if ( requestBodyPromiseResult . status === "rejected" ) {
107
+ // TODO(Nyannyacha): Link it with the tracing for telemetry.
78
108
// console.warn(requestBodyPromiseResult.reason);
79
109
}
80
110
@@ -114,6 +144,26 @@ class UserWorker {
114
144
} ) ;
115
145
}
116
146
147
+ /** @returns {boolean } */
148
+ get active ( ) {
149
+ if ( this . #disposed) {
150
+ return false ;
151
+ }
152
+
153
+ return op_user_worker_is_active ( this . #rid) ;
154
+ }
155
+
156
+ dispose ( ) {
157
+ if ( ! this . #disposed) {
158
+ core . tryClose ( this . #rid) ;
159
+ this . #disposed = true ;
160
+ }
161
+ }
162
+
163
+ [ SymbolDispose ] ( ) {
164
+ this . dispose ( ) ;
165
+ }
166
+
117
167
static async create ( opts ) {
118
168
const readyOptions = {
119
169
noModuleCache : false ,
@@ -136,9 +186,9 @@ class UserWorker {
136
186
throw new TypeError ( "service path must be defined" ) ;
137
187
}
138
188
139
- const key = await op_user_worker_create ( readyOptions ) ;
189
+ const [ key , rid ] = await op_user_worker_create ( readyOptions ) ;
140
190
141
- return new UserWorker ( key ) ;
191
+ return new UserWorker ( key , rid ) ;
142
192
}
143
193
}
144
194
0 commit comments