Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import bytearray
import list
import io
import io/error
import io/promise
import io/filesystem


Expand Down
1 change: 1 addition & 0 deletions examples/benchmarks/input_output/sender_receiver.check
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
10
26 changes: 26 additions & 0 deletions examples/benchmarks/input_output/sender_receiver.effekt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import examples/benchmarks/runner

import io
import io/channel


def run(n: Int) = {
val (sender, receiver) = channel::allocate()
val closed = ref(false)

spawn(box {
each(0, n) { i =>
sender.send(i)
}
closed.set(true)
})

var s = 0
while (not(closed.get())) {
s = s + receiver.receive(())
}
return s
}

def main() = benchmark(5){run}

3 changes: 3 additions & 0 deletions examples/stdlib/acme.effekt
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ import effekt
import exception
import heap
import io
import io/channel
import io/console
import io/error
import io/filesystem
import io/network
import io/promise
import io/signal
import io/time
import json
import list
Expand Down
1 change: 1 addition & 0 deletions examples/stdlib/io/filesystem/async_file_io.effekt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import io
import io/error
import io/filesystem
import io/promise

def program(path1: String, path2: String) = {
val f1 = promise(box {
Expand Down
1 change: 1 addition & 0 deletions examples/stdlib/io/promise.effekt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import io
import io/promise

def main() = {

Expand Down
5 changes: 3 additions & 2 deletions examples/stdlib/io/time.effekt
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import io
import io/time
import io/promise

def main() = {

val p1 = promise(box {
println("Start p1");
wait(250)
sleep(250)
println("Stop p1");
1
})

val p2 = promise(box {
println("Start p2");
wait(150)
sleep(150)
println("Stop p2");
2
})
Expand Down
55 changes: 0 additions & 55 deletions libraries/common/io.effekt
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module io

import ref
import queue

// Event Loop
// ----------
Expand Down Expand Up @@ -34,56 +32,3 @@ extern async def abort(): Nothing =
call void @eraseStack(%Stack %stack)
ret void
"""


// Promises
// --------

extern type Promise[T]
// = js "{resolve: ƒ, promise: Promise}"
// = llvm "{tag: 0, obj: Promise*}"

def promise[T](task: Task[T]): Promise[T] = {
val p = promise::make[T]();
spawn(box { p.resolve(task()) });
return p
}

extern llvm """
declare %Pos @c_promise_make()
declare void @c_promise_resolve(%Pos, %Pos)
declare void @c_promise_await(%Pos, %Neg)
"""

extern async def await[T](promise: Promise[T]): T =
js "$effekt.capture(k => ${promise}.promise.then(k))"
llvm """
call void @c_promise_await(%Pos ${promise}, %Stack %stack)
ret void
"""

extern async def resolve[T](promise: Promise[T], value: T): Unit =
js "$effekt.capture(k => { ${promise}.resolve(${value}); return k($effekt.unit) })"
llvm """
call void @c_promise_resolve(%Pos ${promise}, %Pos ${value}, %Stack %stack)
ret void
"""

namespace promise {
extern js """
function promise$make() {
let resolve;
const promise = new Promise((res, rej) => {
resolve = res;
});
return { resolve: resolve, promise: promise };
}
"""

extern io def make[T](): Promise[T] =
js "promise$make()"
llvm """
%promise = call %Pos @c_promise_make()
ret %Pos %promise
"""
}
49 changes: 49 additions & 0 deletions libraries/common/io/channel.effekt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
module io/channel

import io
import io/signal

record Node[A, B](this: Signal[A, B], rest: Ref[Option[Node[A, B]]])

type Sender[A, B] = Ref[Node[A, B]]
type Receiver[A, B] = Ref[Node[A, B]]

namespace channel {
def allocate[A, B](): (Sender[A, B], Receiver[A, B]) = {
val node = Node(signal::allocate(), ref(None()))
(ref(node), ref(node))
}
}

def send[A, B](sender: Sender[A, B], value: A): B = {
val Node(this, rest) = sender.get()
rest.get() match {
case None() =>
val node = Node(signal::allocate(), ref(None()))
rest.set(Some(node))
sender.set(node)
case Some(node) =>
sender.set(node)
}
// TODO avoid yield
yield()
this.unsafeNotifyA(value)
}

def receive[A, B](receiver: Receiver[A, B], value: B): A = {
val Node(this, rest) = receiver.get()
rest.get() match {
case None() =>
val node = Node(signal::allocate(), ref(None()))
rest.set(Some(node))
receiver.set(node)
case Some(node) =>
receiver.set(node)
}
// TODO avoid yield
yield()
this.unsafeNotifyB(value)
}

// TODO streaming

88 changes: 88 additions & 0 deletions libraries/common/io/promise.effekt
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
module io/promise

import io
import io/signal

// Promises
// --------

type State[T] {
Resolved(value: T)
Pending(signals: List[Signal[T, Unit]])
}

extern type Promise[T]
// = js "{resolve: ƒ, promise: Promise}"
// = llvm "Ref[State[T]]"

extern pure def toPromise[T](promise: Ref[State[T]]): Promise[T] =
llvm """
ret %Pos ${promise}
"""

extern pure def toRefState[T](promise: Promise[T]): Ref[State[T]] =
llvm """
ret %Pos ${promise}
"""

namespace promise {
extern global def make[T](): Promise[T] =
js "promise$make()"
llvm { toPromise(ref(Pending(Nil()))) }
}

def promise[T](task: Task[T]): Promise[T] = {
val p = promise::make[T]();
spawn(box { p.resolve(task()) });
return p
}

extern {async, global} def await[T](promise: Promise[T]): T =
js "$effekt.capture(k => ${promise}.promise.then(k))"
llvm {
val reference = toRefState(promise)
// TODO use reference.get
get(reference) match {
case Resolved(value) =>
return value
case Pending(signals) =>
val signal = signal::allocate()
// TODO use reference.set and signal.wait
set(reference, Pending(Cons(signal, signals)))
// TODO perhaps yield to avoid stack overflow
unsafeNotifyB(signal, ())
}
}

extern {io, async, global} def resolve[T](promise: Promise[T], value: T): Unit =
js "promise$resolve(${promise}, ${value})"
llvm {
val reference = toRefState(promise)
// TODO use reference.get
get(reference) match {
case Resolved(value) =>
panic("ERROR: Promise already resolved")
case Pending(signals) =>
// TODO use reference.set
set(reference, Resolved(value))
// TODO perhaps yield to avoid stack overflow
// TODO use signals.reverse.foreach
foreach(reverse(signals)) { signal => unsafeNotifyA(signal, value) }
}
}


extern js """
function promise$make() {
let resolve;
const promise = new Promise((res, rej) => {
resolve = res;
});
return { resolve: resolve, promise: promise };
}

function promise$resolve(promise, value) {
promise.resolve(value);
return $effekt.unit
}
"""
34 changes: 34 additions & 0 deletions libraries/common/io/signal.effekt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
module io/signal

// Signals
// -------

/// Must be notified exactly once by A and exactly once by B.
/// Must yield before notifying or risking stack overflow.
extern type Signal[A, B]

namespace signal {
extern global def allocate[A, B](): Signal[A, B] =
llvm """
%signal = call %Pos @c_signal_make()
ret %Pos %signal
"""
}

extern async def unsafeNotifyA[A, B](signal: Signal[A, B], value: A): B =
llvm """
call void @c_signal_notify(%Pos ${signal}, %Pos ${value}, %Stack %stack)
ret void
"""

extern async def unsafeNotifyB[A, B](signal: Signal[A, B], value: B): A =
llvm """
call void @c_signal_notify(%Pos ${signal}, %Pos ${value}, %Stack %stack)
ret void
"""

extern llvm """
declare %Pos @c_signal_make()
declare void @c_signal_notify(%Pos, %Pos, %Stack)
"""

2 changes: 1 addition & 1 deletion libraries/common/io/time.effekt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ extern llvm """
declare void @c_timer_start(%Int, %Stack)
"""

extern async def wait(millis: Int): Unit =
extern async def sleep(millis: Int): Unit =
js "$effekt.capture(k => setTimeout(() => k($effekt.unit), ${millis}))"
llvm """
call void @c_timer_start(%Int ${millis}, %Stack %stack)
Expand Down
Loading
Loading