Skip to content

Commit 336d737

Browse files
committed
idle nwwo works in terms of select timeout
1 parent e58c429 commit 336d737

File tree

8 files changed

+22
-129
lines changed

8 files changed

+22
-129
lines changed

src/main/scala/gopher/GopherAPI.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ class GopherAPI(as: ActorSystem, es: ExecutionContext)
5151
def makeChannel[A](capacity: Int = 0): Channel[A] =
5252
Channel[A](capacity)(this)
5353

54+
/**
55+
* create effected input with given thread-policy
56+
*/
5457
def makeEffectedInput[A](in: Input[A], threadingPolicy: ThreadingPolicy = ThreadingPolicy.Single): EffectedInput[A] =
5558
EffectedInput(in,threadingPolicy)
5659

@@ -104,9 +107,18 @@ class GopherAPI(as: ActorSystem, es: ExecutionContext)
104107
**/
105108
def config: Config = as.settings.config.atKey("gopher")
106109

110+
lazy val idleTimeout: FiniteDuration = {
111+
val m = try {
112+
config.getInt("idle-detection-tick")
113+
} catch {
114+
case ex: ConfigException.Missing => 100
115+
}
116+
m.milliseconds
117+
}
118+
107119
def currentFlow = CurrentFlowTermination
108120

109-
private[gopher] val idleDetector = new IdleDetector(this)
121+
//private[gopher] val idleDetector = new IdleDetector(this)
110122

111123
private[gopher] val continuatedProcessorRef: ActorRef = {
112124
val props = Props(classOf[ChannelProcessor], this)

src/main/scala/gopher/channels/FoldSelectorBuilder.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,9 @@ trait FoldSelectorBuilder[T] extends SelectorBuilder[T]
4343
withTimeout(t){ sk => Some(f(ec,sk.flowTermination,t) map Function.const(sk) ) }
4444

4545

46-
47-
4846
def idle(body:T): FoldSelectorBuilder[T] =
4947
macro SelectorBuilder.idleImpl[T,FoldSelectorBuilder[T]]
5048

51-
@inline
52-
def idleWithFlowTerminationAsync[A](f: (ExecutionContext, FlowTermination[T]) => Future[T] ): this.type =
53-
withIdle{ st => Some(f(ec,st.flowTermination) map Function.const(st)) }
5449

5550
}
5651

src/main/scala/gopher/channels/ForeverSelectorBuilder.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,6 @@ trait ForeverSelectorBuilder extends SelectorBuilder[Unit]
4949
def idle(body:Unit): ForeverSelectorBuilder =
5050
macro SelectorBuilder.idleImpl[Unit,ForeverSelectorBuilder]
5151

52-
@inline
53-
def idleWithFlowTerminationAsync(f: (ExecutionContext, FlowTermination[Unit]) => Future[Unit] ): ForeverSelectorBuilder =
54-
withIdle{ st => Some(f(ec,st.flowTermination) map Function.const(st)) }
5552

5653
/**
5754
* provide syntax for running select loop inside go (or async) block

src/main/scala/gopher/channels/IdleDetector.scala

Lines changed: 0 additions & 88 deletions
This file was deleted.

src/main/scala/gopher/channels/InputSelectorBuilder.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,15 @@ class InputSelectorBuilder[T](override val api: GopherAPI) extends SelectorBuild
4747
x=>proxy.awrite(x)
4848
} map Function.const(cw)) })
4949

50+
/*
5051
def idle(body: T): InputSelectorBuilder[T] =
5152
macro SelectorBuilder.idleImpl[T,InputSelectorBuilder[T]]
5253
5354
@inline
5455
def idleWithFlowTerminationAsync(f: (ExecutionContext, FlowTermination[T]) => Future[T] ): this.type =
5556
withIdle{ sk => Some(f(ec,sk.flowTermination) flatMap(x =>
5657
proxy.awrite(x)) map(Function.const(sk)) ) }
58+
*/
5759

5860
def timeout(t:FiniteDuration)(f: FiniteDuration => T): InputSelectorBuilder[T] =
5961
macro SelectorBuilder.timeoutImpl[T,InputSelectorBuilder[T]]

src/main/scala/gopher/channels/OnceSelectorBuilder.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,9 @@ trait OnceSelectorBuilder[T] extends SelectorBuilder[T@uncheckedVariance]
4646
withTimeout(t){ sk => Some(f(ec,sk.flowTermination,t).map(x => Done(x,sk.flowTermination)) ) }
4747

4848

49-
5049
def idle(body: T): OnceSelectorBuilder[T] =
5150
macro SelectorBuilder.idleImpl[T,OnceSelectorBuilder[T]]
5251

53-
@inline
54-
def idleWithFlowTerminationAsync(f: (ExecutionContext, FlowTermination[T]) => Future[T] ): this.type =
55-
withIdle{ sk => Some(f(ec,sk.flowTermination) map(x => Done(x,sk.flowTermination)) ) }
5652

5753
def foreach(f:Any=>T):T =
5854
macro SelectorBuilder.foreachImpl[T]

src/main/scala/gopher/channels/Selector.scala

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,6 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
2727
waiters add makeLocked(ContWrite(f,ch,this))
2828
}
2929

30-
def addIdleSkip(f: Skip[A] => Option[Future[Continuated[A]]]): Unit =
31-
{
32-
idleWaiters add makeLocked(Skip(f,this))
33-
}
34-
3530
def addTimeout(timeout:FiniteDuration, f: Skip[A] => Option[Future[Continuated[A]]]):Unit =
3631
{
3732
if (!timeoutRecord.isDefined) {
@@ -49,19 +44,9 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
4944
if (timeoutRecord.isDefined) {
5045
scheduleTimeout()
5146
}
52-
api.idleDetector put this
5347
future
5448
}
5549

56-
private[channels] def startIdles: Unit =
57-
{
58-
if (idleWaiters.isEmpty) {
59-
api.idleDetector.remove(this)
60-
} else {
61-
sendWaits(idleWaiters)
62-
}
63-
}
64-
6550
private[this] def makeLocked(block: Continuated[A]): Continuated[A] =
6651
{
6752
block match {
@@ -252,7 +237,6 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
252237
private[channels] val nOperations = new AtomicLong();
253238

254239
private[this] val waiters: ConcurrentLinkedQueue[Continuated[A]] = new ConcurrentLinkedQueue()
255-
private[this] val idleWaiters: ConcurrentLinkedQueue[Continuated[A]] = new ConcurrentLinkedQueue()
256240

257241
private[this] class TimeoutRecord(
258242
var lastNOperations: Long,
@@ -273,4 +257,3 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
273257

274258

275259

276-

src/main/scala/gopher/channels/SelectorBuilder.scala

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,13 @@ trait SelectorBuilder[A]
3030

3131
def onIdle(arg: SkipSelectorArgument[A]): this.type =
3232
{
33-
selector.addIdleSkip(arg.normalizedFun)
33+
withTimeout(api.idleTimeout)(arg.normalizedFun)
3434
this
3535
}
3636

37+
def onTimeout(t:FiniteDuration)(arg: SkipSelectorArgument[A]): this.type =
38+
withTimeout(t)(arg.normalizedFun)
39+
3740
@inline
3841
def withReader[B](ch:Input[B], f: ContRead[B,A] => Option[ContRead.In[B]=>Future[Continuated[A]]]): this.type =
3942
{
@@ -51,8 +54,7 @@ trait SelectorBuilder[A]
5154
@inline
5255
def withIdle(f: Skip[A] => Option[Future[Continuated[A]]]):this.type =
5356
{
54-
selector.addIdleSkip(f)
55-
this
57+
withTimeout(api.idleTimeout)(f)
5658
}
5759

5860
@inline
@@ -183,13 +185,7 @@ object SelectorBuilder
183185
def idleImpl[T:c.WeakTypeTag,S](c:Context)(body:c.Expr[T]):c.Expr[S] =
184186
{
185187
import c.universe._
186-
SelectorBuilder.buildAsyncCall[T,S](c)(Nil,body.tree,
187-
{ (nvaldefs, nbody) =>
188-
q"""${c.prefix}.idleWithFlowTerminationAsync(
189-
${Function(nvaldefs,nbody)}
190-
)
191-
"""
192-
})
188+
c.Expr[S](q"${c.prefix}.timeout(${c.prefix}.api.idleTimeout)(_ => ${body})")
193189
}
194190

195191
def timeoutImpl[T:c.WeakTypeTag,S](c:Context)(t:c.Expr[FiniteDuration])(f:c.Expr[FiniteDuration=>T]):c.Expr[S] =
@@ -455,7 +451,7 @@ object SelectorBuilder
455451
if (!caseDef.guard.isEmpty) {
456452
c.abort(caseDef.guard.pos,"guard is not supported in select case")
457453
}
458-
q"${builderName}.idle(${caseDef.body})"
454+
q"${builderName}.timeout(${builderName}.api.idleTimeout)( _ => ${caseDef.body})"
459455
}
460456

461457
def mapImpl[T:c.WeakTypeTag](c:Context)(f:c.Expr[Any=>T]):c.Expr[Input[T]] =

0 commit comments

Comments
 (0)