@@ -98,7 +98,7 @@ public void onStart() {
98
98
@ Override
99
99
public void onNext (Observable <? extends T > t ) {
100
100
if (t instanceof ScalarSynchronousObservable ) {
101
- handleScalarSynchronousObservable (t );
101
+ handleScalarSynchronousObservable (( ScalarSynchronousObservable ) t );
102
102
} else {
103
103
if (t == null || isUnsubscribed ()) {
104
104
return ;
@@ -128,7 +128,7 @@ private void handleNewSource(Observable<? extends T> t) {
128
128
request (1 );
129
129
}
130
130
131
- private void handleScalarSynchronousObservable (Observable <? extends T > t ) {
131
+ private void handleScalarSynchronousObservable (ScalarSynchronousObservable <? extends T > t ) {
132
132
// fast-path for scalar, synchronous values such as Observable.from(int)
133
133
/**
134
134
* Without this optimization:
@@ -154,8 +154,8 @@ private void handleScalarSynchronousObservable(Observable<? extends T> t) {
154
154
}
155
155
}
156
156
157
- private void handleScalarSynchronousObservableWithoutRequestLimits (Observable <? extends T > t ) {
158
- T value = (( ScalarSynchronousObservable < T >) t ) .get ();
157
+ private void handleScalarSynchronousObservableWithoutRequestLimits (ScalarSynchronousObservable <? extends T > t ) {
158
+ T value = t .get ();
159
159
if (getEmitLock ()) {
160
160
try {
161
161
actual .onNext (value );
@@ -177,14 +177,14 @@ private void handleScalarSynchronousObservableWithoutRequestLimits(Observable<?
177
177
}
178
178
}
179
179
180
- private void handleScalarSynchronousObservableWithRequestLimits (Observable <? extends T > t ) {
180
+ private void handleScalarSynchronousObservableWithRequestLimits (ScalarSynchronousObservable <? extends T > t ) {
181
181
if (getEmitLock ()) {
182
182
boolean emitted = false ;
183
183
try {
184
184
long r = mergeProducer .requested ;
185
185
if (r > 0 ) {
186
186
emitted = true ;
187
- actual .onNext ((( ScalarSynchronousObservable < T >) t ) .get ());
187
+ actual .onNext (t .get ());
188
188
mergeProducer .REQUESTED .decrementAndGet (mergeProducer );
189
189
// we handle this Observable without ever incrementing the wip or touching other machinery so just return here
190
190
return ;
@@ -203,7 +203,7 @@ private void handleScalarSynchronousObservableWithRequestLimits(Observable<? ext
203
203
// enqueue the values for later delivery
204
204
initScalarValueQueueIfNeeded ();
205
205
try {
206
- scalarValueQueue .onNext ((( ScalarSynchronousObservable < T >) t ) .get ());
206
+ scalarValueQueue .onNext (t .get ());
207
207
} catch (MissingBackpressureException e ) {
208
208
onError (e );
209
209
}
0 commit comments