@@ -127,26 +127,33 @@ public async Task ExecuteAsync(CancellationToken stoppingToken)
127127
128128 public async Task BroadcastAsync ( ServiceMessage message , CancellationToken ct )
129129 {
130+ // Sends messages to all clients simultaneously and waits for them all
131+ // to send or fail/timeout.
132+ //
130133 // Looping over a ConcurrentDictionary is exception-safe, but any items
131134 // added or removed during the loop may or may not be included.
132- foreach ( var ( clientId , client ) in _activeClients )
135+ await Task . WhenAll ( _activeClients . Select ( async item =>
136+ {
133137 try
134138 {
135- var cts = CancellationTokenSource . CreateLinkedTokenSource ( ct ) ;
139+ // Enforce upper bound in case a CT with a timeout wasn't
140+ // supplied.
141+ using var cts = CancellationTokenSource . CreateLinkedTokenSource ( ct ) ;
136142 cts . CancelAfter ( TimeSpan . FromSeconds ( 2 ) ) ;
137- await client . Speaker . SendMessage ( message , cts . Token ) ;
143+ await item . Value . Speaker . SendMessage ( message , cts . Token ) ;
138144 }
139145 catch ( ObjectDisposedException )
140146 {
141147 // The speaker was likely closed while we were iterating.
142148 }
143149 catch ( Exception e )
144150 {
145- _logger . LogWarning ( e , "Failed to send message to client {ClientId}" , clientId ) ;
151+ _logger . LogWarning ( e , "Failed to send message to client {ClientId}" , item . Key ) ;
146152 // TODO: this should probably kill the client, but due to the
147153 // async nature of the client handling, calling Dispose
148154 // will not remove the client from the active clients list
149155 }
156+ } ) ) ;
150157 }
151158
152159 private async Task HandleRpcClientAsync ( ulong clientId , Speaker < ServiceMessage , ClientMessage > speaker ,
0 commit comments