diff --git a/examples/benchmarks/input_output/interleave_promises.effekt b/examples/benchmarks/input_output/interleave_promises.effekt index e394870c5..0dcda21fb 100644 --- a/examples/benchmarks/input_output/interleave_promises.effekt +++ b/examples/benchmarks/input_output/interleave_promises.effekt @@ -4,6 +4,7 @@ import bytearray import list import io import io/error +import io/promise import io/filesystem diff --git a/examples/benchmarks/input_output/sender_receiver.check b/examples/benchmarks/input_output/sender_receiver.check new file mode 100644 index 000000000..9a037142a --- /dev/null +++ b/examples/benchmarks/input_output/sender_receiver.check @@ -0,0 +1 @@ +10 \ No newline at end of file diff --git a/examples/benchmarks/input_output/sender_receiver.effekt b/examples/benchmarks/input_output/sender_receiver.effekt new file mode 100644 index 000000000..913422e6b --- /dev/null +++ b/examples/benchmarks/input_output/sender_receiver.effekt @@ -0,0 +1,26 @@ +import examples/benchmarks/runner + +import io +import io/channel + + +def run(n: Int) = { + val (sender, receiver) = channel::allocate() + val closed = ref(false) + + spawn(box { + each(0, n) { i => + sender.send(i) + } + closed.set(true) + }) + + var s = 0 + while (not(closed.get())) { + s = s + receiver.receive(()) + } + return s +} + +def main() = benchmark(5){run} + diff --git a/examples/stdlib/acme.effekt b/examples/stdlib/acme.effekt index cffcd9178..6dd25d713 100644 --- a/examples/stdlib/acme.effekt +++ b/examples/stdlib/acme.effekt @@ -13,10 +13,13 @@ import effekt import exception import heap import io +import io/channel import io/console import io/error import io/filesystem import io/network +import io/promise +import io/signal import io/time import json import list diff --git a/examples/stdlib/io/filesystem/async_file_io.effekt b/examples/stdlib/io/filesystem/async_file_io.effekt index 411dcf78c..2e3e4c0d4 100644 --- a/examples/stdlib/io/filesystem/async_file_io.effekt +++ b/examples/stdlib/io/filesystem/async_file_io.effekt @@ -1,6 +1,7 @@ import io import io/error import io/filesystem +import io/promise def program(path1: String, path2: String) = { val f1 = promise(box { diff --git a/examples/stdlib/io/promise.effekt b/examples/stdlib/io/promise.effekt index 59f289ddd..465834bc7 100644 --- a/examples/stdlib/io/promise.effekt +++ b/examples/stdlib/io/promise.effekt @@ -1,4 +1,5 @@ import io +import io/promise def main() = { diff --git a/examples/stdlib/io/time.effekt b/examples/stdlib/io/time.effekt index 5b4186cec..808c996bd 100644 --- a/examples/stdlib/io/time.effekt +++ b/examples/stdlib/io/time.effekt @@ -1,18 +1,19 @@ import io import io/time +import io/promise def main() = { val p1 = promise(box { println("Start p1"); - wait(250) + sleep(250) println("Stop p1"); 1 }) val p2 = promise(box { println("Start p2"); - wait(150) + sleep(150) println("Stop p2"); 2 }) diff --git a/libraries/common/io.effekt b/libraries/common/io.effekt index 758117eb7..45f5db274 100644 --- a/libraries/common/io.effekt +++ b/libraries/common/io.effekt @@ -1,7 +1,5 @@ module io -import ref -import queue // Event Loop // ---------- @@ -34,56 +32,3 @@ extern async def abort(): Nothing = call void @eraseStack(%Stack %stack) ret void """ - - -// Promises -// -------- - -extern type Promise[T] - // = js "{resolve: ƒ, promise: Promise}" - // = llvm "{tag: 0, obj: Promise*}" - -def promise[T](task: Task[T]): Promise[T] = { - val p = promise::make[T](); - spawn(box { p.resolve(task()) }); - return p -} - -extern llvm """ - declare %Pos @c_promise_make() - declare void @c_promise_resolve(%Pos, %Pos) - declare void @c_promise_await(%Pos, %Neg) -""" - -extern async def await[T](promise: Promise[T]): T = - js "$effekt.capture(k => ${promise}.promise.then(k))" - llvm """ - call void @c_promise_await(%Pos ${promise}, %Stack %stack) - ret void - """ - -extern async def resolve[T](promise: Promise[T], value: T): Unit = - js "$effekt.capture(k => { ${promise}.resolve(${value}); return k($effekt.unit) })" - llvm """ - call void @c_promise_resolve(%Pos ${promise}, %Pos ${value}, %Stack %stack) - ret void - """ - -namespace promise { - extern js """ - function promise$make() { - let resolve; - const promise = new Promise((res, rej) => { - resolve = res; - }); - return { resolve: resolve, promise: promise }; - } - """ - - extern io def make[T](): Promise[T] = - js "promise$make()" - llvm """ - %promise = call %Pos @c_promise_make() - ret %Pos %promise - """ -} diff --git a/libraries/common/io/channel.effekt b/libraries/common/io/channel.effekt new file mode 100644 index 000000000..b5015f26a --- /dev/null +++ b/libraries/common/io/channel.effekt @@ -0,0 +1,49 @@ +module io/channel + +import io +import io/signal + +record Node[A, B](this: Signal[A, B], rest: Ref[Option[Node[A, B]]]) + +type Sender[A, B] = Ref[Node[A, B]] +type Receiver[A, B] = Ref[Node[A, B]] + +namespace channel { + def allocate[A, B](): (Sender[A, B], Receiver[A, B]) = { + val node = Node(signal::allocate(), ref(None())) + (ref(node), ref(node)) + } +} + +def send[A, B](sender: Sender[A, B], value: A): B = { + val Node(this, rest) = sender.get() + rest.get() match { + case None() => + val node = Node(signal::allocate(), ref(None())) + rest.set(Some(node)) + sender.set(node) + case Some(node) => + sender.set(node) + } + // TODO avoid yield + yield() + this.unsafeNotifyA(value) +} + +def receive[A, B](receiver: Receiver[A, B], value: B): A = { + val Node(this, rest) = receiver.get() + rest.get() match { + case None() => + val node = Node(signal::allocate(), ref(None())) + rest.set(Some(node)) + receiver.set(node) + case Some(node) => + receiver.set(node) + } + // TODO avoid yield + yield() + this.unsafeNotifyB(value) +} + +// TODO streaming + diff --git a/libraries/common/io/promise.effekt b/libraries/common/io/promise.effekt new file mode 100644 index 000000000..3917f2ac4 --- /dev/null +++ b/libraries/common/io/promise.effekt @@ -0,0 +1,88 @@ +module io/promise + +import io +import io/signal + +// Promises +// -------- + +type State[T] { + Resolved(value: T) + Pending(signals: List[Signal[T, Unit]]) +} + +extern type Promise[T] + // = js "{resolve: ƒ, promise: Promise}" + // = llvm "Ref[State[T]]" + +extern pure def toPromise[T](promise: Ref[State[T]]): Promise[T] = + llvm """ + ret %Pos ${promise} + """ + +extern pure def toRefState[T](promise: Promise[T]): Ref[State[T]] = + llvm """ + ret %Pos ${promise} + """ + +namespace promise { + extern global def make[T](): Promise[T] = + js "promise$make()" + llvm { toPromise(ref(Pending(Nil()))) } +} + +def promise[T](task: Task[T]): Promise[T] = { + val p = promise::make[T](); + spawn(box { p.resolve(task()) }); + return p +} + +extern {async, global} def await[T](promise: Promise[T]): T = + js "$effekt.capture(k => ${promise}.promise.then(k))" + llvm { + val reference = toRefState(promise) + // TODO use reference.get + get(reference) match { + case Resolved(value) => + return value + case Pending(signals) => + val signal = signal::allocate() + // TODO use reference.set and signal.wait + set(reference, Pending(Cons(signal, signals))) + // TODO perhaps yield to avoid stack overflow + unsafeNotifyB(signal, ()) + } + } + +extern {io, async, global} def resolve[T](promise: Promise[T], value: T): Unit = + js "promise$resolve(${promise}, ${value})" + llvm { + val reference = toRefState(promise) + // TODO use reference.get + get(reference) match { + case Resolved(value) => + panic("ERROR: Promise already resolved") + case Pending(signals) => + // TODO use reference.set + set(reference, Resolved(value)) + // TODO perhaps yield to avoid stack overflow + // TODO use signals.reverse.foreach + foreach(reverse(signals)) { signal => unsafeNotifyA(signal, value) } + } + } + + +extern js """ + function promise$make() { + let resolve; + const promise = new Promise((res, rej) => { + resolve = res; + }); + return { resolve: resolve, promise: promise }; + } + + function promise$resolve(promise, value) { + promise.resolve(value); + return $effekt.unit + } +""" diff --git a/libraries/common/io/signal.effekt b/libraries/common/io/signal.effekt new file mode 100644 index 000000000..ba96ec0b6 --- /dev/null +++ b/libraries/common/io/signal.effekt @@ -0,0 +1,34 @@ +module io/signal + +// Signals +// ------- + +/// Must be notified exactly once by A and exactly once by B. +/// Must yield before notifying or risking stack overflow. +extern type Signal[A, B] + +namespace signal { + extern global def allocate[A, B](): Signal[A, B] = + llvm """ + %signal = call %Pos @c_signal_make() + ret %Pos %signal + """ +} + +extern async def unsafeNotifyA[A, B](signal: Signal[A, B], value: A): B = + llvm """ + call void @c_signal_notify(%Pos ${signal}, %Pos ${value}, %Stack %stack) + ret void + """ + +extern async def unsafeNotifyB[A, B](signal: Signal[A, B], value: B): A = + llvm """ + call void @c_signal_notify(%Pos ${signal}, %Pos ${value}, %Stack %stack) + ret void + """ + +extern llvm """ + declare %Pos @c_signal_make() + declare void @c_signal_notify(%Pos, %Pos, %Stack) +""" + diff --git a/libraries/common/io/time.effekt b/libraries/common/io/time.effekt index 95472321b..3301e4249 100644 --- a/libraries/common/io/time.effekt +++ b/libraries/common/io/time.effekt @@ -4,7 +4,7 @@ extern llvm """ declare void @c_timer_start(%Int, %Stack) """ -extern async def wait(millis: Int): Unit = +extern async def sleep(millis: Int): Unit = js "$effekt.capture(k => setTimeout(() => k($effekt.unit), ${millis}))" llvm """ call void @c_timer_start(%Int ${millis}, %Stack %stack) diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 57b3b22d5..9b67b3764 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -295,154 +295,59 @@ void c_yield(Stack stack) { c_timer_start(0, stack); } - -// Promises +// Signals // -------- -typedef enum { UNRESOLVED, RESOLVED } promise_state_t; - -typedef struct Listeners { - Stack head; - struct Listeners* tail; -} Listeners; +typedef enum { INITIAL, NOTIFIED } signal_state_t; typedef struct { uint64_t rc; void* eraser; - promise_state_t state; - // state of { - // case UNRESOLVED => Possibly empty (head is NULL) list of listeners - // case RESOLVED => Pos (the result) - // } - union { - struct Pos value; - Listeners listeners; - } payload; -} Promise; - -void c_promise_erase_listeners(void *envPtr) { - // envPtr points to a Promise _after_ the eraser, so let's adjust it to point to the promise. - Promise *promise = (Promise*) (envPtr - offsetof(Promise, state)); - promise_state_t state = promise->state; - - Stack head; - Listeners* tail; - Listeners* current; - - switch (state) { - case UNRESOLVED: - head = promise->payload.listeners.head; - tail = promise->payload.listeners.tail; - if (head != NULL) { - // Erase head - eraseStack(head); - // Erase tail - current = tail; - while (current != NULL) { - head = current->head; - tail = current->tail; - free(current); - eraseStack(head); - current = tail; - }; - }; - break; - case RESOLVED: - erasePositive(promise->payload.value); - break; - } -} - -void c_promise_resume_listeners(Listeners* listeners, struct Pos value) { - if (listeners != NULL) { - Stack head = listeners->head; - Listeners* tail = listeners->tail; - free(listeners); - c_promise_resume_listeners(tail, value); - sharePositive(value); - resume_Pos(head, value); - } + signal_state_t state; + struct Pos value; + Stack stack; +} Signal; + +void c_signal_erase(void *envPtr) { + // envPtr points to a Signal _after_ the eraser, so let's adjust it to point to the beginning. + Signal *signal = (Signal*) (envPtr - offsetof(Signal, state)); + erasePositive(signal->value); + if (signal->stack != NULL) { eraseStack(signal->stack); } } -void c_promise_resolve(struct Pos promise, struct Pos value, Stack stack) { - Promise* p = (Promise*)promise.obj; - - Stack head; - Listeners* tail; +struct Pos c_signal_make() { + Signal* f = (Signal*)malloc(sizeof(Signal)); - switch (p->state) { - case UNRESOLVED: - head = p->payload.listeners.head; - tail = p->payload.listeners.tail; + f->rc = 0; + f->eraser = c_signal_erase; + f->state = INITIAL; + f->value = (struct Pos) { .tag = 0, .obj = NULL, }; + f->stack = NULL; - p->state = RESOLVED; - p->payload.value = value; - resume_Pos(stack, Unit); - - if (head != NULL) { - // Execute tail - c_promise_resume_listeners(tail, value); - // Execute head - sharePositive(value); - resume_Pos(head, value); - }; - break; - case RESOLVED: - erasePositive(promise); - erasePositive(value); - eraseStack(stack); - fprintf(stderr, "ERROR: Promise already resolved\n"); - exit(1); - break; - } - // TODO stack overflow? - // We need to erase the promise now, since we consume it. - erasePositive(promise); + return (struct Pos) { .tag = 0, .obj = f, }; } -void c_promise_await(struct Pos promise, Stack stack) { - Promise* p = (Promise*)promise.obj; - - Stack head; - Listeners* tail; - Listeners* node; - struct Pos value; - - switch (p->state) { - case UNRESOLVED: - head = p->payload.listeners.head; - tail = p->payload.listeners.tail; - if (head != NULL) { - node = (Listeners*)malloc(sizeof(Listeners)); - node->head = head; - node->tail = tail; - p->payload.listeners.head = stack; - p->payload.listeners.tail = node; - } else { - p->payload.listeners.head = stack; - }; +void c_signal_notify(struct Pos signal, struct Pos value, Stack stack) { + Signal* f = (Signal*)signal.obj; + switch (f->state) { + case INITIAL: { + f->state = NOTIFIED; + f->value = value; + f->stack = stack; + erasePositive(signal); break; - case RESOLVED: - value = p->payload.value; - sharePositive(value); - resume_Pos(stack, value); + } + case NOTIFIED: { + struct Pos other_value = f->value; + f->value = (struct Pos) { .tag = 0, .obj = NULL, }; + Stack other_stack = f->stack; + f->stack = NULL; + erasePositive(signal); + resume_Pos(other_stack, value); + resume_Pos(stack, other_value); break; - }; - // TODO hmm, stack overflow? - erasePositive(promise); -} - -struct Pos c_promise_make() { - Promise* promise = (Promise*)malloc(sizeof(Promise)); - - promise->rc = 0; - promise->eraser = c_promise_erase_listeners; - promise->state = UNRESOLVED; - promise->payload.listeners.head = NULL; - promise->payload.listeners.tail = NULL; - - return (struct Pos) { .tag = 0, .obj = promise, }; + } + } } - #endif