1111import java .util .TreeMap ;
1212
1313import io .reactivex .Completable ;
14- import io .reactivex .CompletableEmitter ;
15- import io .reactivex .CompletableOnSubscribe ;
14+ import io .reactivex .CompletableObserver ;
15+ import io .reactivex .CompletableSource ;
1616import okhttp3 .Headers ;
1717import okhttp3 .OkHttpClient ;
1818import okhttp3 .Request ;
@@ -30,10 +30,10 @@ class OkHttpConnectionProvider extends AbstractConnectionProvider {
3030 private final String tag = OkHttpConnectionProvider .class .getSimpleName ();
3131
3232 @ Nullable
33- private WebSocket openedSocked ;
33+ private WebSocket openSocket ;
3434
3535 @ Nullable
36- private CompletableWebSocketListener currentListener ;
36+ private CompletableWebSocketListener socketListener ;
3737
3838 OkHttpConnectionProvider (String uri , @ Nullable Map <String , String > connectHttpHeaders , OkHttpClient okHttpClient ) {
3939 super ();
@@ -44,10 +44,10 @@ class OkHttpConnectionProvider extends AbstractConnectionProvider {
4444
4545 @ Override
4646 public void rawDisconnect () {
47- if (openedSocked != null ) {
48- openedSocked .close (1000 , "" );
49- if (currentListener != null )
50- currentListener .awaitCloseBlocking ();
47+ if (openSocket != null ) {
48+ openSocket .close (1000 , "" );
49+ if (socketListener != null )
50+ socketListener .awaitCloseBlocking ();
5151 }
5252 }
5353
@@ -58,21 +58,21 @@ void createWebSocketConnection() {
5858
5959 addConnectionHeadersToBuilder (requestBuilder , mConnectHttpHeaders );
6060
61- currentListener = new CompletableWebSocketListener ();
61+ socketListener = new CompletableWebSocketListener ();
6262
63- openedSocked = mOkHttpClient
64- .newWebSocket (requestBuilder .build (), currentListener );
63+ openSocket = mOkHttpClient
64+ .newWebSocket (requestBuilder .build (), socketListener );
6565 }
6666
6767 @ Override
6868 void rawSend (String stompMessage ) {
69- openedSocked .send (stompMessage );
69+ openSocket .send (stompMessage );
7070 }
7171
7272 @ Nullable
7373 @ Override
7474 Object getSocket () {
75- return openedSocked ;
75+ return openSocket ;
7676 }
7777
7878 @ NonNull
@@ -91,9 +91,10 @@ private void addConnectionHeadersToBuilder(@NonNull Request.Builder requestBuild
9191 }
9292 }
9393
94- private class CompletableWebSocketListener extends WebSocketListener implements CompletableOnSubscribe {
95- private List <CompletableEmitter > mEmitters = new ArrayList <>();
96- private Completable mCompletable = Completable .create (this );
94+ // Class for a WS Listener that completes after the socket closes
95+ private class CompletableWebSocketListener extends WebSocketListener implements CompletableSource {
96+ private List <CompletableObserver > mObservers = new ArrayList <>();
97+ private Completable mCompletable = Completable .wrap (this );
9798
9899 @ Override
99100 public void onOpen (WebSocket webSocket , @ NonNull Response response ) {
@@ -120,7 +121,7 @@ public void onMessage(WebSocket webSocket, @NonNull ByteString bytes) {
120121
121122 @ Override
122123 public void onClosed (WebSocket webSocket , int code , String reason ) {
123- openedSocked = null ;
124+ openSocket = null ;
124125 emitLifecycleEvent (new LifecycleEvent (LifecycleEvent .Type .CLOSED ));
125126 launchCloseEvent ();
126127 }
@@ -129,7 +130,7 @@ public void onClosed(WebSocket webSocket, int code, String reason) {
129130 public void onFailure (WebSocket webSocket , Throwable t , Response response ) {
130131 // in OkHttp, a Failure is equivalent to a JWS-Error *and* a JWS-Close
131132 emitLifecycleEvent (new LifecycleEvent (LifecycleEvent .Type .ERROR , new Exception (t )));
132- openedSocked = null ;
133+ openSocket = null ;
133134 emitLifecycleEvent (new LifecycleEvent (LifecycleEvent .Type .CLOSED ));
134135 launchErrorEvent (t );
135136 }
@@ -140,25 +141,23 @@ public void onClosing(final WebSocket webSocket, final int code, final String re
140141 }
141142
142143 @ Override
143- public void subscribe ( CompletableEmitter emitter ) {
144- mEmitters .add (emitter );
144+ public void subscribe ( CompletableObserver observer ) {
145+ mObservers .add (observer );
145146 }
146147
147148 private void awaitCloseBlocking (){
148149 mCompletable .blockingAwait ();
149150 }
150151
151152 private void launchCloseEvent (){
152- for (CompletableEmitter emitter : mEmitters ) {
153- if (!emitter .isDisposed ())
154- emitter .onComplete ();
153+ for (CompletableObserver observer : mObservers ) {
154+ observer .onComplete ();
155155 }
156156 }
157157
158158 private void launchErrorEvent (Throwable t ){
159- for (CompletableEmitter emitter : mEmitters ) {
160- if (!emitter .isDisposed ())
161- emitter .onError (t );
159+ for (CompletableObserver observer : mObservers ) {
160+ observer .onError (t );
162161 }
163162 }
164163 }
0 commit comments