Skip to content

Commit 63ac590

Browse files
committed
implemented minimal infrastructure to timeout
1 parent 1fada2f commit 63ac590

File tree

1 file changed

+40
-25
lines changed

1 file changed

+40
-25
lines changed

src/main/scala/gopher/channels/Selector.scala

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import akka.pattern._
66
import scala.concurrent._
77
import scala.concurrent.duration._
88
import scala.language.postfixOps
9+
import scala.util._
910
import java.util.concurrent.atomic.AtomicBoolean
1011
import java.util.concurrent.atomic.AtomicLong
1112
import java.util.concurrent.ConcurrentLinkedQueue
@@ -33,9 +34,10 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
3334

3435
def addTimeoutSkip(f: Skip[A] => Option[Future[Continuated[A]]], timeout: FiniteDuration):Unit =
3536
{
36-
if (timeoutRecord.isEmpty) {
37-
val locked = makeLocked(Skip(f,this))
38-
timeoutRecord = Some(new TimeoutRecord(nOperations.get,timeout.toMillis,locked))
37+
if (!timeoutRecord.isDefined) {
38+
timeoutRecord.lastNOperations = nOperations.get
39+
timeoutRecord.timeout = timeout
40+
timeoutRecord.waiter = makeLocked(Skip(f,this))
3941
} else {
4042
throw new IllegalStateException("select must have only one timeout entry")
4143
}
@@ -123,6 +125,8 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
123125
private[this] def unlockAfter(f:Future[Continuated[A]], flowTermination: FlowTermination[A], dstr: String): Future[Continuated[A]] =
124126
f.transform(
125127
next => { if (mustUnlock(dstr,flowTermination)) {
128+
if (timeoutRecord.isDefined)
129+
scheduleTimeout()
126130
makeLocked(next)
127131
} else Never
128132
},
@@ -143,27 +147,39 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
143147
private[this] def scheduleTimeout():Unit =
144148
{
145149

146-
val scheduler = api.actorSystem.scheduler
147-
148150
def tickOperation():Unit =
149151
{
150-
if (!isCompleted) {
151-
for(tr <- timeoutRecord) {
152+
if (!isCompleted && timeoutRecord.isDefined) {
152153
val currentNOperations = nOperations.get()
153-
if (currentNOperations == tr.lastNOperations) {
154+
if (currentNOperations == timeoutRecord.lastNOperations) {
154155
// fire
155-
???
156-
} else {
157-
val now = System.currentTimeMillis()
158-
val nextTime = lastOperationEnd.get() + tr.timeoutMillis
159-
tr.lastNOperations = nOperations.get()
160-
scheduler.scheduleOnce((nextTime - now).millis)(tickOperation)
156+
timeoutRecord.waiter match {
157+
// TODO: add timeout field to skip
158+
case sk@Skip(f,ft) => f(sk) foreach { futureNext =>
159+
futureNext.onComplete {
160+
case Success(next) => if (!isCompleted) {
161+
next match {
162+
case sk@Skip(f,ft) if (ft eq this) => timeoutRecord.waiter = sk
163+
case other =>
164+
timeoutRecord.waiter = Never
165+
api.continuatedProcessorRef ! other
166+
}
167+
}
168+
case Failure(ex) => if (!isCompleted) ft.doThrow(ex)
169+
}
170+
}
171+
case other => api.continuatedProcessorRef ! other
172+
}
161173
}
162174
}
163-
}
164175
}
165176

166-
tickOperation()
177+
if (timeoutRecord.isDefined) {
178+
// TODO: make CAS
179+
timeoutRecord.lastNOperations = nOperations.get()
180+
val scheduler = api.actorSystem.scheduler
181+
scheduler.scheduleOnce(timeoutRecord.timeout)(tickOperation)
182+
}
167183

168184
}
169185

@@ -223,34 +239,33 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
223239
}
224240
}
225241

242+
private[this] val log = api.actorSystem.log
226243

227244
// false when unlocked, true otherwise.
228245
private[this] val lockFlag: AtomicBoolean = new AtomicBoolean(false)
229246

230-
// when last operation was started
231-
private[this] val lastOperationEnd: AtomicLong = new AtomicLong(0L)
232-
private[this] var haveTimeout: Boolean = false
233-
234247
// number of operations, increased during each lock/unlock.
235-
// used for idle detection.
248+
// used for idle and timeout detection
236249
private[channels] val nOperations = new AtomicLong();
237250

238251
private[this] val waiters: ConcurrentLinkedQueue[Continuated[A]] = new ConcurrentLinkedQueue()
239252
private[this] val idleWaiters: ConcurrentLinkedQueue[Continuated[A]] = new ConcurrentLinkedQueue()
240253

241254
private[this] class TimeoutRecord(
242255
var lastNOperations: Long,
243-
var timeoutMillis: Long,
256+
var timeout: FiniteDuration,
244257
var waiter: Continuated[A]
245-
)
246-
private[this] var timeoutRecord: Option[TimeoutRecord] = None
258+
) {
259+
def isDefined:Boolean = (waiter != Never)
260+
}
261+
262+
private[this] val timeoutRecord: TimeoutRecord = new TimeoutRecord(0L,0 milliseconds, Never)
247263

248264
private[this] val processor = api.continuatedProcessorRef
249265

250266
private[this] implicit val executionContext: ExecutionContext = api.executionContext
251267

252268

253-
254269
}
255270

256271

0 commit comments

Comments
 (0)