@@ -59,6 +59,9 @@ private class PublisherCoroutine<in T>(
59
59
private val subscriber : Subscriber <T >
60
60
) : AbstractCoroutine<Unit>(parentContext, true ), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
61
61
override val channel: SendChannel <T > get() = this
62
+
63
+ // cancelsParent == true ensure that error is always reported to the parent, so that parent cannot complete
64
+ // without receiving reported error.
62
65
override val cancelsParent: Boolean get() = true
63
66
64
67
// Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
@@ -69,6 +72,8 @@ private class PublisherCoroutine<in T>(
69
72
@Volatile
70
73
private var cancelled = false // true when Subscription.cancel() is invoked
71
74
75
+ private var handleException = false // when handleJobException is invoked
76
+
72
77
override val isClosedForSend: Boolean get() = isCompleted
73
78
override val isFull: Boolean = mutex.isLocked
74
79
override fun close (cause : Throwable ? ): Boolean = cancel(cause)
@@ -105,68 +110,79 @@ private class PublisherCoroutine<in T>(
105
110
}
106
111
}
107
112
113
+ /*
114
+ This code is not trivial because of the two properties:
115
+ 1. It ensures conformance to the reactive specification that mandates that onXXX invocations should not
116
+ be concurrent. It uses Mutex to protect all onXXX invocation and ensure conformance even when multiple
117
+ coroutines are invoking `send` function.
118
+ 2. Normally, `onComplete/onError` notification is sent only when coroutine and all its children are complete.
119
+ However, nothing prevents `publish` coroutine from leaking reference to it send channel to some
120
+ globally-scoped coroutine that is invoking `send` outside of this context. Without extra precaution this may
121
+ lead to `onNext` that is concurrent with `onComplete/onError`, so that is why signalling for
122
+ `onComplete/onError` is also done under the same mutex.
123
+ */
124
+
108
125
// assert: mutex.isLocked()
109
126
private fun doLockedNext (elem : T ) {
110
- // check if already closed for send
127
+ // check if already closed for send, note, that isActive become false as soon as cancel() is invoked,
128
+ // because the job is cancelled, so this check also ensure conformance to the reactive specification's
129
+ // requirement that after cancellation requested we don't call onXXX
111
130
if (! isActive) {
112
- doLockedSignalCompleted ()
131
+ unlockAndCheckCompleted ()
113
132
throw getCancellationException()
114
133
}
115
134
// notify subscriber
116
- try {
117
- subscriber.onNext(elem)
118
- } catch (e: Throwable ) {
119
- try {
120
- if (! cancel(e))
121
- handleCoroutineException(context, e, this )
122
- } finally {
123
- doLockedSignalCompleted()
124
- }
125
- throw getCancellationException()
126
- }
135
+ subscriber.onNext(elem)
127
136
// now update nRequested
128
137
while (true ) { // lock-free loop on nRequested
129
138
val cur = _nRequested .value
130
139
if (cur < 0 ) break // closed from inside onNext => unlock
131
140
if (cur == Long .MAX_VALUE ) break // no back-pressure => unlock
132
141
val upd = cur - 1
133
142
if (_nRequested .compareAndSet(cur, upd)) {
134
- if (upd == 0L ) return // return to keep locked due to back-pressure
143
+ if (upd == 0L ) {
144
+ // return to keep locked due to back-pressure
145
+ return
146
+ }
135
147
break // unlock if upd > 0
136
148
}
137
149
}
138
- /*
139
- There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
140
- happen after this check and before `unlock` (see `onCancellation` that does not do anything
141
- if it fails to acquire the lock that we are still holding).
142
- We have to recheck `isActive` after `unlock` anyway.
143
- */
150
+ unlockAndCheckCompleted()
151
+ }
152
+
153
+ private fun unlockAndCheckCompleted () {
154
+ /*
155
+ There is no sense to check completion before doing `unlock`, because completion might
156
+ happen after this check and before `unlock` (see `signalCompleted` that does not do anything
157
+ if it fails to acquire the lock that we are still holding).
158
+ We have to recheck `isCompleted` after `unlock` anyway.
159
+ */
144
160
mutex.unlock()
145
- // recheck isActive
146
- if (! isActive && mutex.tryLock())
147
- doLockedSignalCompleted()
161
+ // check isCompleted and and try to regain lock to signal completion
162
+ if (isCompleted && mutex.tryLock()) doLockedSignalCompleted()
148
163
}
149
164
150
- // assert: mutex.isLocked()
165
+ // assert: mutex.isLocked() & isCompleted
151
166
private fun doLockedSignalCompleted () {
152
167
try {
153
168
if (_nRequested .value >= CLOSED ) {
154
169
_nRequested .value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
155
170
val cause = getCompletionCause()
156
171
// Specification requires that after cancellation requested we don't call onXXX
157
172
if (cancelled) {
158
- // but we cannot just ignore exception so we handle it
159
- if (cause != null ) handleCoroutineException(context, cause, this )
160
- return
161
- }
162
- try {
163
- if (cause != null && cause !is CancellationException )
164
- subscriber.onError(cause)
165
- else
166
- subscriber.onComplete()
167
- } catch (e: Throwable ) {
168
- handleCoroutineException(context, e, this )
169
- }
173
+ // If the parent had failed to handle our exception (handleJobException was invoked), then
174
+ // we must not loose this exception
175
+ if (handleException && cause != null ) handleExceptionViaHandler(parentContext, cause)
176
+ } else {
177
+ try {
178
+ if (cause != null && cause !is CancellationException )
179
+ subscriber.onError(cause)
180
+ else
181
+ subscriber.onComplete()
182
+ } catch (e: Throwable ) {
183
+ handleExceptionViaHandler(parentContext, e)
184
+ }
185
+ }
170
186
}
171
187
} finally {
172
188
mutex.unlock()
@@ -189,37 +205,48 @@ private class PublisherCoroutine<in T>(
189
205
if (_nRequested .compareAndSet(cur, upd)) {
190
206
// unlock the mutex when we don't have back-pressure anymore
191
207
if (cur == 0L ) {
192
- mutex.unlock()
193
- // recheck isActive
194
- if (! isActive && mutex.tryLock())
195
- doLockedSignalCompleted()
208
+ unlockAndCheckCompleted()
196
209
}
197
210
return
198
211
}
199
212
}
200
213
}
201
214
202
- override fun onCompletedExceptionally (exception : Throwable ) = onCompleted(Unit )
203
-
204
- override fun onCompleted (value : Unit ) {
215
+ // assert: isCompleted
216
+ private fun signalCompleted () {
205
217
while (true ) { // lock-free loop for nRequested
206
218
val cur = _nRequested .value
207
219
if (cur == SIGNALLED ) return // some other thread holding lock already signalled cancellation/completion
208
- check(cur >= 0 ) // no other thread could have marked it as CLOSED, because onCancellation is invoked once
220
+ check(cur >= 0 ) // no other thread could have marked it as CLOSED, because onCompleted[Exceptionally] is invoked once
209
221
if (! _nRequested .compareAndSet(cur, CLOSED )) continue // retry on failed CAS
210
222
// Ok -- marked as CLOSED, now can unlock the mutex if it was locked due to backpressure
211
223
if (cur == 0L ) {
212
224
doLockedSignalCompleted()
213
225
} else {
214
226
// otherwise mutex was either not locked or locked in concurrent onNext... try lock it to signal completion
215
- if (mutex.tryLock())
216
- doLockedSignalCompleted()
227
+ if (mutex.tryLock()) doLockedSignalCompleted()
217
228
// Note: if failed `tryLock`, then `doLockedNext` will signal after performing `unlock`
218
229
}
219
230
return // done anyway
220
231
}
221
232
}
222
233
234
+ // Note: It is invoked when parent fails to handle an exception and strictly before onCompleted[Exception]
235
+ // so here we just raise a flag (and it need NOT be volatile!) to handle this exception.
236
+ // This way we defer decision to handle this exception based on our ability to send this exception
237
+ // to the subscriber (see doLockedSignalCompleted)
238
+ override fun handleJobException (exception : Throwable ) {
239
+ handleException = true
240
+ }
241
+
242
+ override fun onCompletedExceptionally (exception : Throwable ) {
243
+ signalCompleted()
244
+ }
245
+
246
+ override fun onCompleted (value : Unit ) {
247
+ signalCompleted()
248
+ }
249
+
223
250
override fun cancel () {
224
251
// Specification requires that after cancellation publisher stops signalling
225
252
// This flag distinguishes subscription cancellation request from the job crash
0 commit comments