@@ -25,33 +25,46 @@ public ccxt.pro.CountedOrderBook countedOrderBook(object snapshot = null, object
2525
2626 public virtual void onClose ( WebSocketClient client , object error = null )
2727 {
28- // var client = (WebSocketClient)client2;
2928 if ( client . error )
3029 {
3130 // what do we do here?
3231 }
3332 else
3433 {
35- var urlClient = ( this . clients . ContainsKey ( client . url ) ) ? this . clients [ client . url ] : null ;
36- if ( urlClient != null )
37- {
38- // this.clients.Remove(client.url);
39- this . clients . TryRemove ( client . url , out _ ) ;
40- }
34+ this . CleanupClients ( client , error ) ;
4135 }
4236 }
4337
4438 public virtual void onError ( WebSocketClient client , object error = null )
39+ {
40+ this . CleanupClients ( client , error ) ;
41+ }
42+
43+ public void CleanupClients ( WebSocketClient client , object error = null )
4544 {
4645 // var client = (WebSocketClient)client2;
4746 var urlClient = ( this . clients . ContainsKey ( client . url ) ) ? this . clients [ client . url ] : null ;
48- if ( urlClient != null && urlClient . error )
47+ if ( urlClient != null ) // && urlClient.error
4948 {
49+ rejectFutures ( urlClient , error ) ;
5050 // this.clients.Remove(client.url);
5151 this . clients . TryRemove ( client . url , out _ ) ;
5252 }
5353 }
5454
55+ void rejectFutures ( WebSocketClient urlClient , object error )
56+ {
57+ foreach ( var KeyValue in urlClient . subscriptions )
58+ {
59+ urlClient . subscriptions . Remove ( KeyValue . Key ) ;
60+ Future existingFuture = null ;
61+ if ( urlClient . futures . TryGetValue ( KeyValue . Key , out existingFuture ) )
62+ {
63+ existingFuture . reject ( error ) ;
64+ }
65+ }
66+ }
67+
5568 public async virtual Task loadOrderBook ( WebSocketClient client , object messageHash , object symbol , object limit = null , object parameters = null )
5669 {
5770 parameters ??= new Dictionary < string , object > ( ) ;
@@ -134,10 +147,12 @@ public WebSocketClient client(object url2)
134147 return this . clients . GetOrAdd ( url , ( url ) =>
135148 {
136149 object ws = this . safeValue ( this . options , "ws" , new Dictionary < string , object > ( ) { } ) ;
137- var wsOptions = this . safeValue ( ws , "options" , ws ) ;
150+ var wsOptions = this . safeValue ( ws , "options" , new Dictionary < string , object > ( ) { } ) ;
138151 wsOptions = this . deepExtend ( this . streaming , wsOptions ) ;
139- var keepAlive = ( ( Int64 ) this . safeInteger ( wsOptions , "keepAlive" , 30000 ) ) ;
140- var client = new WebSocketClient ( url , proxy , handleMessage , ping , onClose , onError , this . verbose , keepAlive ) ;
152+ var keepAliveValue = this . safeInteger ( wsOptions , "keepAlive" , 30000 ) ?? 30000 ;
153+ var keepAlive = keepAliveValue ;
154+ var decompressBinary = this . safeBool ( this . options , "decompressBinary" , true ) as bool ? ?? true ;
155+ var client = new WebSocketClient ( url , proxy , handleMessage , ping , onClose , onError , this . verbose , keepAlive , decompressBinary ) ;
141156
142157 var wsHeaders = this . safeValue ( wsOptions , "headers" , new Dictionary < string , object > ( ) { } ) ;
143158 // iterate through headers
@@ -149,12 +164,14 @@ public WebSocketClient client(object url2)
149164 client . webSocket . Options . SetRequestHeader ( key , headers [ key ] . ToString ( ) ) ;
150165 }
151166 }
167+
152168 var wsCookies = this . safeDict ( ws , "cookies" , new Dictionary < string , object > ( ) { } ) as Dictionary < string , object > ;
153169 if ( wsCookies != null && wsCookies . Count > 0 )
154170 {
155171 var cookieString = string . Join ( "; " , wsCookies . Select ( kvp => $ "{ kvp . Key } ={ kvp . Value } ") ) ;
156172 client . webSocket . Options . SetRequestHeader ( "Cookie" , cookieString ) ;
157173 }
174+
158175 return client ;
159176 } ) ;
160177 }
@@ -165,15 +182,22 @@ public async Task<object> watch(object url2, object messageHash2, object message
165182 var messageHash = messageHash2 . ToString ( ) ;
166183 var subscribeHash = subscribeHash2 ? . ToString ( ) ;
167184 var client = this . client ( url ) ;
185+ var backoffDelay = 0 ;
168186
169- var future = ( client . futures as ConcurrentDictionary < string , Future > ) . GetOrAdd ( messageHash , ( key ) => client . future ( messageHash ) ) ;
170- if ( subscribeHash == null )
187+ Future existingFuture = null ;
188+ if ( subscribeHash == null && ( client . futures as ConcurrentDictionary < string , Future > ) . TryGetValue ( messageHash , out existingFuture ) )
171189 {
172- return await future ;
190+ return await existingFuture ;
173191 }
174- var connected = client . connect ( 0 ) ;
175-
176- if ( ( client . subscriptions as ConcurrentDictionary < string , object > ) . TryAdd ( subscribeHash , subscription ?? true ) )
192+ var future = client . future ( messageHash ) ;
193+ object clientSubscription = null ;
194+ bool clientSubscriptionExists = ( client . subscriptions as ConcurrentDictionary < string , object > ) . TryGetValue ( subscribeHash , out clientSubscription ) ;
195+ if ( ! clientSubscriptionExists )
196+ {
197+ ( client . subscriptions as ConcurrentDictionary < string , object > ) . TryAdd ( subscribeHash , subscription ?? true ) ;
198+ }
199+ var connected = client . connect ( backoffDelay ) ;
200+ if ( ! clientSubscriptionExists )
177201 {
178202 await connected ;
179203 if ( message != null )
@@ -243,4 +267,4 @@ public async Task<object> watchMultiple(object url2, object messageHashes2, obje
243267
244268 return await future ;
245269 }
246- }
270+ }
0 commit comments