Skip to content

Commit bb47755

Browse files
committed
log failures which can be in spawned operations
1 parent e8e2e7a commit bb47755

10 files changed

+23
-15
lines changed

shared/src/main/scala/gopher/Gopher.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package gopher
22

33
import cps._
44
import scala.concurrent.duration.Duration
5-
import scala.util.Try
5+
import scala.util._
66

77
import java.util.logging.{Level => LogLevel}
88

@@ -33,6 +33,14 @@ trait Gopher[F[_]:CpsSchedulingMonad]:
3333
protected[gopher] def logImpossible(ex: Throwable): Unit =
3434
log(LogLevel.WARNING, "impossible", ex)
3535

36+
protected[gopher] def spawnAndLogFail[T](op: =>F[T]): F[Unit] =
37+
asyncMonad.mapTry(asyncMonad.spawn(op)){
38+
case Success(_) => ()
39+
case Failure(ex) =>
40+
log(LogLevel.WARNING, "exception in spawned process", ex)
41+
()
42+
}
43+
3644

3745
def makeChannel[A](bufSize:Int = 0,
3846
autoClose: Boolean = false)(using g:Gopher[?]):Channel[g.Monad,A,A] =
@@ -46,7 +54,7 @@ def select(using g:Gopher[?]):Select[g.Monad] =
4654

4755
def futureInput[F[_],A](f: F[A])(using g: Gopher[F]): ReadChannel[F,A] =
4856
val ch = g.makeOnceChannel[Try[A]]()
49-
g.asyncMonad.spawn{
57+
g.spawnAndLogFail{
5058
g.asyncMonad.flatMapTry(f)(r => ch.awrite(r))
5159
}
5260
ch.map(_.get)

shared/src/main/scala/gopher/ReadChannel.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ trait ReadChannel[F[_], A]:
149149
def zip[B](x: ReadChannel[F,B]): ReadChannel[F,(A,B)] =
150150
given CpsSchedulingMonad[F] = asyncMonad
151151
val retval = gopherApi.makeChannel[(A,B)]()
152-
asyncMonad.spawn(async[F]{
152+
gopherApi.spawnAndLogFail(async[F]{
153153
var done = false
154154
while(!done) {
155155
this.optRead() match
@@ -213,7 +213,7 @@ object ReadChannel:
213213
def fromIterable[F[_],A](c: IterableOnce[A])(using Gopher[F]): ReadChannel[F,A] =
214214
given asyncMonad: CpsSchedulingMonad[F] = summon[Gopher[F]].asyncMonad
215215
val retval = makeChannel[A]()
216-
asyncMonad.spawn(async{
216+
summon[Gopher[F]].spawnAndLogFail(async{
217217
val it = c.iterator
218218
while(it.hasNext) {
219219
val a = it.next()

shared/src/main/scala/gopher/Select.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class Select[F[_]](api: Gopher[F]):
6060
def mapAsync[A](step: SelectGroup[F,A] => F[A]): ReadChannel[F,A] =
6161
val r = makeChannel[A]()(using api)
6262
given CpsSchedulingMonad[F] = api.asyncMonad
63-
api.asyncMonad.spawn{
63+
api.spawnAndLogFail{
6464
async{
6565
var done = false
6666
while(!done)

shared/src/main/scala/gopher/SelectGroup.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ class SelectGroup[F[_], S](api: Gopher[F]) extends SelectListeners[F,S,S]:
169169
if waitState.compareAndSet(0,1) then
170170
Some(v => {
171171
timeoutScheduled.foreach(_.cancel())
172-
m.spawn(
172+
api.spawnAndLogFail(
173173
m.mapTry(action(v))(x => call(x))
174174
)
175175
})
@@ -189,7 +189,7 @@ class SelectGroup[F[_], S](api: Gopher[F]) extends SelectListeners[F,S,S]:
189189
if waitState.compareAndSet(0,1) then
190190
Some((element, (v:Try[Unit]) => {
191191
timeoutScheduled.foreach(_.cancel())
192-
m.spawn(
192+
api.spawnAndLogFail(
193193
m.mapTry(action(v))(x=>call(x))
194194
)}
195195
))
@@ -204,7 +204,7 @@ class SelectGroup[F[_], S](api: Gopher[F]) extends SelectListeners[F,S,S]:
204204
def capture(): Option[Try[FiniteDuration] => Unit] =
205205
if (waitState.compareAndSet(0,1)) then
206206
Some((v:Try[FiniteDuration]) =>
207-
m.spawn(m.mapTry(action(v))(x => call(x)))
207+
api.spawnAndLogFail(m.mapTry(action(v))(x => call(x)))
208208
)
209209
else
210210
None

shared/src/main/scala/gopher/impl/ChFlatMappedReadChannel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class ChFlatMappedReadChannel[F[_], A, B](prev: ReadChannel[F,A], f: A=>ReadChan
4040
bChannel.close()
4141
}
4242

43-
gopherApi.asyncMonad.spawn(run())
43+
gopherApi.spawnAndLogFail(run())
4444

4545

4646

shared/src/main/scala/gopher/impl/ChFlatMappedTryReadChannel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class ChFlatMappedTryReadChannel[F[_], A, B](prev: ReadChannel[F,Try[A]], f: Try
4545
}
4646
}
4747

48-
gopherApi.asyncMonad.spawn(run())
48+
gopherApi.spawnAndLogFail(run())
4949

5050

5151
}

shared/src/main/scala/gopher/impl/DuppedInput.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,6 @@ class DuppedInput[F[_],A](origin:ReadChannel[F,A], bufSize: Int=1)(using api:Gop
3232
val f2 = sink2.write(a)
3333
true
3434
}}.runAsync()
35-
api.asyncMonad.spawn(runner)
35+
api.spawnAndLogFail(runner)
3636

3737
}

shared/src/main/scala/gopher/impl/FilteredReadChannel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class FilteredAsyncReadChannel[F[_],A](internal: ReadChannel[F,A], p: A=>F[Boole
6262

6363
def wrappedFun(fun: (Try[A] => Unit) ): (Try[A] => Unit) = {
6464
case Success(a) =>
65-
gopherApi.asyncMonad.spawn(
65+
gopherApi.spawnAndLogFail(
6666
gopherApi.asyncMonad.mapTry(p(a)){
6767
case Success(v) =>
6868
if (v) {

shared/src/main/scala/gopher/impl/MappedReadChannel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class MappedAsyncReadChannel[F[_],A, B](internal: ReadChannel[F,A], f: A=> F[B])
5151
def wrappedFun(fun: (Try[B] => Unit) ): (Try[A] => Unit) = {
5252
case Success(a) =>
5353
try{
54-
asyncMonad.spawn(
54+
gopherApi.spawnAndLogFail(
5555
asyncMonad.mapTry(f(a))(fun)
5656
)
5757
}catch{

shared/src/main/scala/gopher/monads/ReadTryChannelCpsMonad.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ given ReadTryChannelCpsMonad[F[_]](using Gopher[F]): CpsAsyncMonad[ [A] =>> Read
3737
def error[A](e: Throwable): ReadChannel[F,Try[A]] =
3838
val r = makeChannel[Try[A]]()
3939
given fm: CpsSchedulingMonad[F] = summon[Gopher[F]].asyncMonad
40-
fm.spawn{ async[F] {
40+
summon[Gopher[F]].spawnAndLogFail{ async[F] {
4141
r.write(Failure(e))
4242
r.close()
4343
} }
@@ -48,7 +48,7 @@ given ReadTryChannelCpsMonad[F[_]](using Gopher[F]): CpsAsyncMonad[ [A] =>> Read
4848
val r = makeOnceChannel[Try[A]]()
4949
given fm: CpsSchedulingMonad[F] = summon[Gopher[F]].asyncMonad
5050
val fv = fm.adoptCallbackStyle(source)
51-
fm.spawn{
51+
summon[Gopher[F]].spawnAndLogFail{
5252
fm.flatMapTry( fv ){ tryV =>
5353
r.awrite(tryV)
5454
}

0 commit comments

Comments
 (0)