Skip to content

Commit aa073d1

Browse files
authored
fix(watcher): reconnect after server or client timeout (#780)
This fixes #739. This closes #771. Allows the resource watcher to retry the connection until the cancellation token requests a stop. The watcher caches the received entities and checks for their keys in a concurrent dictionary. Recurring "added" events after reconnection should not trigger a reconciliation.
1 parent 1870a01 commit aa073d1

File tree

4 files changed

+122
-73
lines changed

4 files changed

+122
-73
lines changed

examples/Operator/Controller/V1TestEntityController.cs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,13 @@
1010
namespace Operator.Controller;
1111

1212
[EntityRbac(typeof(V1TestEntity), Verbs = RbacVerb.All)]
13-
public class V1TestEntityController(ILogger<V1TestEntityController> logger,
14-
EntityRequeue<V1TestEntity> requeue,
15-
EventPublisher eventPublisher)
13+
public class V1TestEntityController(ILogger<V1TestEntityController> logger)
1614
: IEntityController<V1TestEntity>
1715
{
18-
public async Task ReconcileAsync(V1TestEntity entity, CancellationToken cancellationToken)
16+
public Task ReconcileAsync(V1TestEntity entity, CancellationToken cancellationToken)
1917
{
2018
logger.LogInformation("Reconciling entity {Entity}.", entity);
21-
22-
await eventPublisher(entity, "RECONCILED", "Entity was reconciled.");
23-
24-
requeue(entity, TimeSpan.FromSeconds(5));
19+
return Task.CompletedTask;
2520
}
2621

2722
public Task DeletedAsync(V1TestEntity entity, CancellationToken cancellationToken)

src/KubeOps.KubernetesClient/IKubernetesClient.cs

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public interface IKubernetesClient : IDisposable
4040
/// <item>
4141
/// <description>
4242
/// The fallback secret file if running on the cluster
43-
/// (/var/run/secrets/Kubernetes.io/serviceaccount/namespace)
43+
/// (<c>/var/run/secrets/Kubernetes.io/serviceaccount/namespace</c>)
4444
/// </description>
4545
/// </item>
4646
/// <item>
@@ -129,7 +129,7 @@ IList<TEntity> List<TEntity>(
129129
=> List<TEntity>(@namespace, labelSelectors.ToExpression());
130130

131131
/// <summary>
132-
/// Create or Update a entity. This first fetches the entity from the Kubernetes API
132+
/// Create or Update an entity. This first fetches the entity from the Kubernetes API
133133
/// and if it does exist, updates the entity. Otherwise, the entity is created.
134134
/// </summary>
135135
/// <typeparam name="TEntity">The type of the Kubernetes entity.</typeparam>
@@ -252,7 +252,7 @@ IEnumerable<TEntity> Create<TEntity>(params TEntity[] entities)
252252
=> entities.Select(Create);
253253

254254
/// <summary>
255-
/// Update the given entity on the Kubernetes API.
255+
/// Update (replace) the given entity on the Kubernetes API.
256256
/// </summary>
257257
/// <typeparam name="TEntity">The type of the Kubernetes entity.</typeparam>
258258
/// <param name="entity">The entity instance.</param>
@@ -262,10 +262,10 @@ Task<TEntity> UpdateAsync<TEntity>(TEntity entity, CancellationToken cancellatio
262262
where TEntity : IKubernetesObject<V1ObjectMeta>;
263263

264264
/// <summary>
265-
/// Update a list of entities on the Kubernetes API.
265+
/// Update (replace) a list of entities on the Kubernetes API.
266266
/// </summary>
267267
/// <typeparam name="TEntity">The type of the Kubernetes entity.</typeparam>
268-
/// <param name="entities">An enumerable of entities.</param>
268+
/// <param name="entities">Enumerable of entities.</param>
269269
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
270270
/// <returns>The updated instances of the entities.</returns>
271271
async Task<IEnumerable<TEntity>> UpdateAsync<TEntity>(
@@ -275,10 +275,10 @@ async Task<IEnumerable<TEntity>> UpdateAsync<TEntity>(
275275
=> await Task.WhenAll(entities.Select(entity => UpdateAsync(entity, cancellationToken)));
276276

277277
/// <summary>
278-
/// Update a list of entities on the Kubernetes API.
278+
/// Update (replace) a list of entities on the Kubernetes API.
279279
/// </summary>
280280
/// <typeparam name="TEntity">The type of the Kubernetes entity.</typeparam>
281-
/// <param name="entities">An enumerable of entities.</param>
281+
/// <param name="entities">Enumerable of entities.</param>
282282
/// <remarks>
283283
/// This is invoking the API without any cancellation support. In order to pass a <see cref="CancellationToken"/>,
284284
/// you need to use the <see cref="UpdateAsync{TEntity}(IEnumerable{TEntity},CancellationToken)"/> overload.
@@ -396,9 +396,9 @@ void Delete<TEntity>(string name, string? @namespace = null)
396396
=> DeleteAsync<TEntity>(name, @namespace).GetAwaiter().GetResult();
397397

398398
/// <summary>
399-
/// Create a entity watcher on the Kubernetes API.
399+
/// Create an entity watcher on the Kubernetes API.
400400
/// The entity watcher fires events for entity-events on
401-
/// Kubernetes (events: <see cref="WatchEventType"/>.
401+
/// Kubernetes (events: <see cref="WatchEventType"/>).
402402
/// </summary>
403403
/// <typeparam name="TEntity">The type of the Kubernetes entity.</typeparam>
404404
/// <param name="onEvent">Action that is called when an event occurs.</param>
@@ -409,19 +409,24 @@ void Delete<TEntity>(string name, string? @namespace = null)
409409
/// If the namespace is omitted, all entities on the cluster are watched.
410410
/// </param>
411411
/// <param name="timeout">The timeout which the watcher has (after this timeout, the server will close the connection).</param>
412+
/// <param name="allowWatchBookmarks">
413+
/// Parameter to tell the server to send BOOKMARK events. However, if the server has no implementation or
414+
/// configuration for bookmarks, this flag is ignored.
415+
/// </param>
412416
/// <param name="resourceVersion">
413417
/// When specified with a watch call, shows changes that occur after that particular version of a resource.
414418
/// Defaults to changes from the beginning of history.
415419
/// </param>
416420
/// <param name="cancellationToken">Cancellation-Token.</param>
417421
/// <param name="labelSelectors">A list of label-selectors to apply to the search.</param>
418-
/// <returns>A entity watcher for the given entity.</returns>
422+
/// <returns>An entity watcher for the given entity.</returns>
419423
Watcher<TEntity> Watch<TEntity>(
420424
Action<WatchEventType, TEntity> onEvent,
421425
Action<Exception>? onError = null,
422426
Action? onClose = null,
423427
string? @namespace = null,
424428
TimeSpan? timeout = null,
429+
bool? allowWatchBookmarks = null,
425430
string? resourceVersion = null,
426431
CancellationToken cancellationToken = default,
427432
params LabelSelector[] labelSelectors)
@@ -432,14 +437,15 @@ Watcher<TEntity> Watch<TEntity>(
432437
onClose,
433438
@namespace,
434439
timeout,
440+
allowWatchBookmarks,
435441
resourceVersion,
436442
labelSelectors.ToExpression(),
437443
cancellationToken);
438444

439445
/// <summary>
440-
/// Create a entity watcher on the Kubernetes API.
446+
/// Create an entity watcher on the Kubernetes API.
441447
/// The entity watcher fires events for entity-events on
442-
/// Kubernetes (events: <see cref="WatchEventType"/>.
448+
/// Kubernetes (events: <see cref="WatchEventType"/>).
443449
/// </summary>
444450
/// <typeparam name="TEntity">The type of the Kubernetes entity.</typeparam>
445451
/// <param name="onEvent">Action that is called when an event occurs.</param>
@@ -450,19 +456,24 @@ Watcher<TEntity> Watch<TEntity>(
450456
/// If the namespace is omitted, all entities on the cluster are watched.
451457
/// </param>
452458
/// <param name="timeout">The timeout which the watcher has (after this timeout, the server will close the connection).</param>
459+
/// <param name="allowWatchBookmarks">
460+
/// Parameter to tell the server to send BOOKMARK events. However, if the server has no implementation or
461+
/// configuration for bookmarks, this flag is ignored.
462+
/// </param>
453463
/// <param name="resourceVersion">
454464
/// When specified with a watch call, shows changes that occur after that particular version of a resource.
455465
/// Defaults to changes from the beginning of history.
456466
/// </param>
457467
/// <param name="labelSelector">A string, representing an optional label selector for filtering watched objects.</param>
458468
/// <param name="cancellationToken">Cancellation-Token.</param>
459-
/// <returns>A entity watcher for the given entity.</returns>
469+
/// <returns>An entity watcher for the given entity.</returns>
460470
Watcher<TEntity> Watch<TEntity>(
461471
Action<WatchEventType, TEntity> onEvent,
462472
Action<Exception>? onError = null,
463473
Action? onClose = null,
464474
string? @namespace = null,
465475
TimeSpan? timeout = null,
476+
bool? allowWatchBookmarks = null,
466477
string? resourceVersion = null,
467478
string? labelSelector = null,
468479
CancellationToken cancellationToken = default)
@@ -480,13 +491,18 @@ Watcher<TEntity> Watch<TEntity>(
480491
/// Defaults to changes from the beginning of history.
481492
/// </param>
482493
/// <param name="labelSelector">A string, representing an optional label selector for filtering watched objects.</param>
494+
/// <param name="allowWatchBookmarks">
495+
/// Parameter to tell the server to send BOOKMARK events. However, if the server has no implementation or
496+
/// configuration for bookmarks, this flag is ignored.
497+
/// </param>
483498
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
484499
/// <typeparam name="TEntity">The type of the Kubernetes entity.</typeparam>
485500
/// <returns>An asynchronous enumerable that finishes once <paramref name="cancellationToken"/> is cancelled.</returns>
486501
IAsyncEnumerable<(WatchEventType Type, TEntity Entity)> WatchAsync<TEntity>(
487502
string? @namespace = null,
488503
string? resourceVersion = null,
489504
string? labelSelector = null,
505+
bool? allowWatchBookmarks = null,
490506
CancellationToken cancellationToken = default)
491507
where TEntity : IKubernetesObject<V1ObjectMeta>;
492508
}

0 commit comments

Comments
 (0)