Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions effekt/jvm/src/test/scala/effekt/ChezSchemeTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ abstract class ChezSchemeTests extends EffektTests {
examplesDir / "benchmarks" / "input_output" / "small_files.effekt",
examplesDir / "benchmarks" / "input_output" / "interleave_promises.effekt",
examplesDir / "benchmarks" / "input_output" / "financial_format.effekt",
examplesDir / "benchmarks" / "input_output" / "server_client.effekt",

// unsafe continuations are not yet supported in our Chez backend
examplesDir / "pos" / "unsafe_cont.effekt",
Expand Down
4 changes: 3 additions & 1 deletion effekt/jvm/src/test/scala/effekt/JavaScriptTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ class JavaScriptTests extends EffektTests {
// unsafe cont
examplesDir / "pos" / "propagators.effekt",
// stack overflow
examplesDir / "benchmarks" / "folklore_to_fact" / "cps_tak.effekt"
examplesDir / "benchmarks" / "folklore_to_fact" / "cps_tak.effekt",
// tcp server and client
examplesDir / "benchmarks" / "input_output" / "server_client.effekt",
)
}

Expand Down
1 change: 1 addition & 0 deletions examples/benchmarks/input_output/server_client.check
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
55
43 changes: 43 additions & 0 deletions examples/benchmarks/input_output/server_client.effekt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import examples/benchmarks/runner

import io
import io/error
import io/network
import bytearray

def run(n: Int) = {
with on[IOError].panic

val listener = bind("127.0.0.1", 8080);
spawn(box {
with on[IOError].panic
listen(listener, box { connection =>
with on[IOError].panic
val message = "hello world"
var buffer = bytearray::fromString(message)
write(connection, buffer, 0, buffer.size())
close(connection)
})});

val results = array::build(n) { i =>
promise(box {
with on[IOError].result
val connection = connect("127.0.0.1", 8080)
var buffer = bytearray::allocate(4096)
val number = read(connection, buffer, 0, 4096)
close(connection)
number
})
};

var total = 0;
results.foreach { number =>
total = total + number.await.value
};

shutdown(listener)

return total
}

def main() = benchmark(5){run}
3 changes: 1 addition & 2 deletions libraries/common/io.effekt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ extern def spawn(task: Task[Unit]) at async: Unit =
js "$effekt.capture(k => { setTimeout(() => k($effekt.unit), 0); return $effekt.run(${task}) })"
llvm """
call void @c_yield(%Stack %stack)
%unboxed = call ccc %Neg @coercePosNeg(%Pos ${task})
call void @run(%Neg %unboxed)
call void @run(%Pos ${task})
ret void
"""

Expand Down
11 changes: 11 additions & 0 deletions libraries/common/io/error.effekt
Original file line number Diff line number Diff line change
Expand Up @@ -464,3 +464,14 @@ namespace internal {
"""

}

namespace internal {
def checkResult(result: Int): Int / Exception[IOError] =
if (result < 0) {
val ioError = fromNumber(internal::errorNumber(result));
do raise[IOError](ioError, message(ioError))
} else {
result
}
}

7 changes: 0 additions & 7 deletions libraries/common/io/filesystem.effekt
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,6 @@ namespace internal {
ret void
"""

def checkResult(result: Int): Int / Exception[IOError] =
if (result < 0) {
val ioError = fromNumber(internal::errorNumber(result));
do raise[IOError](ioError, message(ioError))
} else {
result
}
}

namespace examples {
Expand Down
199 changes: 135 additions & 64 deletions libraries/common/io/network.effekt
Original file line number Diff line number Diff line change
@@ -1,84 +1,155 @@
module io/network

import stream
import bytearray
import io

namespace js {
extern jsNode """
const net = require('node:net');
import io/error

function listen(server, port, host, listener) {
server.listen(port, host);
server.on('connection', listener);
}
"""

extern type JSServer // = net.Server
extern type JSSocket // = net.Socket
extern def server() at io: JSServer =
jsNode "net.createServer()"
extern def listen(server: JSServer, port: Int, host: String, listener: JSSocket => Unit at {io, async, global}) at io: Unit =
jsNode "listen(${server}, ${port}, ${host}, (socket) => $effekt.runToplevel((ks, k) => (${listener})(socket, ks, k)))"
def client[R](host: String, port: Int) { stream: () => R / {read[Byte], emit[Byte]} }: R / Exception[IOError] = {

extern def send(socket: JSSocket, data: ByteArray) at async: Unit =
jsNode "$effekt.capture(callback => ${socket}.write(${data}, callback))"
val connection = connect(host, port);
with on[IOError].finalize { close(connection) }

extern def receive(socket: JSSocket) at async: ByteArray =
jsNode "$effekt.capture(callback => ${socket}.once('data', callback))"
val writeBuffer = bytearray::allocate(4096)
var writeBufferOffset = 0

extern def end(socket: JSSocket) at async: Unit =
jsNode "$effekt.capture(k => ${socket}.end(k))"
}
val readBuffer = bytearray::allocate(4096)
var readBufferOffset = 0
var readBufferFilled = 0

interface Socket {
def send(message: ByteArray): Unit
def receive(): ByteArray
def end(): Unit
}
def push(i: Int, n: Int): Unit = {
val r = write(connection, writeBuffer, i, n)
if (r < n) {
push(i + r, n - r)
}
}

def pull(): Int = {
read(connection, readBuffer, 0, readBuffer.size) match {
case 0 => pull()
case n => n
}
}

def server(host: String, port: Int, handler: () => Unit / Socket at {io, async, global}): Unit = {
val server = js::server();
js::listen(server, port, host, box { socket =>
println("New connection")
spawn(box {
try handler()
with Socket {
def send(message) =
resume(js::send(socket, message))
def receive() =
resume(js::receive(socket))
def end() =
resume(js::end(socket))
}
})
})
try {
val r = stream()
push(0, writeBufferOffset)
return r
} with emit[Byte] { (byte) =>
if (writeBufferOffset >= writeBuffer.size) {
push(0, writeBuffer.size)
writeBufferOffset = 0
}
writeBuffer.unsafeSet(writeBufferOffset, byte)
writeBufferOffset = writeBufferOffset + 1
resume(())
} with read[Byte] { () =>
if (readBufferOffset >= readBufferFilled) {
val n = pull()
readBufferOffset = 0
readBufferFilled = n
}
val byte = unsafeGet(readBuffer, readBufferOffset)
readBufferOffset = readBufferOffset + 1
resume { return byte }
}
}


namespace examples {
def helloWorldApp(): Unit / Socket = {
val request = do receive().toString;

println("Received a request: " ++ request)
/// A TCP handle. Should not be inspected.
type Connection = Int
Comment on lines +61 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// A TCP handle. Should not be inspected.
type Connection = Int
/// A TCP handle. Should not be inspected.
extern type Connection
// = llvm "Int"

Could we make it an extern type so that it cannot be inspected?

(also applies to Listener below)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I don't like it (because then it is a Pos) but I guess I'll have to do it, because the representation in js is different.

Copy link
Contributor

@jiribenes jiribenes Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approaching the problem from the other side: does it have to be a Pos? We could let the user choose, no?

Couldn't we fix this "properly" by some = llvm "i64" annotation there instead? 🎣


def respond(s: String): Unit / Socket =
do send(s.fromString)
/// Reads data from a TCP connection into a buffer at the given offset.
def read(handle: Connection, buffer: ByteArray, offset: Int, size: Int): Int / Exception[IOError] =
internal::checkResult(internal::read(handle, buffer, offset, size))

if (request.startsWith("GET /")) {
respond("HTTP/1.1 200 OK\r\n\r\nHello from Effekt!")
} else {
respond("HTTP/1.1 400 Bad Request\r\n\r\n")
}
do end()
}
/// Writes data from a buffer at the given offset to a TCP connection.
def write(handle: Connection, buffer: ByteArray, offset: Int, size: Int): Int / Exception[IOError] =
internal::checkResult(internal::write(handle, buffer, offset, size))

// A server that just shows "Hello from Effekt!" on localhost:8080
def main() = {
val port = 8080
println("Starting server on http://localhost:" ++ port.show)
/// Establishes a TCP connection to the specified host and port.
def connect(host: String, port: Int): Connection / Exception[IOError] =
internal::checkResult(internal::connect(host, port))

server("localhost", port, box {
helloWorldApp()
})
}
/// Closes a TCP connection and releases associated resources.
def close(handle: Connection): Unit =
internal::close(handle)

/// A TCP listener. Should not be inspected.
type Listener = Int

/// Creates a TCP listener bound to the specified host and port.
def bind(host: String, port: Int): Listener / Exception[IOError] =
internal::checkResult(internal::bind(host, port))

/// Starts listening for incoming connections and handles them with the provided handler function.
/// Runs until `shutdown` is called on this `Listener`.
def listen(listener: Listener, handler: Connection => Unit at {io, async, global}): Unit / Exception[IOError] = {
internal::checkResult(internal::listen(listener, handler)); ()
}

/// Stops a TCP listener and releases associated resources.
def shutdown(listener: Listener): Unit =
internal::shutdown(listener)

namespace internal {

extern llvm """
declare void @c_tcp_connect(%Pos, %Int, %Stack)
declare void @c_tcp_read(%Int, %Pos, %Int, %Int, %Stack)
declare void @c_tcp_write(%Int, %Pos, %Int, %Int, %Stack)
declare void @c_tcp_close(%Int, %Stack)
declare void @c_tcp_bind(%Pos, %Int, %Int, %Stack)
declare void @c_tcp_listen(%Int, %Pos, %Stack)
declare void @c_tcp_shutdown(%Int, %Stack)
"""

extern def connect(host: String, port: Int) at async: Int =
llvm """
call void @c_tcp_connect(%Pos ${host}, %Int ${port}, %Stack %stack)
ret void
"""

/// The buffer must be kept alive by using it after the call
extern def read(handle: Int, buffer: ByteArray, offset: Int, size: Int) at async: Int =
llvm """
call void @c_tcp_read(%Int ${handle}, %Pos ${buffer}, %Int ${offset}, %Int ${size}, %Stack %stack)
ret void
"""

/// The buffer must be kept alive by using it after the call
extern def write(handle: Int, buffer: ByteArray, offset: Int, size: Int) at async: Int =
llvm """
call void @c_tcp_write(%Int ${handle}, %Pos ${buffer}, %Int ${offset}, %Int ${size}, %Stack %stack)
ret void
"""

extern def close(handle: Int) at async: Unit =
llvm """
call void @c_tcp_close(%Int ${handle}, %Stack %stack)
ret void
"""

extern def bind(host: String, port: Int) at io: Int =
llvm """
%result = call %Int @c_tcp_bind(%Pos ${host}, %Int ${port})
ret %Int %result
"""

extern def listen(listener: Int, handler: Int => Unit at {io, async, global}) at async: Int =
llvm """
call void @c_tcp_listen(%Int ${listener}, %Pos ${handler}, %Stack %stack)
ret void
"""

extern def shutdown(handle: Int) at async: Unit =
llvm """
call void @c_tcp_shutdown(%Int ${handle}, %Stack %stack)
ret void
"""

}


Loading
Loading