3232
3333namespace OpenQA . Selenium . BiDi . Communication ;
3434
35- public class Broker : IAsyncDisposable
35+ public sealed class Broker : IAsyncDisposable
3636{
3737 private readonly ILogger _logger = Log . GetLogger < Broker > ( ) ;
3838
3939 private readonly BiDi _bidi ;
4040 private readonly ITransport _transport ;
4141
42- private readonly ConcurrentDictionary < int , TaskCompletionSource < JsonElement > > _pendingCommands = new ( ) ;
42+ private readonly ConcurrentDictionary < long , CommandInfo > _pendingCommands = new ( ) ;
4343 private readonly BlockingCollection < MessageEvent > _pendingEvents = [ ] ;
44+ private readonly Dictionary < string , Type > _eventTypesMap = [ ] ;
4445
4546 private readonly ConcurrentDictionary < string , List < EventHandler > > _eventHandlers = new ( ) ;
4647
47- private int _currentCommandId ;
48+ private long _currentCommandId ;
4849
4950 private static readonly TaskFactory _myTaskFactory = new ( CancellationToken . None , TaskCreationOptions . DenyChildAttach , TaskContinuationOptions . None , TaskScheduler . Default ) ;
5051
@@ -89,7 +90,6 @@ internal Broker(BiDi bidi, Uri url)
8990 new JsonStringEnumConverter ( JsonNamingPolicy . CamelCase ) ,
9091
9192 // https://github.com/dotnet/runtime/issues/72604
92- new Json . Converters . Polymorphic . MessageConverter ( ) ,
9393 new Json . Converters . Polymorphic . EvaluateResultConverter ( ) ,
9494 new Json . Converters . Polymorphic . RemoteValueConverter ( ) ,
9595 new Json . Converters . Polymorphic . RealmInfoConverter ( ) ,
@@ -122,23 +122,18 @@ private async Task ReceiveMessagesAsync(CancellationToken cancellationToken)
122122 {
123123 while ( ! cancellationToken . IsCancellationRequested )
124124 {
125- var data = await _transport . ReceiveAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
126-
127- var message = JsonSerializer . Deserialize ( new ReadOnlySpan < byte > ( data ) , _jsonSerializerContext . Message ) ;
125+ try
126+ {
127+ var data = await _transport . ReceiveAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
128128
129- switch ( message )
129+ ProcessReceivedMessage ( data ) ;
130+ }
131+ catch ( Exception ex )
130132 {
131- case MessageSuccess messageSuccess :
132- _pendingCommands [ messageSuccess . Id ] . SetResult ( messageSuccess . Result ) ;
133- _pendingCommands . TryRemove ( messageSuccess . Id , out _ ) ;
134- break ;
135- case MessageEvent messageEvent :
136- _pendingEvents . Add ( messageEvent ) ;
137- break ;
138- case MessageError mesageError :
139- _pendingCommands [ mesageError . Id ] . SetException ( new BiDiException ( $ "{ mesageError . Error } : { mesageError . Message } ") ) ;
140- _pendingCommands . TryRemove ( mesageError . Id , out _ ) ;
141- break ;
133+ if ( cancellationToken . IsCancellationRequested is not true && _logger . IsEnabled ( LogEventLevel . Error ) )
134+ {
135+ _logger . Error ( $ "Couldn't process received BiDi remote message: { ex } ") ;
136+ }
142137 }
143138 }
144139 }
@@ -155,7 +150,7 @@ private async Task ProcessEventsAwaiterAsync()
155150 {
156151 foreach ( var handler in eventHandlers . ToArray ( ) ) // copy handlers avoiding modified collection while iterating
157152 {
158- var args = ( EventArgs ) result . Params . Deserialize ( handler . EventArgsType , _jsonSerializerContext ) ! ;
153+ var args = result . Params ;
159154
160155 args . BiDi = _bidi ;
161156
@@ -177,40 +172,41 @@ private async Task ProcessEventsAwaiterAsync()
177172 {
178173 if ( _logger . IsEnabled ( LogEventLevel . Error ) )
179174 {
180- _logger . Error ( $ "Unhandled error processing BiDi event: { ex } ") ;
175+ _logger . Error ( $ "Unhandled error processing BiDi event handler : { ex } ") ;
181176 }
182177 }
183178 }
184179 }
185180
186- public async Task < TResult > ExecuteCommandAsync < TCommand , TResult > ( TCommand command , CommandOptions ? options )
181+ public async Task ExecuteCommandAsync < TCommand > ( TCommand command , CommandOptions ? options )
187182 where TCommand : Command
188183 {
189- var jsonElement = await ExecuteCommandCoreAsync ( command , options ) . ConfigureAwait ( false ) ;
190-
191- return ( TResult ) jsonElement . Deserialize ( typeof ( TResult ) , _jsonSerializerContext ) ! ;
184+ await ExecuteCommandCoreAsync ( command , options ) . ConfigureAwait ( false ) ;
192185 }
193186
194- public async Task ExecuteCommandAsync < TCommand > ( TCommand command , CommandOptions ? options )
187+ public async Task < TResult > ExecuteCommandAsync < TCommand , TResult > ( TCommand command , CommandOptions ? options )
195188 where TCommand : Command
189+ where TResult : EmptyResult
196190 {
197- await ExecuteCommandCoreAsync ( command , options ) . ConfigureAwait ( false ) ;
191+ var result = await ExecuteCommandCoreAsync ( command , options ) . ConfigureAwait ( false ) ;
192+
193+ return ( TResult ) result ;
198194 }
199195
200- private async Task < JsonElement > ExecuteCommandCoreAsync < TCommand > ( TCommand command , CommandOptions ? options )
196+ private async Task < EmptyResult > ExecuteCommandCoreAsync < TCommand > ( TCommand command , CommandOptions ? options )
201197 where TCommand : Command
202198 {
203199 command . Id = Interlocked . Increment ( ref _currentCommandId ) ;
204200
205- var tcs = new TaskCompletionSource < JsonElement > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
201+ var tcs = new TaskCompletionSource < EmptyResult > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
206202
207203 var timeout = options ? . Timeout ?? TimeSpan . FromSeconds ( 30 ) ;
208204
209205 using var cts = new CancellationTokenSource ( timeout ) ;
210206
211207 cts . Token . Register ( ( ) => tcs . TrySetCanceled ( cts . Token ) ) ;
212208
213- _pendingCommands [ command . Id ] = tcs ;
209+ _pendingCommands [ command . Id ] = new ( command . Id , command . ResultType , tcs ) ;
214210
215211 var data = JsonSerializer . SerializeToUtf8Bytes ( command , typeof ( TCommand ) , _jsonSerializerContext ) ;
216212
@@ -222,6 +218,8 @@ private async Task<JsonElement> ExecuteCommandCoreAsync<TCommand>(TCommand comma
222218 public async Task < Subscription > SubscribeAsync < TEventArgs > ( string eventName , Action < TEventArgs > action , SubscriptionOptions ? options = null )
223219 where TEventArgs : EventArgs
224220 {
221+ _eventTypesMap [ eventName ] = typeof ( TEventArgs ) ;
222+
225223 var handlers = _eventHandlers . GetOrAdd ( eventName , ( a ) => [ ] ) ;
226224
227225 if ( options is BrowsingContextsSubscriptionOptions browsingContextsOptions )
@@ -249,6 +247,8 @@ public async Task<Subscription> SubscribeAsync<TEventArgs>(string eventName, Act
249247 public async Task < Subscription > SubscribeAsync < TEventArgs > ( string eventName , Func < TEventArgs , Task > func , SubscriptionOptions ? options = null )
250248 where TEventArgs : EventArgs
251249 {
250+ _eventTypesMap [ eventName ] = typeof ( TEventArgs ) ;
251+
252252 var handlers = _eventHandlers . GetOrAdd ( eventName , ( a ) => [ ] ) ;
253253
254254 if ( options is BrowsingContextsSubscriptionOptions browsingContextsOptions )
@@ -303,12 +303,6 @@ public async Task UnsubscribeAsync(Modules.Session.Subscription subscription, Ev
303303 }
304304
305305 public async ValueTask DisposeAsync ( )
306- {
307- await DisposeAsyncCore ( ) ;
308- GC . SuppressFinalize ( this ) ;
309- }
310-
311- protected virtual async ValueTask DisposeAsyncCore ( )
312306 {
313307 _pendingEvents . CompleteAdding ( ) ;
314308
@@ -320,5 +314,104 @@ protected virtual async ValueTask DisposeAsyncCore()
320314 }
321315
322316 _transport . Dispose ( ) ;
317+
318+ GC . SuppressFinalize ( this ) ;
323319 }
320+
321+ private void ProcessReceivedMessage ( byte [ ] ? data )
322+ {
323+ long ? id = default ;
324+ string ? type = default ;
325+ string ? method = default ;
326+ string ? error = default ;
327+ string ? message = default ;
328+ Utf8JsonReader resultReader = default ;
329+ Utf8JsonReader paramsReader = default ;
330+
331+ Utf8JsonReader reader = new ( new ReadOnlySpan < byte > ( data ) ) ;
332+ reader . Read ( ) ;
333+
334+ reader . Read ( ) ; // "{"
335+
336+ while ( reader . TokenType == JsonTokenType . PropertyName )
337+ {
338+ string ? propertyName = reader . GetString ( ) ;
339+ reader . Read ( ) ;
340+
341+ switch ( propertyName )
342+ {
343+ case "id" :
344+ id = reader . GetInt64 ( ) ;
345+ break ;
346+
347+ case "type" :
348+ type = reader . GetString ( ) ;
349+ break ;
350+
351+ case "method" :
352+ method = reader . GetString ( ) ;
353+ break ;
354+
355+ case "result" :
356+ resultReader = reader ; // cloning reader with current position
357+ break ;
358+
359+ case "params" :
360+ paramsReader = reader ; // cloning reader with current position
361+ break ;
362+
363+ case "error" :
364+ error = reader . GetString ( ) ;
365+ break ;
366+
367+ case "message" :
368+ message = reader . GetString ( ) ;
369+ break ;
370+ }
371+
372+ reader . Skip ( ) ;
373+ reader . Read ( ) ;
374+ }
375+
376+ switch ( type )
377+ {
378+ case "success" :
379+ if ( id is null ) throw new JsonException ( "The remote end responded with 'success' message type, but missed required 'id' property." ) ;
380+
381+ var successCommand = _pendingCommands [ id . Value ] ;
382+ var messageSuccess = JsonSerializer . Deserialize ( ref resultReader , successCommand . ResultType , _jsonSerializerContext ) ! ;
383+ successCommand . TaskCompletionSource . SetResult ( ( EmptyResult ) messageSuccess ) ;
384+ _pendingCommands . TryRemove ( id . Value , out _ ) ;
385+ break ;
386+
387+ case "event" :
388+ if ( method is null ) throw new JsonException ( "The remote end responded with 'event' message type, but missed required 'method' property." ) ;
389+
390+ var eventType = _eventTypesMap [ method ] ;
391+
392+ var eventArgs = ( EventArgs ) JsonSerializer . Deserialize ( ref paramsReader , eventType , _jsonSerializerContext ) ! ;
393+
394+ var messageEvent = new MessageEvent ( method , eventArgs ) ;
395+ _pendingEvents . Add ( messageEvent ) ;
396+ break ;
397+
398+ case "error" :
399+ if ( id is null ) throw new JsonException ( "The remote end responded with 'error' message type, but missed required 'id' property." ) ;
400+
401+ var messageError = new MessageError ( id . Value ) { Error = error , Message = message } ;
402+ var errorCommand = _pendingCommands [ messageError . Id ] ;
403+ errorCommand . TaskCompletionSource . SetException ( new BiDiException ( $ "{ messageError . Error } : { messageError . Message } ") ) ;
404+ _pendingCommands . TryRemove ( messageError . Id , out _ ) ;
405+ break ;
406+ }
407+ }
408+
409+ class CommandInfo ( long id , Type resultType , TaskCompletionSource < EmptyResult > taskCompletionSource )
410+ {
411+ public long Id { get ; } = id ;
412+
413+ public Type ResultType { get ; } = resultType ;
414+
415+ public TaskCompletionSource < EmptyResult > TaskCompletionSource { get ; } = taskCompletionSource ;
416+ } ;
324417}
0 commit comments