@@ -145,13 +145,85 @@ private async Task PullLeasesAsync(CancellationToken cancellationToken)
145145 _logger . LogInformation ( "Lease pull cancelled for node {NodeId}" , _options . NodeId ) ;
146146 break ;
147147 }
148+ catch ( RpcException ex ) when ( ex . StatusCode == StatusCode . Unavailable )
149+ {
150+ TelemetryConfig . LeaseStreamErrorsCounter . Add ( 1 ,
151+ new KeyValuePair < string , object ? > ( "node.id" , _options . NodeId ) ,
152+ new KeyValuePair < string , object ? > ( "error.type" , "RpcUnavailable" ) ) ;
153+
154+ _logger . LogWarning ( ex , "Control plane unavailable for node {NodeId}" , _options . NodeId ) ;
155+
156+ _reconnectAttempts ++ ;
157+ var delay = CalculateReconnectDelay ( _reconnectAttempts ) ;
158+
159+ TelemetryConfig . LeaseStreamReconnectsCounter . Add ( 1 ,
160+ new KeyValuePair < string , object ? > ( "node.id" , _options . NodeId ) ,
161+ new KeyValuePair < string , object ? > ( "attempt" , _reconnectAttempts ) ) ;
162+
163+ _logger . LogWarning ( "Reconnecting to lease stream in {DelaySeconds}s (attempt {Attempt})" , delay , _reconnectAttempts ) ;
164+ await Task . Delay ( TimeSpan . FromSeconds ( delay ) , cancellationToken ) ;
165+ }
166+ catch ( RpcException ex ) when ( ex . StatusCode == StatusCode . DeadlineExceeded )
167+ {
168+ TelemetryConfig . LeaseStreamErrorsCounter . Add ( 1 ,
169+ new KeyValuePair < string , object ? > ( "node.id" , _options . NodeId ) ,
170+ new KeyValuePair < string , object ? > ( "error.type" , "RpcDeadlineExceeded" ) ) ;
171+
172+ _logger . LogWarning ( ex , "gRPC deadline exceeded for node {NodeId}" , _options . NodeId ) ;
173+
174+ _reconnectAttempts ++ ;
175+ var delay = CalculateReconnectDelay ( _reconnectAttempts ) ;
176+
177+ TelemetryConfig . LeaseStreamReconnectsCounter . Add ( 1 ,
178+ new KeyValuePair < string , object ? > ( "node.id" , _options . NodeId ) ,
179+ new KeyValuePair < string , object ? > ( "attempt" , _reconnectAttempts ) ) ;
180+
181+ _logger . LogWarning ( "Reconnecting to lease stream in {DelaySeconds}s (attempt {Attempt})" , delay , _reconnectAttempts ) ;
182+ await Task . Delay ( TimeSpan . FromSeconds ( delay ) , cancellationToken ) ;
183+ }
184+ catch ( RpcException ex )
185+ {
186+ TelemetryConfig . LeaseStreamErrorsCounter . Add ( 1 ,
187+ new KeyValuePair < string , object ? > ( "node.id" , _options . NodeId ) ,
188+ new KeyValuePair < string , object ? > ( "error.type" , $ "Rpc{ ex . StatusCode } ") ) ;
189+
190+ _logger . LogError ( ex , "gRPC error pulling leases for node {NodeId}: {StatusCode}" , _options . NodeId , ex . StatusCode ) ;
191+
192+ _reconnectAttempts ++ ;
193+ var delay = CalculateReconnectDelay ( _reconnectAttempts ) ;
194+
195+ TelemetryConfig . LeaseStreamReconnectsCounter . Add ( 1 ,
196+ new KeyValuePair < string , object ? > ( "node.id" , _options . NodeId ) ,
197+ new KeyValuePair < string , object ? > ( "attempt" , _reconnectAttempts ) ) ;
198+
199+ _logger . LogWarning ( "Reconnecting to lease stream in {DelaySeconds}s (attempt {Attempt})" , delay , _reconnectAttempts ) ;
200+ await Task . Delay ( TimeSpan . FromSeconds ( delay ) , cancellationToken ) ;
201+ }
202+ catch ( InvalidOperationException invalidOpEx )
203+ {
204+ TelemetryConfig . LeaseStreamErrorsCounter . Add ( 1 ,
205+ new KeyValuePair < string , object ? > ( "node.id" , _options . NodeId ) ,
206+ new KeyValuePair < string , object ? > ( "error.type" , "InvalidOperation" ) ) ;
207+
208+ _logger . LogError ( invalidOpEx , "Invalid operation pulling leases for node {NodeId}" , _options . NodeId ) ;
209+
210+ _reconnectAttempts ++ ;
211+ var delay = CalculateReconnectDelay ( _reconnectAttempts ) ;
212+
213+ TelemetryConfig . LeaseStreamReconnectsCounter . Add ( 1 ,
214+ new KeyValuePair < string , object ? > ( "node.id" , _options . NodeId ) ,
215+ new KeyValuePair < string , object ? > ( "attempt" , _reconnectAttempts ) ) ;
216+
217+ _logger . LogWarning ( "Reconnecting to lease stream in {DelaySeconds}s (attempt {Attempt})" , delay , _reconnectAttempts ) ;
218+ await Task . Delay ( TimeSpan . FromSeconds ( delay ) , cancellationToken ) ;
219+ }
148220 catch ( Exception ex )
149221 {
150222 TelemetryConfig . LeaseStreamErrorsCounter . Add ( 1 ,
151223 new KeyValuePair < string , object ? > ( "node.id" , _options . NodeId ) ,
152224 new KeyValuePair < string , object ? > ( "error.type" , ex . GetType ( ) . Name ) ) ;
153225
154- _logger . LogError ( ex , "Error pulling leases for node {NodeId}" , _options . NodeId ) ;
226+ _logger . LogError ( ex , "Unexpected error pulling leases for node {NodeId}" , _options . NodeId ) ;
155227
156228 // Exponential backoff with jitter for reconnection
157229 _reconnectAttempts ++ ;
@@ -215,9 +287,21 @@ private async Task AcknowledgeLeaseAsync(Lease lease, CancellationToken cancella
215287 _logger . LogWarning ( "Failed to acknowledge lease {LeaseId}: {Message}" , lease . LeaseId , response . Message ) ;
216288 }
217289 }
290+ catch ( RpcException ex ) when ( ex . StatusCode == StatusCode . Cancelled )
291+ {
292+ _logger . LogInformation ( "Acknowledgement cancelled for lease {LeaseId}" , lease . LeaseId ) ;
293+ }
294+ catch ( RpcException ex ) when ( ex . StatusCode == StatusCode . DeadlineExceeded )
295+ {
296+ _logger . LogWarning ( ex , "Timeout acknowledging lease {LeaseId}" , lease . LeaseId ) ;
297+ }
298+ catch ( RpcException ex )
299+ {
300+ _logger . LogError ( ex , "gRPC error acknowledging lease {LeaseId}: {StatusCode}" , lease . LeaseId , ex . StatusCode ) ;
301+ }
218302 catch ( Exception ex )
219303 {
220- _logger . LogError ( ex , "Error acknowledging lease {LeaseId}" , lease . LeaseId ) ;
304+ _logger . LogError ( ex , "Unexpected error acknowledging lease {LeaseId}" , lease . LeaseId ) ;
221305 }
222306 }
223307
@@ -394,10 +478,76 @@ private async Task ProcessLeaseAsync(Lease lease, CancellationToken cancellation
394478 {
395479 _logger . LogInformation ( "Processing cancelled for lease {LeaseId}" , lease . LeaseId ) ;
396480 }
481+ catch ( InvalidOperationException invalidOpEx )
482+ {
483+ stopwatch . Stop ( ) ;
484+ _logger . LogError ( invalidOpEx , "Invalid operation processing lease {LeaseId} for run {RunId}" , lease . LeaseId , lease . RunId ) ;
485+
486+ // Report failure
487+ try
488+ {
489+ var failRequest = new FailRequest
490+ {
491+ LeaseId = lease . LeaseId ,
492+ RunId = lease . RunId ,
493+ NodeId = _options . NodeId ,
494+ ErrorMessage = invalidOpEx . Message ,
495+ ErrorDetails = invalidOpEx . ToString ( ) ,
496+ Retryable = false , // Don't retry invalid operations
497+ Timings = new TimingInfo
498+ {
499+ DurationMs = ( long ) stopwatch . ElapsedMilliseconds
500+ }
501+ } ;
502+
503+ await _leaseClient . FailAsync ( failRequest , cancellationToken : cancellationToken ) ;
504+ }
505+ catch ( RpcException rpcEx )
506+ {
507+ _logger . LogError ( rpcEx , "gRPC error reporting failure for lease {LeaseId}: {StatusCode}" , lease . LeaseId , rpcEx . StatusCode ) ;
508+ }
509+ catch ( Exception failEx )
510+ {
511+ _logger . LogError ( failEx , "Unexpected error reporting failure for lease {LeaseId}" , lease . LeaseId ) ;
512+ }
513+ }
514+ catch ( RpcException ex )
515+ {
516+ stopwatch . Stop ( ) ;
517+ _logger . LogError ( ex , "gRPC error processing lease {LeaseId} for run {RunId}: {StatusCode}" , lease . LeaseId , lease . RunId , ex . StatusCode ) ;
518+
519+ // Report failure
520+ try
521+ {
522+ var failRequest = new FailRequest
523+ {
524+ LeaseId = lease . LeaseId ,
525+ RunId = lease . RunId ,
526+ NodeId = _options . NodeId ,
527+ ErrorMessage = $ "gRPC error: { ex . StatusCode } ",
528+ ErrorDetails = ex . ToString ( ) ,
529+ Retryable = ex . StatusCode == StatusCode . Unavailable || ex . StatusCode == StatusCode . DeadlineExceeded ,
530+ Timings = new TimingInfo
531+ {
532+ DurationMs = ( long ) stopwatch . ElapsedMilliseconds
533+ }
534+ } ;
535+
536+ await _leaseClient . FailAsync ( failRequest , cancellationToken : cancellationToken ) ;
537+ }
538+ catch ( RpcException rpcEx )
539+ {
540+ _logger . LogError ( rpcEx , "gRPC error reporting failure for lease {LeaseId}: {StatusCode}" , lease . LeaseId , rpcEx . StatusCode ) ;
541+ }
542+ catch ( Exception failEx )
543+ {
544+ _logger . LogError ( failEx , "Unexpected error reporting failure for lease {LeaseId}" , lease . LeaseId ) ;
545+ }
546+ }
397547 catch ( Exception ex )
398548 {
399549 stopwatch . Stop ( ) ;
400- _logger . LogError ( ex , "Error processing lease {LeaseId} for run {RunId}" , lease . LeaseId , lease . RunId ) ;
550+ _logger . LogError ( ex , "Unexpected error processing lease {LeaseId} for run {RunId}" , lease . LeaseId , lease . RunId ) ;
401551
402552 // Report failure
403553 try
@@ -418,9 +568,13 @@ private async Task ProcessLeaseAsync(Lease lease, CancellationToken cancellation
418568
419569 await _leaseClient . FailAsync ( failRequest , cancellationToken : cancellationToken ) ;
420570 }
571+ catch ( RpcException rpcEx )
572+ {
573+ _logger . LogError ( rpcEx , "gRPC error reporting failure for lease {LeaseId}: {StatusCode}" , lease . LeaseId , rpcEx . StatusCode ) ;
574+ }
421575 catch ( Exception failEx )
422576 {
423- _logger . LogError ( failEx , "Error reporting failure for lease {LeaseId}" , lease . LeaseId ) ;
577+ _logger . LogError ( failEx , "Unexpected error reporting failure for lease {LeaseId}" , lease . LeaseId ) ;
424578 }
425579 }
426580 finally
0 commit comments