@@ -47,88 +47,80 @@ public class PluginInfo : IDisposable
4747
4848 private async Task ReceiveHeartbeatAsync ( CancellationToken token )
4949 {
50- try
50+ while ( ! token . IsCancellationRequested )
5151 {
52- while ( ! token . IsCancellationRequested )
53- {
54- NamedPipeServerStream ? client ;
52+ NamedPipeServerStream ? client ;
5553
56- await _cliLock . WaitAsync ( token ) ;
57- {
58- if ( _client == null || ! _client . IsConnected )
59- break ;
60- client = _client ;
61- }
62- _cliLock . Release ( ) ;
54+ await _cliLock . WaitAsync ( token ) ;
55+ try
56+ {
57+ if ( _client == null || ! _client . IsConnected )
58+ break ;
59+ client = _client ;
60+ } finally { _cliLock . Release ( ) ; }
6361
64- try
65- {
66- var hb = await Task . Run ( ( ) =>
67- PipeHeartbeat . Parser . ParseDelimitedFrom ( client ) , token ) ;
62+ try
63+ {
64+ var hb = await PipeHeartbeat . Parser . ParseDelimitedFromAsync ( client , token ) ;
6865
69- if ( hb ? . Uuid == Uuid )
70- {
71- ClassLogger . Debug ( $ "Heartbeat received for { Name } ") ;
72- LastHeartbeat = DateTime . UtcNow ;
73- }
74- }
75- catch ( IOException ) when ( ! token . IsCancellationRequested )
66+ if ( hb ? . Uuid == Uuid )
7667 {
77- ClassLogger . Info ( $ "{ Name } exited due to IO exception") ;
78- break ;
79- }
80- catch ( OperationCanceledException )
81- {
82- break ;
83- }
84- catch ( Exception ex )
85- {
86- ClassLogger . Error ( ex , $ "Error receiving heartbeat from { Name } ") ;
87- await Task . Delay ( 1000 , token ) ;
68+ ClassLogger . Debug ( $ "Heartbeat received for { Name } ") ;
69+ LastHeartbeat = DateTime . UtcNow ;
8870 }
8971 }
90- }
91- finally
92- {
93- ClassLogger . Debug ( $ "Plugin { Name } left") ;
94- StopAll ( ) ;
72+ catch ( IOException ) when ( ! token . IsCancellationRequested )
73+ {
74+ ClassLogger . Info ( $ "{ Name } exited due to IO exception") ;
75+ break ;
76+ }
77+ catch ( OperationCanceledException )
78+ {
79+ break ;
80+ }
81+ catch ( Exception ex )
82+ {
83+ ClassLogger . Error ( ex , $ "Error receiving heartbeat from { Name } ") ;
84+ await Task . Delay ( 1000 , token ) ;
85+ }
9586 }
9687 }
9788
9889 private void StartHeartbeat ( CancellationToken token )
9990 {
10091 if ( _disposed ) return ;
10192
102- _cliLock . Wait ( ) ;
93+ _cliLock . Wait ( token ) ;
94+ try
10395 {
10496 if ( _heartbeatTask != null || _client == null ) return ;
10597 _heartbeatCts = CancellationTokenSource . CreateLinkedTokenSource ( token ) ;
10698 _heartbeatTask = Task . Run ( ( ) => ReceiveHeartbeatAsync ( _heartbeatCts . Token ) , token ) ;
10799 }
108- _cliLock . Release ( ) ;
100+ finally
101+ {
102+ _cliLock . Release ( ) ;
103+ }
109104 }
110105
111106 private void StopAll ( )
112107 {
113108 if ( _disposed ) return ;
109+
110+ _disposed = true ;
114111
115- _ = Task . Run ( async ( ) =>
112+ try
116113 {
117- try
114+ SendMessage ( new PipeConnectionClosed ( )
118115 {
119- await SendMessage ( new PipeConnectionClosed ( )
120- {
121- Timestamp = Timestamp . FromDateTime ( DateTime . UtcNow )
122- } , CancellationToken . None ) ;
123- }
124- catch ( Exception e )
125- {
126- //ignored
127- }
128- } ) ;
116+ Timestamp = Timestamp . FromDateTime ( DateTime . UtcNow )
117+ } , CancellationToken . None ) . GetAwaiter ( ) . GetResult ( ) ;
118+ }
119+ catch
120+ {
121+ // ignored
122+ }
129123
130- _disposed = true ;
131-
132124 _heartbeatCts ? . Cancel ( ) ;
133125 _heartbeatCts ? . Dispose ( ) ;
134126 _heartbeatCts = null ;
@@ -175,7 +167,7 @@ public async Task SendMessage<T>(T msg, CancellationToken token) where T: IMessa
175167 {
176168 switch ( msg )
177169 {
178- case PackedWsjtxMessage :
170+ case PackedDecodeMessage :
179171 case WsjtxMessage :
180172 if ( ! Capabilities . Contains ( Capability . WsjtxMessage ) ) return ;
181173 break ;
@@ -185,6 +177,8 @@ public async Task SendMessage<T>(T msg, CancellationToken token) where T: IMessa
185177 case ClhInternalMessage :
186178 if ( ! Capabilities . Contains ( Capability . ClhInternalData ) ) return ;
187179 break ;
180+ case PipeConnectionClosed :
181+ break ;
188182 default :
189183 return ;
190184 }
@@ -220,7 +214,7 @@ public class PluginService: IPluginService, IDisposable
220214 private readonly object _serviceLock = new ( ) ;
221215 private bool _isRunning ;
222216
223- private readonly ObservableCollection < WsjtxMessage > _wsjtxDecodeCache = new ( ) ;
217+ private readonly ObservableCollection < Decode > _wsjtxDecodeCache = new ( ) ;
224218
225219 private readonly BasicSettings _basicSettings ;
226220 private readonly CompositeDisposable _settingsSubscription = new ( ) ;
@@ -272,7 +266,7 @@ private async Task HandleDecodeCacheChanged()
272266 if ( decodes . Length == 0 ) return ;
273267
274268 _wsjtxDecodeCache . Clear ( ) ;
275- var packedMessage = new PackedWsjtxMessage ( ) ;
269+ var packedMessage = new PackedDecodeMessage ( ) ;
276270 packedMessage . Messages . AddRange ( decodes ) ;
277271 packedMessage . Timestamp = Timestamp . FromDateTime ( DateTime . UtcNow ) ;
278272 await BroadcastMessageAsync ( packedMessage , CancellationToken . None ) ;
@@ -402,7 +396,7 @@ public async Task BroadcastMessageAsync(IMessage? message, CancellationToken tok
402396 // throttle decode; will be sent later.
403397 if ( message is WsjtxMessage { PayloadCase : WsjtxMessage . PayloadOneofCase . Decode } wmsg )
404398 {
405- _wsjtxDecodeCache . Add ( wmsg ) ;
399+ _wsjtxDecodeCache . Add ( wmsg . Decode ) ;
406400 return ;
407401 }
408402
0 commit comments