@@ -56,7 +56,10 @@ final class OutputStreamPublisher<T> implements Flow.Publisher<T> {
56
56
private final int chunkSize ;
57
57
58
58
59
- private OutputStreamPublisher (OutputStreamHandler outputStreamHandler , ByteMapper <T > byteMapper , Executor executor , int chunkSize ) {
59
+ private OutputStreamPublisher (
60
+ OutputStreamHandler outputStreamHandler , ByteMapper <T > byteMapper ,
61
+ Executor executor , int chunkSize ) {
62
+
60
63
this .outputStreamHandler = outputStreamHandler ;
61
64
this .byteMapper = byteMapper ;
62
65
this .executor = executor ;
@@ -154,8 +157,9 @@ public void subscribe(Flow.Subscriber<? super T> subscriber) {
154
157
// for Reactive Streams compliance.
155
158
Objects .requireNonNull (subscriber , "Subscriber must not be null" );
156
159
157
- OutputStreamSubscription <T > subscription = new OutputStreamSubscription <>(subscriber , this .outputStreamHandler ,
158
- this .byteMapper , this .chunkSize );
160
+ OutputStreamSubscription <T > subscription = new OutputStreamSubscription <>(
161
+ subscriber , this .outputStreamHandler , this .byteMapper , this .chunkSize );
162
+
159
163
subscriber .onSubscribe (subscription );
160
164
this .executor .execute (subscription ::invokeHandler );
161
165
}
@@ -218,7 +222,6 @@ private static final class OutputStreamSubscription<T> extends OutputStream impl
218
222
219
223
static final Object READY = new Object ();
220
224
221
-
222
225
private final Flow .Subscriber <? super T > actual ;
223
226
224
227
private final OutputStreamHandler outputStreamHandler ;
@@ -236,12 +239,13 @@ private static final class OutputStreamSubscription<T> extends OutputStream impl
236
239
237
240
private long produced ;
238
241
239
-
240
- public OutputStreamSubscription ( Flow .Subscriber <? super T > actual , OutputStreamHandler outputStreamHandler ,
242
+ OutputStreamSubscription (
243
+ Flow .Subscriber <? super T > actual , OutputStreamHandler outputStreamHandler ,
241
244
ByteMapper <T > byteMapper , int chunkSize ) {
245
+
242
246
this .actual = actual ;
243
- this .byteMapper = byteMapper ;
244
247
this .outputStreamHandler = outputStreamHandler ;
248
+ this .byteMapper = byteMapper ;
245
249
this .chunkSize = chunkSize ;
246
250
}
247
251
@@ -315,13 +319,14 @@ private void invokeHandler() {
315
319
if (isCancelled (previousState )) {
316
320
return ;
317
321
}
318
-
319
322
if (isTerminated (previousState )) {
320
323
// failure due to illegal requestN
321
- this .actual .onError (this .error );
322
- return ;
324
+ Throwable error = this .error ;
325
+ if (error != null ) {
326
+ this .actual .onError (error );
327
+ return ;
328
+ }
323
329
}
324
-
325
330
this .actual .onError (ex );
326
331
return ;
327
332
}
@@ -330,13 +335,14 @@ private void invokeHandler() {
330
335
if (isCancelled (previousState )) {
331
336
return ;
332
337
}
333
-
334
338
if (isTerminated (previousState )) {
335
339
// failure due to illegal requestN
336
- this .actual .onError (this .error );
337
- return ;
340
+ Throwable error = this .error ;
341
+ if (error != null ) {
342
+ this .actual .onError (error );
343
+ return ;
344
+ }
338
345
}
339
-
340
346
this .actual .onComplete ();
341
347
}
342
348
@@ -346,16 +352,13 @@ public void request(long n) {
346
352
if (n <= 0 ) {
347
353
this .error = new IllegalArgumentException ("request should be a positive number" );
348
354
long previousState = tryTerminate ();
349
-
350
355
if (isTerminated (previousState ) || isCancelled (previousState )) {
351
356
return ;
352
357
}
353
-
354
358
if (previousState > 0 ) {
355
359
// error should eventually be observed and propagated
356
360
return ;
357
361
}
358
-
359
362
// resume parked thread, so it can observe error and propagate it
360
363
resume ();
361
364
return ;
@@ -413,11 +416,9 @@ private void resume() {
413
416
private long tryCancel () {
414
417
while (true ) {
415
418
long r = this .requested .get ();
416
-
417
419
if (isCancelled (r )) {
418
420
return r ;
419
421
}
420
-
421
422
if (this .requested .compareAndSet (r , Long .MIN_VALUE )) {
422
423
return r ;
423
424
}
@@ -427,11 +428,9 @@ private long tryCancel() {
427
428
private long tryTerminate () {
428
429
while (true ) {
429
430
long r = this .requested .get ();
430
-
431
431
if (isCancelled (r ) || isTerminated (r )) {
432
432
return r ;
433
433
}
434
-
435
434
if (this .requested .compareAndSet (r , Long .MIN_VALUE | Long .MAX_VALUE )) {
436
435
return r ;
437
436
}
@@ -486,4 +485,5 @@ private static long addCap(long a, long b) {
486
485
return res ;
487
486
}
488
487
}
488
+
489
489
}
0 commit comments