Skip to content

Commit 988e24a

Browse files
committed
slow progress in timeout implementation
1 parent df3eebb commit 988e24a

File tree

2 files changed

+51
-1
lines changed

2 files changed

+51
-1
lines changed

notes/0.99.8.markdown

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- added support for select.timeout construct

src/main/scala/gopher/channels/Selector.scala

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,16 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
3131
idleWaiters add makeLocked(Skip(f,this))
3232
}
3333

34+
def addTimeoutSkip(f: Skip[A] => Option[Future[Continuated[A]]], timeout: FiniteDuration):Unit =
35+
{
36+
if (timeoutRecord.isEmpty) {
37+
val locked = makeLocked(Skip(f,this))
38+
timeoutRecord = Some(new TimeoutRecord(nOperations.get,timeout.toMillis,locked))
39+
} else {
40+
throw new IllegalStateException("select must have only one timeout entry")
41+
}
42+
}
43+
3444
def run:Future[A] =
3545
{
3646
sendWaits()
@@ -130,6 +140,33 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
130140
}
131141

132142

143+
private[this] def scheduleTimeout():Unit =
144+
{
145+
146+
val scheduler = api.actorSystem.scheduler
147+
148+
def tickOperation():Unit =
149+
{
150+
if (!isCompleted) {
151+
for(tr <- timeoutRecord) {
152+
val currentNOperations = nOperations.get()
153+
if (currentNOperations == tr.lastNOperations) {
154+
// fire
155+
???
156+
} else {
157+
val now = System.currentTimeMillis()
158+
val nextTime = lastOperationEnd.get() + tr.timeoutMillis
159+
tr.lastNOperations = nOperations.get()
160+
scheduler.scheduleOnce((nextTime - now).millis)(tickOperation)
161+
}
162+
}
163+
}
164+
}
165+
166+
tickOperation()
167+
168+
}
169+
133170
def isLocked: Boolean = lockFlag.get();
134171

135172
private[this] def tryLock(): Boolean = lockFlag.compareAndSet(false,true)
@@ -156,6 +193,8 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
156193
} else true
157194
}
158195

196+
197+
159198
private[this] def sendWaits(waiters: ConcurrentLinkedQueue[Continuated[A]] = waiters): Unit =
160199
{
161200
// concurrent structure fpr priority queue
@@ -184,10 +223,13 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
184223
}
185224
}
186225

187-
188226

189227
// false when unlocked, true otherwise.
190228
private[this] val lockFlag: AtomicBoolean = new AtomicBoolean(false)
229+
230+
// when last operation was started
231+
private[this] val lastOperationEnd: AtomicLong = new AtomicLong(0L)
232+
private[this] var haveTimeout: Boolean = false
191233

192234
// number of operations, increased during each lock/unlock.
193235
// used for idle detection.
@@ -196,6 +238,13 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
196238
private[this] val waiters: ConcurrentLinkedQueue[Continuated[A]] = new ConcurrentLinkedQueue()
197239
private[this] val idleWaiters: ConcurrentLinkedQueue[Continuated[A]] = new ConcurrentLinkedQueue()
198240

241+
private[this] class TimeoutRecord(
242+
var lastNOperations: Long,
243+
var timeoutMillis: Long,
244+
var waiter: Continuated[A]
245+
)
246+
private[this] var timeoutRecord: Option[TimeoutRecord] = None
247+
199248
private[this] val processor = api.continuatedProcessorRef
200249

201250
private[this] implicit val executionContext: ExecutionContext = api.executionContext

0 commit comments

Comments
 (0)