Skip to content

Commit 3ef4fca

Browse files
committed
Fix publish & rxObservable builders for cancelling state of coroutine
onError/onComplete is signalled as soon as coroutine isCancelledOrCompleted JobSupport has onCancellation protected method to support that
1 parent 1a016bd commit 3ef4fca

File tree

4 files changed

+54
-30
lines changed
  • kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental
  • reactive
    • kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive
    • kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1
    • kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2

4 files changed

+54
-30
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public interface Job : CoroutineContext.Element {
130130
* The [cancellable][suspendCancellableCoroutine] suspending functions throw this exception
131131
* when trying to suspend in the context of this job.
132132
*/
133-
fun getCompletionException(): Throwable
133+
public fun getCompletionException(): Throwable
134134

135135
// ------------ state update ------------
136136

@@ -551,6 +551,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
551551
else -> check(expect is Empty)
552552
}
553553
// Do overridable processing after completion handlers
554+
if (expect !is Cancelling) onCancellation() // only notify when was not cancelling before
554555
afterCompletion(update, mode)
555556
}
556557

@@ -609,7 +610,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
609610
*/
610611
protected open fun onStart() {}
611612

612-
final override fun getCompletionException(): Throwable {
613+
public final override fun getCompletionException(): Throwable {
613614
val state = this.state
614615
return when (state) {
615616
is Cancelling -> state.cancelled.exception
@@ -619,10 +620,27 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
619620
}
620621
}
621622

622-
final override fun invokeOnCancellation(handler: CompletionHandler): DisposableHandle =
623+
/**
624+
* Returns the cause that signals the completion of this job -- it returns the original
625+
* [cancel] cause or **`null` if this job had completed
626+
* normally or was cancelled without a cause**. This function throws
627+
* [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
628+
* [isCancelled] yet.
629+
*/
630+
protected fun getCompletionCause(): Throwable? {
631+
val state = this.state
632+
return when (state) {
633+
is Cancelling -> state.cancelled.cause
634+
is Incomplete -> error("Job was not completed or cancelled yet")
635+
is CompletedExceptionally -> state.cause
636+
else -> null
637+
}
638+
}
639+
640+
public final override fun invokeOnCancellation(handler: CompletionHandler): DisposableHandle =
623641
installHandler(handler, onCancellation = hasCancellingState)
624642

625-
final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
643+
public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
626644
installHandler(handler, onCancellation = false)
627645

628646
private fun installHandler(handler: CompletionHandler, onCancellation: Boolean): DisposableHandle {
@@ -780,6 +798,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
780798
// try make it cancelling on the condition that we're still in this state
781799
if (STATE.compareAndSet(this, state, Cancelling(state, Cancelled(cause)))) {
782800
notifyCancellation(state, cause)
801+
onCancellation()
783802
return true
784803
}
785804
} else {
@@ -803,6 +822,11 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
803822
throw exception
804823
}
805824

825+
/**
826+
* It is invoked once when job is cancelled or is completed, similarly to [invokeOnCancellation].
827+
*/
828+
protected open fun onCancellation() {}
829+
806830
/**
807831
* Override for post-completion actions that need to do something with the state.
808832
* @param mode completion mode.

reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ private class PublisherCoroutine<T>(
113113
// assert: mutex.isLocked()
114114
private fun doLockedNext(elem: T) {
115115
// check if already closed for send
116-
if (isCompleted) {
116+
if (isCancelledOrCompleted) {
117117
doLockedSignalCompleted()
118118
throw sendException()
119119
}
@@ -147,8 +147,8 @@ private class PublisherCoroutine<T>(
147147
We have to recheck `isCompleted` after `unlock` anyway.
148148
*/
149149
mutex.unlock()
150-
// recheck isCompleted
151-
if (isCompleted && mutex.tryLock())
150+
// recheck isCancelledOrCompleted
151+
if (isCancelledOrCompleted && mutex.tryLock())
152152
doLockedSignalCompleted()
153153
}
154154

@@ -157,10 +157,10 @@ private class PublisherCoroutine<T>(
157157
try {
158158
if (nRequested >= CLOSED) {
159159
nRequested = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
160-
val state = this.state
160+
val cause = getCompletionCause()
161161
try {
162-
if (state is CompletedExceptionally && state.cause != null)
163-
subscriber.onError(state.cause)
162+
if (cause != null)
163+
subscriber.onError(cause)
164164
else
165165
subscriber.onComplete()
166166
} catch (e: Throwable) {
@@ -188,20 +188,20 @@ private class PublisherCoroutine<T>(
188188
// unlock the mutex when we don't have back-pressure anymore
189189
if (cur == 0L) {
190190
mutex.unlock()
191-
// recheck isCompleted
192-
if (isCompleted && mutex.tryLock())
191+
// recheck isCancelledOrCompleted
192+
if (isCancelledOrCompleted && mutex.tryLock())
193193
doLockedSignalCompleted()
194194
}
195195
return
196196
}
197197
}
198198
}
199199

200-
override fun afterCompletion(state: Any?, mode: Int) {
200+
override fun onCancellation() {
201201
while (true) { // lock-free loop for nRequested
202202
val cur = nRequested
203203
if (cur == SIGNALLED) return // some other thread holding lock already signalled completion
204-
check(cur >= 0) // no other thread could have marked it as CLOSED, because afterCompletion is invoked once
204+
check(cur >= 0) // no other thread could have marked it as CLOSED, because onCancellation is invoked once
205205
if (!N_REQUESTED.compareAndSet(this, cur, CLOSED)) continue // retry on failed CAS
206206
// Ok -- marked as CLOSED, now can unlock the mutex if it was locked due to backpressure
207207
if (cur == 0L) {

reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ private class RxObservableCoroutine<T>(
115115
// assert: mutex.isLocked()
116116
private fun doLockedNext(elem: T) {
117117
// check if already closed for send
118-
if (isCompleted) {
118+
if (isCancelledOrCompleted) {
119119
doLockedSignalCompleted()
120120
throw sendException()
121121
}
@@ -149,8 +149,8 @@ private class RxObservableCoroutine<T>(
149149
We have to recheck `isCompleted` after `unlock` anyway.
150150
*/
151151
mutex.unlock()
152-
// recheck isCompleted
153-
if (isCompleted && mutex.tryLock())
152+
// recheck isCancelledOrCompleted
153+
if (isCancelledOrCompleted && mutex.tryLock())
154154
doLockedSignalCompleted()
155155
}
156156

@@ -159,10 +159,10 @@ private class RxObservableCoroutine<T>(
159159
try {
160160
if (nRequested >= CLOSED) {
161161
nRequested = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
162-
val state = this.state
162+
val cause = getCompletionCause()
163163
try {
164-
if (state is CompletedExceptionally && state.cause != null)
165-
subscriber.onError(state.cause)
164+
if (cause != null)
165+
subscriber.onError(cause)
166166
else
167167
subscriber.onCompleted()
168168
} catch (e: Throwable) {
@@ -190,16 +190,16 @@ private class RxObservableCoroutine<T>(
190190
// unlock the mutex when we don't have back-pressure anymore
191191
if (cur == 0L) {
192192
mutex.unlock()
193-
// recheck isCompleted
194-
if (isCompleted && mutex.tryLock())
193+
// recheck isCancelledOrCompleted
194+
if (isCancelledOrCompleted && mutex.tryLock())
195195
doLockedSignalCompleted()
196196
}
197197
return
198198
}
199199
}
200200
}
201201

202-
override fun afterCompletion(state: Any?, mode: Int) {
202+
override fun onCancellation() {
203203
while (true) { // lock-free loop for nRequested
204204
val cur = nRequested
205205
if (cur == SIGNALLED) return // some other thread holding lock already signalled completion

reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ private class RxObservableCoroutine<T>(
113113
// assert: mutex.isLocked()
114114
private fun doLockedNext(elem: T) {
115115
// check if already closed for send
116-
if (isCompleted) {
116+
if (isCancelledOrCompleted) {
117117
doLockedSignalCompleted()
118118
throw sendException()
119119
}
@@ -136,8 +136,8 @@ private class RxObservableCoroutine<T>(
136136
We have to recheck `isCompleted` after `unlock` anyway.
137137
*/
138138
mutex.unlock()
139-
// recheck isCompleted
140-
if (isCompleted && mutex.tryLock())
139+
// recheck isCancelledOrCompleted
140+
if (isCancelledOrCompleted && mutex.tryLock())
141141
doLockedSignalCompleted()
142142
}
143143

@@ -146,10 +146,10 @@ private class RxObservableCoroutine<T>(
146146
try {
147147
if (signal >= CLOSED) {
148148
signal = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
149-
val state = this.state
149+
val cause = getCompletionCause()
150150
try {
151-
if (state is CompletedExceptionally && state.cause != null)
152-
subscriber.onError(state.cause)
151+
if (cause != null)
152+
subscriber.onError(cause)
153153
else
154154
subscriber.onComplete()
155155
} catch (e: Throwable) {
@@ -161,7 +161,7 @@ private class RxObservableCoroutine<T>(
161161
}
162162
}
163163

164-
override fun afterCompletion(state: Any?, mode: Int) {
164+
override fun onCancellation() {
165165
if (!SIGNAL.compareAndSet(this, OPEN, CLOSED)) return // abort, other thread invoked doLockedSignalCompleted
166166
if (mutex.tryLock()) // if we can acquire the lock
167167
doLockedSignalCompleted()

0 commit comments

Comments
 (0)