Skip to content

Commit 800013e

Browse files
committed
Make signals rendezvous
1 parent f5b4a7d commit 800013e

File tree

4 files changed

+39
-82
lines changed

4 files changed

+39
-82
lines changed

libraries/common/io/channel.effekt

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
module io/channel
22

3+
import io
34
import io/signal
45

5-
record Rendezvous[A, B](value: A, other: Signal[B])
6-
7-
record Node[A, B](this: Signal[Rendezvous[A, B]], rest: Ref[Option[Node[A, B]]])
6+
record Node[A, B](this: Signal[A, B], rest: Ref[Option[Node[A, B]]])
87

98
type Sender[A, B] = Ref[Node[A, B]]
109
type Receiver[A, B] = Ref[Node[A, B]]
@@ -26,9 +25,9 @@ def send[A, B](sender: Sender[A, B], value: A): B = {
2625
case Some(node) =>
2726
sender.set(node)
2827
}
29-
val other = signal::allocate()
30-
this.unsafeSend(Rendezvous(value, other))
31-
other.unsafeWait()
28+
// TODO avoid yield
29+
yield()
30+
this.unsafeNotifyA(value)
3231
}
3332

3433
def receive[A, B](receiver: Receiver[A, B], value: B): A = {
@@ -41,9 +40,9 @@ def receive[A, B](receiver: Receiver[A, B], value: B): A = {
4140
case Some(node) =>
4241
receiver.set(node)
4342
}
44-
val Rendezvous(result, other) = this.unsafeWait()
45-
other.unsafeSend(value)
46-
return result
43+
// TODO avoid yield
44+
yield()
45+
this.unsafeNotifyB(value)
4746
}
4847

4948
// TODO streaming

libraries/common/io/promise.effekt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import io/signal
88

99
type State[T] {
1010
Resolved(value: T)
11-
Pending(signals: List[Signal[T]])
11+
Pending(signals: List[Signal[T, Unit]])
1212
}
1313

1414
extern type Promise[T]
@@ -49,11 +49,12 @@ extern {async, global} def await[T](promise: Promise[T]): T =
4949
val signal = signal::allocate()
5050
// TODO use reference.set and signal.wait
5151
set(reference, Pending(Cons(signal, signals)))
52-
unsafeWait(signal)
52+
// TODO perhaps yield to avoid stack overflow
53+
unsafeNotifyB(signal, ())
5354
}
5455
}
5556

56-
extern {io, global} def resolve[T](promise: Promise[T], value: T): Unit =
57+
extern {io, async, global} def resolve[T](promise: Promise[T], value: T): Unit =
5758
js "promise$resolve(${promise}, ${value})"
5859
llvm {
5960
val reference = toRefState(promise)
@@ -64,8 +65,9 @@ extern {io, global} def resolve[T](promise: Promise[T], value: T): Unit =
6465
case Pending(signals) =>
6566
// TODO use reference.set
6667
set(reference, Resolved(value))
68+
// TODO perhaps yield to avoid stack overflow
6769
// TODO use signals.reverse.foreach
68-
foreach(reverse(signals)) { signal => unsafeSend(signal, value) }
70+
foreach(reverse(signals)) { signal => unsafeNotifyA(signal, value) }
6971
}
7072
}
7173

libraries/common/io/signal.effekt

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,32 @@ module io/signal
33
// Signals
44
// -------
55

6-
/// Must be sent exactly once and waited for exactly once
7-
extern type Signal[T]
6+
/// Must be notified exactly once by A and exactly once by B.
7+
/// Must yield before notifying or risking stack overflow.
8+
extern type Signal[A, B]
89

910
namespace signal {
10-
extern global def allocate[T](): Signal[T] =
11+
extern global def allocate[A, B](): Signal[A, B] =
1112
llvm """
1213
%signal = call %Pos @c_signal_make()
1314
ret %Pos %signal
1415
"""
1516
}
1617

17-
extern async def unsafeSend[T](signal: Signal[T], value: T): Unit =
18+
extern async def unsafeNotifyA[A, B](signal: Signal[A, B], value: A): B =
1819
llvm """
19-
call void @c_signal_send(%Pos ${signal}, %Pos ${value}, %Stack %stack)
20+
call void @c_signal_notify(%Pos ${signal}, %Pos ${value}, %Stack %stack)
2021
ret void
2122
"""
2223

23-
extern async def unsafeWait[T](signal: Signal[T]): T =
24+
extern async def unsafeNotifyB[A, B](signal: Signal[A, B], value: B): A =
2425
llvm """
25-
call void @c_signal_wait(%Pos ${signal}, %Stack %stack)
26+
call void @c_signal_notify(%Pos ${signal}, %Pos ${value}, %Stack %stack)
2627
ret void
2728
"""
2829

2930
extern llvm """
3031
declare %Pos @c_signal_make()
31-
declare void @c_signal_send(%Pos, %Pos)
32-
declare void @c_signal_wait(%Pos, %Stack)
32+
declare void @c_signal_notify(%Pos, %Pos, %Stack)
3333
"""
3434

35-
36-
37-

libraries/llvm/io.c

Lines changed: 16 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -298,30 +298,27 @@ void c_yield(Stack stack) {
298298
// Signals
299299
// --------
300300

301-
typedef enum { BEFORE, SENDING, WAITING, AFTER } signal_state_t;
301+
typedef enum { BEFORE, NOTIFIED, AFTER } signal_state_t;
302302

303303
typedef struct {
304304
uint64_t rc;
305305
void* eraser;
306306
signal_state_t state;
307-
union {
308-
struct Pos value;
309-
Stack stack;
310-
} payload;
307+
struct Pos value;
308+
Stack stack;
311309
} Signal;
312310

313311
void c_signal_erase(void *envPtr) {
314312
// envPtr points to a Signal _after_ the eraser, so let's adjust it to point to the beginning.
315313
Signal *signal = (Signal*) (envPtr - offsetof(Signal, state));
314+
// TODO exploit value and stack being NULL
316315
signal_state_t state = signal->state;
317316
switch (state) {
318317
case BEFORE:
319318
break;
320-
case SENDING:
321-
erasePositive(signal->payload.value);
322-
break;
323-
case WAITING:
324-
eraseStack(signal->payload.stack);
319+
case NOTIFIED:
320+
erasePositive(signal->value);
321+
eraseStack(signal->stack);
325322
break;
326323
case AFTER:
327324
break;
@@ -338,30 +335,23 @@ struct Pos c_signal_make() {
338335
return (struct Pos) { .tag = 0, .obj = signal, };
339336
}
340337

341-
void c_signal_send(struct Pos signal, struct Pos value, Stack stack) {
338+
void c_signal_notify(struct Pos signal, struct Pos value, Stack stack) {
342339
Signal* f = (Signal*)signal.obj;
343340
switch (f->state) {
344341
case BEFORE: {
345-
f->state = SENDING;
346-
f->payload.value = value;
342+
f->state = NOTIFIED;
343+
f->value = value;
344+
f->stack = stack;
347345
erasePositive(signal);
348-
// TODO avoid stack overflow without yielding
349-
c_yield(stack);
350346
break;
351347
}
352-
case SENDING: {
353-
// TODO more graceful panic
354-
fprintf(stderr, "ERROR: Signal already used for sending\n");
355-
exit(1);
356-
break;
357-
}
358-
case WAITING: {
359-
Stack other = f->payload.stack;
348+
case NOTIFIED: {
349+
struct Pos other_value = f->value;
350+
Stack other_stack = f->stack;
360351
f->state = AFTER;
361352
erasePositive(signal);
362-
// TODO avoid stack overflow without yielding
363-
c_yield(stack);
364-
resume_Pos(other, value);
353+
resume_Pos(other_stack, value);
354+
resume_Pos(stack, other_value);
365355
break;
366356
}
367357
case AFTER: {
@@ -373,35 +363,4 @@ void c_signal_send(struct Pos signal, struct Pos value, Stack stack) {
373363
}
374364
}
375365

376-
void c_signal_wait(struct Pos signal, Stack stack) {
377-
Signal* f = (Signal*)signal.obj;
378-
switch (f->state) {
379-
case BEFORE: {
380-
f->state = WAITING;
381-
f->payload.stack = stack;
382-
erasePositive(signal);
383-
break;
384-
}
385-
case SENDING: {
386-
struct Pos value = f->payload.value;
387-
f->state = AFTER;
388-
erasePositive(signal);
389-
resume_Pos(stack, value);
390-
break;
391-
}
392-
case WAITING: {
393-
// TODO more graceful panic
394-
fprintf(stderr, "ERROR: Signal already used for waiting\n");
395-
exit(1);
396-
break;
397-
}
398-
case AFTER: {
399-
// TODO more graceful panic
400-
fprintf(stderr, "ERROR: Waiting after signal happened\n");
401-
exit(1);
402-
break;
403-
}
404-
}
405-
}
406-
407366
#endif

0 commit comments

Comments
 (0)