1919
2020using  OpenQA . Selenium . BiDi . Communication . Json ; 
2121using  OpenQA . Selenium . BiDi . Communication . Json . Converters ; 
22- using  OpenQA . Selenium . BiDi . Communication . Json . Internal ; 
2322using  OpenQA . Selenium . BiDi . Communication . Transport ; 
2423using  OpenQA . Selenium . Internal . Logging ; 
2524using  System ; 
3332
3433namespace  OpenQA . Selenium . BiDi . Communication ; 
3534
36- public  class  Broker  :  IAsyncDisposable 
35+ public  sealed   class  Broker  :  IAsyncDisposable 
3736{ 
3837    private  readonly  ILogger  _logger  =  Log . GetLogger < Broker > ( ) ; 
3938
4039    private  readonly  BiDi  _bidi ; 
4140    private  readonly  ITransport  _transport ; 
4241
43-     private  readonly  ConcurrentDictionary < int ,  ( Command ,  TaskCompletionSource < object > ) >  _pendingCommands  =  new ( ) ; 
42+     private  readonly  ConcurrentDictionary < long ,  ( Command ,  TaskCompletionSource < object > ) >  _pendingCommands  =  new ( ) ; 
4443    private  readonly  BlockingCollection < MessageEvent >  _pendingEvents  =  [ ] ; 
4544
4645    private  readonly  ConcurrentDictionary < string ,  List < EventHandler > >  _eventHandlers  =  new ( ) ; 
4746
48-     private  int  _currentCommandId ; 
47+     private  long  _currentCommandId ; 
4948
5049    private  static   readonly  TaskFactory  _myTaskFactory  =  new ( CancellationToken . None ,  TaskCreationOptions . DenyChildAttach ,  TaskContinuationOptions . None ,  TaskScheduler . Default ) ; 
5150
@@ -126,49 +125,11 @@ private async Task ReceiveMessagesAsync(CancellationToken cancellationToken)
126125            { 
127126                var  data  =  await  _transport . ReceiveAsync ( cancellationToken ) . ConfigureAwait ( false ) ; 
128127
129-                 Utf8JsonReader  utfJsonReader  =  new ( new  ReadOnlySpan < byte > ( data ) ) ; 
130-                 utfJsonReader . Read ( ) ; 
131-                 var  messageType  =  utfJsonReader . GetDiscriminator ( "type" ) ; 
132- 
133-                 switch  ( messageType ) 
134-                 { 
135-                     case  "success" : 
136-                         var  successId  =  int . Parse ( utfJsonReader . GetDiscriminator ( "id" ) ) ; 
137-                         var  successCommand  =  _pendingCommands [ successId ] ; 
138-                         var  messageSuccess  =  JsonSerializer . Deserialize ( ref  utfJsonReader ,  successCommand . Item1 . ResultType ,  _jsonSerializerContext ) ; 
139- 
140-                         successCommand . Item2 . SetResult ( messageSuccess ) ; 
141- 
142-                         _pendingCommands . TryRemove ( successId ,  out  _ ) ; 
143-                         break ; 
144- 
145-                     case  "event" : 
146-                         utfJsonReader . Read ( ) ; 
147-                         utfJsonReader . Read ( ) ; 
148-                         var  method  =  utfJsonReader . GetString ( ) ; 
149- 
150-                         utfJsonReader . Read ( ) ; 
151- 
152-                         // TODO: Just get type info from existing subscribers, should be better 
153-                         var  type  =  _eventHandlers [ method ] . First ( ) . EventArgsType ; 
154- 
155-                         var  eventArgs  =  ( EventArgs ) JsonSerializer . Deserialize ( ref  utfJsonReader ,  type ,  _jsonSerializerContext ) ; 
156- 
157-                         var  messageEvent  =  new  MessageEvent ( method ,  eventArgs ) ; 
158-                         _pendingEvents . Add ( messageEvent ) ; 
159-                         break ; 
160- 
161-                     case  "error" : 
162-                         var  messageError  =  JsonSerializer . Deserialize ( ref  utfJsonReader ,  _jsonSerializerContext . MessageError ) ; 
163-                         var  errorCommand  =  _pendingCommands [ messageError . Id ] ; 
164-                         errorCommand . Item2 . SetException ( new  BiDiException ( $ "{ messageError . Error } : { messageError . Message } ") ) ; 
165-                         _pendingCommands . TryRemove ( messageError . Id ,  out  _ ) ; 
166-                         break ; 
167-                 } 
128+                 ProcessReceivedMessage ( data ) ; 
168129            } 
169130            catch  ( Exception  ex ) 
170131            { 
171-                 if  ( ! cancellationToken . IsCancellationRequested ) 
132+                 if  ( cancellationToken . IsCancellationRequested   is  not  true ) 
172133                { 
173134                    _logger . Error ( $ "Couldn't process received BiDi remote message: { ex } ") ; 
174135                } 
@@ -227,7 +188,7 @@ public async Task<TResult> ExecuteCommandAsync<TCommand, TResult>(TCommand comma
227188    { 
228189        var  result  =  await  ExecuteCommandCoreAsync ( command ,  options ) . ConfigureAwait ( false ) ; 
229190
230-         return  ( ( MessageSuccess < TResult > ) result ) . Result ; 
191+         return  ( TResult ) result ; 
231192    } 
232193
233194    private  async  Task < object >  ExecuteCommandCoreAsync < TCommand > ( TCommand  command ,  CommandOptions ?  options ) 
@@ -249,7 +210,7 @@ private async Task<object> ExecuteCommandCoreAsync<TCommand>(TCommand command, C
249210
250211        await  _transport . SendAsync ( data ,  cts . Token ) . ConfigureAwait ( false ) ; 
251212
252-         return  await  tcs . Task ; 
213+         return  await  tcs . Task . ConfigureAwait ( false ) ; 
253214    } 
254215
255216    public  async  Task < Subscription >  SubscribeAsync < TEventArgs > ( string  eventName ,  Action < TEventArgs >  action ,  SubscriptionOptions ?  options  =  null ) 
@@ -336,12 +297,6 @@ public async Task UnsubscribeAsync(Modules.Session.Subscription subscription, Ev
336297    } 
337298
338299    public  async  ValueTask  DisposeAsync ( ) 
339-     { 
340-         await  DisposeAsyncCore ( ) ; 
341-         GC . SuppressFinalize ( this ) ; 
342-     } 
343- 
344-     protected  virtual  async  ValueTask  DisposeAsyncCore ( ) 
345300    { 
346301        _pendingEvents . CompleteAdding ( ) ; 
347302
@@ -353,5 +308,90 @@ protected virtual async ValueTask DisposeAsyncCore()
353308        } 
354309
355310        _transport . Dispose ( ) ; 
311+ 
312+         GC . SuppressFinalize ( this ) ; 
313+     } 
314+ 
315+     private  void  ProcessReceivedMessage ( byte [ ] ?  data ) 
316+     { 
317+         long ?  id  =  default ; 
318+         string ?  type  =  default ; 
319+         string ?  method  =  default ; 
320+         string ?  error  =  default ; 
321+         string ?  message  =  default ; 
322+         Utf8JsonReader  resultReader  =  default ; 
323+         Utf8JsonReader  paramsReader  =  default ; 
324+ 
325+         Utf8JsonReader  reader  =  new ( new  ReadOnlySpan < byte > ( data ) ) ; 
326+         reader . Read ( ) ; 
327+ 
328+         reader . Read ( ) ;  // "{" 
329+ 
330+         while  ( reader . TokenType  ==  JsonTokenType . PropertyName ) 
331+         { 
332+             string ?  propertyName  =  reader . GetString ( ) ; 
333+             reader . Read ( ) ; 
334+ 
335+             switch  ( propertyName ) 
336+             { 
337+                 case  "id" : 
338+                     id  =  reader . GetInt64 ( ) ; 
339+                     break ; 
340+ 
341+                 case  "type" : 
342+                     type  =  reader . GetString ( ) ; 
343+                     break ; 
344+ 
345+                 case  "method" : 
346+                     method  =  reader . GetString ( ) ; 
347+                     break ; 
348+ 
349+                 case  "result" : 
350+                     resultReader  =  reader ;  // cloning reader with current position 
351+                     break ; 
352+ 
353+                 case  "params" : 
354+                     paramsReader  =  reader ;  // cloning reader with current position 
355+                     break ; 
356+ 
357+                 case  "error" : 
358+                     error  =  reader . GetString ( ) ; 
359+                     break ; 
360+ 
361+                 case  "message" : 
362+                     message  =  reader . GetString ( ) ; 
363+                     break ; 
364+             } 
365+ 
366+             reader . Skip ( ) ; 
367+             reader . Read ( ) ; 
368+         } 
369+ 
370+         switch  ( type ) 
371+         { 
372+             case  "success" : 
373+                 var  successCommand  =  _pendingCommands [ id . Value ] ; 
374+                 var  messageSuccess  =  JsonSerializer . Deserialize ( ref  resultReader ,  successCommand . Item1 . ResultType ,  _jsonSerializerContext ) ; 
375+                 successCommand . Item2 . SetResult ( messageSuccess ) ; 
376+                 _pendingCommands . TryRemove ( id . Value ,  out  _ ) ; 
377+                 break ; 
378+ 
379+             case  "event" : 
380+                 // TODO: Just get type info from existing subscribers, should be better 
381+                 var  eventType  =  _eventHandlers [ method ] . First ( ) . EventArgsType ; 
382+ 
383+                 var  eventArgs  =  ( EventArgs ) JsonSerializer . Deserialize ( ref  paramsReader ,  eventType ,  _jsonSerializerContext ) ; 
384+ 
385+                 var  messageEvent  =  new  MessageEvent ( method ,  eventArgs ) ; 
386+                 _pendingEvents . Add ( messageEvent ) ; 
387+                 break ; 
388+ 
389+             case  "error" : 
390+                 var  messageError  =  new  MessageError ( id . Value )  {  Error  =  error ,  Message  =  message  } ; 
391+                 var  errorCommand  =  _pendingCommands [ messageError . Id ] ; 
392+                 errorCommand . Item2 . SetException ( new  BiDiException ( $ "{ messageError . Error } : { messageError . Message } ") ) ; 
393+                 _pendingCommands . TryRemove ( messageError . Id ,  out  _ ) ; 
394+                 break ; 
395+         } 
356396    } 
357397} 
0 commit comments