Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions dotnet/src/webdriver/BiDi/Communication/Broker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ public sealed class Broker : IAsyncDisposable
private readonly BiDi _bidi;
private readonly ITransport _transport;

private readonly ConcurrentDictionary<long, CommandInfo> _pendingCommands = new();
private readonly BlockingCollection<MessageEvent> _pendingEvents = [];
private readonly ConcurrentDictionary<long, CommandInfo<EmptyResult>> _pendingCommands = new();
private readonly BlockingCollection<(string Method, EventArgs Params)> _pendingEvents = [];
private readonly Dictionary<string, JsonTypeInfo> _eventTypesMap = [];

private readonly ConcurrentDictionary<string, List<EventHandler>> _eventHandlers = new();
Expand Down Expand Up @@ -143,20 +143,20 @@ public async Task<TResult> ExecuteCommandAsync<TCommand, TResult>(TCommand comma
where TResult : EmptyResult
{
command.Id = Interlocked.Increment(ref _currentCommandId);
var tcs = new TaskCompletionSource<JsonElement>(TaskCreationOptions.RunContinuationsAsynchronously);
var tcs = new TaskCompletionSource<EmptyResult>(TaskCreationOptions.RunContinuationsAsynchronously);
var timeout = options?.Timeout ?? TimeSpan.FromSeconds(30);
using var cts = new CancellationTokenSource(timeout);
cts.Token.Register(() => tcs.TrySetCanceled(cts.Token));
var commandInfo = new CommandInfo(command.Id, command.ResultType, tcs);
var commandInfo = new CommandInfo<EmptyResult>(command.Id, tcs, jsonResultTypeInfo);
_pendingCommands[command.Id] = commandInfo;
var data = JsonSerializer.SerializeToUtf8Bytes(command, jsonCommandTypeInfo);

await _transport.SendAsync(data, cts.Token).ConfigureAwait(false);
var resultJson = await tcs.Task.ConfigureAwait(false);
return JsonSerializer.Deserialize(resultJson, jsonResultTypeInfo)!;

return (TResult)await tcs.Task.ConfigureAwait(false);
}

public async Task<Subscription> SubscribeAsync<TEventArgs>(string eventName, Action<TEventArgs> action, SubscriptionOptions? options, JsonTypeInfo jsonTypeInfo)
public async Task<Subscription> SubscribeAsync<TEventArgs>(string eventName, Action<TEventArgs> action, SubscriptionOptions? options, JsonTypeInfo<TEventArgs> jsonTypeInfo)
where TEventArgs : EventArgs
{
_eventTypesMap[eventName] = jsonTypeInfo;
Expand Down Expand Up @@ -185,7 +185,7 @@ public async Task<Subscription> SubscribeAsync<TEventArgs>(string eventName, Act
}
}

public async Task<Subscription> SubscribeAsync<TEventArgs>(string eventName, Func<TEventArgs, Task> func, SubscriptionOptions? options, JsonTypeInfo jsonTypeInfo)
public async Task<Subscription> SubscribeAsync<TEventArgs>(string eventName, Func<TEventArgs, Task> func, SubscriptionOptions? options, JsonTypeInfo<TEventArgs> jsonTypeInfo)
where TEventArgs : EventArgs
{
_eventTypesMap[eventName] = jsonTypeInfo;
Expand Down Expand Up @@ -301,7 +301,7 @@ private void ProcessReceivedMessage(byte[]? data)

if (_pendingCommands.TryGetValue(id.Value, out var successCommand))
{
successCommand.TaskCompletionSource.SetResult(JsonElement.ParseValue(ref resultReader));
successCommand.TaskCompletionSource.SetResult((EmptyResult)JsonSerializer.Deserialize(ref resultReader, successCommand.JsonResultTypeInfo)!);
_pendingCommands.TryRemove(id.Value, out _);
}
else
Expand All @@ -318,7 +318,7 @@ private void ProcessReceivedMessage(byte[]? data)
{
var eventArgs = (EventArgs)JsonSerializer.Deserialize(ref paramsReader, eventInfo)!;

var messageEvent = new MessageEvent(method, eventArgs);
var messageEvent = (method, eventArgs);
_pendingEvents.Add(messageEvent);
}
else
Expand All @@ -345,10 +345,13 @@ private void ProcessReceivedMessage(byte[]? data)
}
}

class CommandInfo(long id, Type resultType, TaskCompletionSource<JsonElement> taskCompletionSource)
class CommandInfo<TResult>(long id, TaskCompletionSource<TResult> taskCompletionSource, JsonTypeInfo jsonResultTypeInfo)
where TResult : EmptyResult
{
public long Id { get; } = id;

public TaskCompletionSource<JsonElement> TaskCompletionSource { get; } = taskCompletionSource;
public TaskCompletionSource<TResult> TaskCompletionSource { get; } = taskCompletionSource;

public JsonTypeInfo JsonResultTypeInfo { get; } = jsonResultTypeInfo;
};
}
8 changes: 4 additions & 4 deletions dotnet/src/webdriver/BiDi/Communication/EventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@

namespace OpenQA.Selenium.BiDi.Communication;

public abstract class EventHandler(string eventName, Type eventArgsType, IEnumerable<BrowsingContext.BrowsingContext>? contexts = null)
public abstract class EventHandler(string eventName, IEnumerable<BrowsingContext.BrowsingContext>? contexts = null)
{
public string EventName { get; } = eventName;
public Type EventArgsType { get; set; } = eventArgsType;

public IEnumerable<BrowsingContext.BrowsingContext>? Contexts { get; } = contexts;

public abstract ValueTask InvokeAsync(object args);
}

internal class AsyncEventHandler<TEventArgs>(string eventName, Func<TEventArgs, Task> func, IEnumerable<BrowsingContext.BrowsingContext>? contexts = null)
: EventHandler(eventName, typeof(TEventArgs), contexts) where TEventArgs : EventArgs
: EventHandler(eventName, contexts) where TEventArgs : EventArgs
{
private readonly Func<TEventArgs, Task> _func = func;

Expand All @@ -44,7 +44,7 @@ public override async ValueTask InvokeAsync(object args)
}

internal class SyncEventHandler<TEventArgs>(string eventName, Action<TEventArgs> action, IEnumerable<BrowsingContext.BrowsingContext>? contexts = null)
: EventHandler(eventName, typeof(TEventArgs), contexts) where TEventArgs : EventArgs
: EventHandler(eventName, contexts) where TEventArgs : EventArgs
{
private readonly Action<TEventArgs> _action = action;

Expand Down
34 changes: 0 additions & 34 deletions dotnet/src/webdriver/BiDi/Communication/Message.cs

This file was deleted.