Skip to content

Commit daa1d9d

Browse files
committed
Various refactorings related to resource management and timeouts
* `Job.Registration` is renamed to `DisposableHandle` * `EmptyRegistration` is renamed to `NonDisposableHandle` * `Job.unregisterOnCompletion` is renamed to `Job.disposeOnCompletion` * `Delay.invokeOnTimeout` is introduced * `withTimeout` uses `Delay.invokeOnTimeout` when available
1 parent ab65be7 commit daa1d9d

File tree

12 files changed

+204
-100
lines changed

12 files changed

+204
-100
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ private open class DeferredCoroutine<T>(
155155

156156
@Suppress("UNCHECKED_CAST")
157157
private suspend fun awaitSuspend(): T = suspendCancellableCoroutine { cont ->
158-
cont.unregisterOnCompletion(invokeOnCompletion {
158+
cont.disposeOnCompletion(invokeOnCompletion {
159159
val state = this.state
160160
check(state !is Incomplete)
161161
if (state is CompletedExceptionally)
@@ -183,7 +183,7 @@ private open class DeferredCoroutine<T>(
183183
}
184184
if (startInternal(state) == 0) {
185185
// slow-path -- register waiter for completion
186-
select.unregisterOnCompletion(invokeOnCompletion(SelectAwaitOnCompletion(this, select, block)))
186+
select.disposeOnSelect(invokeOnCompletion(SelectAwaitOnCompletion(this, select, block)))
187187
return
188188
}
189189
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19+
import java.util.concurrent.Future
1920
import java.util.concurrent.TimeUnit
2021
import kotlin.coroutines.experimental.ContinuationInterceptor
2122

2223
/**
2324
* This dispatcher _feature_ is implemented by [CoroutineDispatcher] implementations that natively support
24-
* non-blocking [delay] function.
25+
* scheduled execution of tasks.
26+
*
27+
* Implementation of this interface affects operation of
28+
* [delay][kotlinx.coroutines.experimental.delay] and [withTimeout] functions.
2529
*/
2630
public interface Delay {
2731
/**
@@ -52,6 +56,16 @@ public interface Delay {
5256
* ```
5357
*/
5458
fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>)
59+
60+
/**
61+
* Schedules invocation of a specified [block] after a specified delay [time].
62+
* The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] of this invocation
63+
* request if it is not needed anymore.
64+
*
65+
* This implementation uses a built-in single-threaded scheduled executor service.
66+
*/
67+
fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
68+
DisposableFutureHandle(scheduledExecutor.schedule(block, time, unit))
5569
}
5670

5771
/**
@@ -60,18 +74,26 @@ public interface Delay {
6074
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
6175
* immediately resumes with [CancellationException].
6276
*
63-
* This function delegates to [Delay] implementation of the context [CoroutineDispatcher] if possible,
64-
* otherwise it resumes using a built-in single-threaded scheduled executor service.
77+
* This function delegates to [Delay.scheduleResumeAfterDelay] if the context [CoroutineDispatcher]
78+
* implements [Delay] interface, otherwise it resumes using a built-in single-threaded scheduled executor service.
6579
*/
6680
suspend fun delay(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) {
6781
require(time >= 0) { "Delay time $time cannot be negative" }
6882
if (time <= 0) return // don't delay
6983
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
70-
(cont.context[ContinuationInterceptor] as? Delay)?.apply {
71-
scheduleResumeAfterDelay(time, unit, cont)
72-
return@sc
73-
}
74-
val timeout = scheduledExecutor.schedule(ResumeRunnable(cont), time, unit)
75-
cont.cancelFutureOnCompletion(timeout)
84+
val delay = cont.context[ContinuationInterceptor] as? Delay
85+
if (delay != null)
86+
delay.scheduleResumeAfterDelay(time, unit, cont) else
87+
cont.cancelFutureOnCompletion(scheduledExecutor.schedule(ResumeRunnable(cont), time, unit))
88+
}
89+
}
90+
91+
/**
92+
* An implementation of [DisposableHandle] that cancels the specified future on dispose.
93+
*/
94+
public class DisposableFutureHandle(private val future: Future<*>) : DisposableHandle {
95+
override fun dispose() {
96+
future.cancel(false)
7697
}
98+
override fun toString(): String = "DisposableFutureHandle[$future]"
7799
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ import kotlin.coroutines.experimental.CoroutineContext
2828
public fun Executor.toCoroutineDispatcher(): CoroutineDispatcher =
2929
ExecutorCoroutineDispatcher(this)
3030

31-
internal open class ExecutorCoroutineDispatcher(val executor: Executor) : CoroutineDispatcher(), Delay {
31+
internal open class ExecutorCoroutineDispatcher(
32+
private val executor: Executor
33+
) : CoroutineDispatcher(), Delay {
3234
override fun dispatch(context: CoroutineContext, block: Runnable) = executor.execute(block)
3335

3436
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
@@ -37,20 +39,29 @@ internal open class ExecutorCoroutineDispatcher(val executor: Executor) : Corout
3739
scheduledExecutor.schedule(ResumeRunnable(continuation), time, unit)
3840
continuation.cancelFutureOnCompletion(timeout)
3941
}
42+
43+
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
44+
val timeout = if (executor is ScheduledExecutorService)
45+
executor.schedule(block, time, unit) else
46+
scheduledExecutor.schedule(block, time, unit)
47+
return DisposableFutureHandle(timeout)
48+
}
4049
}
4150

4251
// --- reusing these classes in other places ---
4352

4453
internal class ResumeUndispatchedRunnable(
45-
val dispatcher: CoroutineDispatcher,
46-
val continuation: CancellableContinuation<Unit>
54+
private val dispatcher: CoroutineDispatcher,
55+
private val continuation: CancellableContinuation<Unit>
4756
) : Runnable {
4857
override fun run() {
4958
with(continuation) { dispatcher.resumeUndispatched(Unit) }
5059
}
5160
}
5261

53-
internal class ResumeRunnable(val continuation: Continuation<Unit>) : Runnable {
62+
internal class ResumeRunnable(
63+
private val continuation: Continuation<Unit>
64+
) : Runnable {
5465
override fun run() {
5566
continuation.resume(Unit)
5667
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 75 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -102,18 +102,14 @@ public interface Job : CoroutineContext.Element {
102102
* Registers handler that is **synchronously** invoked on completion of this job.
103103
* When job is already complete, then the handler is immediately invoked
104104
* with a cancellation cause or `null`. Otherwise, handler will be invoked once when this
105-
* job is complete. Note, that [cancellation][cancel] is also a form of completion).
105+
* job is complete. Note, that [cancellation][cancel] is also a form of completion.
106106
*
107-
* The resulting [Registration] can be used to [Registration.unregister] if this
108-
* registration is no longer needed. There is no need to unregister after completion.
107+
* The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] the
108+
* registration of this handler and release its memory if its invocation is no longer needed.
109+
* There is no need to dispose the handler after completion of this job. The reference to
110+
* all the handlers are released when this job completes.
109111
*/
110-
public fun invokeOnCompletion(handler: CompletionHandler): Registration
111-
112-
/**
113-
* @suppress **Deprecated**: Renamed to `invokeOnCompletion`
114-
*/
115-
@Deprecated(message = "Renamed to `invokeOnCompletion`", replaceWith = ReplaceWith("invokeOnCompletion(handler)"))
116-
public fun onCompletion(handler: CompletionHandler): Registration
112+
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
117113

118114
/**
119115
* Suspends coroutine until this job is complete. This invocation resumes normally (without exception)
@@ -161,15 +157,40 @@ public interface Job : CoroutineContext.Element {
161157
/**
162158
* Registration object for [invokeOnCompletion]. It can be used to [unregister] if needed.
163159
* There is no need to unregister after completion.
160+
* @suppress **Deprecated**: Replace with `DisposableHandle`
164161
*/
162+
@Deprecated(message = "Replace with `DisposableHandle`",
163+
replaceWith = ReplaceWith("DisposableHandle"))
165164
public interface Registration {
166165
/**
167166
* Unregisters completion handler.
167+
* @suppress **Deprecated**: Replace with `dispose`
168168
*/
169+
@Deprecated(message = "Replace with `dispose`",
170+
replaceWith = ReplaceWith("dispose()"))
169171
public fun unregister()
170172
}
171173
}
172174

175+
/**
176+
* A handle to an allocated object that can be disposed to make it eligible for garbage collection.
177+
*/
178+
public interface DisposableHandle : Job.Registration {
179+
/**
180+
* Disposes the corresponding object, making it eligible for garbage collection.
181+
* Repeated invocation of this function has no effect.
182+
*/
183+
public fun dispose()
184+
185+
/**
186+
* Unregisters completion handler.
187+
* @suppress **Deprecated**: Replace with `dispose`
188+
*/
189+
@Deprecated(message = "Replace with `dispose`",
190+
replaceWith = ReplaceWith("dispose()"))
191+
public override fun unregister() = dispose()
192+
}
193+
173194
/**
174195
* Handler for [Job.invokeOnCompletion].
175196
*/
@@ -187,9 +208,23 @@ public typealias CancellationException = java.util.concurrent.CancellationExcept
187208
* ```
188209
* invokeOnCompletion { registration.unregister() }
189210
* ```
211+
* @suppress: **Deprecated**: Renamed to `disposeOnCompletion`.
190212
*/
191-
public fun Job.unregisterOnCompletion(registration: Job.Registration): Job.Registration =
192-
invokeOnCompletion(UnregisterOnCompletion(this, registration))
213+
@Deprecated(message = "Renamed to `disposeOnCompletion`",
214+
replaceWith = ReplaceWith("disposeOnCompletion(registration)"))
215+
public fun Job.unregisterOnCompletion(registration: DisposableHandle): DisposableHandle =
216+
invokeOnCompletion(DisposeOnCompletion(this, registration))
217+
218+
/**
219+
* Disposes a specified [handle] when this job is complete.
220+
*
221+
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
222+
* ```
223+
* invokeOnCompletion { handle.dispose() }
224+
* ```
225+
*/
226+
public fun Job.disposeOnCompletion(handle: DisposableHandle): DisposableHandle =
227+
invokeOnCompletion(DisposeOnCompletion(this, handle))
193228

194229
/**
195230
* Cancels a specified [future] when this job is complete.
@@ -199,7 +234,7 @@ public fun Job.unregisterOnCompletion(registration: Job.Registration): Job.Regis
199234
* invokeOnCompletion { future.cancel(false) }
200235
* ```
201236
*/
202-
public fun Job.cancelFutureOnCompletion(future: Future<*>): Job.Registration =
237+
public fun Job.cancelFutureOnCompletion(future: Future<*>): DisposableHandle =
203238
invokeOnCompletion(CancelFutureOnCompletion(this, future))
204239

205240
/**
@@ -212,12 +247,19 @@ public suspend fun Job.join() = this.join()
212247
/**
213248
* No-op implementation of [Job.Registration].
214249
*/
215-
public object EmptyRegistration : Job.Registration {
250+
@Deprecated(message = "Replace with `NonDisposableHandle`",
251+
replaceWith = ReplaceWith("NonDisposableHandle"))
252+
typealias EmptyRegistration = NonDisposableHandle
253+
254+
/**
255+
* No-op implementation of [DisposableHandle].
256+
*/
257+
public object NonDisposableHandle : DisposableHandle {
216258
/** Does not do anything. */
217-
override fun unregister() {}
259+
override fun dispose() {}
218260

219-
/** Returns "EmptyRegistration" string. */
220-
override fun toString(): String = "EmptyRegistration"
261+
/** Returns "NonDisposableHandle" string. */
262+
override fun toString(): String = "NonDisposableHandle"
221263
}
222264

223265
// --------------- utility classes to simplify job implementation
@@ -280,7 +322,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
280322
private var _state: Any? = if (active) EmptyActive else EmptyNew // shared objects while we have no listeners
281323

282324
@Volatile
283-
private var registration: Job.Registration? = null
325+
private var parentHandle: DisposableHandle? = null
284326

285327
protected companion object {
286328
@JvmStatic
@@ -298,16 +340,16 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
298340
* It shall be invoked at most once after construction after all other initialization.
299341
*/
300342
public fun initParentJob(parent: Job?) {
301-
check(registration == null)
343+
check(parentHandle == null)
302344
if (parent == null) {
303-
registration = EmptyRegistration
345+
parentHandle = NonDisposableHandle
304346
return
305347
}
306348
// directly pass HandlerNode to parent scope to optimize one closure object (see makeNode)
307349
val newRegistration = parent.invokeOnCompletion(ParentOnCompletion(parent, this))
308-
registration = newRegistration
350+
parentHandle = newRegistration
309351
// now check our state _after_ registering (see updateState order of actions)
310-
if (isCompleted) newRegistration.unregister()
352+
if (isCompleted) newRegistration.dispose()
311353
}
312354

313355
internal open fun onParentCompletion(cause: Throwable?) {
@@ -338,7 +380,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
338380
require(expect is Incomplete && update !is Incomplete) // only incomplete -> completed transition is allowed
339381
if (!STATE.compareAndSet(this, expect, update)) return false
340382
// Unregister from parent job
341-
registration?.unregister() // volatile read registration _after_ state was updated
383+
parentHandle?.dispose() // volatile read parentHandle _after_ state was updated
342384
return true // continues in completeUpdateState
343385
}
344386

@@ -515,9 +557,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
515557
}
516558
}
517559

518-
override fun onCompletion(handler: CompletionHandler): Job.Registration = invokeOnCompletion(handler)
519-
520-
final override fun invokeOnCompletion(handler: CompletionHandler): Job.Registration {
560+
final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle {
521561
var nodeCache: JobNode<*>? = null
522562
while (true) { // lock-free loop on state
523563
val state = this.state
@@ -545,7 +585,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
545585
}
546586
else -> { // is inactive
547587
handler((state as? CompletedExceptionally)?.exception)
548-
return EmptyRegistration
588+
return NonDisposableHandle
549589
}
550590
}
551591
}
@@ -560,7 +600,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
560600
}
561601

562602
private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
563-
cont.unregisterOnCompletion(invokeOnCompletion(ResumeOnCompletion(this, cont)))
603+
cont.disposeOnCompletion(invokeOnCompletion(ResumeOnCompletion(this, cont)))
564604
}
565605

566606
override fun <R> registerSelectJoin(select: SelectInstance<R>, block: suspend () -> R) {
@@ -576,7 +616,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
576616
}
577617
if (startInternal(state) == 0) {
578618
// slow-path -- register waiter for completion
579-
select.unregisterOnCompletion(invokeOnCompletion(SelectJoinOnCompletion(this, select, block)))
619+
select.disposeOnSelect(invokeOnCompletion(SelectJoinOnCompletion(this, select, block)))
580620
return
581621
}
582622
}
@@ -731,12 +771,12 @@ private class Empty(override val isActive: Boolean) : JobSupport.Incomplete {
731771

732772
internal abstract class JobNode<out J : Job>(
733773
@JvmField val job: J
734-
) : LockFreeLinkedListNode(), Job.Registration, CompletionHandler, JobSupport.Incomplete {
774+
) : LockFreeLinkedListNode(), DisposableHandle, CompletionHandler, JobSupport.Incomplete {
735775
final override val isActive: Boolean get() = true
736776
final override val idempotentStart: Any? get() = null
737777
// if unregister is called on this instance, then Job was an instance of JobSupport that added this node it itself
738778
// directly without wrapping
739-
final override fun unregister() = (job as JobSupport).removeNode(this)
779+
final override fun dispose() = (job as JobSupport).removeNode(this)
740780
override abstract fun invoke(reason: Throwable?)
741781
}
742782

@@ -756,12 +796,12 @@ private class ResumeOnCompletion(
756796
override fun toString() = "ResumeOnCompletion[$continuation]"
757797
}
758798

759-
internal class UnregisterOnCompletion(
799+
internal class DisposeOnCompletion(
760800
job: Job,
761-
@JvmField val registration: Job.Registration
801+
@JvmField val handle: DisposableHandle
762802
) : JobNode<Job>(job) {
763-
override fun invoke(reason: Throwable?) = registration.unregister()
764-
override fun toString(): String = "UnregisterOnCompletion[$registration]"
803+
override fun invoke(reason: Throwable?) = handle.dispose()
804+
override fun toString(): String = "DisposeOnCompletion[$handle]"
765805
}
766806

767807
private class ParentOnCompletion(

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,8 @@ object NonCancellable : AbstractCoroutineContextElement(Job), Job {
5757
/** Always throws [IllegalStateException]. */
5858
override fun getCompletionException(): CancellationException = throw IllegalStateException("This job is always active")
5959

60-
override fun onCompletion(handler: CompletionHandler): Job.Registration = invokeOnCompletion(handler)
61-
62-
/** Always returns [EmptyRegistration]. */
63-
override fun invokeOnCompletion(handler: CompletionHandler): Job.Registration = EmptyRegistration
60+
/** Always returns [NonDisposableHandle]. */
61+
override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle = NonDisposableHandle
6462

6563
/** Always returns `false`. */
6664
override fun cancel(cause: Throwable?): Boolean = false

0 commit comments

Comments
 (0)