@@ -30,6 +30,18 @@ effect stop(): Nothing
30
30
def for[A] { stream: () => Unit / emit[A] } { action: A => Unit }: Unit =
31
31
returning::for[A, Unit]{stream}{action}
32
32
33
+ /// Applies the given function on each emitted element and reemits the result.
34
+ ///
35
+ /// e.g. map[Int,String]{ x => x.show } { range(0, 5) }
36
+ def map[A, B] { function: A => B } { stream: () => Unit / emit[A] }: Unit / emit[B] =
37
+ returning::map[A, B, Unit]{function}{stream}
38
+
39
+ /// Runs the given action whenever the given reader reads an element.
40
+ ///
41
+ /// e.g. var i = -1; with tap { i = i + 1; i }; reader()
42
+ def tap[A] { action: () => A / stop } { reader: () => Unit / read[A] }: Unit =
43
+ returning::tap[A, Unit]{action}{reader}
44
+
33
45
/// Runs the stream as a message queue,
34
46
/// handling each of its elements that can produce new elements ("messages"),
35
47
/// until a fixed point is reached.
@@ -162,8 +174,8 @@ def index[A] { stream: () => Unit / emit[A] }: Unit / emit[Indexed[A]] =
162
174
163
175
164
176
/// If `number` is zero or negative it does nothing
165
- def limit[A](number: Int) { stream: () => Unit / emit[A] }: Unit / { emit[A], stop} =
166
- returning::limit[A, Unit](number){stream}
177
+ def limit[A](number: Int) { stream: () => Unit / emit[A] }: Unit / emit[A] =
178
+ boundary { returning::limit[A, Unit](number){stream} }
167
179
168
180
169
181
/// If `number` is zero or negative it does nothing
@@ -405,6 +417,12 @@ def encodeUTF8[R] { stream: () => R / emit[Char] }: R / emit[Byte] =
405
417
resume(encodeChar(char))
406
418
}
407
419
420
+ def writeFileUTF8[R](path: String) { stream: () => R / emit[Char] }: R / Exception[IOError] =
421
+ writeFile(path) { encodeUTF8 { stream() } }
422
+
423
+ def readFileUTF8[R](path: String) { reader: () => R / read[Char] }: R / Exception[IOError] =
424
+ readFile(path) { decodeUTF8 { reader() } }
425
+
408
426
def feed[R](string: String) { reader: () => R / read[Char] } =
409
427
feed(string.fromString) {
410
428
decodeUTF8 {
@@ -422,6 +440,12 @@ def each(string: String): Unit / emit[Char] =
422
440
def collectString { stream: () => Unit / emit[Char] }: String =
423
441
returning::collectString[Unit]{stream}.second
424
442
443
+ def writeLine { body: () => Unit / emit[Char] }: Unit / emit[Char] =
444
+ returning::writeLine[Unit]{body}
445
+
446
+ def readLine { body: () => Unit / read[Char] }: Unit / {read[Char], stop} =
447
+ returning::readLine[Unit]{body}
448
+
425
449
namespace internal {
426
450
effect snapshot(): Unit
427
451
}
@@ -515,6 +539,20 @@ def for[A, R] { stream: () => R / emit[A] } { action: A => Unit }: R =
515
539
resume(action(value))
516
540
}
517
541
542
+ def map[A, B, R] { function: A => B } { stream: () => R / emit[A] }: R / emit[B] =
543
+ try {
544
+ stream()
545
+ } with emit[A] { value =>
546
+ resume(do emit(function(value)))
547
+ }
548
+
549
+ def tap[A, R] { action: () => A / stop } { reader: () => R / read[A] }: R =
550
+ try {
551
+ reader()
552
+ } with read[A] { () =>
553
+ resume { action() }
554
+ }
555
+
518
556
def boundary[R] { program: () => R / stop }: Option[R] =
519
557
try {
520
558
Some(program())
@@ -542,17 +580,21 @@ def index[A, R] { stream: () => R / emit[A] }: R / emit[Indexed[A]] = {
542
580
543
581
/// If `number` is zero or negative it does nothing
544
582
def limit[A, R](number: Int) { stream: () => R / emit[A] }: R / { emit[A], stop } = {
545
- if (number <= 0) do stop();
546
- var i = 1;
547
- try {
548
- stream()
549
- } with emit[A] { v =>
550
- if (i < number) {
551
- i = i + 1;
552
- resume(do emit(v))
553
- } else {
554
- do stop()
583
+ if (number > 0) {
584
+ var i = number;
585
+ try {
586
+ stream()
587
+ } with emit[A] { v =>
588
+ do emit(v);
589
+ i = i - 1;
590
+ if (i > 0) {
591
+ resume(())
592
+ } else {
593
+ do stop()
594
+ }
555
595
}
596
+ } else {
597
+ do stop()
556
598
}
557
599
}
558
600
@@ -615,6 +657,28 @@ def collectString[R] { stream: () => R / emit[Char] }: (R, String) = {
615
657
(result, bytes.toString)
616
658
}
617
659
660
+ def writeLine[R] { body: () => R / emit[Char] }: R / emit[Char] = {
661
+ val result = body()
662
+ do emit('\n')
663
+ return result
664
+ }
665
+
666
+ def readLine[R] { body: () => R / read[Char] }: R / {read[Char], stop} = {
667
+ var stopped = false
668
+ try {
669
+ body()
670
+ } with read[Char] {
671
+ if(stopped){
672
+ resume { do stop() }
673
+ } else {
674
+ do read[Char] match {
675
+ case '\n' => stopped = true; resume { do stop() }
676
+ case char => resume { return char }
677
+ }
678
+ }
679
+ }
680
+ }
681
+
618
682
def source[A, R] { stream: () => Unit / emit[A] } { reader: () => R / read[A] }: R = {
619
683
var next = box { None() }
620
684
next = box {
0 commit comments