From 79256460566f7bc46dd5ce56806bb36ec7f7efd6 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Mon, 3 Feb 2025 18:11:15 +0100 Subject: [PATCH 01/22] Futures --- libraries/common/io.effekt | 28 +++++++++++++ libraries/llvm/io.c | 84 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+) diff --git a/libraries/common/io.effekt b/libraries/common/io.effekt index 758117eb7..6987f90cf 100644 --- a/libraries/common/io.effekt +++ b/libraries/common/io.effekt @@ -35,6 +35,34 @@ extern async def abort(): Nothing = ret void """ +// Futures +// ------- + +extern type Future[T] + +extern global def allocate[T](): Future[T] = + llvm """ + %future = call %Pos @c_future_make() + ret %Pos %future + """ + +extern global def fill[T](future: Future[T], value: T): Unit = + llvm """ + call void @c_future_fill(%Pos ${future}, %Pos ${value}) + ret %Pos zeroinitializer + """ + +extern async def wait[T](future: Future[T]): T = + llvm """ + call void @c_future_wait(%Pos ${future}, %Stack %stack) + ret void + """ + +extern llvm """ + declare %Pos @c_future_make() + declare void @c_future_fill(%Pos, %Pos) + declare void @c_future_wait(%Pos, %Stack) +""" // Promises // -------- diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 57b3b22d5..979114ce6 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -295,6 +295,90 @@ void c_yield(Stack stack) { c_timer_start(0, stack); } +// Futures +// ------- + +typedef enum { EMPTY, FILLED, WAITED } future_state_t; + +typedef struct { + uint64_t rc; + void* eraser; + future_state_t state; + union { + struct Pos value; + Stack stack; + } payload; +} Future; + +void c_future_erase(void *envPtr) { + // envPtr points to a Future _after_ the eraser, so let's adjust it to point to the beginning. + Future *future = (Future*) (envPtr - offsetof(Future, state)); + future_state_t state = future->state; + switch (state) { + case EMPTY: + break; + case FILLED: + erasePositive(future->payload.value); + break; + case WAITED: + eraseStack(future->payload.stack); + break; + } +} + +struct Pos c_future_make() { + Future* future = (Future*)malloc(sizeof(Future)); + + future->rc = 0; + future->eraser = c_future_erase; + future->state = EMPTY; + + return (struct Pos) { .tag = 0, .obj = future, }; +} + +void c_future_fill(struct Pos future, struct Pos value) { + Future* f = (Future*)future.obj; + switch (f->state) { + case EMPTY: + f->state = FILLED; + f->payload.value = value; + break; + case FILLED: + erasePositive(future); + erasePositive(value); + // TODO more graceful panic + fprintf(stderr, "ERROR: Future already filled\n"); + exit(1); + break; + case WAITED: + Stack stack = f->payload.stack; + free(f); + resume_Pos(stack, value); + break; + } +} + +void c_future_wait(struct Pos future, Stack stack) { + Future* f = (Future*)future.obj; + switch (f->state) { + case EMPTY: + f->state = WAITED; + f->payload.stack = stack; + break; + case FILLED: + struct Pos value = f->payload.value; + free(f); + resume_Pos(stack, value); + break; + case WAITED: + erasePositive(future); + eraseStack(stack); + // TODO more graceful panic + fprintf(stderr, "ERROR: Future already waited\n"); + exit(1); + break; + } +} // Promises // -------- From c8f5133ec76998f51d88c66c7958c6d11ed01ddc Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Fri, 7 Feb 2025 15:41:16 +0100 Subject: [PATCH 02/22] Promises in user space --- examples/stdlib/io/promise.effekt | 1 + libraries/common/io.effekt | 82 -------------------- libraries/common/io/promise.effekt | 115 +++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+), 82 deletions(-) create mode 100644 libraries/common/io/promise.effekt diff --git a/examples/stdlib/io/promise.effekt b/examples/stdlib/io/promise.effekt index 59f289ddd..465834bc7 100644 --- a/examples/stdlib/io/promise.effekt +++ b/examples/stdlib/io/promise.effekt @@ -1,4 +1,5 @@ import io +import io/promise def main() = { diff --git a/libraries/common/io.effekt b/libraries/common/io.effekt index 6987f90cf..460e0b1d8 100644 --- a/libraries/common/io.effekt +++ b/libraries/common/io.effekt @@ -1,7 +1,6 @@ module io import ref -import queue // Event Loop // ---------- @@ -34,84 +33,3 @@ extern async def abort(): Nothing = call void @eraseStack(%Stack %stack) ret void """ - -// Futures -// ------- - -extern type Future[T] - -extern global def allocate[T](): Future[T] = - llvm """ - %future = call %Pos @c_future_make() - ret %Pos %future - """ - -extern global def fill[T](future: Future[T], value: T): Unit = - llvm """ - call void @c_future_fill(%Pos ${future}, %Pos ${value}) - ret %Pos zeroinitializer - """ - -extern async def wait[T](future: Future[T]): T = - llvm """ - call void @c_future_wait(%Pos ${future}, %Stack %stack) - ret void - """ - -extern llvm """ - declare %Pos @c_future_make() - declare void @c_future_fill(%Pos, %Pos) - declare void @c_future_wait(%Pos, %Stack) -""" - -// Promises -// -------- - -extern type Promise[T] - // = js "{resolve: ƒ, promise: Promise}" - // = llvm "{tag: 0, obj: Promise*}" - -def promise[T](task: Task[T]): Promise[T] = { - val p = promise::make[T](); - spawn(box { p.resolve(task()) }); - return p -} - -extern llvm """ - declare %Pos @c_promise_make() - declare void @c_promise_resolve(%Pos, %Pos) - declare void @c_promise_await(%Pos, %Neg) -""" - -extern async def await[T](promise: Promise[T]): T = - js "$effekt.capture(k => ${promise}.promise.then(k))" - llvm """ - call void @c_promise_await(%Pos ${promise}, %Stack %stack) - ret void - """ - -extern async def resolve[T](promise: Promise[T], value: T): Unit = - js "$effekt.capture(k => { ${promise}.resolve(${value}); return k($effekt.unit) })" - llvm """ - call void @c_promise_resolve(%Pos ${promise}, %Pos ${value}, %Stack %stack) - ret void - """ - -namespace promise { - extern js """ - function promise$make() { - let resolve; - const promise = new Promise((res, rej) => { - resolve = res; - }); - return { resolve: resolve, promise: promise }; - } - """ - - extern io def make[T](): Promise[T] = - js "promise$make()" - llvm """ - %promise = call %Pos @c_promise_make() - ret %Pos %promise - """ -} diff --git a/libraries/common/io/promise.effekt b/libraries/common/io/promise.effekt new file mode 100644 index 000000000..5c161009e --- /dev/null +++ b/libraries/common/io/promise.effekt @@ -0,0 +1,115 @@ +module io/promise + +import io + +// Futures +// ------- + +extern type Future[T] + +namespace future { + extern global def allocate[T](): Future[T] = + llvm """ + %future = call %Pos @c_future_make() + ret %Pos %future + """ +} + +extern global def fill[T](future: Future[T], value: T): Unit = + llvm """ + call void @c_future_fill(%Pos ${future}, %Pos ${value}) + ret %Pos zeroinitializer + """ + +extern async def wait[T](future: Future[T]): T = + llvm """ + call void @c_future_wait(%Pos ${future}, %Stack %stack) + ret void + """ + +extern llvm """ + declare %Pos @c_future_make() + declare void @c_future_fill(%Pos, %Pos) + declare void @c_future_wait(%Pos, %Stack) +""" + +// Promises +// -------- + +type State[T] { + Resolved(value: T) + Pending(futures: List[Future[T]]) +} + +extern type Promise[T] + // = js "{resolve: ƒ, promise: Promise}" + // = llvm "Ref[State[T]]" + +extern pure def toPromise[T](promise: Ref[State[T]]): Promise[T] = + llvm """ + ret %Pos ${promise} + """ + +extern pure def toRefState[T](promise: Promise[T]): Ref[State[T]] = + llvm """ + ret %Pos ${promise} + """ + +namespace promise { + extern global def make[T](): Promise[T] = + js "promise$make()" + llvm { toPromise(ref(Pending(Nil()))) } +} + +def promise[T](task: Task[T]): Promise[T] = { + val p = promise::make[T](); + spawn(box { p.resolve(task()) }); + return p +} + +extern {async, global} def await[T](promise: Promise[T]): T = + js "$effekt.capture(k => ${promise}.promise.then(k))" + llvm { + val reference = toRefState(promise) + // TODO use reference.get + get(reference) match { + case Resolved(value) => + return value + case Pending(futures) => + val future = future::allocate() + // TODO user reference.set and future.wait + set(reference, Pending(Cons(future, futures))) + wait(future) + } + } + +extern {io, global} def resolve[T](promise: Promise[T], value: T): Unit = + js "promise$resolve(${promise}, ${value})" + llvm { + val reference = toRefState(promise) + // TODO use reference.get + get(reference) match { + case Resolved(value) => + panic("ERROR: Promise already resolved") + case Pending(futures) => + // TODO use reference.set + set(reference, Resolved(value)) + futures.reverse.foreach { future => future.fill(value) } + } + } + + +extern js """ + function promise$make() { + let resolve; + const promise = new Promise((res, rej) => { + resolve = res; + }); + return { resolve: resolve, promise: promise }; + } + + function promise$resolve(promise, value) { + promise.resolve(value); + return $effekt.unit + } +""" From 8905db566c57a0308df60abb1fc07108f972b1e0 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Thu, 12 Jun 2025 14:15:25 +0200 Subject: [PATCH 03/22] Fix warnings --- libraries/llvm/io.c | 70 ++++++++++++++++++++++++--------------------- 1 file changed, 38 insertions(+), 32 deletions(-) diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 979114ce6..39a2bccfd 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -339,44 +339,50 @@ struct Pos c_future_make() { void c_future_fill(struct Pos future, struct Pos value) { Future* f = (Future*)future.obj; switch (f->state) { - case EMPTY: - f->state = FILLED; - f->payload.value = value; - break; - case FILLED: - erasePositive(future); - erasePositive(value); - // TODO more graceful panic - fprintf(stderr, "ERROR: Future already filled\n"); - exit(1); - break; - case WAITED: - Stack stack = f->payload.stack; - free(f); - resume_Pos(stack, value); - break; + case EMPTY: { + f->state = FILLED; + f->payload.value = value; + break; + } + case FILLED: { + erasePositive(future); + erasePositive(value); + // TODO more graceful panic + fprintf(stderr, "ERROR: Future already filled\n"); + exit(1); + break; + } + case WAITED: { + Stack stack = f->payload.stack; + free(f); + resume_Pos(stack, value); + break; + } } } void c_future_wait(struct Pos future, Stack stack) { Future* f = (Future*)future.obj; switch (f->state) { - case EMPTY: - f->state = WAITED; - f->payload.stack = stack; - break; - case FILLED: - struct Pos value = f->payload.value; - free(f); - resume_Pos(stack, value); - break; - case WAITED: - erasePositive(future); - eraseStack(stack); - // TODO more graceful panic - fprintf(stderr, "ERROR: Future already waited\n"); - exit(1); - break; + case EMPTY: { + f->state = WAITED; + f->payload.stack = stack; + break; + } + case FILLED: { + struct Pos value = f->payload.value; + free(f); + resume_Pos(stack, value); + break; + } + case WAITED: { + erasePositive(future); + eraseStack(stack); + // TODO more graceful panic + fprintf(stderr, "ERROR: Future already waited\n"); + exit(1); + break; + } } } From 14bf25d3d4775214a2b7074e7002777173181a51 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Thu, 12 Jun 2025 14:24:06 +0200 Subject: [PATCH 04/22] Fix test --- examples/benchmarks/input_output/interleave_promises.effekt | 1 + libraries/common/io/promise.effekt | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/benchmarks/input_output/interleave_promises.effekt b/examples/benchmarks/input_output/interleave_promises.effekt index e394870c5..0dcda21fb 100644 --- a/examples/benchmarks/input_output/interleave_promises.effekt +++ b/examples/benchmarks/input_output/interleave_promises.effekt @@ -4,6 +4,7 @@ import bytearray import list import io import io/error +import io/promise import io/filesystem diff --git a/libraries/common/io/promise.effekt b/libraries/common/io/promise.effekt index 5c161009e..bd7ca7418 100644 --- a/libraries/common/io/promise.effekt +++ b/libraries/common/io/promise.effekt @@ -94,7 +94,8 @@ extern {io, global} def resolve[T](promise: Promise[T], value: T): Unit = case Pending(futures) => // TODO use reference.set set(reference, Resolved(value)) - futures.reverse.foreach { future => future.fill(value) } + // TODO use futures.reverse.foreach + foreach(reverse(futures)) { future => fill(future, value) } } } From 39bd49abab655c5c0d30571f2f0fc28e2e1156ae Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Thu, 12 Jun 2025 14:24:20 +0200 Subject: [PATCH 05/22] Rename wait to sleep --- libraries/common/io/time.effekt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/common/io/time.effekt b/libraries/common/io/time.effekt index 95472321b..3301e4249 100644 --- a/libraries/common/io/time.effekt +++ b/libraries/common/io/time.effekt @@ -4,7 +4,7 @@ extern llvm """ declare void @c_timer_start(%Int, %Stack) """ -extern async def wait(millis: Int): Unit = +extern async def sleep(millis: Int): Unit = js "$effekt.capture(k => setTimeout(() => k($effekt.unit), ${millis}))" llvm """ call void @c_timer_start(%Int ${millis}, %Stack %stack) From 0db64a9a783f69f69182ae494a21de68dad04a3d Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Thu, 12 Jun 2025 14:31:33 +0200 Subject: [PATCH 06/22] Delete promises --- libraries/llvm/io.c | 149 -------------------------------------------- 1 file changed, 149 deletions(-) diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 39a2bccfd..e049a85c4 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -386,153 +386,4 @@ void c_future_wait(struct Pos future, Stack stack) { } } -// Promises -// -------- - -typedef enum { UNRESOLVED, RESOLVED } promise_state_t; - -typedef struct Listeners { - Stack head; - struct Listeners* tail; -} Listeners; - -typedef struct { - uint64_t rc; - void* eraser; - promise_state_t state; - // state of { - // case UNRESOLVED => Possibly empty (head is NULL) list of listeners - // case RESOLVED => Pos (the result) - // } - union { - struct Pos value; - Listeners listeners; - } payload; -} Promise; - -void c_promise_erase_listeners(void *envPtr) { - // envPtr points to a Promise _after_ the eraser, so let's adjust it to point to the promise. - Promise *promise = (Promise*) (envPtr - offsetof(Promise, state)); - promise_state_t state = promise->state; - - Stack head; - Listeners* tail; - Listeners* current; - - switch (state) { - case UNRESOLVED: - head = promise->payload.listeners.head; - tail = promise->payload.listeners.tail; - if (head != NULL) { - // Erase head - eraseStack(head); - // Erase tail - current = tail; - while (current != NULL) { - head = current->head; - tail = current->tail; - free(current); - eraseStack(head); - current = tail; - }; - }; - break; - case RESOLVED: - erasePositive(promise->payload.value); - break; - } -} - -void c_promise_resume_listeners(Listeners* listeners, struct Pos value) { - if (listeners != NULL) { - Stack head = listeners->head; - Listeners* tail = listeners->tail; - free(listeners); - c_promise_resume_listeners(tail, value); - sharePositive(value); - resume_Pos(head, value); - } -} - -void c_promise_resolve(struct Pos promise, struct Pos value, Stack stack) { - Promise* p = (Promise*)promise.obj; - - Stack head; - Listeners* tail; - - switch (p->state) { - case UNRESOLVED: - head = p->payload.listeners.head; - tail = p->payload.listeners.tail; - - p->state = RESOLVED; - p->payload.value = value; - resume_Pos(stack, Unit); - - if (head != NULL) { - // Execute tail - c_promise_resume_listeners(tail, value); - // Execute head - sharePositive(value); - resume_Pos(head, value); - }; - break; - case RESOLVED: - erasePositive(promise); - erasePositive(value); - eraseStack(stack); - fprintf(stderr, "ERROR: Promise already resolved\n"); - exit(1); - break; - } - // TODO stack overflow? - // We need to erase the promise now, since we consume it. - erasePositive(promise); -} - -void c_promise_await(struct Pos promise, Stack stack) { - Promise* p = (Promise*)promise.obj; - - Stack head; - Listeners* tail; - Listeners* node; - struct Pos value; - - switch (p->state) { - case UNRESOLVED: - head = p->payload.listeners.head; - tail = p->payload.listeners.tail; - if (head != NULL) { - node = (Listeners*)malloc(sizeof(Listeners)); - node->head = head; - node->tail = tail; - p->payload.listeners.head = stack; - p->payload.listeners.tail = node; - } else { - p->payload.listeners.head = stack; - }; - break; - case RESOLVED: - value = p->payload.value; - sharePositive(value); - resume_Pos(stack, value); - break; - }; - // TODO hmm, stack overflow? - erasePositive(promise); -} - -struct Pos c_promise_make() { - Promise* promise = (Promise*)malloc(sizeof(Promise)); - - promise->rc = 0; - promise->eraser = c_promise_erase_listeners; - promise->state = UNRESOLVED; - promise->payload.listeners.head = NULL; - promise->payload.listeners.tail = NULL; - - return (struct Pos) { .tag = 0, .obj = promise, }; -} - - #endif From f4c9eb3524bf318a04e941f7b3443fdad90a0d4d Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Thu, 12 Jun 2025 16:00:24 +0200 Subject: [PATCH 07/22] Fix future reference count --- libraries/common/io/promise.effekt | 1 + libraries/llvm/io.c | 8 ++++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/libraries/common/io/promise.effekt b/libraries/common/io/promise.effekt index bd7ca7418..92ff7864d 100644 --- a/libraries/common/io/promise.effekt +++ b/libraries/common/io/promise.effekt @@ -5,6 +5,7 @@ import io // Futures // ------- +/// Must be filled exactly once and waited exactly once extern type Future[T] namespace future { diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index e049a85c4..ce69f5619 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -342,6 +342,7 @@ void c_future_fill(struct Pos future, struct Pos value) { case EMPTY: { f->state = FILLED; f->payload.value = value; + erasePositive(future); break; } case FILLED: { @@ -354,7 +355,8 @@ void c_future_fill(struct Pos future, struct Pos value) { } case WAITED: { Stack stack = f->payload.stack; - free(f); + f->state = EMPTY; + erasePositive(future); resume_Pos(stack, value); break; } @@ -367,11 +369,13 @@ void c_future_wait(struct Pos future, Stack stack) { case EMPTY: { f->state = WAITED; f->payload.stack = stack; + erasePositive(future); break; } case FILLED: { struct Pos value = f->payload.value; - free(f); + f->state = EMPTY; + erasePositive(future); resume_Pos(stack, value); break; } From 708a816177f74bebfe03771e9d19fd8f14b983c8 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Thu, 12 Jun 2025 16:01:55 +0200 Subject: [PATCH 08/22] Fix other tests --- examples/stdlib/io/filesystem/async_file_io.effekt | 1 + examples/stdlib/io/time.effekt | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/examples/stdlib/io/filesystem/async_file_io.effekt b/examples/stdlib/io/filesystem/async_file_io.effekt index 411dcf78c..2e3e4c0d4 100644 --- a/examples/stdlib/io/filesystem/async_file_io.effekt +++ b/examples/stdlib/io/filesystem/async_file_io.effekt @@ -1,6 +1,7 @@ import io import io/error import io/filesystem +import io/promise def program(path1: String, path2: String) = { val f1 = promise(box { diff --git a/examples/stdlib/io/time.effekt b/examples/stdlib/io/time.effekt index 5b4186cec..808c996bd 100644 --- a/examples/stdlib/io/time.effekt +++ b/examples/stdlib/io/time.effekt @@ -1,18 +1,19 @@ import io import io/time +import io/promise def main() = { val p1 = promise(box { println("Start p1"); - wait(250) + sleep(250) println("Stop p1"); 1 }) val p2 = promise(box { println("Start p2"); - wait(150) + sleep(150) println("Stop p2"); 2 }) From bd4b21f39b10b5c890e21fd41fa30c01d76ecb7b Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Thu, 12 Jun 2025 16:07:25 +0200 Subject: [PATCH 09/22] Add promises to acme --- examples/stdlib/acme.effekt | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/stdlib/acme.effekt b/examples/stdlib/acme.effekt index cffcd9178..838760ecb 100644 --- a/examples/stdlib/acme.effekt +++ b/examples/stdlib/acme.effekt @@ -17,6 +17,7 @@ import io/console import io/error import io/filesystem import io/network +import io/promise import io/time import json import list From 34202cceac0e7b09cb367fdd97f702c13db738b8 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Thu, 12 Jun 2025 16:13:38 +0200 Subject: [PATCH 10/22] Fix typo --- libraries/common/io/promise.effekt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/common/io/promise.effekt b/libraries/common/io/promise.effekt index 92ff7864d..ec8edf9d5 100644 --- a/libraries/common/io/promise.effekt +++ b/libraries/common/io/promise.effekt @@ -78,7 +78,7 @@ extern {async, global} def await[T](promise: Promise[T]): T = return value case Pending(futures) => val future = future::allocate() - // TODO user reference.set and future.wait + // TODO use reference.set and future.wait set(reference, Pending(Cons(future, futures))) wait(future) } From 7e7d7a1a6717a5cd9b9bc00686ab5398566a1b48 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Fri, 13 Jun 2025 10:50:01 +0200 Subject: [PATCH 11/22] Rename and move futures to channels --- examples/stdlib/acme.effekt | 1 + libraries/common/io.effekt | 1 - libraries/common/io/channel.effekt | 37 +++++++++++++++++ libraries/common/io/promise.effekt | 51 +++++------------------ libraries/llvm/io.c | 66 +++++++++++++++--------------- 5 files changed, 81 insertions(+), 75 deletions(-) create mode 100644 libraries/common/io/channel.effekt diff --git a/examples/stdlib/acme.effekt b/examples/stdlib/acme.effekt index 838760ecb..10d37d338 100644 --- a/examples/stdlib/acme.effekt +++ b/examples/stdlib/acme.effekt @@ -13,6 +13,7 @@ import effekt import exception import heap import io +import io/channel import io/console import io/error import io/filesystem diff --git a/libraries/common/io.effekt b/libraries/common/io.effekt index 460e0b1d8..45f5db274 100644 --- a/libraries/common/io.effekt +++ b/libraries/common/io.effekt @@ -1,6 +1,5 @@ module io -import ref // Event Loop // ---------- diff --git a/libraries/common/io/channel.effekt b/libraries/common/io/channel.effekt new file mode 100644 index 000000000..7598c2542 --- /dev/null +++ b/libraries/common/io/channel.effekt @@ -0,0 +1,37 @@ +module io/channel + +// Channels +// -------- + +/// Must be sended exactly once and waited exactly once +extern type Channel[T] + +namespace channel { + extern global def allocate[T](): Channel[T] = + llvm """ + %channel = call %Pos @c_channel_make() + ret %Pos %channel + """ +} + +extern global def send[T](channel: Channel[T], value: T): Unit = + llvm """ + call void @c_channel_send(%Pos ${channel}, %Pos ${value}) + ret %Pos zeroinitializer + """ + +extern async def wait[T](channel: Channel[T]): T = + llvm """ + call void @c_channel_wait(%Pos ${channel}, %Stack %stack) + ret void + """ + +extern llvm """ + declare %Pos @c_channel_make() + declare void @c_channel_send(%Pos, %Pos) + declare void @c_channel_wait(%Pos, %Stack) +""" + + + + diff --git a/libraries/common/io/promise.effekt b/libraries/common/io/promise.effekt index ec8edf9d5..de19368b8 100644 --- a/libraries/common/io/promise.effekt +++ b/libraries/common/io/promise.effekt @@ -1,45 +1,14 @@ module io/promise import io - -// Futures -// ------- - -/// Must be filled exactly once and waited exactly once -extern type Future[T] - -namespace future { - extern global def allocate[T](): Future[T] = - llvm """ - %future = call %Pos @c_future_make() - ret %Pos %future - """ -} - -extern global def fill[T](future: Future[T], value: T): Unit = - llvm """ - call void @c_future_fill(%Pos ${future}, %Pos ${value}) - ret %Pos zeroinitializer - """ - -extern async def wait[T](future: Future[T]): T = - llvm """ - call void @c_future_wait(%Pos ${future}, %Stack %stack) - ret void - """ - -extern llvm """ - declare %Pos @c_future_make() - declare void @c_future_fill(%Pos, %Pos) - declare void @c_future_wait(%Pos, %Stack) -""" +import io/channel // Promises // -------- type State[T] { Resolved(value: T) - Pending(futures: List[Future[T]]) + Pending(channels: List[Channel[T]]) } extern type Promise[T] @@ -76,11 +45,11 @@ extern {async, global} def await[T](promise: Promise[T]): T = get(reference) match { case Resolved(value) => return value - case Pending(futures) => - val future = future::allocate() - // TODO use reference.set and future.wait - set(reference, Pending(Cons(future, futures))) - wait(future) + case Pending(channels) => + val channel = channel::allocate() + // TODO use reference.set and channel.wait + set(reference, Pending(Cons(channel, channels))) + wait(channel) } } @@ -92,11 +61,11 @@ extern {io, global} def resolve[T](promise: Promise[T], value: T): Unit = get(reference) match { case Resolved(value) => panic("ERROR: Promise already resolved") - case Pending(futures) => + case Pending(channels) => // TODO use reference.set set(reference, Resolved(value)) - // TODO use futures.reverse.foreach - foreach(reverse(futures)) { future => fill(future, value) } + // TODO use channels.reverse.foreach + foreach(reverse(channels)) { channel => send(channel, value) } } } diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index ce69f5619..34abac850 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -295,95 +295,95 @@ void c_yield(Stack stack) { c_timer_start(0, stack); } -// Futures -// ------- +// Channels +// -------- -typedef enum { EMPTY, FILLED, WAITED } future_state_t; +typedef enum { EMPTY, SENDED, WAITED } channel_state_t; typedef struct { uint64_t rc; void* eraser; - future_state_t state; + channel_state_t state; union { struct Pos value; Stack stack; } payload; -} Future; +} Channel; -void c_future_erase(void *envPtr) { - // envPtr points to a Future _after_ the eraser, so let's adjust it to point to the beginning. - Future *future = (Future*) (envPtr - offsetof(Future, state)); - future_state_t state = future->state; +void c_channel_erase(void *envPtr) { + // envPtr points to a Channel _after_ the eraser, so let's adjust it to point to the beginning. + Channel *channel = (Channel*) (envPtr - offsetof(Channel, state)); + channel_state_t state = channel->state; switch (state) { case EMPTY: break; - case FILLED: - erasePositive(future->payload.value); + case SENDED: + erasePositive(channel->payload.value); break; case WAITED: - eraseStack(future->payload.stack); + eraseStack(channel->payload.stack); break; } } -struct Pos c_future_make() { - Future* future = (Future*)malloc(sizeof(Future)); +struct Pos c_channel_make() { + Channel* channel = (Channel*)malloc(sizeof(Channel)); - future->rc = 0; - future->eraser = c_future_erase; - future->state = EMPTY; + channel->rc = 0; + channel->eraser = c_channel_erase; + channel->state = EMPTY; - return (struct Pos) { .tag = 0, .obj = future, }; + return (struct Pos) { .tag = 0, .obj = channel, }; } -void c_future_fill(struct Pos future, struct Pos value) { - Future* f = (Future*)future.obj; +void c_channel_send(struct Pos channel, struct Pos value) { + Channel* f = (Channel*)channel.obj; switch (f->state) { case EMPTY: { - f->state = FILLED; + f->state = SENDED; f->payload.value = value; - erasePositive(future); + erasePositive(channel); break; } - case FILLED: { - erasePositive(future); + case SENDED: { + erasePositive(channel); erasePositive(value); // TODO more graceful panic - fprintf(stderr, "ERROR: Future already filled\n"); + fprintf(stderr, "ERROR: Channel already sended\n"); exit(1); break; } case WAITED: { Stack stack = f->payload.stack; f->state = EMPTY; - erasePositive(future); + erasePositive(channel); resume_Pos(stack, value); break; } } } -void c_future_wait(struct Pos future, Stack stack) { - Future* f = (Future*)future.obj; +void c_channel_wait(struct Pos channel, Stack stack) { + Channel* f = (Channel*)channel.obj; switch (f->state) { case EMPTY: { f->state = WAITED; f->payload.stack = stack; - erasePositive(future); + erasePositive(channel); break; } - case FILLED: { + case SENDED: { struct Pos value = f->payload.value; f->state = EMPTY; - erasePositive(future); + erasePositive(channel); resume_Pos(stack, value); break; } case WAITED: { - erasePositive(future); + erasePositive(channel); eraseStack(stack); // TODO more graceful panic - fprintf(stderr, "ERROR: Future already waited\n"); + fprintf(stderr, "ERROR: Channel already waited\n"); exit(1); break; } From 637b0abdf4f74a135e698f4c9a94a991c52221cf Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Tue, 17 Jun 2025 09:12:31 +0200 Subject: [PATCH 12/22] Fix grammar --- libraries/common/io/channel.effekt | 2 +- libraries/llvm/io.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/libraries/common/io/channel.effekt b/libraries/common/io/channel.effekt index 7598c2542..c87cc841a 100644 --- a/libraries/common/io/channel.effekt +++ b/libraries/common/io/channel.effekt @@ -3,7 +3,7 @@ module io/channel // Channels // -------- -/// Must be sended exactly once and waited exactly once +/// Must sent to exactly once and waited for exactly once extern type Channel[T] namespace channel { diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 34abac850..60def11c1 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -349,7 +349,7 @@ void c_channel_send(struct Pos channel, struct Pos value) { erasePositive(channel); erasePositive(value); // TODO more graceful panic - fprintf(stderr, "ERROR: Channel already sended\n"); + fprintf(stderr, "ERROR: Channel already used for sending\n"); exit(1); break; } @@ -383,7 +383,7 @@ void c_channel_wait(struct Pos channel, Stack stack) { erasePositive(channel); eraseStack(stack); // TODO more graceful panic - fprintf(stderr, "ERROR: Channel already waited\n"); + fprintf(stderr, "ERROR: Channel already used for waiting\n"); exit(1); break; } From 5aa21969ee538d6855ce7dcd5e9ad8755724f3ea Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Tue, 29 Jul 2025 11:42:19 +0200 Subject: [PATCH 13/22] Rename Channel to Signal --- examples/stdlib/acme.effekt | 2 +- libraries/common/io/channel.effekt | 37 -------------------- libraries/common/io/promise.effekt | 20 +++++------ libraries/common/io/signal.effekt | 37 ++++++++++++++++++++ libraries/llvm/io.c | 56 +++++++++++++++--------------- 5 files changed, 76 insertions(+), 76 deletions(-) delete mode 100644 libraries/common/io/channel.effekt create mode 100644 libraries/common/io/signal.effekt diff --git a/examples/stdlib/acme.effekt b/examples/stdlib/acme.effekt index 10d37d338..f3208add3 100644 --- a/examples/stdlib/acme.effekt +++ b/examples/stdlib/acme.effekt @@ -13,12 +13,12 @@ import effekt import exception import heap import io -import io/channel import io/console import io/error import io/filesystem import io/network import io/promise +import io/signal import io/time import json import list diff --git a/libraries/common/io/channel.effekt b/libraries/common/io/channel.effekt deleted file mode 100644 index c87cc841a..000000000 --- a/libraries/common/io/channel.effekt +++ /dev/null @@ -1,37 +0,0 @@ -module io/channel - -// Channels -// -------- - -/// Must sent to exactly once and waited for exactly once -extern type Channel[T] - -namespace channel { - extern global def allocate[T](): Channel[T] = - llvm """ - %channel = call %Pos @c_channel_make() - ret %Pos %channel - """ -} - -extern global def send[T](channel: Channel[T], value: T): Unit = - llvm """ - call void @c_channel_send(%Pos ${channel}, %Pos ${value}) - ret %Pos zeroinitializer - """ - -extern async def wait[T](channel: Channel[T]): T = - llvm """ - call void @c_channel_wait(%Pos ${channel}, %Stack %stack) - ret void - """ - -extern llvm """ - declare %Pos @c_channel_make() - declare void @c_channel_send(%Pos, %Pos) - declare void @c_channel_wait(%Pos, %Stack) -""" - - - - diff --git a/libraries/common/io/promise.effekt b/libraries/common/io/promise.effekt index de19368b8..0887382b2 100644 --- a/libraries/common/io/promise.effekt +++ b/libraries/common/io/promise.effekt @@ -1,14 +1,14 @@ module io/promise import io -import io/channel +import io/signal // Promises // -------- type State[T] { Resolved(value: T) - Pending(channels: List[Channel[T]]) + Pending(signals: List[Signal[T]]) } extern type Promise[T] @@ -45,11 +45,11 @@ extern {async, global} def await[T](promise: Promise[T]): T = get(reference) match { case Resolved(value) => return value - case Pending(channels) => - val channel = channel::allocate() - // TODO use reference.set and channel.wait - set(reference, Pending(Cons(channel, channels))) - wait(channel) + case Pending(signals) => + val signal = signal::allocate() + // TODO use reference.set and signal.wait + set(reference, Pending(Cons(signal, signals))) + wait(signal) } } @@ -61,11 +61,11 @@ extern {io, global} def resolve[T](promise: Promise[T], value: T): Unit = get(reference) match { case Resolved(value) => panic("ERROR: Promise already resolved") - case Pending(channels) => + case Pending(signals) => // TODO use reference.set set(reference, Resolved(value)) - // TODO use channels.reverse.foreach - foreach(reverse(channels)) { channel => send(channel, value) } + // TODO use signals.reverse.foreach + foreach(reverse(signals)) { signal => send(signal, value) } } } diff --git a/libraries/common/io/signal.effekt b/libraries/common/io/signal.effekt new file mode 100644 index 000000000..4f8d20693 --- /dev/null +++ b/libraries/common/io/signal.effekt @@ -0,0 +1,37 @@ +module io/signal + +// Signals +// ------- + +/// Must be sent exactly once and waited for exactly once +extern type Signal[T] + +namespace signal { + extern global def allocate[T](): Signal[T] = + llvm """ + %signal = call %Pos @c_signal_make() + ret %Pos %signal + """ +} + +extern global def send[T](signal: Signal[T], value: T): Unit = + llvm """ + call void @c_signal_send(%Pos ${signal}, %Pos ${value}) + ret %Pos zeroinitializer + """ + +extern async def wait[T](signal: Signal[T]): T = + llvm """ + call void @c_signal_wait(%Pos ${signal}, %Stack %stack) + ret void + """ + +extern llvm """ + declare %Pos @c_signal_make() + declare void @c_signal_send(%Pos, %Pos) + declare void @c_signal_wait(%Pos, %Stack) +""" + + + + diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 60def11c1..0c77587fc 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -295,95 +295,95 @@ void c_yield(Stack stack) { c_timer_start(0, stack); } -// Channels +// Signals // -------- -typedef enum { EMPTY, SENDED, WAITED } channel_state_t; +typedef enum { EMPTY, SENDED, WAITED } signal_state_t; typedef struct { uint64_t rc; void* eraser; - channel_state_t state; + signal_state_t state; union { struct Pos value; Stack stack; } payload; -} Channel; +} Signal; -void c_channel_erase(void *envPtr) { - // envPtr points to a Channel _after_ the eraser, so let's adjust it to point to the beginning. - Channel *channel = (Channel*) (envPtr - offsetof(Channel, state)); - channel_state_t state = channel->state; +void c_signal_erase(void *envPtr) { + // envPtr points to a Signal _after_ the eraser, so let's adjust it to point to the beginning. + Signal *signal = (Signal*) (envPtr - offsetof(Signal, state)); + signal_state_t state = signal->state; switch (state) { case EMPTY: break; case SENDED: - erasePositive(channel->payload.value); + erasePositive(signal->payload.value); break; case WAITED: - eraseStack(channel->payload.stack); + eraseStack(signal->payload.stack); break; } } -struct Pos c_channel_make() { - Channel* channel = (Channel*)malloc(sizeof(Channel)); +struct Pos c_signal_make() { + Signal* signal = (Signal*)malloc(sizeof(Signal)); - channel->rc = 0; - channel->eraser = c_channel_erase; - channel->state = EMPTY; + signal->rc = 0; + signal->eraser = c_signal_erase; + signal->state = EMPTY; - return (struct Pos) { .tag = 0, .obj = channel, }; + return (struct Pos) { .tag = 0, .obj = signal, }; } -void c_channel_send(struct Pos channel, struct Pos value) { - Channel* f = (Channel*)channel.obj; +void c_signal_send(struct Pos signal, struct Pos value) { + Signal* f = (Signal*)signal.obj; switch (f->state) { case EMPTY: { f->state = SENDED; f->payload.value = value; - erasePositive(channel); + erasePositive(signal); break; } case SENDED: { - erasePositive(channel); + erasePositive(signal); erasePositive(value); // TODO more graceful panic - fprintf(stderr, "ERROR: Channel already used for sending\n"); + fprintf(stderr, "ERROR: Signal already used for sending\n"); exit(1); break; } case WAITED: { Stack stack = f->payload.stack; f->state = EMPTY; - erasePositive(channel); + erasePositive(signal); resume_Pos(stack, value); break; } } } -void c_channel_wait(struct Pos channel, Stack stack) { - Channel* f = (Channel*)channel.obj; +void c_signal_wait(struct Pos signal, Stack stack) { + Signal* f = (Signal*)signal.obj; switch (f->state) { case EMPTY: { f->state = WAITED; f->payload.stack = stack; - erasePositive(channel); + erasePositive(signal); break; } case SENDED: { struct Pos value = f->payload.value; f->state = EMPTY; - erasePositive(channel); + erasePositive(signal); resume_Pos(stack, value); break; } case WAITED: { - erasePositive(channel); + erasePositive(signal); eraseStack(stack); // TODO more graceful panic - fprintf(stderr, "ERROR: Channel already used for waiting\n"); + fprintf(stderr, "ERROR: Signal already used for waiting\n"); exit(1); break; } From ce117fb3cb5a023ae5c42dcd82d33a905034ad0e Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Mon, 4 Aug 2025 12:03:16 +0200 Subject: [PATCH 14/22] State after signal happened --- libraries/llvm/io.c | 48 +++++++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 0c77587fc..2b51bc97d 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -298,7 +298,7 @@ void c_yield(Stack stack) { // Signals // -------- -typedef enum { EMPTY, SENDED, WAITED } signal_state_t; +typedef enum { BEFORE, SENDING, WAITING, AFTER } signal_state_t; typedef struct { uint64_t rc; @@ -315,14 +315,16 @@ void c_signal_erase(void *envPtr) { Signal *signal = (Signal*) (envPtr - offsetof(Signal, state)); signal_state_t state = signal->state; switch (state) { - case EMPTY: + case BEFORE: break; - case SENDED: + case SENDING: erasePositive(signal->payload.value); break; - case WAITED: + case WAITING: eraseStack(signal->payload.stack); break; + case AFTER: + break; } } @@ -331,7 +333,7 @@ struct Pos c_signal_make() { signal->rc = 0; signal->eraser = c_signal_erase; - signal->state = EMPTY; + signal->state = BEFORE; return (struct Pos) { .tag = 0, .obj = signal, }; } @@ -339,54 +341,62 @@ struct Pos c_signal_make() { void c_signal_send(struct Pos signal, struct Pos value) { Signal* f = (Signal*)signal.obj; switch (f->state) { - case EMPTY: { - f->state = SENDED; + case BEFORE: { + f->state = SENDING; f->payload.value = value; erasePositive(signal); break; } - case SENDED: { - erasePositive(signal); - erasePositive(value); + case SENDING: { // TODO more graceful panic fprintf(stderr, "ERROR: Signal already used for sending\n"); exit(1); break; } - case WAITED: { + case WAITING: { Stack stack = f->payload.stack; - f->state = EMPTY; + f->state = AFTER; erasePositive(signal); resume_Pos(stack, value); break; } + case AFTER: { + // TODO more graceful panic + fprintf(stderr, "ERROR: Sending after signal happened\n"); + exit(1); + break; + } } } void c_signal_wait(struct Pos signal, Stack stack) { Signal* f = (Signal*)signal.obj; switch (f->state) { - case EMPTY: { - f->state = WAITED; + case BEFORE: { + f->state = WAITING; f->payload.stack = stack; erasePositive(signal); break; } - case SENDED: { + case SENDING: { struct Pos value = f->payload.value; - f->state = EMPTY; + f->state = AFTER; erasePositive(signal); resume_Pos(stack, value); break; } - case WAITED: { - erasePositive(signal); - eraseStack(stack); + case WAITING: { // TODO more graceful panic fprintf(stderr, "ERROR: Signal already used for waiting\n"); exit(1); break; } + case AFTER: { + // TODO more graceful panic + fprintf(stderr, "ERROR: Waiting after signal happened\n"); + exit(1); + break; + } } } From 4951300e217f21747e8f414ea03d61b664f2b4f9 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Sat, 16 Aug 2025 11:38:14 +0200 Subject: [PATCH 15/22] Mark low-level primitives as unsafe --- libraries/common/io/promise.effekt | 4 ++-- libraries/common/io/signal.effekt | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/libraries/common/io/promise.effekt b/libraries/common/io/promise.effekt index 0887382b2..0080680e1 100644 --- a/libraries/common/io/promise.effekt +++ b/libraries/common/io/promise.effekt @@ -49,7 +49,7 @@ extern {async, global} def await[T](promise: Promise[T]): T = val signal = signal::allocate() // TODO use reference.set and signal.wait set(reference, Pending(Cons(signal, signals))) - wait(signal) + unsafeWait(signal) } } @@ -65,7 +65,7 @@ extern {io, global} def resolve[T](promise: Promise[T], value: T): Unit = // TODO use reference.set set(reference, Resolved(value)) // TODO use signals.reverse.foreach - foreach(reverse(signals)) { signal => send(signal, value) } + foreach(reverse(signals)) { signal => unsafeSend(signal, value) } } } diff --git a/libraries/common/io/signal.effekt b/libraries/common/io/signal.effekt index 4f8d20693..91ffe79fd 100644 --- a/libraries/common/io/signal.effekt +++ b/libraries/common/io/signal.effekt @@ -14,13 +14,13 @@ namespace signal { """ } -extern global def send[T](signal: Signal[T], value: T): Unit = +extern global def unsafeSend[T](signal: Signal[T], value: T): Unit = llvm """ call void @c_signal_send(%Pos ${signal}, %Pos ${value}) ret %Pos zeroinitializer """ -extern async def wait[T](signal: Signal[T]): T = +extern async def unsafeWait[T](signal: Signal[T]): T = llvm """ call void @c_signal_wait(%Pos ${signal}, %Stack %stack) ret void From 05a0c9c8d3fbf10d4ebda8082ce88acd527104fd Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Sat, 16 Aug 2025 12:58:06 +0200 Subject: [PATCH 16/22] Start considering channels --- .../input_output/sender_receiver.check | 1 + .../input_output/sender_receiver.effekt | 25 ++++++++++++ libraries/common/io/channel.effekt | 38 +++++++++++++++++++ libraries/llvm/io.c | 1 + 4 files changed, 65 insertions(+) create mode 100644 examples/benchmarks/input_output/sender_receiver.check create mode 100644 examples/benchmarks/input_output/sender_receiver.effekt create mode 100644 libraries/common/io/channel.effekt diff --git a/examples/benchmarks/input_output/sender_receiver.check b/examples/benchmarks/input_output/sender_receiver.check new file mode 100644 index 000000000..9a037142a --- /dev/null +++ b/examples/benchmarks/input_output/sender_receiver.check @@ -0,0 +1 @@ +10 \ No newline at end of file diff --git a/examples/benchmarks/input_output/sender_receiver.effekt b/examples/benchmarks/input_output/sender_receiver.effekt new file mode 100644 index 000000000..01d70d43a --- /dev/null +++ b/examples/benchmarks/input_output/sender_receiver.effekt @@ -0,0 +1,25 @@ +import examples/benchmarks/runner + +import io +import io/channel + + +def run(n: Int) = { + val (sender, receiver) = channel::allocate() + + spawn(box { + each(0, n) { i => + sender.send(Some(i)) + } + sender.send(None()) + }) + + var s = 0 + while (receiver.receive() is Some(v)) { + s = s + v + } + return s +} + +def main() = benchmark(5){run} + diff --git a/libraries/common/io/channel.effekt b/libraries/common/io/channel.effekt new file mode 100644 index 000000000..f835b7f44 --- /dev/null +++ b/libraries/common/io/channel.effekt @@ -0,0 +1,38 @@ +module io/channel + +import io/signal + +// Based on: +// Dependent Session Protocols in Separation Logic from First Principles + +record River[A](value: A, river: Signal[River[A]]) + +type Sender[A] = Ref[Signal[River[A]]] + +type Receiver[A] = Ref[Signal[River[A]]] + +namespace channel { + def allocate[A](): (Sender[A], Receiver[A]) = { + val river = signal::allocate() + (ref(river), ref(river)) + } +} + +def send[A](sender: Sender[A], value: A): Unit = { + val rest = signal::allocate() + // TODO swap reference! + val signal = sender.get() + sender.set(rest) + signal.unsafeSend(River(value, rest)) +} + +def receive[A](receiver: Receiver[A]): A = { + // TODO this can crash when multiple tasks call receive + val signal = receiver.get() + val River(value, rest) = signal.unsafeWait() + receiver.set(rest) + return value +} + +// TODO streaming + diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 2b51bc97d..0650d30b9 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -357,6 +357,7 @@ void c_signal_send(struct Pos signal, struct Pos value) { Stack stack = f->payload.stack; f->state = AFTER; erasePositive(signal); + // TODO this overflows the C stack resume_Pos(stack, value); break; } From c9e657903e4e5e54c397759e3a5574ce458b24fd Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Sat, 16 Aug 2025 17:45:26 +0200 Subject: [PATCH 17/22] Fix multiple receives --- .../input_output/sender_receiver.effekt | 2 +- libraries/common/io/channel.effekt | 54 +++++++++++-------- 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/examples/benchmarks/input_output/sender_receiver.effekt b/examples/benchmarks/input_output/sender_receiver.effekt index 01d70d43a..7148c383e 100644 --- a/examples/benchmarks/input_output/sender_receiver.effekt +++ b/examples/benchmarks/input_output/sender_receiver.effekt @@ -15,7 +15,7 @@ def run(n: Int) = { }) var s = 0 - while (receiver.receive() is Some(v)) { + while (receiver.receive(()) is Some(v)) { s = s + v } return s diff --git a/libraries/common/io/channel.effekt b/libraries/common/io/channel.effekt index f835b7f44..9a0ca2931 100644 --- a/libraries/common/io/channel.effekt +++ b/libraries/common/io/channel.effekt @@ -2,36 +2,48 @@ module io/channel import io/signal -// Based on: -// Dependent Session Protocols in Separation Logic from First Principles +record Rendezvous[A, B](value: A, other: Signal[B]) -record River[A](value: A, river: Signal[River[A]]) +record Node[A, B](this: Signal[Rendezvous[A, B]], rest: Ref[Option[Node[A, B]]]) -type Sender[A] = Ref[Signal[River[A]]] - -type Receiver[A] = Ref[Signal[River[A]]] +type Sender[A, B] = Ref[Node[A, B]] +type Receiver[A, B] = Ref[Node[A, B]] namespace channel { - def allocate[A](): (Sender[A], Receiver[A]) = { - val river = signal::allocate() - (ref(river), ref(river)) + def allocate[A, B](): (Sender[A, B], Receiver[A, B]) = { + val node = Node(signal::allocate(), ref(None())) + (ref(node), ref(node)) } } -def send[A](sender: Sender[A], value: A): Unit = { - val rest = signal::allocate() - // TODO swap reference! - val signal = sender.get() - sender.set(rest) - signal.unsafeSend(River(value, rest)) +def send[A, B](sender: Sender[A, B], value: A): B = { + val Node(this, rest) = sender.get() + rest.get() match { + case None() => + val node = Node(signal::allocate(), ref(None())) + rest.set(Some(node)) + sender.set(node) + case Some(node) => + sender.set(node) + } + val other = signal::allocate() + this.unsafeSend(Rendezvous(value, other)) + other.unsafeWait() } -def receive[A](receiver: Receiver[A]): A = { - // TODO this can crash when multiple tasks call receive - val signal = receiver.get() - val River(value, rest) = signal.unsafeWait() - receiver.set(rest) - return value +def receive[A, B](receiver: Receiver[A, B], value: B): A = { + val Node(this, rest) = receiver.get() + rest.get() match { + case None() => + val node = Node(signal::allocate(), ref(None())) + rest.set(Some(node)) + receiver.set(node) + case Some(node) => + receiver.set(node) + } + val Rendezvous(result, other) = this.unsafeWait() + other.unsafeSend(value) + return result } // TODO streaming From 751f4b059c1dbd580553bff65f39e84b733703f2 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Sat, 16 Aug 2025 17:54:25 +0200 Subject: [PATCH 18/22] Avoid stack overflow by yielding to trampoline --- libraries/common/io/signal.effekt | 6 +++--- libraries/llvm/io.c | 11 +++++++---- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/libraries/common/io/signal.effekt b/libraries/common/io/signal.effekt index 91ffe79fd..2da67e2d0 100644 --- a/libraries/common/io/signal.effekt +++ b/libraries/common/io/signal.effekt @@ -14,10 +14,10 @@ namespace signal { """ } -extern global def unsafeSend[T](signal: Signal[T], value: T): Unit = +extern async def unsafeSend[T](signal: Signal[T], value: T): Unit = llvm """ - call void @c_signal_send(%Pos ${signal}, %Pos ${value}) - ret %Pos zeroinitializer + call void @c_signal_send(%Pos ${signal}, %Pos ${value}, %Stack %stack) + ret void """ extern async def unsafeWait[T](signal: Signal[T]): T = diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 0650d30b9..a0f728ca6 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -338,13 +338,15 @@ struct Pos c_signal_make() { return (struct Pos) { .tag = 0, .obj = signal, }; } -void c_signal_send(struct Pos signal, struct Pos value) { +void c_signal_send(struct Pos signal, struct Pos value, Stack stack) { Signal* f = (Signal*)signal.obj; switch (f->state) { case BEFORE: { f->state = SENDING; f->payload.value = value; erasePositive(signal); + // TODO avoid stack overflow without yielding + c_yield(stack); break; } case SENDING: { @@ -354,11 +356,12 @@ void c_signal_send(struct Pos signal, struct Pos value) { break; } case WAITING: { - Stack stack = f->payload.stack; + Stack other = f->payload.stack; f->state = AFTER; erasePositive(signal); - // TODO this overflows the C stack - resume_Pos(stack, value); + // TODO avoid stack overflow without yielding + c_yield(stack); + resume_Pos(other, value); break; } case AFTER: { From f5b4a7d3d78373cb58651971ad0548cc53bd2484 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Sat, 16 Aug 2025 18:00:13 +0200 Subject: [PATCH 19/22] Add to acme --- examples/stdlib/acme.effekt | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/stdlib/acme.effekt b/examples/stdlib/acme.effekt index f3208add3..6dd25d713 100644 --- a/examples/stdlib/acme.effekt +++ b/examples/stdlib/acme.effekt @@ -13,6 +13,7 @@ import effekt import exception import heap import io +import io/channel import io/console import io/error import io/filesystem From 800013efcd1e15d64dbec6f157d7a96befb10b2d Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Fri, 22 Aug 2025 21:05:48 +0200 Subject: [PATCH 20/22] Make signals rendezvous --- libraries/common/io/channel.effekt | 17 ++++--- libraries/common/io/promise.effekt | 10 ++-- libraries/common/io/signal.effekt | 21 ++++----- libraries/llvm/io.c | 73 +++++++----------------------- 4 files changed, 39 insertions(+), 82 deletions(-) diff --git a/libraries/common/io/channel.effekt b/libraries/common/io/channel.effekt index 9a0ca2931..b5015f26a 100644 --- a/libraries/common/io/channel.effekt +++ b/libraries/common/io/channel.effekt @@ -1,10 +1,9 @@ module io/channel +import io import io/signal -record Rendezvous[A, B](value: A, other: Signal[B]) - -record Node[A, B](this: Signal[Rendezvous[A, B]], rest: Ref[Option[Node[A, B]]]) +record Node[A, B](this: Signal[A, B], rest: Ref[Option[Node[A, B]]]) type Sender[A, B] = Ref[Node[A, B]] type Receiver[A, B] = Ref[Node[A, B]] @@ -26,9 +25,9 @@ def send[A, B](sender: Sender[A, B], value: A): B = { case Some(node) => sender.set(node) } - val other = signal::allocate() - this.unsafeSend(Rendezvous(value, other)) - other.unsafeWait() + // TODO avoid yield + yield() + this.unsafeNotifyA(value) } def receive[A, B](receiver: Receiver[A, B], value: B): A = { @@ -41,9 +40,9 @@ def receive[A, B](receiver: Receiver[A, B], value: B): A = { case Some(node) => receiver.set(node) } - val Rendezvous(result, other) = this.unsafeWait() - other.unsafeSend(value) - return result + // TODO avoid yield + yield() + this.unsafeNotifyB(value) } // TODO streaming diff --git a/libraries/common/io/promise.effekt b/libraries/common/io/promise.effekt index 0080680e1..3917f2ac4 100644 --- a/libraries/common/io/promise.effekt +++ b/libraries/common/io/promise.effekt @@ -8,7 +8,7 @@ import io/signal type State[T] { Resolved(value: T) - Pending(signals: List[Signal[T]]) + Pending(signals: List[Signal[T, Unit]]) } extern type Promise[T] @@ -49,11 +49,12 @@ extern {async, global} def await[T](promise: Promise[T]): T = val signal = signal::allocate() // TODO use reference.set and signal.wait set(reference, Pending(Cons(signal, signals))) - unsafeWait(signal) + // TODO perhaps yield to avoid stack overflow + unsafeNotifyB(signal, ()) } } -extern {io, global} def resolve[T](promise: Promise[T], value: T): Unit = +extern {io, async, global} def resolve[T](promise: Promise[T], value: T): Unit = js "promise$resolve(${promise}, ${value})" llvm { val reference = toRefState(promise) @@ -64,8 +65,9 @@ extern {io, global} def resolve[T](promise: Promise[T], value: T): Unit = case Pending(signals) => // TODO use reference.set set(reference, Resolved(value)) + // TODO perhaps yield to avoid stack overflow // TODO use signals.reverse.foreach - foreach(reverse(signals)) { signal => unsafeSend(signal, value) } + foreach(reverse(signals)) { signal => unsafeNotifyA(signal, value) } } } diff --git a/libraries/common/io/signal.effekt b/libraries/common/io/signal.effekt index 2da67e2d0..ba96ec0b6 100644 --- a/libraries/common/io/signal.effekt +++ b/libraries/common/io/signal.effekt @@ -3,35 +3,32 @@ module io/signal // Signals // ------- -/// Must be sent exactly once and waited for exactly once -extern type Signal[T] +/// Must be notified exactly once by A and exactly once by B. +/// Must yield before notifying or risking stack overflow. +extern type Signal[A, B] namespace signal { - extern global def allocate[T](): Signal[T] = + extern global def allocate[A, B](): Signal[A, B] = llvm """ %signal = call %Pos @c_signal_make() ret %Pos %signal """ } -extern async def unsafeSend[T](signal: Signal[T], value: T): Unit = +extern async def unsafeNotifyA[A, B](signal: Signal[A, B], value: A): B = llvm """ - call void @c_signal_send(%Pos ${signal}, %Pos ${value}, %Stack %stack) + call void @c_signal_notify(%Pos ${signal}, %Pos ${value}, %Stack %stack) ret void """ -extern async def unsafeWait[T](signal: Signal[T]): T = +extern async def unsafeNotifyB[A, B](signal: Signal[A, B], value: B): A = llvm """ - call void @c_signal_wait(%Pos ${signal}, %Stack %stack) + call void @c_signal_notify(%Pos ${signal}, %Pos ${value}, %Stack %stack) ret void """ extern llvm """ declare %Pos @c_signal_make() - declare void @c_signal_send(%Pos, %Pos) - declare void @c_signal_wait(%Pos, %Stack) + declare void @c_signal_notify(%Pos, %Pos, %Stack) """ - - - diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index a0f728ca6..89807df7c 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -298,30 +298,27 @@ void c_yield(Stack stack) { // Signals // -------- -typedef enum { BEFORE, SENDING, WAITING, AFTER } signal_state_t; +typedef enum { BEFORE, NOTIFIED, AFTER } signal_state_t; typedef struct { uint64_t rc; void* eraser; signal_state_t state; - union { - struct Pos value; - Stack stack; - } payload; + struct Pos value; + Stack stack; } Signal; void c_signal_erase(void *envPtr) { // envPtr points to a Signal _after_ the eraser, so let's adjust it to point to the beginning. Signal *signal = (Signal*) (envPtr - offsetof(Signal, state)); + // TODO exploit value and stack being NULL signal_state_t state = signal->state; switch (state) { case BEFORE: break; - case SENDING: - erasePositive(signal->payload.value); - break; - case WAITING: - eraseStack(signal->payload.stack); + case NOTIFIED: + erasePositive(signal->value); + eraseStack(signal->stack); break; case AFTER: break; @@ -338,30 +335,23 @@ struct Pos c_signal_make() { return (struct Pos) { .tag = 0, .obj = signal, }; } -void c_signal_send(struct Pos signal, struct Pos value, Stack stack) { +void c_signal_notify(struct Pos signal, struct Pos value, Stack stack) { Signal* f = (Signal*)signal.obj; switch (f->state) { case BEFORE: { - f->state = SENDING; - f->payload.value = value; + f->state = NOTIFIED; + f->value = value; + f->stack = stack; erasePositive(signal); - // TODO avoid stack overflow without yielding - c_yield(stack); break; } - case SENDING: { - // TODO more graceful panic - fprintf(stderr, "ERROR: Signal already used for sending\n"); - exit(1); - break; - } - case WAITING: { - Stack other = f->payload.stack; + case NOTIFIED: { + struct Pos other_value = f->value; + Stack other_stack = f->stack; f->state = AFTER; erasePositive(signal); - // TODO avoid stack overflow without yielding - c_yield(stack); - resume_Pos(other, value); + resume_Pos(other_stack, value); + resume_Pos(stack, other_value); break; } case AFTER: { @@ -373,35 +363,4 @@ void c_signal_send(struct Pos signal, struct Pos value, Stack stack) { } } -void c_signal_wait(struct Pos signal, Stack stack) { - Signal* f = (Signal*)signal.obj; - switch (f->state) { - case BEFORE: { - f->state = WAITING; - f->payload.stack = stack; - erasePositive(signal); - break; - } - case SENDING: { - struct Pos value = f->payload.value; - f->state = AFTER; - erasePositive(signal); - resume_Pos(stack, value); - break; - } - case WAITING: { - // TODO more graceful panic - fprintf(stderr, "ERROR: Signal already used for waiting\n"); - exit(1); - break; - } - case AFTER: { - // TODO more graceful panic - fprintf(stderr, "ERROR: Waiting after signal happened\n"); - exit(1); - break; - } - } -} - #endif From eb9578453d9dfdb31a4da30bf053e79b72d31f8d Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Fri, 22 Aug 2025 21:48:12 +0200 Subject: [PATCH 21/22] Further simplify signal code --- libraries/llvm/io.c | 39 +++++++++++++-------------------------- 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/libraries/llvm/io.c b/libraries/llvm/io.c index 89807df7c..9b67b3764 100644 --- a/libraries/llvm/io.c +++ b/libraries/llvm/io.c @@ -298,7 +298,7 @@ void c_yield(Stack stack) { // Signals // -------- -typedef enum { BEFORE, NOTIFIED, AFTER } signal_state_t; +typedef enum { INITIAL, NOTIFIED } signal_state_t; typedef struct { uint64_t rc; @@ -311,34 +311,26 @@ typedef struct { void c_signal_erase(void *envPtr) { // envPtr points to a Signal _after_ the eraser, so let's adjust it to point to the beginning. Signal *signal = (Signal*) (envPtr - offsetof(Signal, state)); - // TODO exploit value and stack being NULL - signal_state_t state = signal->state; - switch (state) { - case BEFORE: - break; - case NOTIFIED: - erasePositive(signal->value); - eraseStack(signal->stack); - break; - case AFTER: - break; - } + erasePositive(signal->value); + if (signal->stack != NULL) { eraseStack(signal->stack); } } struct Pos c_signal_make() { - Signal* signal = (Signal*)malloc(sizeof(Signal)); + Signal* f = (Signal*)malloc(sizeof(Signal)); - signal->rc = 0; - signal->eraser = c_signal_erase; - signal->state = BEFORE; + f->rc = 0; + f->eraser = c_signal_erase; + f->state = INITIAL; + f->value = (struct Pos) { .tag = 0, .obj = NULL, }; + f->stack = NULL; - return (struct Pos) { .tag = 0, .obj = signal, }; + return (struct Pos) { .tag = 0, .obj = f, }; } void c_signal_notify(struct Pos signal, struct Pos value, Stack stack) { Signal* f = (Signal*)signal.obj; switch (f->state) { - case BEFORE: { + case INITIAL: { f->state = NOTIFIED; f->value = value; f->stack = stack; @@ -347,19 +339,14 @@ void c_signal_notify(struct Pos signal, struct Pos value, Stack stack) { } case NOTIFIED: { struct Pos other_value = f->value; + f->value = (struct Pos) { .tag = 0, .obj = NULL, }; Stack other_stack = f->stack; - f->state = AFTER; + f->stack = NULL; erasePositive(signal); resume_Pos(other_stack, value); resume_Pos(stack, other_value); break; } - case AFTER: { - // TODO more graceful panic - fprintf(stderr, "ERROR: Sending after signal happened\n"); - exit(1); - break; - } } } From a8ef69b6844048969a3e61bd0b379bfa99f51f82 Mon Sep 17 00:00:00 2001 From: Philipp Schuster Date: Mon, 25 Aug 2025 09:04:55 +0200 Subject: [PATCH 22/22] Signal closing by state --- examples/benchmarks/input_output/sender_receiver.effekt | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/examples/benchmarks/input_output/sender_receiver.effekt b/examples/benchmarks/input_output/sender_receiver.effekt index 7148c383e..913422e6b 100644 --- a/examples/benchmarks/input_output/sender_receiver.effekt +++ b/examples/benchmarks/input_output/sender_receiver.effekt @@ -6,17 +6,18 @@ import io/channel def run(n: Int) = { val (sender, receiver) = channel::allocate() + val closed = ref(false) spawn(box { each(0, n) { i => - sender.send(Some(i)) + sender.send(i) } - sender.send(None()) + closed.set(true) }) var s = 0 - while (receiver.receive(()) is Some(v)) { - s = s + v + while (not(closed.get())) { + s = s + receiver.receive(()) } return s }