Skip to content

Commit 0d03a87

Browse files
Stdlib: tee for emit-streams (#922)
This adds an implementation of `tee`-like functions for `emit` streams. There is: - `teeing` which is controlled by the consumer on the outside (i.e. if the outside consumer stops, it stops the inside one too) - `tee` which runs two consumers with one producer, stopping once both are done - `manyTee` which allows to implement `tee` for more than 2 consumers easily <del>Note that my initial implementation of this depended on #919. I don't think this one does, but would probably still want to hold off on merging this for either solving it or someone else being sufficiently certain it doesn't.</del>
1 parent a09e3b0 commit 0d03a87

File tree

3 files changed

+195
-0
lines changed

3 files changed

+195
-0
lines changed

examples/stdlib/stream/tee.check

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
a{ ... }
2+
1
3+
2
4+
3
5+
4
6+
5
7+
Cons(1, Cons(2, Cons(3, Cons(4, Nil()))))
8+
a done
9+
b{ ... }
10+
1
11+
2
12+
3
13+
4
14+
5
15+
6
16+
7
17+
8
18+
9
19+
10
20+
Cons(1, Cons(2, Cons(3, Cons(4, Cons(5, Cons(6, Cons(7, Cons(8, Cons(9, Nil())))))))))
21+
b done
22+
tee{a}{b}{ ... }
23+
1
24+
2
25+
3
26+
4
27+
5
28+
Cons(1, Cons(2, Cons(3, Cons(4, Nil()))))
29+
a done
30+
6
31+
7
32+
8
33+
9
34+
10
35+
Cons(1, Cons(2, Cons(3, Cons(4, Cons(5, Cons(6, Cons(7, Cons(8, Cons(9, Nil())))))))))
36+
b done
37+
tee{b}{a}{ ... }
38+
1
39+
2
40+
3
41+
4
42+
5
43+
Cons(1, Cons(2, Cons(3, Cons(4, Nil()))))
44+
a done
45+
6
46+
7
47+
8
48+
9
49+
10
50+
Cons(1, Cons(2, Cons(3, Cons(4, Cons(5, Cons(6, Cons(7, Cons(8, Cons(9, Nil())))))))))
51+
b done
52+
a{ teeing{b}{ ... } }
53+
1
54+
2
55+
3
56+
4
57+
5
58+
Cons(1, Cons(2, Cons(3, Cons(4, Nil()))))
59+
a done
60+
b{ teeing{a}{ ... } }
61+
1
62+
2
63+
3
64+
4
65+
5
66+
Cons(1, Cons(2, Cons(3, Cons(4, Nil()))))
67+
a done
68+
6
69+
7
70+
8
71+
9
72+
10
73+
Cons(1, Cons(2, Cons(3, Cons(4, Cons(5, Cons(6, Cons(7, Cons(8, Cons(9, Nil())))))))))
74+
b done

examples/stdlib/stream/tee.effekt

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import stream
2+
3+
def main() = {
4+
def stream() = {
5+
// equivalent: with teeing[Int]{ {b} => for[Int]{b}{ v => println(v) } }
6+
for[Int]{
7+
[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15].each
8+
}{ v =>
9+
println(v)
10+
do emit(v)
11+
}
12+
}
13+
def a{ b: => Unit / emit[Int] }: Unit = {
14+
println(collectList[Int]{boundary{limit[Int](5){b}}})
15+
println("a done")
16+
}
17+
def b{ b: => Unit / emit[Int] }: Unit = {
18+
println(collectList[Int]{boundary{limit[Int](10){b}}})
19+
println("b done")
20+
}
21+
println("a{ ... }")
22+
a{stream}
23+
println("b{ ... }")
24+
b{stream}
25+
println("tee{a}{b}{ ... }")
26+
tee[Int]{a}{b}{ stream() }
27+
println("tee{b}{a}{ ... }")
28+
tee[Int]{b}{a}{ stream() }
29+
println("a{ teeing{b}{ ... } }")
30+
a{
31+
with teeing[Int]{b}
32+
stream()
33+
}
34+
println("b{ teeing{a}{ ... } }")
35+
b{
36+
with teeing[Int]{a}
37+
stream()
38+
}
39+
}

libraries/common/stream.effekt

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,88 @@ def each(string: String): Unit / emit[Char] =
409409
def collectString { stream: () => Unit / emit[Char] }: String =
410410
returning::collectString[Unit]{stream}.second
411411

412+
namespace internal {
413+
effect snapshot(): Unit
414+
}
415+
416+
/// Use cons to handle prod, but also emit to the outside.
417+
/// The outside controls iteration, i.e., if cons aborts, the rest of prod will still be emitted.
418+
///
419+
/// Starts by executing cons. Will stop cons during execution if the outside stops consuming.
420+
///
421+
/// Example, printing all values consumed by the outside:
422+
///
423+
/// with teeing{ {s} => for{s}{ e => println(e) } }
424+
/// prod() // some producer
425+
///
426+
/// Laws-ish (hopefully):
427+
/// - hnd{ prd() } === hnd{ teeing{t}{ prd() } } for all hnd and t that calls its argument at most once (and has no other captures)
428+
def teeing[A]{ cons: { => Unit / emit[A] } => Unit }{ prod: => Unit / emit[A] }: Unit / emit[A] = region r {
429+
var consDone = false
430+
var k in r = box { prod() } // what still needs to run of prod after cons exits
431+
try {
432+
cons{
433+
try {
434+
prod()
435+
k = box { () }
436+
} with emit[A] { v =>
437+
do internal::snapshot() // remember that we need to continue here if cons aborts
438+
if(not(consDone)) do emit(v)
439+
outer.emit(v)
440+
resume(())
441+
}
442+
if(consDone) do stop() // cons already exited, so skip continuing there
443+
} // end cons
444+
consDone = true
445+
}
446+
with stop { () => () }
447+
with outer: emit[A]{ v => resume(do emit(v)) }
448+
with internal::snapshot{ () =>
449+
k = box { resume(()) }
450+
resume(())
451+
}
452+
k()
453+
}
454+
455+
/// Binds a `teeing`-like function in the body, stopping the inner producer once all consumers in tees are done consuming.
456+
///
457+
/// Example of use, equivalent to `tee[A]{cns1}{cns2}{prd}`:
458+
///
459+
/// manyTee[A] { {tee} =>
460+
/// with tee{cns1}
461+
/// with tee{cns2}
462+
/// prd()
463+
/// }
464+
///
465+
def manyTee[A]{ body: { { { => Unit / emit[A] } => Unit }{ => Unit / emit[A] } => Unit / emit[A] } => Unit / emit[A] }: Unit = {
466+
var running = 0
467+
try {
468+
body{ {hnd}{prod} =>
469+
running = running + 1
470+
teeing[A]{ {b} => hnd{b}; running = running - 1 }{prod}
471+
}
472+
} with emit[A] { _ =>
473+
if(running > 0) resume(())
474+
}
475+
}
476+
477+
/// Streams prod to both cons1 and cons2. Stops once both cons1 and cons2 stopped.
478+
/// Only runs prod once, resuming at most once at each emit.
479+
///
480+
/// var sumRes = 0
481+
/// var productRes = 0
482+
/// tee{ s => sumRes = sum{s} }{ s => productRes = product{s} }{ range(0, 10) }
483+
/// assertEquals(sum{ range(0, 10) }, sumRes)
484+
/// assertEquals(product{ range(0, 10) }, productRes)
485+
///
486+
def tee[A]{ cons1: { => Unit / emit[A] } => Unit }{ cons2: { => Unit / emit[A] } => Unit }{ prod: => Unit / emit[A] }: Unit = {
487+
manyTee[A]{ {tee} =>
488+
with tee{cons1}
489+
with tee{cons2}
490+
prod()
491+
}
492+
}
493+
412494
namespace returning {
413495

414496
/// Canonical handler of push streams that performs `action` for every

0 commit comments

Comments
 (0)