7
7
using KubeOps . Operator . DependencyInjection ;
8
8
using Microsoft . Extensions . DependencyInjection ;
9
9
using Microsoft . Extensions . Logging ;
10
+ using Timer = System . Timers . Timer ;
10
11
11
12
namespace KubeOps . Operator . Watcher
12
13
{
13
14
internal class EntityWatcher < TEntity > : IDisposable
14
15
where TEntity : IKubernetesObject < V1ObjectMeta >
15
16
{
17
+ private const double MaxRetrySeconds = 64 ;
18
+
19
+ private int _errorCount = 0 ;
20
+
16
21
private readonly ILogger < EntityWatcher < TEntity > > _logger ;
22
+ private readonly Random _rnd = new Random ( ) ;
17
23
private CancellationTokenSource ? _cancellation ;
18
24
private Watcher < TEntity > ? _watcher ;
19
25
26
+ private Timer ? _reconnectTimer ;
27
+ private Timer ? _resetErrCountTimer ;
28
+
20
29
private readonly Lazy < IKubernetesClient > _client =
21
30
new Lazy < IKubernetesClient > ( ( ) => DependencyInjector . Services . GetRequiredService < IKubernetesClient > ( ) ) ;
22
31
@@ -51,6 +60,8 @@ public void Dispose()
51
60
WatcherEvent -= ( EventHandler < ( WatchEventType type , TEntity resource ) > ) handler ;
52
61
}
53
62
63
+ _reconnectTimer ? . Dispose ( ) ;
64
+ _resetErrCountTimer ? . Dispose ( ) ;
54
65
_cancellation ? . Dispose ( ) ;
55
66
_watcher ? . Dispose ( ) ;
56
67
_logger . LogTrace ( @"Disposed resource watcher for type ""{type}""." , typeof ( TEntity ) ) ;
@@ -71,6 +82,19 @@ private async Task WatchResource()
71
82
}
72
83
}
73
84
85
+ _resetErrCountTimer = new Timer ( TimeSpan . FromSeconds ( 10 ) . TotalMilliseconds ) ;
86
+ _resetErrCountTimer . Elapsed += ( _ , __ ) =>
87
+ {
88
+ _logger . LogTrace ( "Reset error count in resource watcher." ) ;
89
+ _errorCount = 0 ;
90
+ _resetErrCountTimer . Dispose ( ) ;
91
+ _resetErrCountTimer = null ;
92
+ _reconnectTimer ? . Stop ( ) ;
93
+ _reconnectTimer ? . Dispose ( ) ;
94
+ _reconnectTimer = null ;
95
+ } ;
96
+ _resetErrCountTimer . Start ( ) ;
97
+
74
98
_cancellation = new CancellationTokenSource ( ) ;
75
99
// TODO: namespaced resources
76
100
_watcher = await _client . Value . Watch < TEntity > (
@@ -117,11 +141,19 @@ private void OnWatcherEvent(WatchEventType type, TEntity resource)
117
141
private void OnException ( Exception e )
118
142
{
119
143
_logger . LogError ( e , @"There was an error while watching the resource ""{resource}""." , typeof ( TEntity ) ) ;
120
- // _logger.LogInformation("Trying to reconnect.");
121
- // RestartWatcher();
122
144
_cancellation ? . Cancel ( ) ;
123
145
_watcher ? . Dispose ( ) ;
124
146
_watcher = null ;
147
+
148
+ _logger . LogInformation ( "Trying to reconnect with exponential backoff." ) ;
149
+ _resetErrCountTimer ? . Stop ( ) ;
150
+ _resetErrCountTimer ? . Dispose ( ) ;
151
+ _resetErrCountTimer = null ;
152
+ _reconnectTimer ? . Stop ( ) ;
153
+ _reconnectTimer ? . Dispose ( ) ;
154
+ _reconnectTimer = new Timer ( ExponentialBackoff ( ++ _errorCount ) . TotalMilliseconds ) ;
155
+ _reconnectTimer . Elapsed += ( _ , __ ) => RestartWatcher ( ) ;
156
+ _reconnectTimer . Start ( ) ;
125
157
}
126
158
127
159
private void OnClose ( )
@@ -132,5 +164,9 @@ private void OnClose()
132
164
RestartWatcher ( ) ;
133
165
}
134
166
}
167
+
168
+ private TimeSpan ExponentialBackoff ( int retryCount ) => TimeSpan
169
+ . FromSeconds ( Math . Min ( Math . Pow ( 2 , retryCount ) , MaxRetrySeconds ) )
170
+ . Add ( TimeSpan . FromMilliseconds ( _rnd . Next ( 0 , 1000 ) ) ) ;
135
171
}
136
172
}
0 commit comments