diff --git a/effekt/js/src/main/scala/effekt/EffektConfig.scala b/effekt/js/src/main/scala/effekt/EffektConfig.scala index 7ffeccdc8..903c158a7 100644 --- a/effekt/js/src/main/scala/effekt/EffektConfig.scala +++ b/effekt/js/src/main/scala/effekt/EffektConfig.scala @@ -20,11 +20,13 @@ trait EffektConfig { def prelude(): List[String] = List( "effekt", "option", + "stream", "list", "result", "exception", "array", "char", + "bytearray", "string", "ref" ) diff --git a/effekt/jvm/src/main/scala/effekt/Runner.scala b/effekt/jvm/src/main/scala/effekt/Runner.scala index c5e94f5a5..560033a5d 100644 --- a/effekt/jvm/src/main/scala/effekt/Runner.scala +++ b/effekt/jvm/src/main/scala/effekt/Runner.scala @@ -43,7 +43,7 @@ trait Runner[Executable] { * if module A depends on module B, then B should come before A. * - Furthermore, each module mentioned here must import the `effekt` module as its first import. */ - def prelude: List[String] = List("effekt", "option", "list", "result", "exception", "array", "char", "string", "ref") + def prelude: List[String] = List("effekt", "option", "stream", "list", "result", "exception", "array", "char", "bytearray", "string", "ref") /** * Creates a OS-specific script file that will execute the command when executed, diff --git a/effekt/jvm/src/test/scala/effekt/LSPTests.scala b/effekt/jvm/src/test/scala/effekt/LSPTests.scala index 70db6e775..784c2e55d 100644 --- a/effekt/jvm/src/test/scala/effekt/LSPTests.scala +++ b/effekt/jvm/src/test/scala/effekt/LSPTests.scala @@ -1380,7 +1380,7 @@ class LSPTests extends FunSuite { val expectedIRContents = raw"""ModuleDecl( | test, - | List(effekt, option, list, result, exception, array, char, string, ref), + | List(effekt, option, stream, list, result, exception, array, char, bytearray, string, ref), | Nil, | Nil, | List( diff --git a/examples/benchmarks/input_output/dyck_one.effekt b/examples/benchmarks/input_output/dyck_one.effekt index 50c77ce9d..77d194a21 100644 --- a/examples/benchmarks/input_output/dyck_one.effekt +++ b/examples/benchmarks/input_output/dyck_one.effekt @@ -28,7 +28,7 @@ type Tree { def readTree(): Tree / { Scan[Byte], stop } = { readIf[Byte] { b => b.toInt == 40 } - val children = collectList[Tree] { many { readTree() } } + val children = list::collect[Tree] { many { readTree() } } skipIf[Byte] { b => b.toInt == 41 } Node(children) } diff --git a/examples/stdlib/array/map.effekt b/examples/stdlib/array/map.effekt index ffc00d34b..4c7a45d55 100644 --- a/examples/stdlib/array/map.effekt +++ b/examples/stdlib/array/map.effekt @@ -3,7 +3,7 @@ def main() = { arr.map { x => x * 2 } println(arr) - val arr1 = allocate(0) + val arr1 = array::allocate(0) arr1.map { x => x * 2 } println(arr1) } \ No newline at end of file diff --git a/examples/stdlib/list/collect.effekt b/examples/stdlib/list/collect.effekt index e86bd9131..6ef17a14d 100644 --- a/examples/stdlib/list/collect.effekt +++ b/examples/stdlib/list/collect.effekt @@ -2,11 +2,11 @@ module examples/pos/list/collect def main() = { val empty = Nil[Int]() - empty.collect { x => Some(x) }.foreach { x => println(x) } + empty.collectSome { x => Some(x) }.foreach { x => println(x) } val l = [1, 2, 3, 4] - l.collect { x => if (x > 2) { Some(x * 10) } else { None() } }.foreach { x => println(x) } + l.collectSome { x => if (x > 2) { Some(x * 10) } else { None() } }.foreach { x => println(x) } val optList = [Some(1), None(), Some(2), None(), Some(3), Some(4), None()] - optList.collect { x => x }.foreach { x => println(x) } + optList.collectSome { x => x }.foreach { x => println(x) } } diff --git a/examples/stdlib/stream/characters.effekt b/examples/stdlib/stream/characters.effekt index 7bf66205b..b1bd244c4 100644 --- a/examples/stdlib/stream/characters.effekt +++ b/examples/stdlib/stream/characters.effekt @@ -5,7 +5,7 @@ def main() = { println(show(c) ++ " (" ++ show(c.toInt) ++ ")") } - val list = collectList[Char] { each("Hello") } + val list = list::collect[Char] { each("Hello") } println(list.map { c => c.show }) } diff --git a/examples/stdlib/stream/fastexp.effekt b/examples/stdlib/stream/fastexp.effekt index 11dbe589c..a8261bb67 100644 --- a/examples/stdlib/stream/fastexp.effekt +++ b/examples/stdlib/stream/fastexp.effekt @@ -62,8 +62,8 @@ def main() = { def testBits(n: Int) = { println(n) - println(collectList[Bool] {n.eachLSB}.prettyBits) - println(collectList[Bool] {n.eachMSB}.prettyBits) + println(list::collect[Bool] {n.eachLSB}.prettyBits) + println(list::collect[Bool] {n.eachMSB}.prettyBits) } testBits(0) diff --git a/examples/stdlib/stream/fibonacci.effekt b/examples/stdlib/stream/fibonacci.effekt index 7b3e3a4b2..c62da349a 100644 --- a/examples/stdlib/stream/fibonacci.effekt +++ b/examples/stdlib/stream/fibonacci.effekt @@ -1,9 +1,8 @@ -import stream def main() = { val max = 10 - val fibs = collectList[Int] { + val fibs = list::collect[Int] { var a = 0 var b = 1 diff --git a/examples/stdlib/stream/fuse_newlines.effekt b/examples/stdlib/stream/fuse_newlines.effekt index 3c050c347..28ecbb802 100644 --- a/examples/stdlib/stream/fuse_newlines.effekt +++ b/examples/stdlib/stream/fuse_newlines.effekt @@ -23,7 +23,7 @@ def skipNewlines(): Nothing / {read[Char], emit[Char], stop} = { def main() = { with feed("ab\n\nc\nde") - println(collectString { + println(string::collect { with exhaustively fuseNewlines() }) diff --git a/examples/stdlib/stream/map.effekt b/examples/stdlib/stream/map.effekt index 4f1a7efb5..cc4d4ea2e 100644 --- a/examples/stdlib/stream/map.effekt +++ b/examples/stdlib/stream/map.effekt @@ -8,10 +8,10 @@ def main() = { println(show(k) ++ ": " ++ show(v) ++ " (" ++ show(v.toInt) ++ ")") } - val newMap = collectMap[Int, Char](compareInt) { each(m) } + val newMap = map::collect[Int, Char](compareInt) { each(m) } println(map::internal::prettyPairs(newMap.toList) { n => show(n) } { c => show(c) }) - val hello: String = collectString { eachValue(m) } + val hello: String = string::collect { eachValue(m) } println(hello) } diff --git a/examples/stdlib/stream/taylor.effekt b/examples/stdlib/stream/taylor.effekt index 7ac8ec7de..a924ec4d4 100644 --- a/examples/stdlib/stream/taylor.effekt +++ b/examples/stdlib/stream/taylor.effekt @@ -8,7 +8,7 @@ def ints(): Unit / emit[Int] = { /// Take the `n` first elements of a `stream`, put them in a list. def take[A](n: Int) { stream: () => Unit / emit[A] }: List[A] = { - with collectList[A] + with collect[A] with boundary with limit[A](n) stream() diff --git a/examples/stdlib/stream/tee.effekt b/examples/stdlib/stream/tee.effekt index 234131093..fcf1abfb7 100644 --- a/examples/stdlib/stream/tee.effekt +++ b/examples/stdlib/stream/tee.effekt @@ -11,11 +11,11 @@ def main() = { } } def a{ b: => Unit / emit[Int] }: Unit = { - println(collectList[Int]{boundary{limit[Int](4){b}}}) + println(list::collect[Int]{boundary{limit[Int](4){b}}}) println("a done") } def b{ b: => Unit / emit[Int] }: Unit = { - println(collectList[Int]{boundary{limit[Int](9){b}}}) + println(list::collect[Int]{boundary{limit[Int](9){b}}}) println("b done") } println("a{ ... }") diff --git a/libraries/common/array.effekt b/libraries/common/array.effekt index 83a3d7c3a..57c621a19 100644 --- a/libraries/common/array.effekt +++ b/libraries/common/array.effekt @@ -4,6 +4,7 @@ import effekt import exception import list import option +import stream /// A mutable 0-indexed fixed-sized array. extern type Array[T] @@ -641,3 +642,55 @@ def println(l: Array[Int]): Unit = println(show(l)) def println(l: Array[Double]): Unit = println(show(l)) def println(l: Array[Bool]): Unit = println(show(l)) def println(l: Array[String]): Unit = println(show(l)) + + +// Streaming +// --------- + +/// Turns an `array` into a producer of a push stream +/// by emitting each contained value from 0 to length - 1. +def each[T](array: Array[T]): Unit / emit[T] = { + val n = array.size + def go(i: Int): Unit = { + if (i < n) { + do emit(array.unsafeGet(i)) + go(i + 1) + } + } + go(0) +} + +def feed[T, R](array: Array[T]) { reader: () => R / read[T] }: R = { + var i = 0 + try { + reader() + } with read[T] { + resume { + if (i < array.size) { + val c = i + i = c + 1 + array.unsafeGet(c) + } else { + do stop() + } + } + } +} + +def collect[A] { stream: () => Unit / emit[A] }: Array[A] = + returning::collect[A, Unit]{stream}.second + +namespace returning { + def collect[A, R] { stream: () => R / emit[A] }: (R, Array[A]) = { + var i = 0 + var a = allocate(1) + try { + (stream(), a.resize(i)) + } with emit[A] { (v) => + if (i >= a.size) { a = a.resize(2 * a.size) } + a.unsafeSet(i, v) + i = i + 1 + resume(()) + } + } +} diff --git a/libraries/common/bytearray.effekt b/libraries/common/bytearray.effekt index ee7f10778..fdd3dcdb5 100644 --- a/libraries/common/bytearray.effekt +++ b/libraries/common/bytearray.effekt @@ -1,5 +1,8 @@ module bytearray +import effekt +import stream + /** * A memory managed, mutable, fixed-length array of bytes. * @@ -200,3 +203,55 @@ def compareStringBytes(left: String, right: String): Ordering = { val r = right.fromString compareByteArray(l, r) } + + +// Streaming +// --------- + +/// Turns `bytes` into a producer of a push stream +/// by emitting each contained value from 0 to length - 1. +def each(bytes: ByteArray): Unit / emit[Byte] = { + val n = bytes.size + def go(i: Int): Unit = { + if (i < n) { + do emit(bytes.unsafeGet(i)) + go(i + 1) + } + } + go(0) +} + +def feed[R](bytes: ByteArray) { reader: () => R / read[Byte] }: R = { + var i = 0 + try { + reader() + } with read[Byte] { + resume { + if (i < bytes.size) { + val c = i + i = c + 1 + bytes.unsafeGet(c) + } else { + do stop() + } + } + } +} + +def collect { stream: () => Unit / emit[Byte] }: ByteArray = + returning::collect[Unit]{stream}.second + +namespace returning { + def collect[R] { stream: () => R / emit[Byte] }: (R, ByteArray) = { + var i = 0 + var a = allocate(1) + try { + (stream(), a.resize(i)) + } with emit[Byte] { (v) => + if (i >= a.size) { a = a.resize(2 * a.size) } + a.unsafeSet(i, v) + i = i + 1 + resume(()) + } + } +} diff --git a/libraries/common/char.effekt b/libraries/common/char.effekt index f7a51f6a3..fc8188a08 100644 --- a/libraries/common/char.effekt +++ b/libraries/common/char.effekt @@ -5,6 +5,7 @@ import effekt import exception import option import result +import stream /// Checks if the given character is an ASCII whitespace @@ -175,3 +176,89 @@ def utf16UnitCount(codepoint: Char): Int = codepoint match { extern def charWidth(c: Char) at {}: Int = // JavaScript strings are UTF-16 where every unicode character after 0xffff takes two units js "(${c} > 0xffff) ? 2 : 1" + + +def writeLine { body: () => Unit / emit[Char] }: Unit / emit[Char] = + returning::writeLine[Unit]{body} + +def readLine { body: () => Unit / read[Char] }: Unit / {read[Char], stop} = + returning::readLine[Unit]{body} + +namespace returning { + def writeLine[R] { body: () => R / emit[Char] }: R / emit[Char] = { + val result = body() + do emit('\n') + return result + } + + def readLine[R] { body: () => R / read[Char] }: R / {read[Char], stop} = { + var stopped = false + try { + body() + } with read[Char] { + if(stopped){ + resume { do stop() } + } else { + do read[Char] match { + case '\n' => stopped = true; resume { do stop() } + case char => resume { return char } + } + } + } + } +} + +def decodeChar(): Char / {read[Byte], stop} = { + val b = do read().toInt + if (b < 128) { + b.toChar + } else if (b < 224) { + val part1 = bitwiseShl(bitwiseAnd(b, 31), 6) + val part2 = bitwiseAnd(do read().toInt, 63) + bitwiseOr(part1, part2).toChar + } else if (b < 240) { + val part1 = bitwiseShl(bitwiseAnd(b, 15), 12) + val part2 = bitwiseShl(bitwiseAnd(do read().toInt, 63), 6) + val part3 = bitwiseAnd(do read().toInt, 63) + bitwiseOr(bitwiseOr(part1, part2), part3).toChar + } else { + val part1 = bitwiseShl(bitwiseAnd(b, 7), 18) + val part2 = bitwiseShl(bitwiseAnd(do read().toInt, 63), 12) + val part3 = bitwiseShl(bitwiseAnd(do read().toInt, 63), 6) + val part4 = bitwiseAnd(do read().toInt, 63) + bitwiseOr(bitwiseOr(bitwiseOr(part1, part2), part3), part4).toChar + } +} + +def encodeChar(char: Char): Unit / emit[Byte] = { + val code = char.toInt + if (code < 128) { + do emit(code.toByte) + } else if (code < 2048) { + do emit(bitwiseOr(192, bitwiseShr(code, 6)).toByte) + do emit(bitwiseOr(128, bitwiseAnd(code, 63)).toByte) + } else if (code < 65536) { + do emit(bitwiseOr(224, bitwiseShr(code, 12)).toByte) + do emit(bitwiseOr(128, bitwiseAnd(bitwiseShr(code, 6), 63)).toByte) + do emit(bitwiseOr(128, bitwiseAnd(code, 63)).toByte) + } else { + do emit(bitwiseOr(240, bitwiseShr(code, 18)).toByte) + do emit(bitwiseOr(128, bitwiseAnd(bitwiseShr(code, 12), 63)).toByte) + do emit(bitwiseOr(128, bitwiseAnd(bitwiseShr(code, 6), 63)).toByte) + do emit(bitwiseOr(128, bitwiseAnd(code, 63)).toByte) + } +} + +def decodeUTF8[R] { reader: () => R / read[Char] }: R / read[Byte] = + try { + reader() + } with read[Char] { + resume { decodeChar() } + } + +def encodeUTF8[R] { stream: () => R / emit[Char] }: R / emit[Byte] = + try { + stream() + } with emit[Char] { char => + resume(encodeChar(char)) + } diff --git a/libraries/common/io/filesystem.effekt b/libraries/common/io/filesystem.effekt index efe0bd0a2..0d847d344 100644 --- a/libraries/common/io/filesystem.effekt +++ b/libraries/common/io/filesystem.effekt @@ -71,6 +71,77 @@ def appendFile(path: String, contents: String): Unit / Exception[IOError] = { go() } +def writeFileUTF8[R](path: String) { stream: () => R / emit[Char] }: R / Exception[IOError] = + writeFile(path) { encodeUTF8 { stream() } } + +def readFileUTF8[R](path: String) { reader: () => R / read[Char] }: R / Exception[IOError] = + readFile(path) { decodeUTF8 { reader() } } + +def writeFile[R](path: String) { stream: () => R / emit[Byte] }: R / Exception[IOError] = { + + val file = openForWriting(path); + with on[IOError].finalize { close(file) } + + val chunkSize = 1048576 // 1MB + val buffer = bytearray::allocate(chunkSize) + var offset = 0 + + def push(i: Int, n: Int): Unit = { + val r = write(file, buffer, i, n, -1) + if (r < n) { + push(i + r, n - r) + } + } + + try { + val r = stream() + push(0, offset) + return r + } with emit[Byte] { (byte) => + if (offset >= buffer.size) { + push(0, buffer.size) + offset = 0 + } + buffer.unsafeSet(offset, byte) + offset = offset + 1 + resume(()) + } +} + +def readFile[R](path: String) { reader: () => R / read[Byte] }: R / Exception[IOError] = { + + val file = openForReading(path); + with on[IOError].finalize { close(file) } + + val chunkSize = 1048576 // 1MB + val buffer = bytearray::allocate(chunkSize) + var offset = 0 + var length = 0 + + def pull(): Unit / stop = { + read(file, buffer, 0, chunkSize, -1) match { + case 0 => + do stop() + case n => + length = n + } + } + + try { + reader() + } with read[Byte] { + resume { + if (offset >= length) { + pull() + offset = 0 + } + val byte = buffer.unsafeGet(offset) + offset = offset + 1 + return byte + } + } +} + /// An abstract interface applications can program against. /// /// Can be interpreted with the `filesystem` handler, or virtualized etc. diff --git a/libraries/common/json.effekt b/libraries/common/json.effekt index 2906ef1e9..f6bb773f3 100644 --- a/libraries/common/json.effekt +++ b/libraries/common/json.effekt @@ -165,7 +165,7 @@ def decodeJson(): Unit / {Scan[Char], JsonBuilder, Exception[WrongFormat]} = { case 'f' => expectString("false"); do bool(false) case '-' => do number(readDouble()) case d and d.isDigit => do number(readDouble()) - case '"' => do string(collectString { readQuotedString() }) + case '"' => do string(collect { readQuotedString() }) case '{' => do dict{ decodeJsonObject() } case '[' => do list{ decodeJsonList() } case _ => println("Unexpected " ++ c.toString); <> @@ -179,7 +179,7 @@ def decodeJsonObject(): Unit / {Scan[Char], JsonObjectBuilder, Exception[WrongFo skipWhitespace() while(do peek[Char]() is c and (c != '}')) { if (not(first)) { expectString(",") } - val k: String = collectString { readQuotedString() } + val k = string::collect { readQuotedString() } skipWhitespace() expectString(":") do field(k){ @@ -277,7 +277,7 @@ def build[R](){ body: => R / JsonBuilder }: (R, JsonValue) = { } (x, r) } -def buildList[R](){ body: => R / JsonBuilder }: (R, List[JsonValue]) = returning::collectList[JsonValue, R] { +def buildList[R](){ body: => R / JsonBuilder }: (R, List[JsonValue]) = list::returning::collect[JsonValue, R] { try body() with JsonBuilder { def number(n) = { do emit(Number(n)); resume(()) } def bool(b) = { do emit(Bool(b)); resume(()) } @@ -295,7 +295,7 @@ def buildList[R](){ body: => R / JsonBuilder }: (R, List[JsonValue]) = returning } } } -def buildDict[R](){ body: => R / JsonObjectBuilder }: (R, List[(String, JsonValue)]) = returning::collectList[(String, JsonValue), R] { +def buildDict[R](){ body: => R / JsonObjectBuilder }: (R, List[(String, JsonValue)]) = list::returning::collect[(String, JsonValue), R] { try body() with JsonObjectBuilder { def field(k) = resume { {v} => val x = build{v} @@ -315,7 +315,7 @@ namespace test { // Read quoted string feed("\"\ta\n\ra\"") { with scanner[Char] - println(collectString { readQuotedString() }) + println(string::collect { readQuotedString() }) } println("") diff --git a/libraries/common/list.effekt b/libraries/common/list.effekt index 21b8e306e..991d9f6f8 100644 --- a/libraries/common/list.effekt +++ b/libraries/common/list.effekt @@ -3,6 +3,7 @@ module list import effekt import option import exception +import stream /// Immutable linked list for finite sequences of elements. type List[A] { @@ -178,7 +179,7 @@ def map[A, B](l: List[A]) { f: A => B } : List[B] = { /// discarding the elements for which the function returned `None()`. /// /// O(N) -def collect[A, B](l: List[A]) { f : A => Option[B] }: List[B] = { +def collectSome[A, B](l: List[A]) { f : A => Option[B] }: List[B] = { var acc = Nil[B]() l.foreach { a => val optB = f(a) @@ -836,3 +837,46 @@ def println(l: List[Int]): Unit = println(show(l)) def println(l: List[Double]): Unit = println(show(l)) def println(l: List[Bool]): Unit = println(show(l)) def println(l: List[String]): Unit = println(show(l)) + + +// Streaming +// --------- + +/// Turns a `list` into a producer of a push stream +/// by emitting each contained value left-to-right. +def each[A](list: List[A]): Unit / emit[A] = + list match { + case Nil() => () + case Cons(head, tail) => + do emit(head) + each(tail) + } + +def feed[T, R](list: List[T]) { reader: () => R / read[T] }: R = { + var l = list + try { + reader() + } with read[T] { + resume { + l match { + case Nil() => do stop() + case Cons(value, rest) => + l = rest + return value + } + } + } +} + +def collect[A] { stream: () => Unit / emit[A] }: List[A] = + returning::collect[A, Unit]{stream}.second + +namespace returning { + def collect[A, R] { stream: () => R / emit[A] }: (R, List[A]) = + try { + (stream(), Nil()) + } with emit[A] { (v) => + val (r, vs) = resume(()); + (r, Cons(v, vs)) + } +} diff --git a/libraries/common/map.effekt b/libraries/common/map.effekt index 9010fb4e3..7faf0297d 100644 --- a/libraries/common/map.effekt +++ b/libraries/common/map.effekt @@ -1,5 +1,7 @@ module map +import stream + /// Ordered finite immutable map, backed by balanced binary trees of logarithmic depth. record Map[K, V](tree: internal::Tree[K, V], compare: (K, K) => Ordering at {}) @@ -989,4 +991,36 @@ namespace internal { "[" ++ res ++ "]" } -} \ No newline at end of file +} + + +// Streaming +// --------- + +/// Turns a `map` into a producer of a push stream +/// of `(key, value)` pairs by emitting each contained *in order*. +def each[K, V](map: Map[K, V]): Unit / emit[(K, V)] = + map.foreach { (k, v) => do emit((k, v)) } + +/// Turns a `map` into a producer of a push stream +/// of its keys by emitting each contained *in order*. +def eachKey[K, V](map: Map[K, V]): Unit / emit[K] = + map.foreach { (k, _v) => do emit(k) } + +/// Turns a `map` into a producer of a push stream +/// of its values by emitting each contained. +def eachValue[K, V](map: Map[K, V]): Unit / emit[V] = + map.foreach { (_k, v) => do emit(v) } + +def collect[K, V](compare: (K, K) => Ordering at {}) { stream: () => Unit / emit[(K, V)] }: Map[K, V] = + returning::collect[K, V, Unit](compare){stream}.second + +namespace returning { + def collect[K, V, R](compare: (K, K) => Ordering at {}) { stream: () => R / emit[(K, V)] }: (R, Map[K, V]) = + try { + (stream(), empty(compare)) + } with emit[(K, V)] { case (k, v) => + val (r, map) = resume(()); + (r, map.put(k, v)) + } +} diff --git a/libraries/common/random.effekt b/libraries/common/random.effekt index b1998aec7..647b0d3e0 100644 --- a/libraries/common/random.effekt +++ b/libraries/common/random.effekt @@ -1,6 +1,7 @@ module random import stream +import io/filesystem import io/error /// Infinite pull stream of random bytes. diff --git a/libraries/common/set.effekt b/libraries/common/set.effekt index c7a158126..4c67f2869 100644 --- a/libraries/common/set.effekt +++ b/libraries/common/set.effekt @@ -1,6 +1,7 @@ module set import map +import stream /// Ordered finite immutable set, backed by balanced binary trees of logarithmic depth. record Set[A](tree: internal::Tree[A, Unit], compare: (A, A) => Ordering at {}) @@ -231,4 +232,26 @@ def intersection[A](s1: Set[A], s2: Set[A], compare: (A, A) => Ordering at {}): /// /// O(???) def intersection[A](s1: Set[A], s2: Set[A]): Set[A] = - s1.intersection(s2, s1.compare) \ No newline at end of file + s1.intersection(s2, s1.compare) + + +// Streaming +// --------- + +/// Turns a `set` into a producer of a push stream +/// by emitting each contained *in order*. +def each[A](set: Set[A]): Unit / emit[A] = + set.foreach { x => do emit(x) } + +def collect[A](compare: (A, A) => Ordering at {}) { stream: () => Unit / emit[A] }: Set[A] = + returning::collect[A, Unit](compare){stream}.second + +namespace returning { + def collect[A, R](compare: (A, A) => Ordering at {}) { stream: () => R / emit[A] }: (R, Set[A]) = + try { + (stream(), empty(compare)) + } with emit[A] { (v) => + val (r, set) = resume(()); + (r, set.insert(v)) + } +} diff --git a/libraries/common/stream.effekt b/libraries/common/stream.effekt index 7ff3e80ed..9b802c936 100644 --- a/libraries/common/stream.effekt +++ b/libraries/common/stream.effekt @@ -1,12 +1,7 @@ module stream -import array -import bytearray -import map -import set - -import io/filesystem -import io/error +import effekt +import option // Effects // ------- @@ -53,68 +48,6 @@ def fix[T] { one: T => Unit / emit[T] } { stream: => Unit / emit[T] }: Unit = resume(()) } -/// Turns a `list` into a producer of a push stream -/// by emitting each contained value left-to-right. -def each[A](list: List[A]): Unit / emit[A] = - list match { - case Nil() => () - case Cons(head, tail) => - do emit(head) - each(tail) - } - -/// Turns an `array` into a producer of a push stream -/// by emitting each contained value from 0 to length - 1. -def each[T](array: Array[T]): Unit / emit[T] = { - val n = array.size - def go(i: Int): Unit = { - if (i < n) { - do emit(array.unsafeGet(i)) - go(i + 1) - } - } - go(0) -} - -/// Turns `bytes` into a producer of a push stream -/// by emitting each contained value from 0 to length - 1. -def each(bytes: ByteArray): Unit / emit[Byte] = { - val n = bytes.size - def go(i: Int): Unit = { - if (i < n) { - do emit(bytes.unsafeGet(i)) - go(i + 1) - } - } - go(0) -} - -/// Turns a `map` into a producer of a push stream -/// of `(key, value)` pairs by emitting each contained *in order*. -def each[K, V](map: Map[K, V]): Unit / emit[(K, V)] = - map.foreach { (k, v) => do emit((k, v)) } - -/// Turns a `map` into a producer of a push stream -/// of its keys by emitting each contained *in order*. -def eachKey[K, V](map: Map[K, V]): Unit / emit[K] = - map.foreach { (k, _v) => do emit(k) } - -/// Turns a `map` into a producer of a push stream -/// of its values by emitting each contained. -def eachValue[K, V](map: Map[K, V]): Unit / emit[V] = - map.foreach { (_k, v) => do emit(v) } - -/// Turns a `set` into a producer of a push stream -/// by emitting each contained *in order*. -def each[A](set: Set[A]): Unit / emit[A] = - set.foreach { x => do emit(x) } - -// not option -// not dequeue -// not queue -// not result -// not seq - def boundary { program: () => Unit / stop }: Unit = { returning::boundary[Unit]{program} @@ -202,87 +135,6 @@ def sum { stream: () => Unit / emit[Int] }: Int = def product { stream: () => Unit / emit[Int] }: Int = returning::product[Unit]{stream}.second -def collectList[A] { stream: () => Unit / emit[A] }: List[A] = - returning::collectList[A, Unit]{stream}.second - -def collectArray[A] { stream: () => Unit / emit[A] }: Array[A] = - returning::collectArray[A, Unit]{stream}.second - -def collectBytes { stream: () => Unit / emit[Byte] }: ByteArray = - returning::collectBytes[Unit]{stream}.second - -def feed[T, R](list: List[T]) { reader: () => R / read[T] }: R = { - var l = list - try { - reader() - } with read[T] { - resume { - l match { - case Nil() => do stop() - case Cons(value, rest) => - l = rest - return value - } - } - } -} - -def collectMap[K, V, R](compare: (K, K) => Ordering at {}) { stream: () => R / emit[(K, V)] }: (R, Map[K, V]) = - try { - (stream(), map::empty(compare)) - } with emit[(K, V)] { case (k, v) => - val (r, map) = resume(()); - (r, map.put(k, v)) - } - -def collectMap[K, V](compare: (K, K) => Ordering at {}) { stream: () => Any / emit[(K, V)] }: Map[K, V] = - collectMap[K, V, Any](compare){stream}.second - -def collectSet[A, R](compare: (A, A) => Ordering at {}) { stream: () => R / emit[A] }: (R, Set[A]) = - try { - (stream(), set::empty(compare)) - } with emit[A] { (v) => - val (r, set) = resume(()); - (r, set.insert(v)) - } - -def collectSet[A](compare: (A, A) => Ordering at {}) { stream: () => Any / emit[A] }: Set[A] = - collectSet[A, Any](compare){stream}.second - -def feed[T, R](array: Array[T]) { reader: () => R / read[T] }: R = { - var i = 0 - try { - reader() - } with read[T] { - resume { - if (i < array.size) { - val c = i - i = c + 1 - array.unsafeGet(c) - } else { - do stop() - } - } - } -} - -def feed[R](bytes: ByteArray) { reader: () => R / read[Byte] }: R = { - var i = 0 - try { - reader() - } with read[Byte] { - resume { - if (i < bytes.size) { - val c = i - i = c + 1 - bytes.unsafeGet(c) - } else { - do stop() - } - } - } -} - def source[A] { stream: () => Unit / emit[A] } { reader: () => Unit / read[A] }: Unit = returning::source[A, Unit]{stream}{reader} @@ -319,155 +171,6 @@ def zipLongest[A, B] { stream1: () => Unit / emit[A] } { stream2: () => Unit / e } } -def writeFile[R](path: String) { stream: () => R / emit[Byte] }: R / Exception[IOError] = { - - val file = openForWriting(path); - with on[IOError].finalize { close(file) } - - val chunkSize = 1048576 // 1MB - val buffer = bytearray::allocate(chunkSize) - var offset = 0 - - def push(i: Int, n: Int): Unit = { - val r = write(file, buffer, i, n, -1) - if (r < n) { - push(i + r, n - r) - } - } - - try { - val r = stream() - push(0, offset) - return r - } with emit[Byte] { (byte) => - if (offset >= buffer.size) { - push(0, buffer.size) - offset = 0 - } - buffer.unsafeSet(offset, byte) - offset = offset + 1 - resume(()) - } -} - -def readFile[R](path: String) { reader: () => R / read[Byte] }: R / Exception[IOError] = { - - val file = openForReading(path); - with on[IOError].finalize { close(file) } - - val chunkSize = 1048576 // 1MB - val buffer = bytearray::allocate(chunkSize) - var offset = 0 - var length = 0 - - def pull(): Unit / stop = { - read(file, buffer, 0, chunkSize, -1) match { - case 0 => - do stop() - case n => - length = n - } - } - - try { - reader() - } with read[Byte] { - resume { - if (offset >= length) { - pull() - offset = 0 - } - val byte = buffer.unsafeGet(offset) - offset = offset + 1 - return byte - } - } -} - -def decodeChar(): Char / {read[Byte], stop} = { - val b = do read().toInt - if (b < 128) { - b.toChar - } else if (b < 224) { - val part1 = bitwiseShl(bitwiseAnd(b, 31), 6) - val part2 = bitwiseAnd(do read().toInt, 63) - bitwiseOr(part1, part2).toChar - } else if (b < 240) { - val part1 = bitwiseShl(bitwiseAnd(b, 15), 12) - val part2 = bitwiseShl(bitwiseAnd(do read().toInt, 63), 6) - val part3 = bitwiseAnd(do read().toInt, 63) - bitwiseOr(bitwiseOr(part1, part2), part3).toChar - } else { - val part1 = bitwiseShl(bitwiseAnd(b, 7), 18) - val part2 = bitwiseShl(bitwiseAnd(do read().toInt, 63), 12) - val part3 = bitwiseShl(bitwiseAnd(do read().toInt, 63), 6) - val part4 = bitwiseAnd(do read().toInt, 63) - bitwiseOr(bitwiseOr(bitwiseOr(part1, part2), part3), part4).toChar - } -} - -def encodeChar(char: Char): Unit / emit[Byte] = { - val code = char.toInt - if (code < 128) { - do emit(code.toByte) - } else if (code < 2048) { - do emit(bitwiseOr(192, bitwiseShr(code, 6)).toByte) - do emit(bitwiseOr(128, bitwiseAnd(code, 63)).toByte) - } else if (code < 65536) { - do emit(bitwiseOr(224, bitwiseShr(code, 12)).toByte) - do emit(bitwiseOr(128, bitwiseAnd(bitwiseShr(code, 6), 63)).toByte) - do emit(bitwiseOr(128, bitwiseAnd(code, 63)).toByte) - } else { - do emit(bitwiseOr(240, bitwiseShr(code, 18)).toByte) - do emit(bitwiseOr(128, bitwiseAnd(bitwiseShr(code, 12), 63)).toByte) - do emit(bitwiseOr(128, bitwiseAnd(bitwiseShr(code, 6), 63)).toByte) - do emit(bitwiseOr(128, bitwiseAnd(code, 63)).toByte) - } -} - -def decodeUTF8[R] { reader: () => R / read[Char] }: R / read[Byte] = - try { - reader() - } with read[Char] { - resume { decodeChar() } - } - -def encodeUTF8[R] { stream: () => R / emit[Char] }: R / emit[Byte] = - try { - stream() - } with emit[Char] { char => - resume(encodeChar(char)) - } - -def writeFileUTF8[R](path: String) { stream: () => R / emit[Char] }: R / Exception[IOError] = - writeFile(path) { encodeUTF8 { stream() } } - -def readFileUTF8[R](path: String) { reader: () => R / read[Char] }: R / Exception[IOError] = - readFile(path) { decodeUTF8 { reader() } } - -def feed[R](string: String) { reader: () => R / read[Char] } = - feed(string.fromString) { - decodeUTF8 { - reader() - } - } - -def each(string: String): Unit / emit[Char] = - feed(string) { - exhaustively { - do emit[Char](do read[Char]()) - } - } - -def collectString { stream: () => Unit / emit[Char] }: String = - returning::collectString[Unit]{stream}.second - -def writeLine { body: () => Unit / emit[Char] }: Unit / emit[Char] = - returning::writeLine[Unit]{body} - -def readLine { body: () => Unit / read[Char] }: Unit / {read[Char], stop} = - returning::readLine[Unit]{body} - namespace internal { effect snapshot(): Unit } @@ -640,67 +343,6 @@ def product[R] { stream: () => R / emit[Int] }: (R, Int) = { } } -def collectList[A, R] { stream: () => R / emit[A] }: (R, List[A]) = - try { - (stream(), Nil()) - } with emit[A] { (v) => - val (r, vs) = resume(()); - (r, Cons(v, vs)) - } - -def collectArray[A, R] { stream: () => R / emit[A] }: (R, Array[A]) = { - var i = 0 - var a = array::allocate(1) - try { - (stream(), a.resize(i)) - } with emit[A] { (v) => - if (i >= a.size) { a = a.resize(2 * a.size) } - a.unsafeSet(i, v) - i = i + 1 - resume(()) - } -} - -def collectBytes[R] { stream: () => R / emit[Byte] }: (R, ByteArray) = { - var i = 0 - var a = bytearray::allocate(1) - try { - (stream(), a.resize(i)) - } with emit[Byte] { (v) => - if (i >= a.size) { a = a.resize(2 * a.size) } - a.unsafeSet(i, v) - i = i + 1 - resume(()) - } -} - -def collectString[R] { stream: () => R / emit[Char] }: (R, String) = { - val (result, bytes) = collectBytes[R] { encodeUTF8 { stream } } - (result, bytes.toString) -} - -def writeLine[R] { body: () => R / emit[Char] }: R / emit[Char] = { - val result = body() - do emit('\n') - return result -} - -def readLine[R] { body: () => R / read[Char] }: R / {read[Char], stop} = { - var stopped = false - try { - body() - } with read[Char] { - if(stopped){ - resume { do stop() } - } else { - do read[Char] match { - case '\n' => stopped = true; resume { do stop() } - case char => resume { return char } - } - } - } -} - def source[A, R] { stream: () => Unit / emit[A] } { reader: () => R / read[A] }: R = { var next = box { None() } next = box { diff --git a/libraries/common/string.effekt b/libraries/common/string.effekt index 3277811a9..12963cdf5 100644 --- a/libraries/common/string.effekt +++ b/libraries/common/string.effekt @@ -2,10 +2,12 @@ module string import effekt import option +import stream import list import exception import result import char +import bytearray // TODO // - [ ] handle unicode codepoints (that can span two indices) correctly @@ -229,3 +231,31 @@ extern def unsafeCharAt(str: String, n: Int) at {}: Char = ret %Int %x """ vm "string::unsafeCharAt(String, Int)" + + +// Streaming +// --------- + +def feed[R](string: String) { reader: () => R / read[Char] } = + feed(string.fromString) { + decodeUTF8 { + reader() + } + } + +def each(string: String): Unit / emit[Char] = + feed(string) { + exhaustively { + do emit[Char](do read[Char]()) + } + } + +def collect { stream: () => Unit / emit[Char] }: String = + returning::collect[Unit]{stream}.second + +namespace returning { + def collect[R] { stream: () => R / emit[Char] }: (R, String) = { + val (result, bytes) = bytearray::returning::collect[R] { encodeUTF8 { stream } } + (result, bytes.toString) + } +}