1414package io .reactivex .internal .operators .single ;
1515
1616import java .util .concurrent .*;
17- import java .util .concurrent .atomic .AtomicBoolean ;
17+ import java .util .concurrent .atomic .AtomicReference ;
1818
1919import io .reactivex .*;
20- import io .reactivex .disposables .*;
20+ import io .reactivex .disposables .Disposable ;
21+ import io .reactivex .internal .disposables .DisposableHelper ;
22+ import io .reactivex .plugins .RxJavaPlugins ;
2123
2224public final class SingleTimeout <T > extends Single <T > {
2325
@@ -43,97 +45,118 @@ public SingleTimeout(SingleSource<T> source, long timeout, TimeUnit unit, Schedu
4345 @ Override
4446 protected void subscribeActual (final SingleObserver <? super T > s ) {
4547
46- final CompositeDisposable set = new CompositeDisposable ( );
47- s .onSubscribe (set );
48+ TimeoutMainObserver < T > parent = new TimeoutMainObserver < T >( s , other );
49+ s .onSubscribe (parent );
4850
49- final AtomicBoolean once = new AtomicBoolean ( );
51+ DisposableHelper . replace ( parent . task , scheduler . scheduleDirect ( parent , timeout , unit ) );
5052
51- Disposable timer = scheduler .scheduleDirect (new TimeoutDispose (once , set , s ), timeout , unit );
53+ source .subscribe (parent );
54+ }
5255
53- set .add (timer );
56+ static final class TimeoutMainObserver <T > extends AtomicReference <Disposable >
57+ implements SingleObserver <T >, Runnable , Disposable {
5458
55- source . subscribe ( new TimeoutObserver ( once , set , s )) ;
59+ private static final long serialVersionUID = 37497744973048446L ;
5660
57- }
61+ final SingleObserver <? super T > actual ;
5862
59- final class TimeoutDispose implements Runnable {
60- private final AtomicBoolean once ;
61- final CompositeDisposable set ;
62- final SingleObserver <? super T > s ;
63+ final AtomicReference <Disposable > task ;
6364
64- TimeoutDispose (AtomicBoolean once , CompositeDisposable set , SingleObserver <? super T > s ) {
65- this .once = once ;
66- this .set = set ;
67- this .s = s ;
68- }
65+ final TimeoutFallbackObserver <T > fallback ;
6966
70- @ Override
71- public void run () {
72- if (once .compareAndSet (false , true )) {
73- if (other != null ) {
74- set .clear ();
75- other .subscribe (new TimeoutObserver ());
76- } else {
77- set .dispose ();
78- s .onError (new TimeoutException ());
79- }
80- }
81- }
67+ SingleSource <? extends T > other ;
8268
83- final class TimeoutObserver implements SingleObserver <T > {
69+ static final class TimeoutFallbackObserver <T > extends AtomicReference <Disposable >
70+ implements SingleObserver <T > {
8471
85- @ Override
86- public void onError (Throwable e ) {
87- set .dispose ();
88- s .onError (e );
72+ private static final long serialVersionUID = 2071387740092105509L ;
73+ final SingleObserver <? super T > actual ;
74+
75+ TimeoutFallbackObserver (SingleObserver <? super T > actual ) {
76+ this .actual = actual ;
8977 }
9078
9179 @ Override
9280 public void onSubscribe (Disposable d ) {
93- set . add ( d );
81+ DisposableHelper . setOnce ( this , d );
9482 }
9583
9684 @ Override
97- public void onSuccess (T value ) {
98- set .dispose ();
99- s .onSuccess (value );
85+ public void onSuccess (T t ) {
86+ actual .onSuccess (t );
10087 }
10188
89+ @ Override
90+ public void onError (Throwable e ) {
91+ actual .onError (e );
92+ }
10293 }
103- }
10494
105- final class TimeoutObserver implements SingleObserver <T > {
95+ TimeoutMainObserver (SingleObserver <? super T > actual , SingleSource <? extends T > other ) {
96+ this .actual = actual ;
97+ this .other = other ;
98+ this .task = new AtomicReference <Disposable >();
99+ if (other != null ) {
100+ this .fallback = new TimeoutFallbackObserver <T >(actual );
101+ } else {
102+ this .fallback = null ;
103+ }
104+ }
106105
107- private final AtomicBoolean once ;
108- private final CompositeDisposable set ;
109- private final SingleObserver <? super T > s ;
106+ @ Override
107+ public void run () {
108+ Disposable d = get ();
109+ if (d != DisposableHelper .DISPOSED && compareAndSet (d , DisposableHelper .DISPOSED )) {
110+ if (d != null ) {
111+ d .dispose ();
112+ }
113+ SingleSource <? extends T > other = this .other ;
114+ if (other == null ) {
115+ actual .onError (new TimeoutException ());
116+ } else {
117+ this .other = null ;
118+ other .subscribe (fallback );
119+ }
120+ }
121+ }
110122
111- TimeoutObserver (AtomicBoolean once , CompositeDisposable set , SingleObserver <? super T > s ) {
112- this .once = once ;
113- this .set = set ;
114- this .s = s ;
123+ @ Override
124+ public void onSubscribe (Disposable d ) {
125+ DisposableHelper .setOnce (this , d );
115126 }
116127
117128 @ Override
118- public void onError (Throwable e ) {
119- if (once .compareAndSet (false , true )) {
120- set .dispose ();
121- s .onError (e );
129+ public void onSuccess (T t ) {
130+ Disposable d = get ();
131+ if (d != DisposableHelper .DISPOSED && compareAndSet (d , DisposableHelper .DISPOSED )) {
132+ DisposableHelper .dispose (task );
133+ actual .onSuccess (t );
122134 }
123135 }
124136
125137 @ Override
126- public void onSubscribe (Disposable d ) {
127- set .add (d );
138+ public void onError (Throwable e ) {
139+ Disposable d = get ();
140+ if (d != DisposableHelper .DISPOSED && compareAndSet (d , DisposableHelper .DISPOSED )) {
141+ DisposableHelper .dispose (task );
142+ actual .onError (e );
143+ } else {
144+ RxJavaPlugins .onError (e );
145+ }
128146 }
129147
130148 @ Override
131- public void onSuccess (T value ) {
132- if (once .compareAndSet (false , true )) {
133- set .dispose ();
134- s .onSuccess (value );
149+ public void dispose () {
150+ DisposableHelper .dispose (this );
151+ DisposableHelper .dispose (task );
152+ if (fallback != null ) {
153+ DisposableHelper .dispose (fallback );
135154 }
136155 }
137156
157+ @ Override
158+ public boolean isDisposed () {
159+ return DisposableHelper .isDisposed (get ());
160+ }
138161 }
139162}
0 commit comments