Skip to content

Commit c7272cf

Browse files
committed
TCP client on llvm
1 parent ab94c0a commit c7272cf

File tree

4 files changed

+190
-73
lines changed

4 files changed

+190
-73
lines changed

libraries/common/io/error.effekt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,3 +464,14 @@ namespace internal {
464464
"""
465465

466466
}
467+
468+
namespace internal {
469+
def checkResult(result: Int): Int / Exception[IOError] =
470+
if (result < 0) {
471+
val ioError = fromNumber(internal::errorNumber(result));
472+
do raise[IOError](ioError, message(ioError))
473+
} else {
474+
result
475+
}
476+
}
477+

libraries/common/io/filesystem.effekt

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -222,13 +222,6 @@ namespace internal {
222222
ret void
223223
"""
224224

225-
def checkResult(result: Int): Int / Exception[IOError] =
226-
if (result < 0) {
227-
val ioError = fromNumber(internal::errorNumber(result));
228-
do raise[IOError](ioError, message(ioError))
229-
} else {
230-
result
231-
}
232225
}
233226

234227
namespace examples {

libraries/common/io/network.effekt

Lines changed: 42 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,60 @@
11
module io/network
22

33
import bytearray
4-
import io
54

6-
namespace js {
7-
extern jsNode """
8-
const net = require('node:net');
5+
import io/error
96

10-
function listen(server, port, host, listener) {
11-
server.listen(port, host);
12-
server.on('connection', listener);
13-
}
14-
"""
15-
16-
extern type JSServer // = net.Server
17-
extern type JSSocket // = net.Socket
18-
extern io def server(): JSServer =
19-
jsNode "net.createServer()"
20-
extern io def listen(server: JSServer, port: Int, host: String, listener: JSSocket => Unit at {io, async, global}): Unit =
21-
jsNode "listen(${server}, ${port}, ${host}, (socket) => $effekt.runToplevel((ks, k) => (${listener})(socket, ks, k)))"
227

23-
extern async def send(socket: JSSocket, data: ByteArray): Unit =
24-
jsNode "$effekt.capture(callback => ${socket}.write(${data}, callback))"
8+
/// A tcp handle. Should not be inspected.
9+
type Connection = Int
2510

26-
extern async def receive(socket: JSSocket): ByteArray =
27-
jsNode "$effekt.capture(callback => ${socket}.once('data', callback))"
11+
def connect(host: String, port: Int): Connection / Exception[IOError] =
12+
internal::checkResult(internal::connect(host, port))
2813

29-
extern async def end(socket: JSSocket): Unit =
30-
jsNode "$effekt.capture(k => ${socket}.end(k))"
31-
}
14+
def read(handle: Connection, buffer: ByteArray, offset: Int, size: Int): Int / Exception[IOError] =
15+
internal::checkResult(internal::read(handle, buffer, offset, size))
3216

33-
interface Socket {
34-
def send(message: ByteArray): Unit
35-
def receive(): ByteArray
36-
def end(): Unit
37-
}
17+
def write(handle: Connection, buffer: ByteArray, offset: Int, size: Int): Int / Exception[IOError] =
18+
internal::checkResult(internal::write(handle, buffer, offset, size))
3819

39-
def server(host: String, port: Int, handler: () => Unit / Socket at {io, async, global}): Unit = {
40-
val server = js::server();
41-
js::listen(server, port, host, box { socket =>
42-
println("New connection")
43-
spawn(box {
44-
try handler()
45-
with Socket {
46-
def send(message) =
47-
resume(js::send(socket, message))
48-
def receive() =
49-
resume(js::receive(socket))
50-
def end() =
51-
resume(js::end(socket))
52-
}
53-
})
54-
})
20+
def close(handle: Connection): Unit / Exception[IOError] = {
21+
internal::checkResult(internal::close(handle)); ()
5522
}
5623

5724

58-
namespace examples {
59-
def helloWorldApp(): Unit / Socket = {
60-
val request = do receive().toString;
25+
namespace internal {
6126

62-
println("Received a request: " ++ request)
27+
extern llvm """
28+
declare void @c_tcp_connect(%Pos, %Int, %Stack)
29+
declare void @c_tcp_read(%Int, %Pos, %Int, %Int, %Stack)
30+
declare void @c_tcp_write(%Int, %Pos, %Int, %Int, %Stack)
31+
declare void @c_tcp_close(%Int, %Stack)
32+
"""
6333

64-
def respond(s: String): Unit / Socket =
65-
do send(s.fromString)
34+
extern async def connect(host: String, port: Int): Int =
35+
llvm """
36+
call void @c_tcp_connect(%Pos ${host}, %Int ${port}, %Stack %stack)
37+
ret void
38+
"""
39+
40+
extern async def read(handle: Int, buffer: ByteArray, offset: Int, size: Int): Int =
41+
llvm """
42+
call void @c_tcp_read(%Int ${handle}, %Pos ${buffer}, %Int ${offset}, %Int ${size}, %Stack %stack)
43+
ret void
44+
"""
45+
46+
extern async def write(handle: Int, buffer: ByteArray, offset: Int, size: Int): Int =
47+
llvm """
48+
call void @c_tcp_write(%Int ${handle}, %Pos ${buffer}, %Int ${offset}, %Int ${size}, %Stack %stack)
49+
ret void
50+
"""
51+
52+
extern async def close(handle: Int): Int =
53+
llvm """
54+
call void @c_tcp_close(%Int ${handle}, %Stack %stack)
55+
ret void
56+
"""
6657

67-
if (request.startsWith("GET /")) {
68-
respond("HTTP/1.1 200 OK\r\n\r\nHello from Effekt!")
69-
} else {
70-
respond("HTTP/1.1 400 Bad Request\r\n\r\n")
71-
}
72-
do end()
73-
}
58+
}
7459

75-
// A server that just shows "Hello from Effekt!" on localhost:8080
76-
def main() = {
77-
val port = 8080
78-
println("Starting server on http://localhost:" ++ port.show)
7960

80-
server("localhost", port, box {
81-
helloWorldApp()
82-
})
83-
}
84-
}

libraries/llvm/io.c

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ void c_fs_open(String path, int flags, Stack stack) {
8181
int result = uv_fs_open(uv_default_loop(), request, path_str, flags, 0777, c_resume_int_fs);
8282

8383
if (result < 0) {
84+
// TODO free path_str?
8485
uv_fs_req_cleanup(request);
8586
free(request);
8687
resume_Int(stack, result);
@@ -178,6 +179,142 @@ void c_fs_mkdir(String path, Stack stack) {
178179
return;
179180
}
180181

182+
// Network
183+
// -------
184+
185+
void c_tcp_connect_cb(uv_connect_t* request, int status) {
186+
Stack stack = (Stack)request->data;
187+
188+
if (status < 0) {
189+
uv_close((uv_handle_t*)request->handle, NULL);
190+
free(request->handle);
191+
free(request);
192+
resume_Int(stack, status);
193+
} else {
194+
int64_t handle = (int64_t)request->handle;
195+
free(request);
196+
resume_Int(stack, handle);
197+
}
198+
}
199+
200+
void c_tcp_connect(String host, Int port, Stack stack) {
201+
char* host_str = c_bytearray_into_nullterminated_string(host);
202+
erasePositive(host);
203+
204+
uv_tcp_t* tcp_handle = malloc(sizeof(uv_tcp_t));
205+
int result = uv_tcp_init(uv_default_loop(), tcp_handle);
206+
207+
if (result < 0) {
208+
free(tcp_handle);
209+
free(host_str);
210+
resume_Int(stack, result);
211+
return;
212+
}
213+
214+
uv_connect_t* connect_req = malloc(sizeof(uv_connect_t));
215+
connect_req->data = stack;
216+
217+
struct sockaddr_in addr;
218+
result = uv_ip4_addr(host_str, port, &addr);
219+
220+
if (result < 0) {
221+
free(tcp_handle);
222+
free(connect_req);
223+
free(host_str);
224+
resume_Int(stack, result);
225+
return;
226+
}
227+
228+
result = uv_tcp_connect(connect_req, tcp_handle, (const struct sockaddr*)&addr, c_tcp_connect_cb);
229+
230+
if (result < 0) {
231+
free(tcp_handle);
232+
free(connect_req);
233+
free(host_str);
234+
resume_Int(stack, result);
235+
return;
236+
}
237+
}
238+
239+
typedef struct {
240+
Stack stack;
241+
size_t size;
242+
char* data;
243+
} tcp_read_closure_t;
244+
245+
void c_tcp_read_cb(uv_stream_t* stream, ssize_t bytes_read, const uv_buf_t* buf) {
246+
tcp_read_closure_t* read_closure = (tcp_read_closure_t*)stream->data;
247+
Stack stack = read_closure->stack;
248+
249+
uv_read_stop(stream);
250+
free(read_closure);
251+
252+
resume_Int(stack, (int64_t)bytes_read);
253+
}
254+
255+
void c_tcp_read_alloc_cb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
256+
tcp_read_closure_t* read_closure = (tcp_read_closure_t*)handle->data;
257+
buf->base = read_closure->data;
258+
buf->len = read_closure->size;
259+
}
260+
261+
void c_tcp_read(Int handle, struct Pos buffer, Int offset, Int size, Stack stack) {
262+
uv_stream_t* stream = (uv_stream_t*)handle;
263+
264+
char* buffer_ptr = (char*)(c_bytearray_data(buffer) + offset);
265+
erasePositive(buffer);
266+
267+
tcp_read_closure_t* read_closure = malloc(sizeof(tcp_read_closure_t));
268+
read_closure->stack = stack;
269+
read_closure->size = size;
270+
read_closure->data = buffer_ptr;
271+
stream->data = read_closure;
272+
273+
int result = uv_read_start(stream, c_tcp_read_alloc_cb, c_tcp_read_cb);
274+
275+
if (result < 0) {
276+
free(read_closure);
277+
stream->data = NULL;
278+
resume_Int(stack, result);
279+
}
280+
}
281+
282+
void c_tcp_write_cb(uv_write_t* request, int status) {
283+
Stack stack = (Stack)request->data;
284+
free(request);
285+
resume_Int(stack, (int64_t)status);
286+
}
287+
288+
void c_tcp_write(Int handle, struct Pos buffer, Int offset, Int size, Stack stack) {
289+
uv_stream_t* stream = (uv_stream_t*)handle;
290+
291+
uv_buf_t buf = uv_buf_init((char*)(c_bytearray_data(buffer) + offset), size);
292+
erasePositive(buffer);
293+
294+
uv_write_t* request = malloc(sizeof(uv_write_t));
295+
request->data = stack;
296+
297+
int result = uv_write(request, stream, &buf, 1, c_tcp_write_cb);
298+
299+
if (result < 0) {
300+
free(request);
301+
resume_Int(stack, result);
302+
}
303+
}
304+
305+
void c_tcp_close_cb(uv_handle_t* handle) {
306+
Stack stack = (Stack)handle->data;
307+
free(handle);
308+
resume_Int(stack, 0);
309+
}
310+
311+
void c_tcp_close(Int handle, Stack stack) {
312+
uv_handle_t* uv_handle = (uv_handle_t*)handle;
313+
uv_handle->data = stack;
314+
uv_close(uv_handle, c_tcp_close_cb);
315+
}
316+
317+
181318
/**
182319
* Maps the libuv error code to a stable (platform independent) numeric value.
183320
*

0 commit comments

Comments
 (0)