diff --git a/effekt/jvm/src/test/scala/effekt/ChezSchemeTests.scala b/effekt/jvm/src/test/scala/effekt/ChezSchemeTests.scala index 9325a9f9e..4dfe21c78 100644 --- a/effekt/jvm/src/test/scala/effekt/ChezSchemeTests.scala +++ b/effekt/jvm/src/test/scala/effekt/ChezSchemeTests.scala @@ -37,6 +37,7 @@ abstract class ChezSchemeTests extends EffektTests { examplesDir / "benchmarks" / "input_output" / "small_files.effekt", examplesDir / "benchmarks" / "input_output" / "interleave_promises.effekt", examplesDir / "benchmarks" / "input_output" / "financial_format.effekt", + examplesDir / "benchmarks" / "input_output" / "server_client.effekt", // unsafe continuations are not yet supported in our Chez backend examplesDir / "pos" / "unsafe_cont.effekt", diff --git a/effekt/jvm/src/test/scala/effekt/JavaScriptTests.scala b/effekt/jvm/src/test/scala/effekt/JavaScriptTests.scala index 972d5a48b..edc74b71d 100644 --- a/effekt/jvm/src/test/scala/effekt/JavaScriptTests.scala +++ b/effekt/jvm/src/test/scala/effekt/JavaScriptTests.scala @@ -38,7 +38,9 @@ class JavaScriptTests extends EffektTests { // unsafe cont examplesDir / "pos" / "propagators.effekt", // stack overflow - examplesDir / "benchmarks" / "folklore_to_fact" / "cps_tak.effekt" + examplesDir / "benchmarks" / "folklore_to_fact" / "cps_tak.effekt", + // tcp server and client + examplesDir / "benchmarks" / "input_output" / "server_client.effekt", ) } diff --git a/examples/benchmarks/input_output/server_client.check b/examples/benchmarks/input_output/server_client.check new file mode 100644 index 000000000..7c6ba0fe1 --- /dev/null +++ b/examples/benchmarks/input_output/server_client.check @@ -0,0 +1 @@ +55 \ No newline at end of file diff --git a/examples/benchmarks/input_output/server_client.effekt b/examples/benchmarks/input_output/server_client.effekt new file mode 100644 index 000000000..e575d7da7 --- /dev/null +++ b/examples/benchmarks/input_output/server_client.effekt @@ -0,0 +1,43 @@ +import examples/benchmarks/runner + +import io +import io/error +import io/network +import bytearray + +def run(n: Int) = { + with on[IOError].panic + + val listener = bind("127.0.0.1", 8080); + spawn(box { + with on[IOError].panic + listen(listener, box { connection => + with on[IOError].panic + val message = "hello world" + var buffer = bytearray::fromString(message) + write(connection, buffer, 0, buffer.size()) + close(connection) + })}); + + val results = array::build(n) { i => + promise(box { + with on[IOError].result + val connection = connect("127.0.0.1", 8080) + var buffer = bytearray::allocate(4096) + val number = read(connection, buffer, 0, 4096) + close(connection) + number + }) + }; + + var total = 0; + results.foreach { number => + total = total + number.await.value + }; + + shutdown(listener) + + return total +} + +def main() = benchmark(5){run} diff --git a/libraries/common/io.effekt b/libraries/common/io.effekt index 59630c466..ce6365433 100644 --- a/libraries/common/io.effekt +++ b/libraries/common/io.effekt @@ -16,8 +16,7 @@ extern def spawn(task: Task[Unit]) at async: Unit = js "$effekt.capture(k => { setTimeout(() => k($effekt.unit), 0); return $effekt.run(${task}) })" llvm """ call void @c_yield(%Stack %stack) - %unboxed = call ccc %Neg @coercePosNeg(%Pos ${task}) - call void @run(%Neg %unboxed) + call void @run(%Pos ${task}) ret void """ diff --git a/libraries/common/io/error.effekt b/libraries/common/io/error.effekt index 6620776ef..1c624620b 100644 --- a/libraries/common/io/error.effekt +++ b/libraries/common/io/error.effekt @@ -464,3 +464,14 @@ namespace internal { """ } + +namespace internal { + def checkResult(result: Int): Int / Exception[IOError] = + if (result < 0) { + val ioError = fromNumber(internal::errorNumber(result)); + do raise[IOError](ioError, message(ioError)) + } else { + result + } +} + diff --git a/libraries/common/io/filesystem.effekt b/libraries/common/io/filesystem.effekt index efe0bd0a2..f7de91a10 100644 --- a/libraries/common/io/filesystem.effekt +++ b/libraries/common/io/filesystem.effekt @@ -222,13 +222,6 @@ namespace internal { ret void """ - def checkResult(result: Int): Int / Exception[IOError] = - if (result < 0) { - val ioError = fromNumber(internal::errorNumber(result)); - do raise[IOError](ioError, message(ioError)) - } else { - result - } } namespace examples { diff --git a/libraries/common/io/network.effekt b/libraries/common/io/network.effekt index 5744e852c..f03ac9291 100644 --- a/libraries/common/io/network.effekt +++ b/libraries/common/io/network.effekt @@ -1,84 +1,155 @@ module io/network +import stream import bytearray -import io -namespace js { - extern jsNode """ - const net = require('node:net'); +import io/error - function listen(server, port, host, listener) { - server.listen(port, host); - server.on('connection', listener); - } - """ - extern type JSServer // = net.Server - extern type JSSocket // = net.Socket - extern def server() at io: JSServer = - jsNode "net.createServer()" - extern def listen(server: JSServer, port: Int, host: String, listener: JSSocket => Unit at {io, async, global}) at io: Unit = - jsNode "listen(${server}, ${port}, ${host}, (socket) => $effekt.runToplevel((ks, k) => (${listener})(socket, ks, k)))" +def client[R](host: String, port: Int) { stream: () => R / {read[Byte], emit[Byte]} }: R / Exception[IOError] = { - extern def send(socket: JSSocket, data: ByteArray) at async: Unit = - jsNode "$effekt.capture(callback => ${socket}.write(${data}, callback))" + val connection = connect(host, port); + with on[IOError].finalize { close(connection) } - extern def receive(socket: JSSocket) at async: ByteArray = - jsNode "$effekt.capture(callback => ${socket}.once('data', callback))" + val writeBuffer = bytearray::allocate(4096) + var writeBufferOffset = 0 - extern def end(socket: JSSocket) at async: Unit = - jsNode "$effekt.capture(k => ${socket}.end(k))" -} + val readBuffer = bytearray::allocate(4096) + var readBufferOffset = 0 + var readBufferFilled = 0 -interface Socket { - def send(message: ByteArray): Unit - def receive(): ByteArray - def end(): Unit -} + def push(i: Int, n: Int): Unit = { + val r = write(connection, writeBuffer, i, n) + if (r < n) { + push(i + r, n - r) + } + } + + def pull(): Int = { + read(connection, readBuffer, 0, readBuffer.size) match { + case 0 => pull() + case n => n + } + } -def server(host: String, port: Int, handler: () => Unit / Socket at {io, async, global}): Unit = { - val server = js::server(); - js::listen(server, port, host, box { socket => - println("New connection") - spawn(box { - try handler() - with Socket { - def send(message) = - resume(js::send(socket, message)) - def receive() = - resume(js::receive(socket)) - def end() = - resume(js::end(socket)) - } - }) - }) + try { + val r = stream() + push(0, writeBufferOffset) + return r + } with emit[Byte] { (byte) => + if (writeBufferOffset >= writeBuffer.size) { + push(0, writeBuffer.size) + writeBufferOffset = 0 + } + writeBuffer.unsafeSet(writeBufferOffset, byte) + writeBufferOffset = writeBufferOffset + 1 + resume(()) + } with read[Byte] { () => + if (readBufferOffset >= readBufferFilled) { + val n = pull() + readBufferOffset = 0 + readBufferFilled = n + } + val byte = unsafeGet(readBuffer, readBufferOffset) + readBufferOffset = readBufferOffset + 1 + resume { return byte } + } } -namespace examples { - def helloWorldApp(): Unit / Socket = { - val request = do receive().toString; - println("Received a request: " ++ request) +/// A TCP handle. Should not be inspected. +type Connection = Int - def respond(s: String): Unit / Socket = - do send(s.fromString) +/// Reads data from a TCP connection into a buffer at the given offset. +def read(handle: Connection, buffer: ByteArray, offset: Int, size: Int): Int / Exception[IOError] = + internal::checkResult(internal::read(handle, buffer, offset, size)) - if (request.startsWith("GET /")) { - respond("HTTP/1.1 200 OK\r\n\r\nHello from Effekt!") - } else { - respond("HTTP/1.1 400 Bad Request\r\n\r\n") - } - do end() - } +/// Writes data from a buffer at the given offset to a TCP connection. +def write(handle: Connection, buffer: ByteArray, offset: Int, size: Int): Int / Exception[IOError] = + internal::checkResult(internal::write(handle, buffer, offset, size)) - // A server that just shows "Hello from Effekt!" on localhost:8080 - def main() = { - val port = 8080 - println("Starting server on http://localhost:" ++ port.show) +/// Establishes a TCP connection to the specified host and port. +def connect(host: String, port: Int): Connection / Exception[IOError] = + internal::checkResult(internal::connect(host, port)) - server("localhost", port, box { - helloWorldApp() - }) - } +/// Closes a TCP connection and releases associated resources. +def close(handle: Connection): Unit = + internal::close(handle) + +/// A TCP listener. Should not be inspected. +type Listener = Int + +/// Creates a TCP listener bound to the specified host and port. +def bind(host: String, port: Int): Listener / Exception[IOError] = + internal::checkResult(internal::bind(host, port)) + +/// Starts listening for incoming connections and handles them with the provided handler function. +/// Runs until `shutdown` is called on this `Listener`. +def listen(listener: Listener, handler: Connection => Unit at {io, async, global}): Unit / Exception[IOError] = { + internal::checkResult(internal::listen(listener, handler)); () } + +/// Stops a TCP listener and releases associated resources. +def shutdown(listener: Listener): Unit = + internal::shutdown(listener) + +namespace internal { + + extern llvm """ + declare void @c_tcp_connect(%Pos, %Int, %Stack) + declare void @c_tcp_read(%Int, %Pos, %Int, %Int, %Stack) + declare void @c_tcp_write(%Int, %Pos, %Int, %Int, %Stack) + declare void @c_tcp_close(%Int, %Stack) + declare void @c_tcp_bind(%Pos, %Int, %Int, %Stack) + declare void @c_tcp_listen(%Int, %Pos, %Stack) + declare void @c_tcp_shutdown(%Int, %Stack) + """ + + extern def connect(host: String, port: Int) at async: Int = + llvm """ + call void @c_tcp_connect(%Pos ${host}, %Int ${port}, %Stack %stack) + ret void + """ + + /// The buffer must be kept alive by using it after the call + extern def read(handle: Int, buffer: ByteArray, offset: Int, size: Int) at async: Int = + llvm """ + call void @c_tcp_read(%Int ${handle}, %Pos ${buffer}, %Int ${offset}, %Int ${size}, %Stack %stack) + ret void + """ + + /// The buffer must be kept alive by using it after the call + extern def write(handle: Int, buffer: ByteArray, offset: Int, size: Int) at async: Int = + llvm """ + call void @c_tcp_write(%Int ${handle}, %Pos ${buffer}, %Int ${offset}, %Int ${size}, %Stack %stack) + ret void + """ + + extern def close(handle: Int) at async: Unit = + llvm """ + call void @c_tcp_close(%Int ${handle}, %Stack %stack) + ret void + """ + + extern def bind(host: String, port: Int) at io: Int = + llvm """ + %result = call %Int @c_tcp_bind(%Pos ${host}, %Int ${port}) + ret %Int %result + """ + + extern def listen(listener: Int, handler: Int => Unit at {io, async, global}) at async: Int = + llvm """ + call void @c_tcp_listen(%Int ${listener}, %Pos ${handler}, %Stack %stack) + ret void + """ + + extern def shutdown(handle: Int) at async: Unit = + llvm """ + call void @c_tcp_shutdown(%Int ${handle}, %Stack %stack) + ret void + """ + +} + + diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 57b3b22d5..b849b00ac 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -81,6 +81,7 @@ void c_fs_open(String path, int flags, Stack stack) { int result = uv_fs_open(uv_default_loop(), request, path_str, flags, 0777, c_resume_int_fs); if (result < 0) { + // TODO free path_str? uv_fs_req_cleanup(request); free(request); resume_Int(stack, result); @@ -178,6 +179,252 @@ void c_fs_mkdir(String path, Stack stack) { return; } +// Network +// ------- + +void c_tcp_connect_cb(uv_connect_t* request, int status) { + Stack stack = (Stack)request->data; + + if (status < 0) { + uv_close((uv_handle_t*)request->handle, (uv_close_cb)free); + free(request); + resume_Int(stack, status); + } else { + int64_t handle = (int64_t)request->handle; + free(request); + resume_Int(stack, handle); + } +} + +void c_tcp_connect(String host, Int port, Stack stack) { + char* host_str = c_bytearray_into_nullterminated_string(host); + erasePositive(host); + + uv_tcp_t* tcp_handle = malloc(sizeof(uv_tcp_t)); + int result = uv_tcp_init(uv_default_loop(), tcp_handle); + + if (result < 0) { + free(tcp_handle); + free(host_str); + resume_Int(stack, result); + return; + } + + uv_connect_t* connect_req = malloc(sizeof(uv_connect_t)); + connect_req->data = stack; + + struct sockaddr_in addr; + result = uv_ip4_addr(host_str, port, &addr); + free(host_str); + + if (result < 0) { + free(tcp_handle); + free(connect_req); + resume_Int(stack, result); + return; + } + + result = uv_tcp_connect(connect_req, tcp_handle, (const struct sockaddr*)&addr, c_tcp_connect_cb); + + if (result < 0) { + free(tcp_handle); + free(connect_req); + resume_Int(stack, result); + return; + } +} + +typedef struct { + Stack stack; + size_t size; + char* data; +} tcp_read_closure_t; + +void c_tcp_read_cb(uv_stream_t* stream, ssize_t bytes_read, const uv_buf_t* buf) { + (void)(buf); + tcp_read_closure_t* read_closure = (tcp_read_closure_t*)stream->data; + Stack stack = read_closure->stack; + + uv_read_stop(stream); + free(read_closure); + + resume_Int(stack, (int64_t)bytes_read); +} + +void c_tcp_read_alloc_cb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + (void)(suggested_size); + tcp_read_closure_t* read_closure = (tcp_read_closure_t*)handle->data; + buf->base = read_closure->data; + buf->len = read_closure->size; +} + +void c_tcp_read(Int handle, struct Pos buffer, Int offset, Int size, Stack stack) { + uv_stream_t* stream = (uv_stream_t*)handle; + + char* buffer_ptr = (char*)(c_bytearray_data(buffer) + offset); + erasePositive(buffer); + // TODO panic if this was the last reference + + tcp_read_closure_t* read_closure = malloc(sizeof(tcp_read_closure_t)); + read_closure->stack = stack; + read_closure->size = size; + read_closure->data = buffer_ptr; + stream->data = read_closure; + + int result = uv_read_start(stream, c_tcp_read_alloc_cb, c_tcp_read_cb); + + if (result < 0) { + free(read_closure); + stream->data = NULL; + resume_Int(stack, result); + } +} + +void c_tcp_write_cb(uv_write_t* request, int status) { + Stack stack = (Stack)request->data; + free(request); + resume_Int(stack, (int64_t)status); +} + +void c_tcp_write(Int handle, struct Pos buffer, Int offset, Int size, Stack stack) { + uv_stream_t* stream = (uv_stream_t*)handle; + + uv_buf_t buf = uv_buf_init((char*)(c_bytearray_data(buffer) + offset), size); + erasePositive(buffer); + // TODO panic if this was the last reference + + uv_write_t* request = malloc(sizeof(uv_write_t)); + request->data = stack; + + int result = uv_write(request, stream, &buf, 1, c_tcp_write_cb); + + if (result < 0) { + free(request); + resume_Int(stack, result); + } +} + +void c_tcp_close_cb(uv_handle_t* handle) { + Stack stack = (Stack)handle->data; + free(handle); + resume_Pos(stack, Unit); +} + +void c_tcp_close(Int handle, Stack stack) { + uv_handle_t* uv_handle = (uv_handle_t*)handle; + uv_handle->data = stack; + uv_close(uv_handle, c_tcp_close_cb); +} + +Int c_tcp_bind(String host, Int port) { + char* host_str = c_bytearray_into_nullterminated_string(host); + erasePositive(host); + + uv_tcp_t* tcp_handle = malloc(sizeof(uv_tcp_t)); + int result = uv_tcp_init(uv_default_loop(), tcp_handle); + + if (result < 0) { + free(tcp_handle); + free(host_str); + return result; + } + + struct sockaddr_in addr; + result = uv_ip4_addr(host_str, port, &addr); + free(host_str); + + if (result < 0) { + uv_close((uv_handle_t*)tcp_handle, (uv_close_cb)free); + return result; + } + + result = uv_tcp_bind(tcp_handle, (const struct sockaddr*)&addr, 0); + if (result < 0) { + uv_close((uv_handle_t*)tcp_handle, (uv_close_cb)free); + return result; + } + + return (int64_t)tcp_handle; +} + +typedef struct { + Stack stack; + struct Pos handler; +} tcp_listen_closure_t; + +void c_tcp_listen_cb(uv_stream_t* server, int status) { + tcp_listen_closure_t* listen_closure = (tcp_listen_closure_t*)server->data; + Stack closure_stack = listen_closure->stack; + struct Pos closure_handler = listen_closure->handler; + + if (status < 0) { + server->data = NULL; + free(listen_closure); + erasePositive(closure_handler); + resume_Int(closure_stack, status); + return; + } + + uv_tcp_t* client = malloc(sizeof(uv_tcp_t)); + int result = uv_tcp_init(uv_default_loop(), client); + + if (result < 0) { + free(client); + server->data = NULL; + free(listen_closure); + erasePositive(closure_handler); + resume_Int(closure_stack, result); + return; + } + + result = uv_accept(server, (uv_stream_t*)client); + if (result < 0) { + uv_close((uv_handle_t*)client, (uv_close_cb)free); + server->data = NULL; + free(listen_closure); + erasePositive(closure_handler); + resume_Int(closure_stack, result); + return; + } + + sharePositive(closure_handler); + run_Int(closure_handler, (int64_t)client); +} + +void c_tcp_listen(Int listener, struct Pos handler, Stack stack) { + uv_stream_t* server = (uv_stream_t*)listener; + + tcp_listen_closure_t* listen_closure = malloc(sizeof(tcp_listen_closure_t)); + listen_closure->stack = stack; + listen_closure->handler = handler; + server->data = listen_closure; + + int result = uv_listen(server, SOMAXCONN, c_tcp_listen_cb); + if (result < 0) { + free(listen_closure); + erasePositive(handler); + resume_Int(stack, result); + return; + } +} + +void c_tcp_shutdown(Int handle, Stack stack) { + uv_handle_t* uv_handle = (uv_handle_t*)handle; + tcp_listen_closure_t* listen_closure = (tcp_listen_closure_t*)uv_handle->data; + + uv_handle->data = stack; + uv_close(uv_handle, c_tcp_close_cb); + + if (listen_closure) { + Stack closure_stack = listen_closure->stack; + struct Pos closure_handler = listen_closure->handler; + free(listen_closure); + erasePositive(closure_handler); + resume_Int(closure_stack, 0); + } +} + + /** * Maps the libuv error code to a stable (platform independent) numeric value. * diff --git a/libraries/llvm/main.c b/libraries/llvm/main.c index 38c5f7133..d95f9a535 100644 --- a/libraries/llvm/main.c +++ b/libraries/llvm/main.c @@ -37,6 +37,5 @@ int main(int argc, char *argv[]) { effektMain(); uv_loop_t *loop = uv_default_loop(); uv_run(loop, UV_RUN_DEFAULT); - uv_loop_close(loop); - return 0; + return uv_loop_close(loop); } diff --git a/libraries/llvm/rts.ll b/libraries/llvm/rts.ll index 746ed5def..604d75a25 100644 --- a/libraries/llvm/rts.ll +++ b/libraries/llvm/rts.ll @@ -1,7 +1,5 @@ ; Run-Time System -%Evidence = type i64 - ; Basic types %Environment = type ptr @@ -708,7 +706,10 @@ define void @resume_Pos(%Stack %stack, %Pos %argument) { ret void } -define void @run(%Neg %f) { +define void @run(%Pos %boxed) { + ; unbox + %f = call %Neg @coercePosNeg(%Pos %boxed) + ; fresh stack %stack = call %Stack @withEmptyStack() @@ -723,7 +724,10 @@ define void @run(%Neg %f) { ret void } -define void @run_Int(%Neg %f, i64 %argument) { +define void @run_Int(%Pos %boxed, i64 %argument) { + ; unbox + %f = call %Neg @coercePosNeg(%Pos %boxed) + ; fresh stack %stack = call %Stack @withEmptyStack() @@ -734,11 +738,14 @@ define void @run_Int(%Neg %f, i64 %argument) { %functionPointer = load ptr, ptr %functionPointerPointer, !alias.scope !15, !noalias !25 ; call - tail call tailcc %Pos %functionPointer(%Object %object, %Evidence 0, i64 %argument, %Stack %stack) + tail call tailcc %Pos %functionPointer(%Object %object, i64 %argument, %Stack %stack) ret void } -define void @run_Pos(%Neg %f, %Pos %argument) { +define void @run_Pos(%Pos %boxed, %Pos %argument) { + ; unbox + %f = call %Neg @coercePosNeg(%Pos %boxed) + ; fresh stack %stack = call %Stack @withEmptyStack() @@ -749,7 +756,7 @@ define void @run_Pos(%Neg %f, %Pos %argument) { %functionPointer = load ptr, ptr %functionPointerPointer, !alias.scope !15, !noalias !25 ; call - tail call tailcc %Pos %functionPointer(%Object %object, %Evidence 0, %Pos %argument, %Stack %stack) + tail call tailcc %Pos %functionPointer(%Object %object, %Pos %argument, %Stack %stack) ret void } diff --git a/libraries/llvm/types.c b/libraries/llvm/types.c index c372bbbb6..83190cf4a 100644 --- a/libraries/llvm/types.c +++ b/libraries/llvm/types.c @@ -37,9 +37,9 @@ typedef struct StackValue* Stack; extern void resume_Int(Stack, Int); extern void resume_Pos(Stack, struct Pos); -extern void run(struct Neg); -extern void run_Int(struct Neg, Int); -extern void run_Pos(struct Neg, struct Pos); +extern void run(struct Pos); +extern void run_Int(struct Pos, Int); +extern void run_Pos(struct Pos, struct Pos); // Reference counting primitives defined in LLVM extern void eraseNegative(struct Neg);