1
+ import stream
2
+ import io
3
+ import io/error
4
+ import stringbuffer
5
+
6
+ // Async iterables
7
+ // ---------------
8
+
9
+ record EffektAsyncIterator[T](next: => Promise[Option[T]] at {io, async, global})
10
+ def getNext[T](it: EffektAsyncIterator[T]): Option[T] =
11
+ (it.next)().await()
12
+ def each[T](it: EffektAsyncIterator[T]): Unit / emit[T] = {
13
+ while(it.getNext() is Some(x)) { do emit(x) }
14
+ }
15
+
16
+ // Types
17
+ // -----
18
+
19
+ type Protocol { HTTP(); HTTPS() }
20
+ def show(p: Protocol): String = p match {
21
+ case HTTP() => "http"
22
+ case HTTPS() => "https"
23
+ }
24
+ type Method { GET(); POST() }
25
+ def show(m: Method): String = m match {
26
+ case GET() => "GET"
27
+ case POST() => "POST"
28
+ }
29
+ interface RequestBuilder {
30
+ def method(method: Method): Unit
31
+ def hostname(host: String): Unit
32
+ def path(path: String): Unit
33
+ def port(port: Int): Unit
34
+ def protocol(proto: Protocol): Unit
35
+ def header(key: String, value: String): Unit
36
+ }
37
+ interface ResponseReader {
38
+ def status(): Int
39
+ def body(): Unit / emit[Byte]
40
+ def getHeader(key: String): Option[String]
41
+ //def headers(): Unit / emit[(String, String)]
42
+ }
43
+
44
+ record RequestError()
45
+
46
+ // Backend-specific implementations
47
+ // --------------------------------
48
+
49
+ namespace js {
50
+ extern type AsyncIterator[T]
51
+ extern type IterableResult[T]
52
+ extern io def nextPromise[T](it: AsyncIterator[T]): Promise[IterableResult[T]] =
53
+ js "{ promise: ${it}.next() }"
54
+ extern def done[T](r: IterableResult[T]): Bool =
55
+ js "${r}.done"
56
+ extern io def unsafeValue[T](r: IterableResult[T]): T =
57
+ js "${r}.value"
58
+ extern def fromValue[T](v: T): IterableResult[T] =
59
+ js "{ value: ${v}, done: false }"
60
+ extern def done[T](): IterableResult[T] =
61
+ js "{ value: undefined, done: true }"
62
+
63
+ def next[T](it: AsyncIterator[T]): IterableResult[T] =
64
+ it.nextPromise().await()
65
+
66
+ def each[T](it: AsyncIterator[T]): Unit / emit[T] = {
67
+ while(it.next() is r and not(r.done)) {
68
+ do emit(r.unsafeValue())
69
+ }
70
+ }
71
+ // broken if we resolve a promise inside
72
+ // extern def makeAsyncIterator[T](next: => Promise[IterableResult[T]] at {io, async, global}): AsyncIterator[T] =
73
+ // js """{ next: () => $effekt.runToplevel((ks,k) => ${next}(ks, k)) }"""
74
+
75
+ // Native Byte Buffers
76
+ // -------------------
77
+ extern type NativeBytes
78
+ extern pure def length(n: NativeBytes): Int =
79
+ js "${n}.length"
80
+ extern pure def get(n: NativeBytes, x: Int): Byte =
81
+ js "${n}[${x}]"
82
+ def each(n: NativeBytes): Unit / emit[Byte] = {
83
+ each(0, n.length){ i =>
84
+ do emit(n.get(i))
85
+ }
86
+ }
87
+
88
+ // Event emitters
89
+ // --------------
90
+ extern type Undefined
91
+ extern type EventEmitter
92
+ record Event[T](name: String)
93
+ namespace ev {
94
+ def data(): Event[js::NativeBytes] = Event("data")
95
+ def end(): Event[Undefined] = Event("end")
96
+ }
97
+
98
+ extern io def unsafeOn[T](em: EventEmitter, ev: String, handler: T => Unit at {io, async, global}): Unit =
99
+ js "${em}.on(${ev}, (param) => $effekt.runToplevel((ks,k) => ${handler}(param, ks, k)))"
100
+ def on[T](em: EventEmitter, ev: Event[T], handler: T => Unit at {io, async, global}): Unit =
101
+ em.unsafeOn(ev.name, handler)
102
+ extern async def unsafeWait[T](em: EventEmitter, ev: String): T =
103
+ js "$effekt.capture(k => ${em}.on(${ev}, k))"
104
+ def wait[T](em: EventEmitter, ev: Event[T]): T =
105
+ em.unsafeWait(ev.name)
106
+
107
+ // Dict-like JS objects
108
+ // --------------------
109
+ extern type JsObj
110
+ extern def empty(): JsObj =
111
+ js "{}"
112
+
113
+ extern io def set(obj: JsObj, key: String, value: Any): Unit =
114
+ js "${obj}[${key}] = ${value};"
115
+ extern io def set(obj: JsObj, key1: String, key2: String, value: Any): Unit =
116
+ js "${obj}[${key1}][${key2}] = ${value};"
117
+ }
118
+
119
+ namespace jsNode {
120
+ extern jsNode """
121
+ const http = require('node:http')
122
+ const https = require('node:https')
123
+ """
124
+
125
+ extern type NativeResponse
126
+ extern async def runHTTP(obj: js::JsObj): NativeResponse =
127
+ jsNode "$effekt.capture(callback => http.request(${obj}, callback).on('error', callback).end())"
128
+ extern async def runHTTPS(obj: js::JsObj): NativeResponse =
129
+ jsNode "$effekt.capture(callback => https.request(${obj}, callback).on('error', callback).end())"
130
+
131
+ extern io def statusCode(r: NativeResponse): Int =
132
+ jsNode "${r}.statusCode"
133
+ extern pure def getHeader(r: NativeResponse, h: String): String =
134
+ jsNode "${r}.headers[${h}]"
135
+
136
+ extern pure def events(r: NativeResponse): js::EventEmitter = jsNode "${r}"
137
+
138
+ def getBody(r: NativeResponse): EffektAsyncIterator[js::NativeBytes] = {
139
+ val nextResolve = ref(promise::make())
140
+ r.events.js::on(js::ev::data(), box { chunk =>
141
+ val waitingResolve = nextResolve.get().await()
142
+ nextResolve.set(promise::make())
143
+ waitingResolve.resolve(Some(chunk))
144
+ })
145
+ r.events.js::on(js::ev::end(), box { _ =>
146
+ val waitingResolve = nextResolve.get().await()
147
+ nextResolve.set(promise::make())
148
+ waitingResolve.resolve(None())
149
+ })
150
+ EffektAsyncIterator(box {
151
+ val resPromise = promise::make()
152
+ nextResolve.get().resolve(resPromise)
153
+ resPromise
154
+ })
155
+ }
156
+
157
+ extern io def isError(r: NativeResponse): Bool =
158
+ jsNode "(${r} instanceof Error)"
159
+
160
+ // outside interface
161
+ def request[R]{ body: => Unit / RequestBuilder }{ k: {ResponseReader} => R }: R / Exception[RequestError] = {
162
+ val options = js::empty()
163
+ options.js::set("headers", js::empty())
164
+ var protocol = HTTPS()
165
+ try body() with RequestBuilder {
166
+ def method(m) = resume(options.js::set("method", m.show))
167
+ def hostname(n) = resume(options.js::set("hostname", n))
168
+ def path(p) = resume(options.js::set("path", p))
169
+ def port(p) = resume(options.js::set("port", p))
170
+ def header(k, v) = resume(options.js::set("headers", k, v))
171
+ def protocol(p) = resume(protocol = p)
172
+ }
173
+ val res = protocol match {
174
+ case HTTP() => runHTTP(options)
175
+ case HTTPS() => runHTTPS(options)
176
+ }
177
+ if(res.isError) { println(res.genericShow); do raise(RequestError(), "Request failed") }
178
+
179
+ val resbody = res.getBody
180
+ def rr = new ResponseReader{
181
+ def status() = res.statusCode
182
+ def body() = {
183
+ for[js::NativeBytes]{ resbody.each }{ b => b.js::each }
184
+ }
185
+ def getHeader(k) = undefinedToOption(res.getHeader(k))
186
+ //def headers() = <>
187
+ }
188
+ k{rr}
189
+ }
190
+ }
191
+
192
+ namespace jsWeb {
193
+ extern type RequestInit
194
+ extern type NativeResponse
195
+ extern async def run(url: String, obj: js::JsObj): NativeResponse =
196
+ jsWeb """$effekt.capture(k => fetch(${url}, ${obj}).then(k).catch(k))"""
197
+ extern pure def isError(r: NativeResponse): Bool =
198
+ jsWeb """(${r} instanceof Error)"""
199
+
200
+ extern pure def statusCode(r: NativeResponse): Int =
201
+ jsWeb """${r}.status"""
202
+ extern pure def getHeader(r: NativeResponse, name: String): String =
203
+ jsWeb """${r}.headers.get(${name})"""
204
+ extern type Reader
205
+ extern io def getBody(r: NativeResponse): Reader =
206
+ jsWeb """${r}.body.getReader()"""
207
+ extern io def read(r: Reader): Promise[js::IterableResult[js::NativeBytes]] =
208
+ jsWeb """{ promise: ${r}.read() }"""
209
+ def each(r: Reader): Unit / emit[js::NativeBytes] = {
210
+ while(r.read().await() is r and not(r.js::done)) {
211
+ do emit(r.js::unsafeValue())
212
+ }
213
+ }
214
+
215
+ def request[R]{ body: => Unit / RequestBuilder }{ k: {ResponseReader} => R }: R / Exception[RequestError] = {
216
+ val options = js::empty()
217
+ options.js::set("headers", js::empty())
218
+ var protocol = HTTPS()
219
+ var hostname = ""
220
+ var path = "/"
221
+ var port = 443
222
+ try body() with RequestBuilder {
223
+ def method(m) = resume(options.js::set("method", m.show))
224
+ def hostname(n) = resume(hostname = n)
225
+ def path(p) = resume(path = p)
226
+ def port(p) = resume(port = p)
227
+ def header(k, v) = resume(options.js::set("headers", k, v))
228
+ def protocol(p) = resume(protocol = p)
229
+ }
230
+ val url = s"${protocol.show}://${hostname}:${port.show}${path}"
231
+ val res = run(url, options)
232
+ if(res.isError) { println(res.genericShow); do raise(RequestError(), "Request failed") }
233
+
234
+ def rr = new ResponseReader {
235
+ def status() = res.statusCode
236
+ def body() = for[js::NativeBytes]{ res.getBody().each() }{ b => b.js::each }
237
+ def getHeader(k) = undefinedToOption(res.getHeader(k))
238
+ }
239
+ k{rr}
240
+ }
241
+ }
242
+
243
+ namespace internal {
244
+ extern pure def backend(): String =
245
+ jsNode { "js-node" }
246
+ jsWeb { "js-web" }
247
+ }
248
+
249
+ def request[R]{ body: => Unit / RequestBuilder }{ res: {ResponseReader} => R }: R / Exception[RequestError] = internal::backend() match {
250
+ case "js-node" => jsNode::request{body}{res}
251
+ case "js-web" => jsWeb::request{body}{res}
252
+ case _ => <>
253
+ }
254
+
255
+ namespace example {
256
+ def main() = {
257
+ with on[RequestError].panic
258
+ with def res = request{
259
+ do method(GET())
260
+ do hostname("effekt-lang.org")
261
+ //do header("user-agent", "Effekt/script") // dont use this on js-web
262
+ do path("/")
263
+ do port(443)
264
+ }
265
+ if(res.status() == 200){
266
+ println("OK")
267
+ println(res.getHeader("content-type").show{ x => x })
268
+ with source[Byte]{ res.body() }
269
+ with decodeUTF8
270
+ with stringBuffer
271
+ exhaustively{
272
+ do read[Char]() match {
273
+ case '\n' => println(do flush())
274
+ case c =>
275
+ do write(c.show)
276
+ }
277
+ }
278
+ println(do flush())
279
+ } else {
280
+ println(res.status().show)
281
+ }
282
+ }
283
+ }
0 commit comments