From 8a25978dc7aaacfe12db04b356844d20b229ce2d Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Fri, 13 Jun 2025 12:36:39 +0200 Subject: [PATCH 01/24] TCP client on llvm --- libraries/common/io/error.effekt | 11 +++ libraries/common/io/filesystem.effekt | 7 -- libraries/common/io/network.effekt | 108 ++++++++------------ libraries/llvm/io.c | 137 ++++++++++++++++++++++++++ 4 files changed, 190 insertions(+), 73 deletions(-) diff --git a/libraries/common/io/error.effekt b/libraries/common/io/error.effekt index 6620776ef..1c624620b 100644 --- a/libraries/common/io/error.effekt +++ b/libraries/common/io/error.effekt @@ -464,3 +464,14 @@ namespace internal { """ } + +namespace internal { + def checkResult(result: Int): Int / Exception[IOError] = + if (result < 0) { + val ioError = fromNumber(internal::errorNumber(result)); + do raise[IOError](ioError, message(ioError)) + } else { + result + } +} + diff --git a/libraries/common/io/filesystem.effekt b/libraries/common/io/filesystem.effekt index efe0bd0a2..f7de91a10 100644 --- a/libraries/common/io/filesystem.effekt +++ b/libraries/common/io/filesystem.effekt @@ -222,13 +222,6 @@ namespace internal { ret void """ - def checkResult(result: Int): Int / Exception[IOError] = - if (result < 0) { - val ioError = fromNumber(internal::errorNumber(result)); - do raise[IOError](ioError, message(ioError)) - } else { - result - } } namespace examples { diff --git a/libraries/common/io/network.effekt b/libraries/common/io/network.effekt index 5744e852c..9f2954305 100644 --- a/libraries/common/io/network.effekt +++ b/libraries/common/io/network.effekt @@ -1,84 +1,60 @@ module io/network import bytearray -import io -namespace js { - extern jsNode """ - const net = require('node:net'); +import io/error - function listen(server, port, host, listener) { - server.listen(port, host); - server.on('connection', listener); - } - """ - - extern type JSServer // = net.Server - extern type JSSocket // = net.Socket - extern def server() at io: JSServer = - jsNode "net.createServer()" - extern def listen(server: JSServer, port: Int, host: String, listener: JSSocket => Unit at {io, async, global}) at io: Unit = - jsNode "listen(${server}, ${port}, ${host}, (socket) => $effekt.runToplevel((ks, k) => (${listener})(socket, ks, k)))" - extern def send(socket: JSSocket, data: ByteArray) at async: Unit = - jsNode "$effekt.capture(callback => ${socket}.write(${data}, callback))" +/// A tcp handle. Should not be inspected. +type Connection = Int - extern def receive(socket: JSSocket) at async: ByteArray = - jsNode "$effekt.capture(callback => ${socket}.once('data', callback))" +def connect(host: String, port: Int): Connection / Exception[IOError] = + internal::checkResult(internal::connect(host, port)) - extern def end(socket: JSSocket) at async: Unit = - jsNode "$effekt.capture(k => ${socket}.end(k))" -} +def read(handle: Connection, buffer: ByteArray, offset: Int, size: Int): Int / Exception[IOError] = + internal::checkResult(internal::read(handle, buffer, offset, size)) -interface Socket { - def send(message: ByteArray): Unit - def receive(): ByteArray - def end(): Unit -} +def write(handle: Connection, buffer: ByteArray, offset: Int, size: Int): Int / Exception[IOError] = + internal::checkResult(internal::write(handle, buffer, offset, size)) -def server(host: String, port: Int, handler: () => Unit / Socket at {io, async, global}): Unit = { - val server = js::server(); - js::listen(server, port, host, box { socket => - println("New connection") - spawn(box { - try handler() - with Socket { - def send(message) = - resume(js::send(socket, message)) - def receive() = - resume(js::receive(socket)) - def end() = - resume(js::end(socket)) - } - }) - }) +def close(handle: Connection): Unit / Exception[IOError] = { + internal::checkResult(internal::close(handle)); () } -namespace examples { - def helloWorldApp(): Unit / Socket = { - val request = do receive().toString; +namespace internal { - println("Received a request: " ++ request) + extern llvm """ + declare void @c_tcp_connect(%Pos, %Int, %Stack) + declare void @c_tcp_read(%Int, %Pos, %Int, %Int, %Stack) + declare void @c_tcp_write(%Int, %Pos, %Int, %Int, %Stack) + declare void @c_tcp_close(%Int, %Stack) + """ - def respond(s: String): Unit / Socket = - do send(s.fromString) + extern async def connect(host: String, port: Int): Int = + llvm """ + call void @c_tcp_connect(%Pos ${host}, %Int ${port}, %Stack %stack) + ret void + """ + + extern async def read(handle: Int, buffer: ByteArray, offset: Int, size: Int): Int = + llvm """ + call void @c_tcp_read(%Int ${handle}, %Pos ${buffer}, %Int ${offset}, %Int ${size}, %Stack %stack) + ret void + """ + + extern async def write(handle: Int, buffer: ByteArray, offset: Int, size: Int): Int = + llvm """ + call void @c_tcp_write(%Int ${handle}, %Pos ${buffer}, %Int ${offset}, %Int ${size}, %Stack %stack) + ret void + """ + + extern async def close(handle: Int): Int = + llvm """ + call void @c_tcp_close(%Int ${handle}, %Stack %stack) + ret void + """ - if (request.startsWith("GET /")) { - respond("HTTP/1.1 200 OK\r\n\r\nHello from Effekt!") - } else { - respond("HTTP/1.1 400 Bad Request\r\n\r\n") - } - do end() - } +} - // A server that just shows "Hello from Effekt!" on localhost:8080 - def main() = { - val port = 8080 - println("Starting server on http://localhost:" ++ port.show) - server("localhost", port, box { - helloWorldApp() - }) - } -} diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 57b3b22d5..adbc6efa5 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -81,6 +81,7 @@ void c_fs_open(String path, int flags, Stack stack) { int result = uv_fs_open(uv_default_loop(), request, path_str, flags, 0777, c_resume_int_fs); if (result < 0) { + // TODO free path_str? uv_fs_req_cleanup(request); free(request); resume_Int(stack, result); @@ -178,6 +179,142 @@ void c_fs_mkdir(String path, Stack stack) { return; } +// Network +// ------- + +void c_tcp_connect_cb(uv_connect_t* request, int status) { + Stack stack = (Stack)request->data; + + if (status < 0) { + uv_close((uv_handle_t*)request->handle, NULL); + free(request->handle); + free(request); + resume_Int(stack, status); + } else { + int64_t handle = (int64_t)request->handle; + free(request); + resume_Int(stack, handle); + } +} + +void c_tcp_connect(String host, Int port, Stack stack) { + char* host_str = c_bytearray_into_nullterminated_string(host); + erasePositive(host); + + uv_tcp_t* tcp_handle = malloc(sizeof(uv_tcp_t)); + int result = uv_tcp_init(uv_default_loop(), tcp_handle); + + if (result < 0) { + free(tcp_handle); + free(host_str); + resume_Int(stack, result); + return; + } + + uv_connect_t* connect_req = malloc(sizeof(uv_connect_t)); + connect_req->data = stack; + + struct sockaddr_in addr; + result = uv_ip4_addr(host_str, port, &addr); + + if (result < 0) { + free(tcp_handle); + free(connect_req); + free(host_str); + resume_Int(stack, result); + return; + } + + result = uv_tcp_connect(connect_req, tcp_handle, (const struct sockaddr*)&addr, c_tcp_connect_cb); + + if (result < 0) { + free(tcp_handle); + free(connect_req); + free(host_str); + resume_Int(stack, result); + return; + } +} + +typedef struct { + Stack stack; + size_t size; + char* data; +} tcp_read_closure_t; + +void c_tcp_read_cb(uv_stream_t* stream, ssize_t bytes_read, const uv_buf_t* buf) { + tcp_read_closure_t* read_closure = (tcp_read_closure_t*)stream->data; + Stack stack = read_closure->stack; + + uv_read_stop(stream); + free(read_closure); + + resume_Int(stack, (int64_t)bytes_read); +} + +void c_tcp_read_alloc_cb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + tcp_read_closure_t* read_closure = (tcp_read_closure_t*)handle->data; + buf->base = read_closure->data; + buf->len = read_closure->size; +} + +void c_tcp_read(Int handle, struct Pos buffer, Int offset, Int size, Stack stack) { + uv_stream_t* stream = (uv_stream_t*)handle; + + char* buffer_ptr = (char*)(c_bytearray_data(buffer) + offset); + erasePositive(buffer); + + tcp_read_closure_t* read_closure = malloc(sizeof(tcp_read_closure_t)); + read_closure->stack = stack; + read_closure->size = size; + read_closure->data = buffer_ptr; + stream->data = read_closure; + + int result = uv_read_start(stream, c_tcp_read_alloc_cb, c_tcp_read_cb); + + if (result < 0) { + free(read_closure); + stream->data = NULL; + resume_Int(stack, result); + } +} + +void c_tcp_write_cb(uv_write_t* request, int status) { + Stack stack = (Stack)request->data; + free(request); + resume_Int(stack, (int64_t)status); +} + +void c_tcp_write(Int handle, struct Pos buffer, Int offset, Int size, Stack stack) { + uv_stream_t* stream = (uv_stream_t*)handle; + + uv_buf_t buf = uv_buf_init((char*)(c_bytearray_data(buffer) + offset), size); + erasePositive(buffer); + + uv_write_t* request = malloc(sizeof(uv_write_t)); + request->data = stack; + + int result = uv_write(request, stream, &buf, 1, c_tcp_write_cb); + + if (result < 0) { + free(request); + resume_Int(stack, result); + } +} + +void c_tcp_close_cb(uv_handle_t* handle) { + Stack stack = (Stack)handle->data; + free(handle); + resume_Int(stack, 0); +} + +void c_tcp_close(Int handle, Stack stack) { + uv_handle_t* uv_handle = (uv_handle_t*)handle; + uv_handle->data = stack; + uv_close(uv_handle, c_tcp_close_cb); +} + + /** * Maps the libuv error code to a stable (platform independent) numeric value. * From 526960ee396649dc1fdd8227f2cd42b24d94d027 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Fri, 13 Jun 2025 19:06:23 +0200 Subject: [PATCH 02/24] Start working TCP server --- libraries/common/io/network.effekt | 33 ++++++++++ libraries/llvm/io.c | 97 ++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+) diff --git a/libraries/common/io/network.effekt b/libraries/common/io/network.effekt index 9f2954305..7ab6dfa7c 100644 --- a/libraries/common/io/network.effekt +++ b/libraries/common/io/network.effekt @@ -21,6 +21,18 @@ def close(handle: Connection): Unit / Exception[IOError] = { internal::checkResult(internal::close(handle)); () } +/// A tcp listener. Should not be inspected. +type Listener = Int + +def listen(host: String, port: Int, backlog: Int): Listener / Exception[IOError] = + internal::checkResult(internal::listen(host, port, backlog)) + +def accept(listener: Listener): Connection / Exception[IOError] = + internal::checkResult(internal::accept(listener)) + +def close_listener(listener: Listener): Unit / Exception[IOError] = { + internal::checkResult(internal::close_listener(listener)); () +} namespace internal { @@ -29,6 +41,9 @@ namespace internal { declare void @c_tcp_read(%Int, %Pos, %Int, %Int, %Stack) declare void @c_tcp_write(%Int, %Pos, %Int, %Int, %Stack) declare void @c_tcp_close(%Int, %Stack) + declare void @c_tcp_listen(%Pos, %Int, %Int, %Stack) + declare void @c_tcp_accept(%Int, %Stack) + declare void @c_tcp_close_listener(%Int, %Stack) """ extern async def connect(host: String, port: Int): Int = @@ -55,6 +70,24 @@ namespace internal { ret void """ + extern async def listen(host: String, port: Int, backlog: Int): Int = + llvm """ + call void @c_tcp_listen(%Pos ${host}, %Int ${port}, %Int ${backlog}, %Stack %stack) + ret void + """ + + extern async def accept(listener: Int): Int = + llvm """ + call void @c_tcp_accept(%Int ${listener}, %Stack %stack) + ret void + """ + + extern async def close_listener(handle: Int): Int = + llvm """ + call void @c_tcp_close_listener(%Int ${handle}, %Stack %stack) + ret void + """ + } diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index adbc6efa5..3e6ab8f33 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -305,6 +305,7 @@ void c_tcp_write(Int handle, struct Pos buffer, Int offset, Int size, Stack stac void c_tcp_close_cb(uv_handle_t* handle) { Stack stack = (Stack)handle->data; free(handle); + // TODO resume_Pos Unit resume_Int(stack, 0); } @@ -314,6 +315,102 @@ void c_tcp_close(Int handle, Stack stack) { uv_close(uv_handle, c_tcp_close_cb); } +void c_tcp_listen(String host, Int port, Int backlog, Stack stack) { + // TODO make non-async + char* host_str = c_bytearray_into_nullterminated_string(host); + erasePositive(host); + + uv_tcp_t* tcp_handle = malloc(sizeof(uv_tcp_t)); + int result = uv_tcp_init(uv_default_loop(), tcp_handle); + + if (result < 0) { + free(tcp_handle); + free(host_str); + resume_Int(stack, result); + return; + } + + struct sockaddr_in addr; + result = uv_ip4_addr(host_str, port, &addr); + free(host_str); + + if (result < 0) { + free(tcp_handle); + resume_Int(stack, result); + return; + } + + result = uv_tcp_bind(tcp_handle, (const struct sockaddr*)&addr, 0); + if (result < 0) { + free(tcp_handle); + resume_Int(stack, result); + return; + } + + result = uv_listen((uv_stream_t*)tcp_handle, backlog, NULL); + if (result < 0) { + free(tcp_handle); + resume_Int(stack, result); + return; + } + + resume_Int(stack, (int64_t)tcp_handle); +} + +void c_tcp_accept_cb(uv_stream_t* server, int status) { + Stack stack = (Stack)server->data; + + if (status < 0) { + resume_Int(stack, status); + return; + } + + uv_tcp_t* client = malloc(sizeof(uv_tcp_t)); + int result = uv_tcp_init(uv_default_loop(), client); + + if (result < 0) { + free(client); + // TODO share stack? + resume_Int(stack, result); + return; + } + + result = uv_accept(server, (uv_stream_t*)client); + if (result < 0) { + uv_close((uv_handle_t*)client, NULL); + free(client); + // TODO share stack? + resume_Int(stack, result); + return; + } + + // TODO share resumption + // shareStack(stack); + resume_Int(stack, (int64_t)client); +} + +void c_tcp_accept(Int server_handle, Stack stack) { + uv_stream_t* server = (uv_stream_t*)server_handle; + server->data = stack; // Store stack for multiple resumes + + int result = uv_listen(server, 0, c_tcp_accept_cb); + if (result < 0) { + resume_Int(stack, result); + } +} + +void c_tcp_close_listener(Int handle, Stack stack) { + uv_handle_t* uv_handle = (uv_handle_t*)handle; + + Stack registered_stack = (Stack)uv_handle->data; + if (registered_stack) { + eraseStack(registered_stack); + } + + uv_handle->data = stack; + uv_close(uv_handle, c_tcp_close_cb); +} + /** * Maps the libuv error code to a stable (platform independent) numeric value. From d93eecbf08fda35feb9b8c80b416f5f44fa10715 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Sun, 15 Jun 2025 19:47:33 +0200 Subject: [PATCH 03/24] Fix formatting --- libraries/common/io/network.effekt | 70 +++++++++++++++--------------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/libraries/common/io/network.effekt b/libraries/common/io/network.effekt index 7ab6dfa7c..f8bca0562 100644 --- a/libraries/common/io/network.effekt +++ b/libraries/common/io/network.effekt @@ -46,41 +46,41 @@ namespace internal { declare void @c_tcp_close_listener(%Int, %Stack) """ - extern async def connect(host: String, port: Int): Int = - llvm """ - call void @c_tcp_connect(%Pos ${host}, %Int ${port}, %Stack %stack) - ret void - """ - - extern async def read(handle: Int, buffer: ByteArray, offset: Int, size: Int): Int = - llvm """ - call void @c_tcp_read(%Int ${handle}, %Pos ${buffer}, %Int ${offset}, %Int ${size}, %Stack %stack) - ret void - """ - - extern async def write(handle: Int, buffer: ByteArray, offset: Int, size: Int): Int = - llvm """ - call void @c_tcp_write(%Int ${handle}, %Pos ${buffer}, %Int ${offset}, %Int ${size}, %Stack %stack) - ret void - """ - - extern async def close(handle: Int): Int = - llvm """ - call void @c_tcp_close(%Int ${handle}, %Stack %stack) - ret void - """ - - extern async def listen(host: String, port: Int, backlog: Int): Int = - llvm """ - call void @c_tcp_listen(%Pos ${host}, %Int ${port}, %Int ${backlog}, %Stack %stack) - ret void - """ - - extern async def accept(listener: Int): Int = - llvm """ - call void @c_tcp_accept(%Int ${listener}, %Stack %stack) - ret void - """ + extern async def connect(host: String, port: Int): Int = + llvm """ + call void @c_tcp_connect(%Pos ${host}, %Int ${port}, %Stack %stack) + ret void + """ + + extern async def read(handle: Int, buffer: ByteArray, offset: Int, size: Int): Int = + llvm """ + call void @c_tcp_read(%Int ${handle}, %Pos ${buffer}, %Int ${offset}, %Int ${size}, %Stack %stack) + ret void + """ + + extern async def write(handle: Int, buffer: ByteArray, offset: Int, size: Int): Int = + llvm """ + call void @c_tcp_write(%Int ${handle}, %Pos ${buffer}, %Int ${offset}, %Int ${size}, %Stack %stack) + ret void + """ + + extern async def close(handle: Int): Int = + llvm """ + call void @c_tcp_close(%Int ${handle}, %Stack %stack) + ret void + """ + + extern async def listen(host: String, port: Int, backlog: Int): Int = + llvm """ + call void @c_tcp_listen(%Pos ${host}, %Int ${port}, %Int ${backlog}, %Stack %stack) + ret void + """ + + extern async def accept(listener: Int): Int = + llvm """ + call void @c_tcp_accept(%Int ${listener}, %Stack %stack) + ret void + """ extern async def close_listener(handle: Int): Int = llvm """ From eec4c3c57953dede2c39dc08ea3d352bdf78fe4d Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Tue, 17 Jun 2025 10:29:36 +0200 Subject: [PATCH 04/24] Accept callback in accept --- libraries/common/io/network.effekt | 21 +++++----- libraries/llvm/io.c | 65 ++++++++++++++++++++++-------- libraries/llvm/types.c | 3 ++ 3 files changed, 62 insertions(+), 27 deletions(-) diff --git a/libraries/common/io/network.effekt b/libraries/common/io/network.effekt index f8bca0562..320bf02b3 100644 --- a/libraries/common/io/network.effekt +++ b/libraries/common/io/network.effekt @@ -27,11 +27,12 @@ type Listener = Int def listen(host: String, port: Int, backlog: Int): Listener / Exception[IOError] = internal::checkResult(internal::listen(host, port, backlog)) -def accept(listener: Listener): Connection / Exception[IOError] = - internal::checkResult(internal::accept(listener)) +def accept(listener: Listener, handler: Connection => Unit at {io, async, global}): Unit / Exception[IOError] = { + internal::checkResult(internal::accept(listener, handler)); () +} -def close_listener(listener: Listener): Unit / Exception[IOError] = { - internal::checkResult(internal::close_listener(listener)); () +def shutdown(listener: Listener): Unit / Exception[IOError] = { + internal::checkResult(internal::shutdown(listener)); () } namespace internal { @@ -42,8 +43,8 @@ namespace internal { declare void @c_tcp_write(%Int, %Pos, %Int, %Int, %Stack) declare void @c_tcp_close(%Int, %Stack) declare void @c_tcp_listen(%Pos, %Int, %Int, %Stack) - declare void @c_tcp_accept(%Int, %Stack) - declare void @c_tcp_close_listener(%Int, %Stack) + declare void @c_tcp_accept(%Int, %Pos, %Stack) + declare void @c_tcp_shutdown(%Int, %Stack) """ extern async def connect(host: String, port: Int): Int = @@ -76,15 +77,15 @@ namespace internal { ret void """ - extern async def accept(listener: Int): Int = + extern async def accept(listener: Int, handler: Int => Unit at {io, async, global}): Int = llvm """ - call void @c_tcp_accept(%Int ${listener}, %Stack %stack) + call void @c_tcp_accept(%Int ${listener}, %Pos ${handler}, %Stack %stack) ret void """ - extern async def close_listener(handle: Int): Int = + extern async def shutdown(handle: Int): Int = llvm """ - call void @c_tcp_close_listener(%Int ${handle}, %Stack %stack) + call void @c_tcp_shutdown(%Int ${handle}, %Stack %stack) ret void """ diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 3e6ab8f33..f7f68d0f9 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -315,6 +315,12 @@ void c_tcp_close(Int handle, Stack stack) { uv_close(uv_handle, c_tcp_close_cb); } + +typedef struct { + Stack stack; + struct Pos handler; +} tcp_accept_closure_t; + void c_tcp_listen(String host, Int port, Int backlog, Stack stack) { // TODO make non-async char* host_str = c_bytearray_into_nullterminated_string(host); @@ -358,10 +364,14 @@ void c_tcp_listen(String host, Int port, Int backlog, Stack stack) { } void c_tcp_accept_cb(uv_stream_t* server, int status) { - Stack stack = (Stack)server->data; + tcp_accept_closure_t* accept_closure = (tcp_accept_closure_t*)server->data; if (status < 0) { - resume_Int(stack, status); + // TODO resume last + erasePositive(accept_closure->handler); + resume_Int(accept_closure->stack, status); + free(accept_closure); + server->data = NULL; return; } @@ -369,46 +379,67 @@ void c_tcp_accept_cb(uv_stream_t* server, int status) { int result = uv_tcp_init(uv_default_loop(), client); if (result < 0) { + // TODO resume last free(client); - // TODO share stack? - resume_Int(stack, result); + erasePositive(accept_closure->handler); + resume_Int(accept_closure->stack, result); + free(accept_closure); + server->data = NULL; return; } result = uv_accept(server, (uv_stream_t*)client); if (result < 0) { + // TODO resume last uv_close((uv_handle_t*)client, NULL); free(client); - // TODO share stack? - resume_Int(stack, result); + erasePositive(accept_closure->handler); + resume_Int(accept_closure->stack, result); + free(accept_closure); + server->data = NULL; return; } - // TODO share resumption - // shareStack(stack); - resume_Int(stack, (int64_t)client); + // Call the handler with the new client connection + run_Int(unbox(accept_closure->handler), (int64_t)client); } -void c_tcp_accept(Int server_handle, Stack stack) { - uv_stream_t* server = (uv_stream_t*)server_handle; - server->data = stack; // Store stack for multiple resumes +void c_tcp_accept(Int listener, struct Pos handler, Stack stack) { + uv_stream_t* server = (uv_stream_t*)listener; + + tcp_accept_closure_t* accept_closure = malloc(sizeof(tcp_accept_closure_t)); + accept_closure->stack = stack; + accept_closure->handler = handler; + server->data = accept_closure; int result = uv_listen(server, 0, c_tcp_accept_cb); if (result < 0) { + free(accept_closure); + erasePositive(handler); resume_Int(stack, result); + return; } } -void c_tcp_close_listener(Int handle, Stack stack) { +void c_tcp_shutdown_cb(uv_handle_t* handle) { + Stack stack = (Stack)handle->data; + free(handle); + // TODO resume_Pos Unit + resume_Int(stack, 0); +} + +void c_tcp_shutdown(Int handle, Stack stack) { uv_handle_t* uv_handle = (uv_handle_t*)handle; - Stack registered_stack = (Stack)uv_handle->data; - if (registered_stack) { - eraseStack(registered_stack); + tcp_accept_closure_t* accept_closure = (tcp_accept_closure_t*)uv_handle->data; + if (accept_closure) { + eraseStack(accept_closure->stack); + erasePositive(accept_closure->handler); + free(accept_closure); } uv_handle->data = stack; - uv_close(uv_handle, c_tcp_close_cb); + uv_close(uv_handle, c_tcp_shutdown_cb); } diff --git a/libraries/llvm/types.c b/libraries/llvm/types.c index c372bbbb6..bc3fe04d2 100644 --- a/libraries/llvm/types.c +++ b/libraries/llvm/types.c @@ -34,6 +34,9 @@ typedef struct StackValue* Stack; // Defined in rts.ll +extern struct Pos box(struct Neg); +extern struct Neg unbox(struct Pos); + extern void resume_Int(Stack, Int); extern void resume_Pos(Stack, struct Pos); From a91a6f727a3a930d2e8e3fb7005f8962e4037781 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Tue, 17 Jun 2025 10:50:07 +0200 Subject: [PATCH 05/24] Remove traces of evidence --- libraries/llvm/io.c | 1 - libraries/llvm/rts.ll | 6 ++---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index f7f68d0f9..228134fa0 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -400,7 +400,6 @@ void c_tcp_accept_cb(uv_stream_t* server, int status) { return; } - // Call the handler with the new client connection run_Int(unbox(accept_closure->handler), (int64_t)client); } diff --git a/libraries/llvm/rts.ll b/libraries/llvm/rts.ll index 746ed5def..8e141b4b5 100644 --- a/libraries/llvm/rts.ll +++ b/libraries/llvm/rts.ll @@ -1,7 +1,5 @@ ; Run-Time System -%Evidence = type i64 - ; Basic types %Environment = type ptr @@ -734,7 +732,7 @@ define void @run_Int(%Neg %f, i64 %argument) { %functionPointer = load ptr, ptr %functionPointerPointer, !alias.scope !15, !noalias !25 ; call - tail call tailcc %Pos %functionPointer(%Object %object, %Evidence 0, i64 %argument, %Stack %stack) + tail call tailcc %Pos %functionPointer(%Object %object, i64 %argument, %Stack %stack) ret void } @@ -749,7 +747,7 @@ define void @run_Pos(%Neg %f, %Pos %argument) { %functionPointer = load ptr, ptr %functionPointerPointer, !alias.scope !15, !noalias !25 ; call - tail call tailcc %Pos %functionPointer(%Object %object, %Evidence 0, %Pos %argument, %Stack %stack) + tail call tailcc %Pos %functionPointer(%Object %object, %Pos %argument, %Stack %stack) ret void } From 05e4e27eb837c026c5019901d08af2d3914e2dd0 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Tue, 17 Jun 2025 12:32:44 +0200 Subject: [PATCH 06/24] Share handler --- libraries/llvm/io.c | 1 + 1 file changed, 1 insertion(+) diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 228134fa0..18b76c8ea 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -400,6 +400,7 @@ void c_tcp_accept_cb(uv_stream_t* server, int status) { return; } + sharePositive(accept_closure->handler); run_Int(unbox(accept_closure->handler), (int64_t)client); } From 85b8d4ce4b7a038baaa0d541cd53204a0252e866 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Tue, 17 Jun 2025 14:54:12 +0200 Subject: [PATCH 07/24] Run boxes only --- libraries/common/io.effekt | 3 +-- libraries/llvm/io.c | 2 +- libraries/llvm/rts.ll | 15 ++++++++++++--- libraries/llvm/types.c | 9 +++------ 4 files changed, 17 insertions(+), 12 deletions(-) diff --git a/libraries/common/io.effekt b/libraries/common/io.effekt index 59630c466..e9b69ddb7 100644 --- a/libraries/common/io.effekt +++ b/libraries/common/io.effekt @@ -16,8 +16,7 @@ extern def spawn(task: Task[Unit]) at async: Unit = js "$effekt.capture(k => { setTimeout(() => k($effekt.unit), 0); return $effekt.run(${task}) })" llvm """ call void @c_yield(%Stack %stack) - %unboxed = call ccc %Neg @coercePosNeg(%Pos ${task}) - call void @run(%Neg %unboxed) + call void @run(%Pos %task) ret void """ diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 18b76c8ea..0922ea216 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -401,7 +401,7 @@ void c_tcp_accept_cb(uv_stream_t* server, int status) { } sharePositive(accept_closure->handler); - run_Int(unbox(accept_closure->handler), (int64_t)client); + run_Int(accept_closure->handler, (int64_t)client); } void c_tcp_accept(Int listener, struct Pos handler, Stack stack) { diff --git a/libraries/llvm/rts.ll b/libraries/llvm/rts.ll index 8e141b4b5..604d75a25 100644 --- a/libraries/llvm/rts.ll +++ b/libraries/llvm/rts.ll @@ -706,7 +706,10 @@ define void @resume_Pos(%Stack %stack, %Pos %argument) { ret void } -define void @run(%Neg %f) { +define void @run(%Pos %boxed) { + ; unbox + %f = call %Neg @coercePosNeg(%Pos %boxed) + ; fresh stack %stack = call %Stack @withEmptyStack() @@ -721,7 +724,10 @@ define void @run(%Neg %f) { ret void } -define void @run_Int(%Neg %f, i64 %argument) { +define void @run_Int(%Pos %boxed, i64 %argument) { + ; unbox + %f = call %Neg @coercePosNeg(%Pos %boxed) + ; fresh stack %stack = call %Stack @withEmptyStack() @@ -736,7 +742,10 @@ define void @run_Int(%Neg %f, i64 %argument) { ret void } -define void @run_Pos(%Neg %f, %Pos %argument) { +define void @run_Pos(%Pos %boxed, %Pos %argument) { + ; unbox + %f = call %Neg @coercePosNeg(%Pos %boxed) + ; fresh stack %stack = call %Stack @withEmptyStack() diff --git a/libraries/llvm/types.c b/libraries/llvm/types.c index bc3fe04d2..83190cf4a 100644 --- a/libraries/llvm/types.c +++ b/libraries/llvm/types.c @@ -34,15 +34,12 @@ typedef struct StackValue* Stack; // Defined in rts.ll -extern struct Pos box(struct Neg); -extern struct Neg unbox(struct Pos); - extern void resume_Int(Stack, Int); extern void resume_Pos(Stack, struct Pos); -extern void run(struct Neg); -extern void run_Int(struct Neg, Int); -extern void run_Pos(struct Neg, struct Pos); +extern void run(struct Pos); +extern void run_Int(struct Pos, Int); +extern void run_Pos(struct Pos, struct Pos); // Reference counting primitives defined in LLVM extern void eraseNegative(struct Neg); From cef36c7b8cdf90b501530f7c39a23ebbdbcb1a29 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Mon, 23 Jun 2025 16:52:21 +0200 Subject: [PATCH 08/24] Fix bug in spawn and document todo --- libraries/common/io.effekt | 2 +- libraries/common/io/network.effekt | 2 ++ libraries/llvm/io.c | 2 ++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/libraries/common/io.effekt b/libraries/common/io.effekt index e9b69ddb7..ce6365433 100644 --- a/libraries/common/io.effekt +++ b/libraries/common/io.effekt @@ -16,7 +16,7 @@ extern def spawn(task: Task[Unit]) at async: Unit = js "$effekt.capture(k => { setTimeout(() => k($effekt.unit), 0); return $effekt.run(${task}) })" llvm """ call void @c_yield(%Stack %stack) - call void @run(%Pos %task) + call void @run(%Pos ${task}) ret void """ diff --git a/libraries/common/io/network.effekt b/libraries/common/io/network.effekt index 320bf02b3..3b30cb901 100644 --- a/libraries/common/io/network.effekt +++ b/libraries/common/io/network.effekt @@ -53,12 +53,14 @@ namespace internal { ret void """ + /// The buffer must be kept alive by using it after the call extern async def read(handle: Int, buffer: ByteArray, offset: Int, size: Int): Int = llvm """ call void @c_tcp_read(%Int ${handle}, %Pos ${buffer}, %Int ${offset}, %Int ${size}, %Stack %stack) ret void """ + /// The buffer must be kept alive by using it after the call extern async def write(handle: Int, buffer: ByteArray, offset: Int, size: Int): Int = llvm """ call void @c_tcp_write(%Int ${handle}, %Pos ${buffer}, %Int ${offset}, %Int ${size}, %Stack %stack) diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 0922ea216..03341ea6a 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -263,6 +263,7 @@ void c_tcp_read(Int handle, struct Pos buffer, Int offset, Int size, Stack stack char* buffer_ptr = (char*)(c_bytearray_data(buffer) + offset); erasePositive(buffer); + // TODO panic if this was the last reference tcp_read_closure_t* read_closure = malloc(sizeof(tcp_read_closure_t)); read_closure->stack = stack; @@ -290,6 +291,7 @@ void c_tcp_write(Int handle, struct Pos buffer, Int offset, Int size, Stack stac uv_buf_t buf = uv_buf_init((char*)(c_bytearray_data(buffer) + offset), size); erasePositive(buffer); + // TODO panic if this was the last reference uv_write_t* request = malloc(sizeof(uv_write_t)); request->data = stack; From 8fcfb7c53ea8583bf98210fe85938d51c5e4259b Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Mon, 23 Jun 2025 16:55:21 +0200 Subject: [PATCH 09/24] Add test --- .../input_output/server_client.effekt | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 examples/benchmarks/input_output/server_client.effekt diff --git a/examples/benchmarks/input_output/server_client.effekt b/examples/benchmarks/input_output/server_client.effekt new file mode 100644 index 000000000..d507e0c18 --- /dev/null +++ b/examples/benchmarks/input_output/server_client.effekt @@ -0,0 +1,49 @@ +import io +import io/error +import io/network +import io/time +import bytearray + +def main() = { + with on[IOError].panic + val listener = listen("127.0.0.1", 8080, 1000); + spawn(box { + with on[IOError].panic + accept(listener, box { connection => + with on[IOError].panic + println("accepted") + val message = "hello world" + var buffer = bytearray::fromString(message) + write(connection, buffer, 0, buffer.size()) + close(connection) + })}); + + println("started") + + val results = array::build(2) { i => + promise(box { + with on[IOError].result + wait(1000) + val connection = connect("127.0.0.1", 8080) + println("connected") + var buffer = bytearray::allocate(4096) + val number = read(connection, buffer, 0, 4096) + close(connection) + println("closed " ++ number.show) + number + }) + }; + + println("spawned") + + var total = 0; + results.foreach { number => + println("awaiting") + total = total + number.await.value + println("awaited") + }; + + shutdown(listener) + + println(total) +} \ No newline at end of file From 57721cd3c378fd35bacedd8e1eea4fef7c9db850 Mon Sep 17 00:00:00 2001 From: Marvin Date: Mon, 30 Jun 2025 17:06:20 +0200 Subject: [PATCH 10/24] Fix stdlib/network (#1063) This fixes a memory leak and increases the backlog of uv_listen to SOMAXCONN, such that connections are put in a queue when the previous acceptor is not yet finished (this is the case with a timeout, which triggers the connects at the same time) Another memory leak remains which I couldn't find. ``` ==130847== 128 bytes in 1 blocks are definitely lost in loss record 1 of 1 ==130847== at 0x48577A8: malloc (vg_replace_malloc.c:446) ==130847== by 0x4003F90: ??? (in server_client) ==130847== by 0x4004499: ??? (in server_client) ==130847== by 0x4004557: run (in server_client) ==130847== by 0x4004A1C: spawn_4055 (in server_client) ==130847== by 0x40044F6: resume_Int (in server_client) ==130847== by 0x4002BA0: c_tcp_listen (io.c:367) ==130847== by 0x4004B65: listen_4331 (in server_client) ==130847== by 0x400CA5D: effektMain (in server_client) ==130847== by 0x4003D60: main (main.c:37) ``` --- examples/benchmarks/input_output/server_client.effekt | 4 ++-- libraries/llvm/io.c | 7 ++++--- libraries/llvm/main.c | 3 +-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/examples/benchmarks/input_output/server_client.effekt b/examples/benchmarks/input_output/server_client.effekt index d507e0c18..ba233a35f 100644 --- a/examples/benchmarks/input_output/server_client.effekt +++ b/examples/benchmarks/input_output/server_client.effekt @@ -20,7 +20,7 @@ def main() = { println("started") - val results = array::build(2) { i => + val results = array::build(10) { i => promise(box { with on[IOError].result wait(1000) @@ -46,4 +46,4 @@ def main() = { shutdown(listener) println(total) -} \ No newline at end of file +} diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 03341ea6a..ea108abce 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -216,11 +216,11 @@ void c_tcp_connect(String host, Int port, Stack stack) { struct sockaddr_in addr; result = uv_ip4_addr(host_str, port, &addr); + free(host_str); if (result < 0) { free(tcp_handle); free(connect_req); - free(host_str); resume_Int(stack, result); return; } @@ -230,7 +230,6 @@ void c_tcp_connect(String host, Int port, Stack stack) { if (result < 0) { free(tcp_handle); free(connect_req); - free(host_str); resume_Int(stack, result); return; } @@ -243,6 +242,7 @@ typedef struct { } tcp_read_closure_t; void c_tcp_read_cb(uv_stream_t* stream, ssize_t bytes_read, const uv_buf_t* buf) { + (void)(buf); tcp_read_closure_t* read_closure = (tcp_read_closure_t*)stream->data; Stack stack = read_closure->stack; @@ -253,6 +253,7 @@ void c_tcp_read_cb(uv_stream_t* stream, ssize_t bytes_read, const uv_buf_t* buf) } void c_tcp_read_alloc_cb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + (void)(suggested_size); tcp_read_closure_t* read_closure = (tcp_read_closure_t*)handle->data; buf->base = read_closure->data; buf->len = read_closure->size; @@ -414,7 +415,7 @@ void c_tcp_accept(Int listener, struct Pos handler, Stack stack) { accept_closure->handler = handler; server->data = accept_closure; - int result = uv_listen(server, 0, c_tcp_accept_cb); + int result = uv_listen(server, SOMAXCONN, c_tcp_accept_cb); if (result < 0) { free(accept_closure); erasePositive(handler); diff --git a/libraries/llvm/main.c b/libraries/llvm/main.c index 38c5f7133..d95f9a535 100644 --- a/libraries/llvm/main.c +++ b/libraries/llvm/main.c @@ -37,6 +37,5 @@ int main(int argc, char *argv[]) { effektMain(); uv_loop_t *loop = uv_default_loop(); uv_run(loop, UV_RUN_DEFAULT); - uv_loop_close(loop); - return 0; + return uv_loop_close(loop); } From 325798f266876ce84fe475fe1762ccb1f428dbff Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Wed, 2 Jul 2025 13:29:19 +0200 Subject: [PATCH 11/24] Only free stuff in callback --- libraries/llvm/io.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index ea108abce..41a4b0b46 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -186,8 +186,7 @@ void c_tcp_connect_cb(uv_connect_t* request, int status) { Stack stack = (Stack)request->data; if (status < 0) { - uv_close((uv_handle_t*)request->handle, NULL); - free(request->handle); + uv_close((uv_handle_t*)request->handle, (uv_close_cb)free); free(request); resume_Int(stack, status); } else { @@ -394,8 +393,7 @@ void c_tcp_accept_cb(uv_stream_t* server, int status) { result = uv_accept(server, (uv_stream_t*)client); if (result < 0) { // TODO resume last - uv_close((uv_handle_t*)client, NULL); - free(client); + uv_close((uv_handle_t*)client, (uv_close_cb)free); erasePositive(accept_closure->handler); resume_Int(accept_closure->stack, result); free(accept_closure); From 6655bc1b6d3835e7d6f8c1544b0faabf1abc708d Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Wed, 2 Jul 2025 13:45:11 +0200 Subject: [PATCH 12/24] Close listener in case of error --- libraries/llvm/io.c | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 41a4b0b46..8b9e90a08 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -317,12 +317,6 @@ void c_tcp_close(Int handle, Stack stack) { uv_close(uv_handle, c_tcp_close_cb); } - -typedef struct { - Stack stack; - struct Pos handler; -} tcp_accept_closure_t; - void c_tcp_listen(String host, Int port, Int backlog, Stack stack) { // TODO make non-async char* host_str = c_bytearray_into_nullterminated_string(host); @@ -343,21 +337,21 @@ void c_tcp_listen(String host, Int port, Int backlog, Stack stack) { free(host_str); if (result < 0) { - free(tcp_handle); + uv_close((uv_handle_t*)tcp_handle, (uv_close_cb)free); resume_Int(stack, result); return; } result = uv_tcp_bind(tcp_handle, (const struct sockaddr*)&addr, 0); if (result < 0) { - free(tcp_handle); + uv_close((uv_handle_t*)tcp_handle, (uv_close_cb)free); resume_Int(stack, result); return; } result = uv_listen((uv_stream_t*)tcp_handle, backlog, NULL); if (result < 0) { - free(tcp_handle); + uv_close((uv_handle_t*)tcp_handle, (uv_close_cb)free); resume_Int(stack, result); return; } @@ -365,6 +359,11 @@ void c_tcp_listen(String host, Int port, Int backlog, Stack stack) { resume_Int(stack, (int64_t)tcp_handle); } +typedef struct { + Stack stack; + struct Pos handler; +} tcp_accept_closure_t; + void c_tcp_accept_cb(uv_stream_t* server, int status) { tcp_accept_closure_t* accept_closure = (tcp_accept_closure_t*)server->data; From 1e23d5c716686c7f29223594f694c278eca927d0 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Tue, 29 Jul 2025 15:39:36 +0200 Subject: [PATCH 13/24] Delete duplicate callback --- libraries/llvm/io.c | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 8b9e90a08..d8d6f4f53 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -421,13 +421,6 @@ void c_tcp_accept(Int listener, struct Pos handler, Stack stack) { } } -void c_tcp_shutdown_cb(uv_handle_t* handle) { - Stack stack = (Stack)handle->data; - free(handle); - // TODO resume_Pos Unit - resume_Int(stack, 0); -} - void c_tcp_shutdown(Int handle, Stack stack) { uv_handle_t* uv_handle = (uv_handle_t*)handle; @@ -439,7 +432,7 @@ void c_tcp_shutdown(Int handle, Stack stack) { } uv_handle->data = stack; - uv_close(uv_handle, c_tcp_shutdown_cb); + uv_close(uv_handle, c_tcp_close_cb); } From 0278cc92f3f4927a3b17034fad5a39a7a235ed48 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Tue, 29 Jul 2025 15:47:45 +0200 Subject: [PATCH 14/24] Don't listen in listen --- libraries/common/io/network.effekt | 8 ++++---- libraries/llvm/io.c | 9 +-------- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/libraries/common/io/network.effekt b/libraries/common/io/network.effekt index 3b30cb901..bcfe66c75 100644 --- a/libraries/common/io/network.effekt +++ b/libraries/common/io/network.effekt @@ -24,8 +24,8 @@ def close(handle: Connection): Unit / Exception[IOError] = { /// A tcp listener. Should not be inspected. type Listener = Int -def listen(host: String, port: Int, backlog: Int): Listener / Exception[IOError] = - internal::checkResult(internal::listen(host, port, backlog)) +def listen(host: String, port: Int): Listener / Exception[IOError] = + internal::checkResult(internal::listen(host, port)) def accept(listener: Listener, handler: Connection => Unit at {io, async, global}): Unit / Exception[IOError] = { internal::checkResult(internal::accept(listener, handler)); () @@ -73,9 +73,9 @@ namespace internal { ret void """ - extern async def listen(host: String, port: Int, backlog: Int): Int = + extern async def listen(host: String, port: Int): Int = llvm """ - call void @c_tcp_listen(%Pos ${host}, %Int ${port}, %Int ${backlog}, %Stack %stack) + call void @c_tcp_listen(%Pos ${host}, %Int ${port}, %Stack %stack) ret void """ diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index d8d6f4f53..1c1e3f4ba 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -317,7 +317,7 @@ void c_tcp_close(Int handle, Stack stack) { uv_close(uv_handle, c_tcp_close_cb); } -void c_tcp_listen(String host, Int port, Int backlog, Stack stack) { +void c_tcp_listen(String host, Int port, Stack stack) { // TODO make non-async char* host_str = c_bytearray_into_nullterminated_string(host); erasePositive(host); @@ -349,13 +349,6 @@ void c_tcp_listen(String host, Int port, Int backlog, Stack stack) { return; } - result = uv_listen((uv_stream_t*)tcp_handle, backlog, NULL); - if (result < 0) { - uv_close((uv_handle_t*)tcp_handle, (uv_close_cb)free); - resume_Int(stack, result); - return; - } - resume_Int(stack, (int64_t)tcp_handle); } From 55deb8655a98706e6bdf6a8f86ac70f28d23d0c1 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Wed, 30 Jul 2025 10:26:30 +0200 Subject: [PATCH 15/24] Resume continuation instead of erasing it --- examples/benchmarks/input_output/server_client.effekt | 11 +---------- libraries/llvm/io.c | 4 +++- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/examples/benchmarks/input_output/server_client.effekt b/examples/benchmarks/input_output/server_client.effekt index ba233a35f..be11d54d1 100644 --- a/examples/benchmarks/input_output/server_client.effekt +++ b/examples/benchmarks/input_output/server_client.effekt @@ -6,41 +6,32 @@ import bytearray def main() = { with on[IOError].panic - val listener = listen("127.0.0.1", 8080, 1000); + val listener = listen("127.0.0.1", 8080); spawn(box { with on[IOError].panic accept(listener, box { connection => with on[IOError].panic - println("accepted") val message = "hello world" var buffer = bytearray::fromString(message) write(connection, buffer, 0, buffer.size()) close(connection) })}); - println("started") - val results = array::build(10) { i => promise(box { with on[IOError].result wait(1000) val connection = connect("127.0.0.1", 8080) - println("connected") var buffer = bytearray::allocate(4096) val number = read(connection, buffer, 0, 4096) close(connection) - println("closed " ++ number.show) number }) }; - println("spawned") - var total = 0; results.foreach { number => - println("awaiting") total = total + number.await.value - println("awaited") }; shutdown(listener) diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 1c1e3f4ba..74dea0c67 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -419,7 +419,9 @@ void c_tcp_shutdown(Int handle, Stack stack) { tcp_accept_closure_t* accept_closure = (tcp_accept_closure_t*)uv_handle->data; if (accept_closure) { - eraseStack(accept_closure->stack); + // TODO what to resume with + // TODO resume last + resume_Int(accept_closure->stack, 0); erasePositive(accept_closure->handler); free(accept_closure); } From 908676981627a19c73584aa9dee572d9906aaf32 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Wed, 30 Jul 2025 10:27:12 +0200 Subject: [PATCH 16/24] Checkfile --- examples/benchmarks/input_output/server_client.check | 1 + 1 file changed, 1 insertion(+) create mode 100644 examples/benchmarks/input_output/server_client.check diff --git a/examples/benchmarks/input_output/server_client.check b/examples/benchmarks/input_output/server_client.check new file mode 100644 index 000000000..97e350411 --- /dev/null +++ b/examples/benchmarks/input_output/server_client.check @@ -0,0 +1 @@ +110 \ No newline at end of file From 09886bb55f3a727bb2e33fbcd4b78fe168bb9d9d Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Wed, 30 Jul 2025 11:26:38 +0200 Subject: [PATCH 17/24] Rename and fix tests --- .../test/scala/effekt/ChezSchemeTests.scala | 1 + .../test/scala/effekt/JavaScriptTests.scala | 4 +- .../input_output/server_client.check | 2 +- .../input_output/server_client.effekt | 17 +++--- libraries/common/io/network.effekt | 20 +++---- libraries/llvm/io.c | 54 +++++++++---------- 6 files changed, 52 insertions(+), 46 deletions(-) diff --git a/effekt/jvm/src/test/scala/effekt/ChezSchemeTests.scala b/effekt/jvm/src/test/scala/effekt/ChezSchemeTests.scala index 9325a9f9e..4dfe21c78 100644 --- a/effekt/jvm/src/test/scala/effekt/ChezSchemeTests.scala +++ b/effekt/jvm/src/test/scala/effekt/ChezSchemeTests.scala @@ -37,6 +37,7 @@ abstract class ChezSchemeTests extends EffektTests { examplesDir / "benchmarks" / "input_output" / "small_files.effekt", examplesDir / "benchmarks" / "input_output" / "interleave_promises.effekt", examplesDir / "benchmarks" / "input_output" / "financial_format.effekt", + examplesDir / "benchmarks" / "input_output" / "server_client.effekt", // unsafe continuations are not yet supported in our Chez backend examplesDir / "pos" / "unsafe_cont.effekt", diff --git a/effekt/jvm/src/test/scala/effekt/JavaScriptTests.scala b/effekt/jvm/src/test/scala/effekt/JavaScriptTests.scala index 972d5a48b..edc74b71d 100644 --- a/effekt/jvm/src/test/scala/effekt/JavaScriptTests.scala +++ b/effekt/jvm/src/test/scala/effekt/JavaScriptTests.scala @@ -38,7 +38,9 @@ class JavaScriptTests extends EffektTests { // unsafe cont examplesDir / "pos" / "propagators.effekt", // stack overflow - examplesDir / "benchmarks" / "folklore_to_fact" / "cps_tak.effekt" + examplesDir / "benchmarks" / "folklore_to_fact" / "cps_tak.effekt", + // tcp server and client + examplesDir / "benchmarks" / "input_output" / "server_client.effekt", ) } diff --git a/examples/benchmarks/input_output/server_client.check b/examples/benchmarks/input_output/server_client.check index 97e350411..7c6ba0fe1 100644 --- a/examples/benchmarks/input_output/server_client.check +++ b/examples/benchmarks/input_output/server_client.check @@ -1 +1 @@ -110 \ No newline at end of file +55 \ No newline at end of file diff --git a/examples/benchmarks/input_output/server_client.effekt b/examples/benchmarks/input_output/server_client.effekt index be11d54d1..e575d7da7 100644 --- a/examples/benchmarks/input_output/server_client.effekt +++ b/examples/benchmarks/input_output/server_client.effekt @@ -1,15 +1,17 @@ +import examples/benchmarks/runner + import io import io/error import io/network -import io/time import bytearray -def main() = { +def run(n: Int) = { with on[IOError].panic - val listener = listen("127.0.0.1", 8080); + + val listener = bind("127.0.0.1", 8080); spawn(box { with on[IOError].panic - accept(listener, box { connection => + listen(listener, box { connection => with on[IOError].panic val message = "hello world" var buffer = bytearray::fromString(message) @@ -17,10 +19,9 @@ def main() = { close(connection) })}); - val results = array::build(10) { i => + val results = array::build(n) { i => promise(box { with on[IOError].result - wait(1000) val connection = connect("127.0.0.1", 8080) var buffer = bytearray::allocate(4096) val number = read(connection, buffer, 0, 4096) @@ -36,5 +37,7 @@ def main() = { shutdown(listener) - println(total) + return total } + +def main() = benchmark(5){run} diff --git a/libraries/common/io/network.effekt b/libraries/common/io/network.effekt index bcfe66c75..c8a21bcf3 100644 --- a/libraries/common/io/network.effekt +++ b/libraries/common/io/network.effekt @@ -24,11 +24,11 @@ def close(handle: Connection): Unit / Exception[IOError] = { /// A tcp listener. Should not be inspected. type Listener = Int -def listen(host: String, port: Int): Listener / Exception[IOError] = - internal::checkResult(internal::listen(host, port)) +def bind(host: String, port: Int): Listener / Exception[IOError] = + internal::checkResult(internal::bind(host, port)) -def accept(listener: Listener, handler: Connection => Unit at {io, async, global}): Unit / Exception[IOError] = { - internal::checkResult(internal::accept(listener, handler)); () +def listen(listener: Listener, handler: Connection => Unit at {io, async, global}): Unit / Exception[IOError] = { + internal::checkResult(internal::listen(listener, handler)); () } def shutdown(listener: Listener): Unit / Exception[IOError] = { @@ -42,8 +42,8 @@ namespace internal { declare void @c_tcp_read(%Int, %Pos, %Int, %Int, %Stack) declare void @c_tcp_write(%Int, %Pos, %Int, %Int, %Stack) declare void @c_tcp_close(%Int, %Stack) - declare void @c_tcp_listen(%Pos, %Int, %Int, %Stack) - declare void @c_tcp_accept(%Int, %Pos, %Stack) + declare void @c_tcp_bind(%Pos, %Int, %Int, %Stack) + declare void @c_tcp_listen(%Int, %Pos, %Stack) declare void @c_tcp_shutdown(%Int, %Stack) """ @@ -73,15 +73,15 @@ namespace internal { ret void """ - extern async def listen(host: String, port: Int): Int = + extern async def bind(host: String, port: Int): Int = llvm """ - call void @c_tcp_listen(%Pos ${host}, %Int ${port}, %Stack %stack) + call void @c_tcp_bind(%Pos ${host}, %Int ${port}, %Stack %stack) ret void """ - extern async def accept(listener: Int, handler: Int => Unit at {io, async, global}): Int = + extern async def listen(listener: Int, handler: Int => Unit at {io, async, global}): Int = llvm """ - call void @c_tcp_accept(%Int ${listener}, %Pos ${handler}, %Stack %stack) + call void @c_tcp_listen(%Int ${listener}, %Pos ${handler}, %Stack %stack) ret void """ diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 74dea0c67..489ffd693 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -317,7 +317,7 @@ void c_tcp_close(Int handle, Stack stack) { uv_close(uv_handle, c_tcp_close_cb); } -void c_tcp_listen(String host, Int port, Stack stack) { +void c_tcp_bind(String host, Int port, Stack stack) { // TODO make non-async char* host_str = c_bytearray_into_nullterminated_string(host); erasePositive(host); @@ -355,16 +355,16 @@ void c_tcp_listen(String host, Int port, Stack stack) { typedef struct { Stack stack; struct Pos handler; -} tcp_accept_closure_t; +} tcp_listen_closure_t; -void c_tcp_accept_cb(uv_stream_t* server, int status) { - tcp_accept_closure_t* accept_closure = (tcp_accept_closure_t*)server->data; +void c_tcp_listen_cb(uv_stream_t* server, int status) { + tcp_listen_closure_t* listen_closure = (tcp_listen_closure_t*)server->data; if (status < 0) { // TODO resume last - erasePositive(accept_closure->handler); - resume_Int(accept_closure->stack, status); - free(accept_closure); + erasePositive(listen_closure->handler); + resume_Int(listen_closure->stack, status); + free(listen_closure); server->data = NULL; return; } @@ -375,9 +375,9 @@ void c_tcp_accept_cb(uv_stream_t* server, int status) { if (result < 0) { // TODO resume last free(client); - erasePositive(accept_closure->handler); - resume_Int(accept_closure->stack, result); - free(accept_closure); + erasePositive(listen_closure->handler); + resume_Int(listen_closure->stack, result); + free(listen_closure); server->data = NULL; return; } @@ -386,28 +386,28 @@ void c_tcp_accept_cb(uv_stream_t* server, int status) { if (result < 0) { // TODO resume last uv_close((uv_handle_t*)client, (uv_close_cb)free); - erasePositive(accept_closure->handler); - resume_Int(accept_closure->stack, result); - free(accept_closure); + erasePositive(listen_closure->handler); + resume_Int(listen_closure->stack, result); + free(listen_closure); server->data = NULL; return; } - sharePositive(accept_closure->handler); - run_Int(accept_closure->handler, (int64_t)client); + sharePositive(listen_closure->handler); + run_Int(listen_closure->handler, (int64_t)client); } -void c_tcp_accept(Int listener, struct Pos handler, Stack stack) { +void c_tcp_listen(Int listener, struct Pos handler, Stack stack) { uv_stream_t* server = (uv_stream_t*)listener; - tcp_accept_closure_t* accept_closure = malloc(sizeof(tcp_accept_closure_t)); - accept_closure->stack = stack; - accept_closure->handler = handler; - server->data = accept_closure; + tcp_listen_closure_t* listen_closure = malloc(sizeof(tcp_listen_closure_t)); + listen_closure->stack = stack; + listen_closure->handler = handler; + server->data = listen_closure; - int result = uv_listen(server, SOMAXCONN, c_tcp_accept_cb); + int result = uv_listen(server, SOMAXCONN, c_tcp_listen_cb); if (result < 0) { - free(accept_closure); + free(listen_closure); erasePositive(handler); resume_Int(stack, result); return; @@ -417,13 +417,13 @@ void c_tcp_accept(Int listener, struct Pos handler, Stack stack) { void c_tcp_shutdown(Int handle, Stack stack) { uv_handle_t* uv_handle = (uv_handle_t*)handle; - tcp_accept_closure_t* accept_closure = (tcp_accept_closure_t*)uv_handle->data; - if (accept_closure) { + tcp_listen_closure_t* listen_closure = (tcp_listen_closure_t*)uv_handle->data; + if (listen_closure) { // TODO what to resume with // TODO resume last - resume_Int(accept_closure->stack, 0); - erasePositive(accept_closure->handler); - free(accept_closure); + resume_Int(listen_closure->stack, 0); + erasePositive(listen_closure->handler); + free(listen_closure); } uv_handle->data = stack; From 67df861c15b10613433d53fa59ac8af5b784a254 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Wed, 30 Jul 2025 11:36:39 +0200 Subject: [PATCH 18/24] Shutdown and close never fail --- libraries/common/io/network.effekt | 14 ++++++-------- libraries/llvm/io.c | 3 +-- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/libraries/common/io/network.effekt b/libraries/common/io/network.effekt index c8a21bcf3..12dea6a3f 100644 --- a/libraries/common/io/network.effekt +++ b/libraries/common/io/network.effekt @@ -17,9 +17,8 @@ def read(handle: Connection, buffer: ByteArray, offset: Int, size: Int): Int / E def write(handle: Connection, buffer: ByteArray, offset: Int, size: Int): Int / Exception[IOError] = internal::checkResult(internal::write(handle, buffer, offset, size)) -def close(handle: Connection): Unit / Exception[IOError] = { - internal::checkResult(internal::close(handle)); () -} +def close(handle: Connection): Unit = + internal::close(handle) /// A tcp listener. Should not be inspected. type Listener = Int @@ -31,9 +30,8 @@ def listen(listener: Listener, handler: Connection => Unit at {io, async, global internal::checkResult(internal::listen(listener, handler)); () } -def shutdown(listener: Listener): Unit / Exception[IOError] = { - internal::checkResult(internal::shutdown(listener)); () -} +def shutdown(listener: Listener): Unit = + internal::shutdown(listener) namespace internal { @@ -67,7 +65,7 @@ namespace internal { ret void """ - extern async def close(handle: Int): Int = + extern async def close(handle: Int): Unit = llvm """ call void @c_tcp_close(%Int ${handle}, %Stack %stack) ret void @@ -85,7 +83,7 @@ namespace internal { ret void """ - extern async def shutdown(handle: Int): Int = + extern async def shutdown(handle: Int): Unit = llvm """ call void @c_tcp_shutdown(%Int ${handle}, %Stack %stack) ret void diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 489ffd693..0d9d3684f 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -307,8 +307,7 @@ void c_tcp_write(Int handle, struct Pos buffer, Int offset, Int size, Stack stac void c_tcp_close_cb(uv_handle_t* handle) { Stack stack = (Stack)handle->data; free(handle); - // TODO resume_Pos Unit - resume_Int(stack, 0); + resume_Pos(stack, Unit); } void c_tcp_close(Int handle, Stack stack) { From 591fa3e3abb9d764d2cb421bab9f61d809a77b26 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Wed, 30 Jul 2025 11:46:51 +0200 Subject: [PATCH 19/24] Resume last in shutdown --- libraries/llvm/io.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 0d9d3684f..a90eda569 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -415,18 +415,18 @@ void c_tcp_listen(Int listener, struct Pos handler, Stack stack) { void c_tcp_shutdown(Int handle, Stack stack) { uv_handle_t* uv_handle = (uv_handle_t*)handle; - tcp_listen_closure_t* listen_closure = (tcp_listen_closure_t*)uv_handle->data; - if (listen_closure) { - // TODO what to resume with - // TODO resume last - resume_Int(listen_closure->stack, 0); - erasePositive(listen_closure->handler); - free(listen_closure); - } uv_handle->data = stack; uv_close(uv_handle, c_tcp_close_cb); + + if (listen_closure) { + Stack closure_stack = listen_closure->stack; + struct Pos closure_handler = listen_closure->handler; + free(listen_closure); + erasePositive(closure_handler); + resume_Int(closure_stack, 0); + } } From 9bf7c6faf5a15a3b548454c8cfc69104db1cee6d Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Wed, 30 Jul 2025 12:02:05 +0200 Subject: [PATCH 20/24] Resume last in error cases --- libraries/llvm/io.c | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index a90eda569..87009a894 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -358,13 +358,14 @@ typedef struct { void c_tcp_listen_cb(uv_stream_t* server, int status) { tcp_listen_closure_t* listen_closure = (tcp_listen_closure_t*)server->data; + Stack closure_stack = listen_closure->stack; + struct Pos closure_handler = listen_closure->handler; if (status < 0) { - // TODO resume last - erasePositive(listen_closure->handler); - resume_Int(listen_closure->stack, status); - free(listen_closure); server->data = NULL; + free(listen_closure); + erasePositive(closure_handler); + resume_Int(closure_stack, status); return; } @@ -372,28 +373,26 @@ void c_tcp_listen_cb(uv_stream_t* server, int status) { int result = uv_tcp_init(uv_default_loop(), client); if (result < 0) { - // TODO resume last free(client); - erasePositive(listen_closure->handler); - resume_Int(listen_closure->stack, result); - free(listen_closure); server->data = NULL; + free(listen_closure); + erasePositive(closure_handler); + resume_Int(closure_stack, result); return; } result = uv_accept(server, (uv_stream_t*)client); if (result < 0) { - // TODO resume last uv_close((uv_handle_t*)client, (uv_close_cb)free); - erasePositive(listen_closure->handler); - resume_Int(listen_closure->stack, result); - free(listen_closure); server->data = NULL; + free(listen_closure); + erasePositive(closure_handler); + resume_Int(closure_stack, result); return; } - sharePositive(listen_closure->handler); - run_Int(listen_closure->handler, (int64_t)client); + sharePositive(closure_handler); + run_Int(closure_handler, (int64_t)client); } void c_tcp_listen(Int listener, struct Pos handler, Stack stack) { From d81caf50b3783413d40652a974df820d5560943f Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Wed, 30 Jul 2025 12:07:23 +0200 Subject: [PATCH 21/24] Make bind non-async --- libraries/common/io/network.effekt | 6 +++--- libraries/llvm/io.c | 14 +++++--------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/libraries/common/io/network.effekt b/libraries/common/io/network.effekt index 12dea6a3f..704a7190e 100644 --- a/libraries/common/io/network.effekt +++ b/libraries/common/io/network.effekt @@ -71,10 +71,10 @@ namespace internal { ret void """ - extern async def bind(host: String, port: Int): Int = + extern io def bind(host: String, port: Int): Int = llvm """ - call void @c_tcp_bind(%Pos ${host}, %Int ${port}, %Stack %stack) - ret void + %result = call %Int @c_tcp_bind(%Pos ${host}, %Int ${port}) + ret %Int %result """ extern async def listen(listener: Int, handler: Int => Unit at {io, async, global}): Int = diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 87009a894..b849b00ac 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -316,8 +316,7 @@ void c_tcp_close(Int handle, Stack stack) { uv_close(uv_handle, c_tcp_close_cb); } -void c_tcp_bind(String host, Int port, Stack stack) { - // TODO make non-async +Int c_tcp_bind(String host, Int port) { char* host_str = c_bytearray_into_nullterminated_string(host); erasePositive(host); @@ -327,8 +326,7 @@ void c_tcp_bind(String host, Int port, Stack stack) { if (result < 0) { free(tcp_handle); free(host_str); - resume_Int(stack, result); - return; + return result; } struct sockaddr_in addr; @@ -337,18 +335,16 @@ void c_tcp_bind(String host, Int port, Stack stack) { if (result < 0) { uv_close((uv_handle_t*)tcp_handle, (uv_close_cb)free); - resume_Int(stack, result); - return; + return result; } result = uv_tcp_bind(tcp_handle, (const struct sockaddr*)&addr, 0); if (result < 0) { uv_close((uv_handle_t*)tcp_handle, (uv_close_cb)free); - resume_Int(stack, result); - return; + return result; } - resume_Int(stack, (int64_t)tcp_handle); + return (int64_t)tcp_handle; } typedef struct { From 727f7828dbcc53142da87587ed5f8af6a768089f Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Mon, 4 Aug 2025 12:24:15 +0200 Subject: [PATCH 22/24] Add documentation --- libraries/common/io/network.effekt | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/libraries/common/io/network.effekt b/libraries/common/io/network.effekt index 704a7190e..88075cc8a 100644 --- a/libraries/common/io/network.effekt +++ b/libraries/common/io/network.effekt @@ -5,31 +5,39 @@ import bytearray import io/error -/// A tcp handle. Should not be inspected. +/// A TCP handle. Should not be inspected. type Connection = Int -def connect(host: String, port: Int): Connection / Exception[IOError] = - internal::checkResult(internal::connect(host, port)) - +/// Reads data from a TCP connection into a buffer at the given offset. def read(handle: Connection, buffer: ByteArray, offset: Int, size: Int): Int / Exception[IOError] = internal::checkResult(internal::read(handle, buffer, offset, size)) +/// Writes data from a buffer at the given offset to a TCP connection. def write(handle: Connection, buffer: ByteArray, offset: Int, size: Int): Int / Exception[IOError] = internal::checkResult(internal::write(handle, buffer, offset, size)) +/// Establishes a TCP connection to the specified host and port. +def connect(host: String, port: Int): Connection / Exception[IOError] = + internal::checkResult(internal::connect(host, port)) + +/// Closes a TCP connection and releases associated resources. def close(handle: Connection): Unit = internal::close(handle) -/// A tcp listener. Should not be inspected. +/// A TCP listener. Should not be inspected. type Listener = Int +/// Creates a TCP listener bound to the specified host and port. def bind(host: String, port: Int): Listener / Exception[IOError] = internal::checkResult(internal::bind(host, port)) +/// Starts listening for incoming connections and handles them with the provided handler function. +/// Runs until `shutdown` is called on this `Listener`. def listen(listener: Listener, handler: Connection => Unit at {io, async, global}): Unit / Exception[IOError] = { internal::checkResult(internal::listen(listener, handler)); () } +/// Stops a TCP listener and releases associated resources. def shutdown(listener: Listener): Unit = internal::shutdown(listener) From ca34e62c9ecc63003d13ef873a55a93a8a9cd427 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Fri, 19 Sep 2025 16:37:09 +0200 Subject: [PATCH 23/24] New syntax for async --- libraries/common/io/network.effekt | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/libraries/common/io/network.effekt b/libraries/common/io/network.effekt index 88075cc8a..618548aaf 100644 --- a/libraries/common/io/network.effekt +++ b/libraries/common/io/network.effekt @@ -53,45 +53,45 @@ namespace internal { declare void @c_tcp_shutdown(%Int, %Stack) """ - extern async def connect(host: String, port: Int): Int = + extern def connect(host: String, port: Int) at async: Int = llvm """ call void @c_tcp_connect(%Pos ${host}, %Int ${port}, %Stack %stack) ret void """ /// The buffer must be kept alive by using it after the call - extern async def read(handle: Int, buffer: ByteArray, offset: Int, size: Int): Int = + extern def read(handle: Int, buffer: ByteArray, offset: Int, size: Int) at async: Int = llvm """ call void @c_tcp_read(%Int ${handle}, %Pos ${buffer}, %Int ${offset}, %Int ${size}, %Stack %stack) ret void """ /// The buffer must be kept alive by using it after the call - extern async def write(handle: Int, buffer: ByteArray, offset: Int, size: Int): Int = + extern def write(handle: Int, buffer: ByteArray, offset: Int, size: Int) at async: Int = llvm """ call void @c_tcp_write(%Int ${handle}, %Pos ${buffer}, %Int ${offset}, %Int ${size}, %Stack %stack) ret void """ - extern async def close(handle: Int): Unit = + extern def close(handle: Int) at async: Unit = llvm """ call void @c_tcp_close(%Int ${handle}, %Stack %stack) ret void """ - extern io def bind(host: String, port: Int): Int = + extern def bind(host: String, port: Int) at io: Int = llvm """ %result = call %Int @c_tcp_bind(%Pos ${host}, %Int ${port}) ret %Int %result """ - extern async def listen(listener: Int, handler: Int => Unit at {io, async, global}): Int = + extern def listen(listener: Int, handler: Int => Unit at {io, async, global}) at async: Int = llvm """ call void @c_tcp_listen(%Int ${listener}, %Pos ${handler}, %Stack %stack) ret void """ - extern async def shutdown(handle: Int): Unit = + extern def shutdown(handle: Int) at async: Unit = llvm """ call void @c_tcp_shutdown(%Int ${handle}, %Stack %stack) ret void From d95ab0d1799b45f6ec567bb11d5f9f0be9f72582 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Fri, 19 Sep 2025 18:02:23 +0200 Subject: [PATCH 24/24] Streaming client --- libraries/common/io/network.effekt | 53 ++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/libraries/common/io/network.effekt b/libraries/common/io/network.effekt index 618548aaf..f03ac9291 100644 --- a/libraries/common/io/network.effekt +++ b/libraries/common/io/network.effekt @@ -1,10 +1,63 @@ module io/network +import stream import bytearray import io/error +def client[R](host: String, port: Int) { stream: () => R / {read[Byte], emit[Byte]} }: R / Exception[IOError] = { + + val connection = connect(host, port); + with on[IOError].finalize { close(connection) } + + val writeBuffer = bytearray::allocate(4096) + var writeBufferOffset = 0 + + val readBuffer = bytearray::allocate(4096) + var readBufferOffset = 0 + var readBufferFilled = 0 + + def push(i: Int, n: Int): Unit = { + val r = write(connection, writeBuffer, i, n) + if (r < n) { + push(i + r, n - r) + } + } + + def pull(): Int = { + read(connection, readBuffer, 0, readBuffer.size) match { + case 0 => pull() + case n => n + } + } + + try { + val r = stream() + push(0, writeBufferOffset) + return r + } with emit[Byte] { (byte) => + if (writeBufferOffset >= writeBuffer.size) { + push(0, writeBuffer.size) + writeBufferOffset = 0 + } + writeBuffer.unsafeSet(writeBufferOffset, byte) + writeBufferOffset = writeBufferOffset + 1 + resume(()) + } with read[Byte] { () => + if (readBufferOffset >= readBufferFilled) { + val n = pull() + readBufferOffset = 0 + readBufferFilled = n + } + val byte = unsafeGet(readBuffer, readBufferOffset) + readBufferOffset = readBufferOffset + 1 + resume { return byte } + } +} + + + /// A TCP handle. Should not be inspected. type Connection = Int