Skip to content

Commit 74ee8c3

Browse files
committed
implemented timeoit for inputSelectorBuilder
1 parent 63ac590 commit 74ee8c3

File tree

4 files changed

+47
-8
lines changed

4 files changed

+47
-8
lines changed

src/main/scala/gopher/channels/InputSelectorBuilder.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import scala.reflect.api._
66
import gopher._
77
import gopher.util._
88
import scala.concurrent._
9+
import scala.concurrent.duration._
910
import scala.annotation.unchecked._
1011

1112

@@ -54,6 +55,16 @@ class InputSelectorBuilder[T](override val api: GopherAPI) extends SelectorBuild
5455
withIdle{ sk => Some(f(ec,sk.flowTermination) flatMap(x =>
5556
proxy.awrite(x)) map(Function.const(sk)) ) }
5657

58+
def timeout(t:FiniteDuration)(f: FiniteDuration => T): InputSelectorBuilder[T] =
59+
macro SelectorBuilder.timeoutImpl[T,InputSelectorBuilder[T]]
60+
61+
@inline
62+
def timeoutWithFlowTerminationAsync(t:FiniteDuration,
63+
f: (ExecutionContext, FlowTermination[T], FiniteDuration) => Future[T] ): this.type =
64+
withTimeout(t){ sk => Some(f(ec,sk.flowTermination,t) flatMap( x =>
65+
proxy.awrite(x)) map(Function.const(sk)) ) }
66+
67+
5768
def foreach(f:Any=>T):T =
5869
macro SelectorBuilder.foreachImpl[T]
5970

src/main/scala/gopher/channels/Selector.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
3232
idleWaiters add makeLocked(Skip(f,this))
3333
}
3434

35-
def addTimeoutSkip(f: Skip[A] => Option[Future[Continuated[A]]], timeout: FiniteDuration):Unit =
35+
def addTimeout(timeout:FiniteDuration, f: Skip[A] => Option[Future[Continuated[A]]]):Unit =
3636
{
3737
if (!timeoutRecord.isDefined) {
3838
timeoutRecord.lastNOperations = nOperations.get
@@ -46,6 +46,9 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
4646
def run:Future[A] =
4747
{
4848
sendWaits()
49+
if (timeoutRecord.isDefined) {
50+
scheduleTimeout()
51+
}
4952
api.idleDetector put this
5053
future
5154
}

src/main/scala/gopher/channels/SelectorBuilder.scala

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@ import scala.reflect.api._
66
import gopher._
77
import gopher.util._
88
import scala.concurrent._
9+
import scala.concurrent.duration._
910
import scala.annotation.unchecked._
1011

1112
trait SelectorBuilder[A]
1213
{
1314

15+
type timeout = FiniteDuration
16+
1417
def api: GopherAPI
1518

1619
def onRead[E](ch:Input[E])(arg: ReadSelectorArgument[E,A]): this.type =
@@ -52,6 +55,12 @@ trait SelectorBuilder[A]
5255
this
5356
}
5457

58+
@inline
59+
def withTimeout(t:FiniteDuration)(f: Skip[A] => Option[Future[Continuated[A]]]):this.type =
60+
{
61+
selector.addTimeout(t,f)
62+
this
63+
}
5564

5665
def go: Future[A] = selector.run
5766

@@ -183,6 +192,23 @@ object SelectorBuilder
183192
})
184193
}
185194

195+
def timeoutImpl[T:c.WeakTypeTag,S](c:Context)(t:c.Expr[FiniteDuration])(f:c.Expr[FiniteDuration=>T]):c.Expr[S] =
196+
{
197+
import c.universe._
198+
f.tree match {
199+
case Function(valdefs, body) =>
200+
SelectorBuilder.buildAsyncCall[T,S](c)(valdefs,body,
201+
{ (nvaldefs, nbody) =>
202+
q"""${c.prefix}.timeoutWithFlowTerminationAsync(${t},
203+
${Function(nvaldefs,nbody)}
204+
)
205+
"""
206+
})
207+
case _ => c.abort(c.enclosingPosition,"second argument of timeout must have shape Function(x,y)")
208+
}
209+
}
210+
211+
186212
def foreachImpl[T](c:Context)(f:c.Expr[Any=>T]):c.Expr[T] =
187213
{
188214
import c.universe._
@@ -365,7 +391,6 @@ object SelectorBuilder
365391
atPos(caseDef.pat.pos)(q"implicitly[akka.util.Timeout]")
366392
}
367393
val timeout = q"${builderName}.timeout(${expression})(${param} => ${body} )"
368-
System.err.println(s"generate timeout statement: ${timeout}")
369394
atPos(caseDef.pat.pos)(timeout)
370395
case _ =>
371396
if (caseDef.guard.isEmpty) {

src/test/scala/gopher/channels/MacroSelectSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -336,20 +336,20 @@ class MacroSelectSuite extends FunSuite
336336
}
337337

338338
test("select with constant timeout") {
339-
pending
340-
/*
339+
//pending
341340
import gopherApi._
342341
val ch1 = makeChannel[Int](10)
343342
val r = select.amap {
344343
case x:ch1.read =>
345-
System.err.println(s"readed ${x}")
344+
//System.err.println(s"readed ${x}")
346345
x
347-
case x:select.timeout if (x==500.milliseconds) =>
348-
System.err.println(s"timeout 500 ms")
346+
case y:select.timeout if (y==500.milliseconds) =>
347+
//System.err.println(s"timeout ${y}")
349348
-1
350349
}
351350
val f1 = ch1.awrite(1)
352-
*/
351+
val x = Await.result(r.aread, 10 seconds)
352+
assert(x==1)
353353
}
354354

355355
lazy val gopherApi = CommonTestObjects.gopherApi

0 commit comments

Comments
 (0)