Skip to content

Commit e58c429

Browse files
committed
implemented select timeout in select.once
1 parent 9d1bad9 commit e58c429

File tree

5 files changed

+62
-2
lines changed

5 files changed

+62
-2
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ scalacOptions ++= Seq("-unchecked","-deprecation", "-feature" /* , "-Ymacro-debu
1313

1414
libraryDependencies <+= scalaVersion( "org.scala-lang" % "scala-reflect" % _ )
1515

16-
libraryDependencies += "org.scala-lang.modules" %% "scala-async" % "0.9.5"
16+
libraryDependencies += "org.scala-lang.modules" %% "scala-async" % "0.9.6-RC2"
1717

1818
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" % "test"
1919

src/main/scala/gopher/channels/FoldSelectorBuilder.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import scala.reflect.api._
66
import gopher._
77
import gopher.util._
88
import scala.concurrent._
9+
import scala.concurrent.duration._
910
import scala.annotation.unchecked._
1011

1112

@@ -33,6 +34,17 @@ trait FoldSelectorBuilder[T] extends SelectorBuilder[T]
3334
def writingWithFlowTerminationAsync[A](ch:Output[A], x: =>A, f: (ExecutionContext, FlowTermination[T], A) => Future[T] ): this.type =
3435
withWriter[A](ch, { cw => Some(x,f(ec,cw.flowTermination, x) map Function.const(cw)) } )
3536

37+
def timeout(t:FiniteDuration)(f: FiniteDuration => T): FoldSelectorBuilder[T] =
38+
macro SelectorBuilder.timeoutImpl[T,FoldSelectorBuilder[T]]
39+
40+
@inline
41+
def timeoutWithFlowTerminationAsync(t:FiniteDuration,
42+
f: (ExecutionContext, FlowTermination[T], FiniteDuration) => Future[T] ): this.type =
43+
withTimeout(t){ sk => Some(f(ec,sk.flowTermination,t) map Function.const(sk) ) }
44+
45+
46+
47+
3648
def idle(body:T): FoldSelectorBuilder[T] =
3749
macro SelectorBuilder.idleImpl[T,FoldSelectorBuilder[T]]
3850

src/main/scala/gopher/channels/OnceSelectorBuilder.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import scala.reflect.api._
66
import gopher._
77
import gopher.util._
88
import scala.concurrent._
9+
import scala.concurrent.duration._
910
import scala.annotation.unchecked._
1011

1112

@@ -35,6 +36,17 @@ trait OnceSelectorBuilder[T] extends SelectorBuilder[T@uncheckedVariance]
3536
def writingWithFlowTerminationAsync[A](ch:Output[A], x: =>A, f: (ExecutionContext, FlowTermination[T], A) => Future[T] ): this.type =
3637
withWriter[A](ch, { cw => Some(x,f(ec,cw.flowTermination,x) map(x => Done(x,cw.flowTermination)) ) } )
3738

39+
def timeout(t:FiniteDuration)(f: FiniteDuration => T): OnceSelectorBuilder[T] =
40+
macro SelectorBuilder.timeoutImpl[T,OnceSelectorBuilder[T]]
41+
42+
43+
@inline
44+
def timeoutWithFlowTerminationAsync(t:FiniteDuration,
45+
f: (ExecutionContext, FlowTermination[T], FiniteDuration) => Future[T] ): this.type =
46+
withTimeout(t){ sk => Some(f(ec,sk.flowTermination,t).map(x => Done(x,sk.flowTermination)) ) }
47+
48+
49+
3850
def idle(body: T): OnceSelectorBuilder[T] =
3951
macro SelectorBuilder.idleImpl[T,OnceSelectorBuilder[T]]
4052

src/main/scala/gopher/channels/SelectorBuilder.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,13 +197,14 @@ object SelectorBuilder
197197
import c.universe._
198198
f.tree match {
199199
case Function(valdefs, body) =>
200-
SelectorBuilder.buildAsyncCall[T,S](c)(valdefs,body,
200+
val r = SelectorBuilder.buildAsyncCall[T,S](c)(valdefs,body,
201201
{ (nvaldefs, nbody) =>
202202
q"""${c.prefix}.timeoutWithFlowTerminationAsync(${t},
203203
${Function(nvaldefs,nbody)}
204204
)
205205
"""
206206
})
207+
r
207208
case _ => c.abort(c.enclosingPosition,"second argument of timeout must have shape Function(x,y)")
208209
}
209210
}

src/test/scala/gopher/channels/SelectTimeoutSuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,41 @@ class SelectTimeoutSuite extends FunSuite
7171
assert(s > 2)
7272
}
7373

74+
test("timeout in select.fold") {
75+
import gopherApi._
76+
val ch1 = makeChannel[Int](10)
77+
val f = select.afold(0) { (state,sl) =>
78+
sl match {
79+
case x: ch1.read => state+1
80+
case x: select.timeout if (x == 100.milliseconds) =>
81+
select.exit(state+10)
82+
}
83+
}
84+
ch1.awrite(1)
85+
val x = Await.result(f, 10 seconds)
86+
assert(x==11)
87+
}
88+
89+
test("timeout in select.once") {
90+
import gopherApi._
91+
implicit val timeout = Timeout(100 milliseconds)
92+
val ch1 = makeChannel[Int](10)
93+
var x = 0
94+
val f = go {
95+
for(s <- select.once) {
96+
s match {
97+
case y: ch1.read => info("ch1 readed")
98+
x=1
99+
case y: select.timeout =>
100+
info("ch2 readed")
101+
x=10
102+
}
103+
}
104+
}
105+
Await.ready(f, 10 seconds)
106+
assert(x==10)
107+
108+
}
74109

75110
lazy val gopherApi = CommonTestObjects.gopherApi
76111

0 commit comments

Comments
 (0)