Skip to content

Commit d45974c

Browse files
committed
Start considering channels
1 parent 193b8bc commit d45974c

File tree

4 files changed

+65
-0
lines changed

4 files changed

+65
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
10
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import examples/benchmarks/runner
2+
3+
import io
4+
import io/channel
5+
6+
7+
def run(n: Int) = {
8+
val (sender, receiver) = channel::allocate()
9+
10+
spawn(box {
11+
each(0, n) { i =>
12+
sender.send(Some(i))
13+
}
14+
sender.send(None())
15+
})
16+
17+
var s = 0
18+
while (receiver.receive() is Some(v)) {
19+
s = s + v
20+
}
21+
return s
22+
}
23+
24+
def main() = benchmark(5){run}
25+

libraries/common/io/channel.effekt

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
module io/channel
2+
3+
import io/signal
4+
5+
// Based on:
6+
// Dependent Session Protocols in Separation Logic from First Principles
7+
8+
record River[A](value: A, river: Signal[River[A]])
9+
10+
type Sender[A] = Ref[Signal[River[A]]]
11+
12+
type Receiver[A] = Ref[Signal[River[A]]]
13+
14+
namespace channel {
15+
def allocate[A](): (Sender[A], Receiver[A]) = {
16+
val river = signal::allocate()
17+
(ref(river), ref(river))
18+
}
19+
}
20+
21+
def send[A](sender: Sender[A], value: A): Unit = {
22+
val rest = signal::allocate()
23+
// TODO swap reference!
24+
val signal = sender.get()
25+
sender.set(rest)
26+
signal.unsafeSend(River(value, rest))
27+
}
28+
29+
def receive[A](receiver: Receiver[A]): A = {
30+
// TODO this can crash when multiple tasks call receive
31+
val signal = receiver.get()
32+
val River(value, rest) = signal.unsafeWait()
33+
receiver.set(rest)
34+
return value
35+
}
36+
37+
// TODO streaming
38+

libraries/llvm/io.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ void c_signal_send(struct Pos signal, struct Pos value) {
357357
Stack stack = f->payload.stack;
358358
f->state = AFTER;
359359
erasePositive(signal);
360+
// TODO this overflows the C stack
360361
resume_Pos(stack, value);
361362
break;
362363
}

0 commit comments

Comments
 (0)