Skip to content

Commit c9e6579

Browse files
committed
Fix multiple receives
1 parent 05a0c9c commit c9e6579

File tree

2 files changed

+34
-22
lines changed

2 files changed

+34
-22
lines changed

examples/benchmarks/input_output/sender_receiver.effekt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def run(n: Int) = {
1515
})
1616

1717
var s = 0
18-
while (receiver.receive() is Some(v)) {
18+
while (receiver.receive(()) is Some(v)) {
1919
s = s + v
2020
}
2121
return s

libraries/common/io/channel.effekt

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,48 @@ module io/channel
22

33
import io/signal
44

5-
// Based on:
6-
// Dependent Session Protocols in Separation Logic from First Principles
5+
record Rendezvous[A, B](value: A, other: Signal[B])
76

8-
record River[A](value: A, river: Signal[River[A]])
7+
record Node[A, B](this: Signal[Rendezvous[A, B]], rest: Ref[Option[Node[A, B]]])
98

10-
type Sender[A] = Ref[Signal[River[A]]]
11-
12-
type Receiver[A] = Ref[Signal[River[A]]]
9+
type Sender[A, B] = Ref[Node[A, B]]
10+
type Receiver[A, B] = Ref[Node[A, B]]
1311

1412
namespace channel {
15-
def allocate[A](): (Sender[A], Receiver[A]) = {
16-
val river = signal::allocate()
17-
(ref(river), ref(river))
13+
def allocate[A, B](): (Sender[A, B], Receiver[A, B]) = {
14+
val node = Node(signal::allocate(), ref(None()))
15+
(ref(node), ref(node))
1816
}
1917
}
2018

21-
def send[A](sender: Sender[A], value: A): Unit = {
22-
val rest = signal::allocate()
23-
// TODO swap reference!
24-
val signal = sender.get()
25-
sender.set(rest)
26-
signal.unsafeSend(River(value, rest))
19+
def send[A, B](sender: Sender[A, B], value: A): B = {
20+
val Node(this, rest) = sender.get()
21+
rest.get() match {
22+
case None() =>
23+
val node = Node(signal::allocate(), ref(None()))
24+
rest.set(Some(node))
25+
sender.set(node)
26+
case Some(node) =>
27+
sender.set(node)
28+
}
29+
val other = signal::allocate()
30+
this.unsafeSend(Rendezvous(value, other))
31+
other.unsafeWait()
2732
}
2833

29-
def receive[A](receiver: Receiver[A]): A = {
30-
// TODO this can crash when multiple tasks call receive
31-
val signal = receiver.get()
32-
val River(value, rest) = signal.unsafeWait()
33-
receiver.set(rest)
34-
return value
34+
def receive[A, B](receiver: Receiver[A, B], value: B): A = {
35+
val Node(this, rest) = receiver.get()
36+
rest.get() match {
37+
case None() =>
38+
val node = Node(signal::allocate(), ref(None()))
39+
rest.set(Some(node))
40+
receiver.set(node)
41+
case Some(node) =>
42+
receiver.set(node)
43+
}
44+
val Rendezvous(result, other) = this.unsafeWait()
45+
other.unsafeSend(value)
46+
return result
3547
}
3648

3749
// TODO streaming

0 commit comments

Comments
 (0)