44import android .support .annotation .Nullable ;
55import android .util .Log ;
66
7+ import java .util .ArrayList ;
78import java .util .HashMap ;
9+ import java .util .List ;
810import java .util .Map ;
911import java .util .TreeMap ;
1012
13+ import io .reactivex .Completable ;
14+ import io .reactivex .CompletableEmitter ;
15+ import io .reactivex .CompletableOnSubscribe ;
1116import okhttp3 .Headers ;
1217import okhttp3 .OkHttpClient ;
1318import okhttp3 .Request ;
@@ -27,18 +32,22 @@ class OkHttpConnectionProvider extends AbstractConnectionProvider {
2732 @ Nullable
2833 private WebSocket openedSocked ;
2934
35+ @ Nullable
36+ private CompletableWebSocketListener currentListener ;
37+
3038 OkHttpConnectionProvider (String uri , @ Nullable Map <String , String > connectHttpHeaders , OkHttpClient okHttpClient ) {
3139 super ();
3240 mUri = uri ;
3341 mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap <>();
3442 mOkHttpClient = okHttpClient ;
3543 }
3644
37- @ NonNull
3845 @ Override
3946 public void rawDisconnect () {
4047 if (openedSocked != null ) {
4148 openedSocked .close (1000 , "" );
49+ if (currentListener != null )
50+ currentListener .getCompletable ().blockingAwait ();
4251 }
4352 }
4453
@@ -49,52 +58,10 @@ void createWebSocketConnection() {
4958
5059 addConnectionHeadersToBuilder (requestBuilder , mConnectHttpHeaders );
5160
52- openedSocked = mOkHttpClient .newWebSocket (requestBuilder .build (),
53- new WebSocketListener () {
54- @ Override
55- public void onOpen (WebSocket webSocket , @ NonNull Response response ) {
56- LifecycleEvent openEvent = new LifecycleEvent (LifecycleEvent .Type .OPENED );
57-
58- TreeMap <String , String > headersAsMap = headersAsMap (response );
59-
60- openEvent .setHandshakeResponseHeaders (headersAsMap );
61- emitLifecycleEvent (openEvent );
62- }
63-
64- @ Override
65- public void onMessage (WebSocket webSocket , String text ) {
66- if (text .equals ("\n " ))
67- Log .d (tag , "RECEIVED HEARTBEAT" );
68- else
69- emitMessage (text );
70- }
71-
72- @ Override
73- public void onMessage (WebSocket webSocket , @ NonNull ByteString bytes ) {
74- emitMessage (bytes .utf8 ());
75- }
76-
77- @ Override
78- public void onClosed (WebSocket webSocket , int code , String reason ) {
79- openedSocked = null ;
80- emitLifecycleEvent (new LifecycleEvent (LifecycleEvent .Type .CLOSED ));
81- }
82-
83- @ Override
84- public void onFailure (WebSocket webSocket , Throwable t , Response response ) {
85- // in OkHttp, a Failure is equivalent to a JWS-Error *and* a JWS-Close
86- emitLifecycleEvent (new LifecycleEvent (LifecycleEvent .Type .ERROR , new Exception (t )));
87- openedSocked = null ;
88- emitLifecycleEvent (new LifecycleEvent (LifecycleEvent .Type .CLOSED ));
89- }
90-
91- @ Override
92- public void onClosing (final WebSocket webSocket , final int code , final String reason ) {
93- webSocket .close (code , reason );
94- }
95- }
96-
97- );
61+ currentListener = new CompletableWebSocketListener ();
62+
63+ openedSocked = mOkHttpClient
64+ .newWebSocket (requestBuilder .build (), currentListener );
9865 }
9966
10067 @ Override
@@ -123,4 +90,76 @@ private void addConnectionHeadersToBuilder(@NonNull Request.Builder requestBuild
12390 requestBuilder .addHeader (headerEntry .getKey (), headerEntry .getValue ());
12491 }
12592 }
93+
94+ private class CompletableWebSocketListener extends WebSocketListener implements CompletableOnSubscribe {
95+ private List <CompletableEmitter > mEmitters = new ArrayList <>();
96+ private Completable mCompletable = Completable .create (this );
97+
98+ @ Override
99+ public void onOpen (WebSocket webSocket , @ NonNull Response response ) {
100+ LifecycleEvent openEvent = new LifecycleEvent (LifecycleEvent .Type .OPENED );
101+
102+ TreeMap <String , String > headersAsMap = headersAsMap (response );
103+
104+ openEvent .setHandshakeResponseHeaders (headersAsMap );
105+ emitLifecycleEvent (openEvent );
106+ }
107+
108+ @ Override
109+ public void onMessage (WebSocket webSocket , String text ) {
110+ if (text .equals ("\n " ))
111+ Log .d (tag , "RECEIVED HEARTBEAT" );
112+ else
113+ emitMessage (text );
114+ }
115+
116+ @ Override
117+ public void onMessage (WebSocket webSocket , @ NonNull ByteString bytes ) {
118+ emitMessage (bytes .utf8 ());
119+ }
120+
121+ @ Override
122+ public void onClosed (WebSocket webSocket , int code , String reason ) {
123+ openedSocked = null ;
124+ emitLifecycleEvent (new LifecycleEvent (LifecycleEvent .Type .CLOSED ));
125+ launchCloseEvent ();
126+ }
127+
128+ @ Override
129+ public void onFailure (WebSocket webSocket , Throwable t , Response response ) {
130+ // in OkHttp, a Failure is equivalent to a JWS-Error *and* a JWS-Close
131+ emitLifecycleEvent (new LifecycleEvent (LifecycleEvent .Type .ERROR , new Exception (t )));
132+ openedSocked = null ;
133+ emitLifecycleEvent (new LifecycleEvent (LifecycleEvent .Type .CLOSED ));
134+ launchErrorEvent (t );
135+ }
136+
137+ @ Override
138+ public void onClosing (final WebSocket webSocket , final int code , final String reason ) {
139+ webSocket .close (code , reason );
140+ }
141+
142+ @ Override
143+ public void subscribe (CompletableEmitter emitter ) {
144+ mEmitters .add (emitter );
145+ }
146+
147+ private Completable getCompletable (){
148+ return mCompletable ;
149+ }
150+
151+ private void launchCloseEvent (){
152+ for (CompletableEmitter emitter : mEmitters ) {
153+ if (!emitter .isDisposed ())
154+ emitter .onComplete ();
155+ }
156+ }
157+
158+ private void launchErrorEvent (Throwable t ){
159+ for (CompletableEmitter emitter : mEmitters ) {
160+ if (!emitter .isDisposed ())
161+ emitter .onError (t );
162+ }
163+ }
164+ }
126165}
0 commit comments