19
19
20
20
namespace KubeOps . Operator . Watcher ;
21
21
22
- internal class ResourceWatcher < TEntity > (
22
+ public class ResourceWatcher < TEntity > (
23
23
ILogger < ResourceWatcher < TEntity > > logger ,
24
24
IServiceProvider provider ,
25
25
TimedEntityQueue < TEntity > requeue ,
@@ -127,6 +127,64 @@ static async ValueTask CastAndDispose(IDisposable resource)
127
127
}
128
128
}
129
129
130
+ protected virtual async Task OnEventAsync ( WatchEventType type , TEntity entity , CancellationToken cancellationToken )
131
+ {
132
+ switch ( type )
133
+ {
134
+ case WatchEventType . Added :
135
+ if ( _entityCache . TryAdd ( entity . Uid ( ) , entity . Generation ( ) ?? 0 ) )
136
+ {
137
+ // Only perform reconciliation if the entity was not already in the cache.
138
+ await ReconcileModificationAsync ( entity , cancellationToken ) ;
139
+ }
140
+ else
141
+ {
142
+ logger . LogDebug (
143
+ """Received ADDED event for entity "{Kind}/{Name}" which was already in the cache. Skip event.""" ,
144
+ entity . Kind ,
145
+ entity . Name ( ) ) ;
146
+ }
147
+
148
+ break ;
149
+ case WatchEventType . Modified :
150
+ switch ( entity )
151
+ {
152
+ case { Metadata . DeletionTimestamp : null } :
153
+ _entityCache . TryGetValue ( entity . Uid ( ) , out var cachedGeneration ) ;
154
+
155
+ // Check if entity spec has changed through "Generation" value increment. Skip reconcile if not changed.
156
+ if ( entity . Generation ( ) <= cachedGeneration )
157
+ {
158
+ logger . LogDebug (
159
+ """Entity "{Kind}/{Name}" modification did not modify generation. Skip event.""" ,
160
+ entity . Kind ,
161
+ entity . Name ( ) ) ;
162
+ return ;
163
+ }
164
+
165
+ // update cached generation since generation now changed
166
+ _entityCache . TryUpdate ( entity . Uid ( ) , entity . Generation ( ) ?? 1 , cachedGeneration ) ;
167
+ await ReconcileModificationAsync ( entity , cancellationToken ) ;
168
+ break ;
169
+ case { Metadata : { DeletionTimestamp : not null , Finalizers . Count : > 0 } } :
170
+ await ReconcileFinalizersSequentialAsync ( entity , cancellationToken ) ;
171
+ break ;
172
+ }
173
+
174
+ break ;
175
+ case WatchEventType . Deleted :
176
+ await ReconcileDeletionAsync ( entity , cancellationToken ) ;
177
+ break ;
178
+ default :
179
+ logger . LogWarning (
180
+ """Received unsupported event "{EventType}" for "{Kind}/{Name}".""" ,
181
+ type ,
182
+ entity . Kind ,
183
+ entity . Name ( ) ) ;
184
+ break ;
185
+ }
186
+ }
187
+
130
188
private async Task WatchClientEventsAsync ( CancellationToken stoppingToken )
131
189
{
132
190
string ? currentVersion = null ;
@@ -149,6 +207,7 @@ private async Task WatchClientEventsAsync(CancellationToken stoppingToken)
149
207
150
208
// ReSharper disable once RedundantAnonymousTypePropertyName
151
209
Kind = entity . Kind ,
210
+ Namespace = entity . Namespace ( ) ,
152
211
Name = entity . Name ( ) ,
153
212
ResourceVersion = entity . ResourceVersion ( ) ,
154
213
} ) ;
@@ -221,64 +280,6 @@ void LogReconciliationFailed(Exception exception)
221
280
}
222
281
}
223
282
224
- private async Task OnEventAsync ( WatchEventType type , TEntity entity , CancellationToken cancellationToken )
225
- {
226
- switch ( type )
227
- {
228
- case WatchEventType . Added :
229
- if ( _entityCache . TryAdd ( entity . Uid ( ) , entity . Generation ( ) ?? 0 ) )
230
- {
231
- // Only perform reconciliation if the entity was not already in the cache.
232
- await ReconcileModificationAsync ( entity , cancellationToken ) ;
233
- }
234
- else
235
- {
236
- logger . LogDebug (
237
- """Received ADDED event for entity "{Kind}/{Name}" which was already in the cache. Skip event.""" ,
238
- entity . Kind ,
239
- entity . Name ( ) ) ;
240
- }
241
-
242
- break ;
243
- case WatchEventType . Modified :
244
- switch ( entity )
245
- {
246
- case { Metadata . DeletionTimestamp : null } :
247
- _entityCache . TryGetValue ( entity . Uid ( ) , out var cachedGeneration ) ;
248
-
249
- // Check if entity spec has changed through "Generation" value increment. Skip reconcile if not changed.
250
- if ( entity . Generation ( ) <= cachedGeneration )
251
- {
252
- logger . LogDebug (
253
- """Entity "{Kind}/{Name}" modification did not modify generation. Skip event.""" ,
254
- entity . Kind ,
255
- entity . Name ( ) ) ;
256
- return ;
257
- }
258
-
259
- // update cached generation since generation now changed
260
- _entityCache . TryUpdate ( entity . Uid ( ) , entity . Generation ( ) ?? 1 , cachedGeneration ) ;
261
- await ReconcileModificationAsync ( entity , cancellationToken ) ;
262
- break ;
263
- case { Metadata : { DeletionTimestamp : not null , Finalizers . Count : > 0 } } :
264
- await ReconcileFinalizersSequentialAsync ( entity , cancellationToken ) ;
265
- break ;
266
- }
267
-
268
- break ;
269
- case WatchEventType . Deleted :
270
- await ReconcileDeletionAsync ( entity ! , cancellationToken ) ;
271
- break ;
272
- default :
273
- logger . LogWarning (
274
- """Received unsupported event "{EventType}" for "{Kind}/{Name}".""" ,
275
- type ,
276
- entity . Kind ,
277
- entity . Name ( ) ) ;
278
- break ;
279
- }
280
- }
281
-
282
283
private async Task OnWatchErrorAsync ( Exception e )
283
284
{
284
285
switch ( e )
0 commit comments