Skip to content

Commit 9d1bad9

Browse files
committed
added SelectTimeout
1 parent 89bc545 commit 9d1bad9

File tree

4 files changed

+88
-33
lines changed

4 files changed

+88
-33
lines changed

src/main/scala/gopher/channels/ForeverSelectorBuilder.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package gopher.channels
22

33
import scala.language.experimental.macros
4-
import scala.reflect.macros.whitebox.Context
4+
import scala.reflect.macros.blackbox.Context
55
import scala.reflect.api._
66
import gopher._
77
import gopher.util._
88
import scala.concurrent._
9+
import scala.concurrent.duration._
910
import scala.annotation.unchecked._
1011

1112

@@ -36,6 +37,14 @@ trait ForeverSelectorBuilder extends SelectorBuilder[Unit]
3637
def writingWithFlowTerminationAsync[A](ch:Output[A], x: =>A, f: (ExecutionContext, FlowTermination[Unit], A) => Future[Unit] ): ForeverSelectorBuilder =
3738
withWriter[A](ch, { cw => Some(x,f(ec,cw.flowTermination, x) map Function.const(cw)) } )
3839

40+
def timeout(t:FiniteDuration)(f: FiniteDuration => Unit): ForeverSelectorBuilder =
41+
macro SelectorBuilder.timeoutImpl[Unit,ForeverSelectorBuilder]
42+
43+
@inline
44+
def timeoutWithFlowTerminationAsync(t:FiniteDuration,
45+
f: (ExecutionContext, FlowTermination[Unit], FiniteDuration) => Future[Unit] ): this.type =
46+
withTimeout(t){ sk => Some(f(ec,sk.flowTermination,t) map Function.const(sk)) }
47+
3948

4049
def idle(body:Unit): ForeverSelectorBuilder =
4150
macro SelectorBuilder.idleImpl[Unit,ForeverSelectorBuilder]

src/main/scala/gopher/channels/SelectorBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ object SelectorBuilder
388388
val expression = if (!caseDef.guard.isEmpty) {
389389
parseGuardInSelectorCaseDef(c)(termName,caseDef.guard)
390390
} else {
391-
atPos(caseDef.pat.pos)(q"implicitly[akka.util.Timeout]")
391+
atPos(caseDef.pat.pos)(q"implicitly[akka.util.Timeout].duration")
392392
}
393393
val timeout = q"${builderName}.timeout(${expression})(${param} => ${body} )"
394394
atPos(caseDef.pat.pos)(timeout)

src/test/scala/gopher/channels/MacroSelectSuite.scala

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -335,37 +335,6 @@ class MacroSelectSuite extends FunSuite
335335
assert(x2.isInstanceOf[ChannelClosedException])
336336
}
337337

338-
test("select with constant timeout which not fire") {
339-
//pending
340-
import gopherApi._
341-
val ch1 = makeChannel[Int](10)
342-
val r = select.amap {
343-
case x:ch1.read =>
344-
//System.err.println(s"readed ${x}")
345-
x
346-
case y:select.timeout if (y==500.milliseconds) =>
347-
//System.err.println(s"timeout ${y}")
348-
-1
349-
}
350-
val f1 = ch1.awrite(1)
351-
val x = Await.result(r.aread, 10 seconds)
352-
assert(x==1)
353-
}
354-
355-
test("select with constant timeout which fire") {
356-
import gopherApi._
357-
val ch1 = makeChannel[Int](10)
358-
val r = select.amap {
359-
case x:ch1.read =>
360-
//System.err.println(s"readed ${x}")
361-
x
362-
case x:select.timeout if (x==500.milliseconds) =>
363-
//System.err.println(s"timeout ${x}")
364-
-1
365-
}
366-
val x = Await.result(r.aread, 10 seconds)
367-
assert(x == -1)
368-
}
369338

370339
lazy val gopherApi = CommonTestObjects.gopherApi
371340

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package gopher.channels
2+
3+
import gopher._
4+
import gopher.channels._
5+
import gopher.tags._
6+
7+
import org.scalatest._
8+
9+
import scala.language._
10+
import scala.concurrent._
11+
import scala.concurrent.duration._
12+
13+
import akka.util.Timeout
14+
15+
class SelectTimeoutSuite extends FunSuite
16+
{
17+
18+
import scala.concurrent.ExecutionContext.Implicits.global
19+
20+
21+
test("select with constant timeout which not fire") {
22+
//pending
23+
import gopherApi._
24+
val ch1 = makeChannel[Int](10)
25+
val r = select.amap {
26+
case x:ch1.read =>
27+
//System.err.println(s"readed ${x}")
28+
x
29+
case y:select.timeout if (y==500.milliseconds) =>
30+
//System.err.println(s"timeout ${y}")
31+
-1
32+
}
33+
val f1 = ch1.awrite(1)
34+
val x = Await.result(r.aread, 10 seconds)
35+
assert(x==1)
36+
}
37+
38+
test("select with constant timeout which fire") {
39+
import gopherApi._
40+
val ch1 = makeChannel[Int](10)
41+
val r = select.amap {
42+
case x:ch1.read =>
43+
//System.err.println(s"readed ${x}")
44+
x
45+
case x:select.timeout if (x==500.milliseconds) =>
46+
//System.err.println(s"timeout ${x}")
47+
-1
48+
}
49+
val x = Await.result(r.aread, 10 seconds)
50+
assert(x == -1)
51+
}
52+
53+
test("timeout in select.forever") {
54+
import gopherApi._
55+
val ch1 = makeChannel[Int](10)
56+
val ch2 = makeChannel[Int]()
57+
val chS = makeChannel[String](10)
58+
var s = 0
59+
implicit val timeout = Timeout(100 milliseconds)
60+
val f = select.forever{
61+
case x: ch1.read =>
62+
chS.write("1")
63+
case x: ch2.read =>
64+
chS.write("2")
65+
case x:select.timeout =>
66+
s += 1
67+
chS.write("t")
68+
if (s > 2) select.exit(())
69+
}
70+
val x = Await.result(f, 10 seconds)
71+
assert(s > 2)
72+
}
73+
74+
75+
lazy val gopherApi = CommonTestObjects.gopherApi
76+
77+
}

0 commit comments

Comments
 (0)