@@ -29,6 +29,7 @@ internal class ResourceWatcher<TEntity> : IHostedService
29
29
private readonly ConcurrentDictionary < string , long > _entityCache = new ( ) ;
30
30
private readonly Lazy < List < FinalizerRegistration > > _finalizers ;
31
31
private bool _stopped ;
32
+ private uint _watcherReconnectRetries ;
32
33
33
34
private Watcher < TEntity > ? _watcher ;
34
35
@@ -81,21 +82,14 @@ private void WatchResource()
81
82
}
82
83
}
83
84
85
+ _logger . LogDebug ( """Create watcher for entity of type "{type}".""" , typeof ( TEntity ) ) ;
84
86
_watcher = _client . Watch ( OnEvent , OnError , OnClosed , @namespace : _settings . Namespace ) ;
85
87
}
86
88
87
89
private void StopWatching ( )
88
90
{
89
91
_watcher ? . Dispose ( ) ;
90
- }
91
-
92
- private void OnClosed ( )
93
- {
94
- _logger . LogDebug ( "The server closed the connection." ) ;
95
- if ( ! _stopped )
96
- {
97
- WatchResource ( ) ;
98
- }
92
+ _watcher = null ;
99
93
}
100
94
101
95
private async void OnEntityRequeue ( object ? sender , ( string Name , string ? Namespace ) queued )
@@ -116,7 +110,7 @@ private async void OnEntityRequeue(object? sender, (string Name, string? Namespa
116
110
await ReconcileModification ( entity ) ;
117
111
}
118
112
119
- private void OnError ( Exception e )
113
+ private async void OnError ( Exception e )
120
114
{
121
115
switch ( e )
122
116
{
@@ -138,11 +132,34 @@ e.InnerException is EndOfStreamException &&
138
132
}
139
133
140
134
_logger . LogError ( e , """There was an error while watching the resource "{resource}".""" , typeof ( TEntity ) ) ;
135
+ StopWatching ( ) ;
136
+ _watcherReconnectRetries ++ ;
137
+
138
+ var delay = TimeSpan
139
+ . FromSeconds ( Math . Pow ( 2 , Math . Clamp ( _watcherReconnectRetries , 0 , 5 ) ) )
140
+ . Add ( TimeSpan . FromMilliseconds ( new Random ( ) . Next ( 0 , 1000 ) ) ) ;
141
+ _logger . LogWarning (
142
+ "There were {retries} errors / retries in the watcher. Wait {seconds}s before next attempt to connect." ,
143
+ _watcherReconnectRetries ,
144
+ delay . TotalSeconds ) ;
145
+ await Task . Delay ( delay ) ;
146
+
141
147
WatchResource ( ) ;
142
148
}
143
149
150
+ private void OnClosed ( )
151
+ {
152
+ _logger . LogDebug ( "The watcher was closed." ) ;
153
+ if ( ! _stopped && _watcherReconnectRetries == 0 )
154
+ {
155
+ WatchResource ( ) ;
156
+ }
157
+ }
158
+
144
159
private async void OnEvent ( WatchEventType type , TEntity entity )
145
160
{
161
+ _watcherReconnectRetries = 0 ;
162
+
146
163
_logger . LogTrace (
147
164
"""Received watch event "{eventType}" for "{kind}/{name}".""" ,
148
165
type ,
0 commit comments