@@ -166,6 +166,7 @@ void drain() {
166
166
167
167
final AtomicLong requested = sharedProducer ;
168
168
final Subscriber <? super R > actualSubscriber = this .actual ;
169
+ final NotificationLite <R > nl = NotificationLite .instance ();
169
170
170
171
for (;;) {
171
172
@@ -200,13 +201,13 @@ void drain() {
200
201
long emittedAmount = 0L ;
201
202
boolean unbounded = requestedAmount == Long .MAX_VALUE ;
202
203
203
- Queue <R > innerQueue = innerSubscriber .queue ;
204
+ Queue <Object > innerQueue = innerSubscriber .queue ;
204
205
boolean innerDone = false ;
205
206
206
207
207
208
for (;;) {
208
209
outerDone = innerSubscriber .done ;
209
- R v = innerQueue .peek ();
210
+ Object v = innerQueue .peek ();
210
211
empty = v == null ;
211
212
212
213
if (outerDone ) {
@@ -237,7 +238,7 @@ void drain() {
237
238
innerQueue .poll ();
238
239
239
240
try {
240
- actualSubscriber .onNext (v );
241
+ actualSubscriber .onNext (nl . getValue ( v ) );
241
242
} catch (Throwable ex ) {
242
243
Exceptions .throwOrReport (ex , actualSubscriber , v );
243
244
return ;
@@ -271,27 +272,29 @@ void drain() {
271
272
272
273
static final class EagerInnerSubscriber <T > extends Subscriber <T > {
273
274
final EagerOuterSubscriber <?, T > parent ;
274
- final Queue <T > queue ;
275
+ final Queue <Object > queue ;
276
+ final NotificationLite <T > nl ;
275
277
276
278
volatile boolean done ;
277
279
Throwable error ;
278
280
279
281
public EagerInnerSubscriber (EagerOuterSubscriber <?, T > parent , int bufferSize ) {
280
282
super ();
281
283
this .parent = parent ;
282
- Queue <T > q ;
284
+ Queue <Object > q ;
283
285
if (UnsafeAccess .isUnsafeAvailable ()) {
284
- q = new SpscArrayQueue <T >(bufferSize );
286
+ q = new SpscArrayQueue <Object >(bufferSize );
285
287
} else {
286
- q = new SpscAtomicArrayQueue <T >(bufferSize );
288
+ q = new SpscAtomicArrayQueue <Object >(bufferSize );
287
289
}
288
290
this .queue = q ;
291
+ this .nl = NotificationLite .instance ();
289
292
request (bufferSize );
290
293
}
291
294
292
295
@ Override
293
296
public void onNext (T t ) {
294
- queue .offer (t );
297
+ queue .offer (nl . next ( t ) );
295
298
parent .drain ();
296
299
}
297
300
0 commit comments