Skip to content

Commit 3fba32a

Browse files
committed
added ReadChannel.unfold
fiexe situation, where exception in map is not propagated to reader
1 parent bb27fe7 commit 3fba32a

File tree

4 files changed

+113
-12
lines changed

4 files changed

+113
-12
lines changed

shared/src/main/scala/gopher/ReadChannel.scala

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import gopher.impl._
55
import scala.util.Try
66
import scala.util.Success
77
import scala.util.Failure
8+
import scala.util.control.NonFatal
89
import scala.concurrent.duration.Duration
910

1011
import java.util.logging.{Level => LogLevel}
@@ -231,7 +232,35 @@ object ReadChannel:
231232
def fromValues[F[_],A](values: A*)(using Gopher[F]): ReadChannel[F,A] =
232233
fromIterable(values)
233234

234-
235+
def unfold[S,F[_],A](s:S)(f:S => Option[(A,S)])(using Gopher[F]): ReadChannel[F,A] =
236+
unfoldAsync[S,F,A](s)( state => summon[Gopher[F]].asyncMonad.tryPure(f(state)) )
237+
238+
def unfoldAsync[S,F[_],A](s:S)(f:S => F[Option[(A,S)]])(using Gopher[F]): ReadChannel[F,A]=
239+
given asyncMonad: CpsSchedulingMonad[F] = summon[Gopher[F]].asyncMonad
240+
val retval = makeChannel[Try[A]]()
241+
summon[Gopher[F]].spawnAndLogFail(async{
242+
var done = false
243+
var state = s
244+
try
245+
while(!done) {
246+
await(f(state)) match
247+
case Some((a,next)) =>
248+
retval.write(Success(a))
249+
state = next
250+
case None =>
251+
done = true
252+
}
253+
catch
254+
case NonFatal(ex) =>
255+
retval.write(Failure(ex))
256+
finally
257+
retval.close();
258+
})
259+
retval.map{
260+
case Success(x) => x
261+
case Failure(ex) =>
262+
throw ex
263+
}
235264

236265
end ReadChannel
237266

shared/src/main/scala/gopher/impl/FilteredReadChannel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class FilteredAsyncReadChannel[F[_],A](internal: ReadChannel[F,A], p: A=>F[Boole
6868
if (v) {
6969
if (markedUsed.get()) {
7070
nested.markUsed()
71-
}
71+
}
7272
fun(Success(a))
7373
} else {
7474
nested.markFree()

shared/src/main/scala/gopher/impl/MappedReadChannel.scala

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ class MappedReadChannel[F[_],A, B](internal: ReadChannel[F,A], f: A=> B) extends
1111

1212
def wrappedFun(fun: (Try[B] => Unit) ): (Try[A] => Unit) = {
1313
case Success(a) =>
14-
val b = f(a)
15-
fun(Success(b))
14+
try
15+
val b = f(a)
16+
fun(Success(b))
17+
catch
18+
case NonFatal(ex) =>
19+
fun(Failure(ex))
1620
case Failure(ex) =>
1721
fun(Failure(ex))
1822
}
@@ -50,14 +54,14 @@ class MappedAsyncReadChannel[F[_],A, B](internal: ReadChannel[F,A], f: A=> F[B])
5054

5155
def wrappedFun(fun: (Try[B] => Unit) ): (Try[A] => Unit) = {
5256
case Success(a) =>
53-
try{
54-
gopherApi.spawnAndLogFail(
55-
asyncMonad.mapTry(f(a))(fun)
56-
)
57-
}catch{
58-
case NonFatal(ex) =>
59-
fun(Failure(ex))
60-
}
57+
gopherApi.spawnAndLogFail(
58+
try
59+
asyncMonad.mapTry(f(a))(fun)
60+
catch
61+
case NonFatal(ex) =>
62+
fun(Failure(ex))
63+
asyncMonad.pure(())
64+
)
6165
case Failure(ex) =>
6266
fun(Failure(ex))
6367
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package gopher.channels
2+
3+
import gopher._
4+
import cps._
5+
import munit._
6+
7+
import scala.concurrent.ExecutionContext.Implicits.global
8+
import scala.concurrent.{Channel=>_,_}
9+
import scala.concurrent.duration._
10+
11+
import cps.monads.FutureAsyncMonad
12+
13+
class ReadChannelFactoryTest extends FunSuite {
14+
15+
given Gopher[Future] = Gopher[Future]()
16+
17+
18+
test("unfoldAsync produce stream simple") {
19+
val ch = ReadChannel.unfoldAsync(0){
20+
(x: Int) =>
21+
if (x > 10) then
22+
Future successful None
23+
else
24+
Future successful Some(x,x+1)
25+
}
26+
27+
ch.atake(20).map{ values =>
28+
assert(values(0) == 0)
29+
assert(values(1) == 1)
30+
assert(values(2) == 2)
31+
assert(values.size == 11)
32+
}
33+
34+
}
35+
36+
37+
test("unfoldAsync prodce stream with error") {
38+
val ch = ReadChannel.unfoldAsync(0){
39+
(x: Int) =>
40+
if (x > 3) then
41+
Future failed new RuntimeException("state is too big")
42+
else
43+
Future successful Some(x,x+1)
44+
}
45+
46+
async {
47+
val r0 = ch.read()
48+
assert(r0 == 0)
49+
val r1 = ch.read()
50+
assert(r1 == 1)
51+
val r2 = ch.read()
52+
assert(r2 == 2)
53+
val r3 = ch.read()
54+
assert(r3 == 3)
55+
var wasTooBig = false
56+
try {
57+
val r4 = ch.read()
58+
}catch{
59+
case e: RuntimeException =>
60+
wasTooBig = true
61+
}
62+
assert(wasTooBig)
63+
}
64+
65+
66+
}
67+
68+
}

0 commit comments

Comments
 (0)