@@ -38,7 +38,7 @@ public sealed class Broker : IAsyncDisposable
38
38
private readonly ITransport _transport ;
39
39
40
40
private readonly ConcurrentDictionary < long , CommandInfo > _pendingCommands = new ( ) ;
41
- private readonly BlockingCollection < MessageEvent > _pendingEvents = [ ] ;
41
+ private readonly BlockingCollection < ( string Method , EventArgs Params ) > _pendingEvents = [ ] ;
42
42
private readonly Dictionary < string , JsonTypeInfo > _eventTypesMap = [ ] ;
43
43
44
44
private readonly ConcurrentDictionary < string , List < EventHandler > > _eventHandlers = new ( ) ;
@@ -143,20 +143,20 @@ public async Task<TResult> ExecuteCommandAsync<TCommand, TResult>(TCommand comma
143
143
where TResult : EmptyResult
144
144
{
145
145
command . Id = Interlocked . Increment ( ref _currentCommandId ) ;
146
- var tcs = new TaskCompletionSource < JsonElement > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
146
+ var tcs = new TaskCompletionSource < EmptyResult > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
147
147
var timeout = options ? . Timeout ?? TimeSpan . FromSeconds ( 30 ) ;
148
148
using var cts = new CancellationTokenSource ( timeout ) ;
149
149
cts . Token . Register ( ( ) => tcs . TrySetCanceled ( cts . Token ) ) ;
150
- var commandInfo = new CommandInfo ( command . Id , command . ResultType , tcs ) ;
150
+ var commandInfo = new CommandInfo ( command . Id , tcs , jsonResultTypeInfo ) ;
151
151
_pendingCommands [ command . Id ] = commandInfo ;
152
152
var data = JsonSerializer . SerializeToUtf8Bytes ( command , jsonCommandTypeInfo ) ;
153
153
154
154
await _transport . SendAsync ( data , cts . Token ) . ConfigureAwait ( false ) ;
155
- var resultJson = await tcs . Task . ConfigureAwait ( false ) ;
156
- return JsonSerializer . Deserialize ( resultJson , jsonResultTypeInfo ) ! ;
155
+
156
+ return ( TResult ) await tcs . Task . ConfigureAwait ( false ) ;
157
157
}
158
158
159
- public async Task < Subscription > SubscribeAsync < TEventArgs > ( string eventName , Action < TEventArgs > action , SubscriptionOptions ? options , JsonTypeInfo jsonTypeInfo )
159
+ public async Task < Subscription > SubscribeAsync < TEventArgs > ( string eventName , Action < TEventArgs > action , SubscriptionOptions ? options , JsonTypeInfo < TEventArgs > jsonTypeInfo )
160
160
where TEventArgs : EventArgs
161
161
{
162
162
_eventTypesMap [ eventName ] = jsonTypeInfo ;
@@ -185,7 +185,7 @@ public async Task<Subscription> SubscribeAsync<TEventArgs>(string eventName, Act
185
185
}
186
186
}
187
187
188
- public async Task < Subscription > SubscribeAsync < TEventArgs > ( string eventName , Func < TEventArgs , Task > func , SubscriptionOptions ? options , JsonTypeInfo jsonTypeInfo )
188
+ public async Task < Subscription > SubscribeAsync < TEventArgs > ( string eventName , Func < TEventArgs , Task > func , SubscriptionOptions ? options , JsonTypeInfo < TEventArgs > jsonTypeInfo )
189
189
where TEventArgs : EventArgs
190
190
{
191
191
_eventTypesMap [ eventName ] = jsonTypeInfo ;
@@ -301,7 +301,7 @@ private void ProcessReceivedMessage(byte[]? data)
301
301
302
302
if ( _pendingCommands . TryGetValue ( id . Value , out var successCommand ) )
303
303
{
304
- successCommand . TaskCompletionSource . SetResult ( JsonElement . ParseValue ( ref resultReader ) ) ;
304
+ successCommand . TaskCompletionSource . SetResult ( ( EmptyResult ) JsonSerializer . Deserialize ( ref resultReader , successCommand . JsonResultTypeInfo ) ! ) ;
305
305
_pendingCommands . TryRemove ( id . Value , out _ ) ;
306
306
}
307
307
else
@@ -318,7 +318,7 @@ private void ProcessReceivedMessage(byte[]? data)
318
318
{
319
319
var eventArgs = ( EventArgs ) JsonSerializer . Deserialize ( ref paramsReader , eventInfo ) ! ;
320
320
321
- var messageEvent = new MessageEvent ( method , eventArgs ) ;
321
+ var messageEvent = ( method , eventArgs ) ;
322
322
_pendingEvents . Add ( messageEvent ) ;
323
323
}
324
324
else
@@ -345,10 +345,12 @@ private void ProcessReceivedMessage(byte[]? data)
345
345
}
346
346
}
347
347
348
- class CommandInfo ( long id , Type resultType , TaskCompletionSource < JsonElement > taskCompletionSource )
348
+ class CommandInfo ( long id , TaskCompletionSource < EmptyResult > taskCompletionSource , JsonTypeInfo jsonResultTypeInfo )
349
349
{
350
350
public long Id { get ; } = id ;
351
351
352
- public TaskCompletionSource < JsonElement > TaskCompletionSource { get ; } = taskCompletionSource ;
352
+ public TaskCompletionSource < EmptyResult > TaskCompletionSource { get ; } = taskCompletionSource ;
353
+
354
+ public JsonTypeInfo JsonResultTypeInfo { get ; } = jsonResultTypeInfo ;
353
355
} ;
354
356
}
0 commit comments