Skip to content

Commit 1ef82b2

Browse files
author
Christoph Bühler
committed
feat(operator): reworked entity requeue logic
BREAKING CHANGE: controllers do not have return values anymore. To requeue an entity, use the `EntityRequeue<_>` delegate. When an entity is requeued, the reconcile loop is called after the timeout. If - during this timeout - the entity is modified or deleted, the timeout will be cancelled.
1 parent 7bd82c8 commit 1ef82b2

18 files changed

+580
-59
lines changed

examples/Operator/Controller/V1TestEntityController.cs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using KubeOps.Abstractions.Controller;
22
using KubeOps.Abstractions.Finalizer;
3+
using KubeOps.Abstractions.Queue;
34
using KubeOps.Abstractions.Rbac;
45
using KubeOps.KubernetesClient;
56

@@ -15,32 +16,26 @@ public class V1TestEntityController : IEntityController<V1TestEntity>
1516
{
1617
private readonly ILogger<V1TestEntityController> _logger;
1718
private readonly IKubernetesClient<V1TestEntity> _client;
18-
private readonly EntityFinalizerAttacher<FinalizerOne, V1TestEntity> _finalizer1;
19-
private readonly EntityFinalizerAttacher<FinalizerTwo, V1TestEntity> _finalizer2;
19+
private readonly EntityRequeue<V1TestEntity> _requeue;
2020

2121
public V1TestEntityController(
2222
ILogger<V1TestEntityController> logger,
2323
IKubernetesClient<V1TestEntity> client,
24-
EntityFinalizerAttacher<FinalizerOne, V1TestEntity> finalizer1,
25-
EntityFinalizerAttacher<FinalizerTwo, V1TestEntity> finalizer2)
24+
EntityRequeue<V1TestEntity> requeue)
2625
{
2726
_logger = logger;
2827
_client = client;
29-
_finalizer1 = finalizer1;
30-
_finalizer2 = finalizer2;
28+
_requeue = requeue;
3129
}
3230

3331
public async Task ReconcileAsync(V1TestEntity entity)
3432
{
3533
_logger.LogInformation("Reconciling entity {Entity}.", entity);
3634

37-
entity = await _finalizer1(entity);
38-
entity = await _finalizer2(entity);
39-
40-
entity.Status.Status = "Reconciling";
41-
entity = await _client.UpdateStatusAsync(entity);
4235
entity.Status.Status = "Reconciled";
4336
await _client.UpdateStatusAsync(entity);
37+
38+
_requeue(entity, TimeSpan.FromSeconds(5));
4439
}
4540

4641
public Task DeletedAsync(V1TestEntity entity)

examples/Operator/todos.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
todo:
22
- events
33
- leadership election
4-
- requeue
54
- build targets
65
- other CLI commands
76
- error handling

src/KubeOps.Abstractions/Controller/IEntityController{TEntity}.cs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,35 @@
44
namespace KubeOps.Abstractions.Controller;
55

66
/// <summary>
7-
/// Generic entity controller interface.
7+
/// Generic entity controller. The controller manages the reconcile loop
8+
/// for a given entity type.
89
/// </summary>
910
/// <typeparam name="TEntity">The type of the Kubernetes entity.</typeparam>
11+
/// <example>
12+
/// Simple example controller that just logs the entity.
13+
/// <code>
14+
/// public class V1TestEntityController : IEntityController&lt;V1TestEntity&gt;
15+
/// {
16+
/// private readonly ILogger&lt;V1TestEntityController&gt; _logger;
17+
///
18+
/// public V1TestEntityController(
19+
/// ILogger&lt;V1TestEntityController&gt; logger)
20+
/// {
21+
/// _logger = logger;
22+
/// }
23+
///
24+
/// public async Task ReconcileAsync(V1TestEntity entity)
25+
/// {
26+
/// _logger.LogInformation("Reconciling entity {Entity}.", entity);
27+
/// }
28+
///
29+
/// public async Task DeletedAsync(V1TestEntity entity)
30+
/// {
31+
/// _logger.LogInformation("Deleting entity {Entity}.", entity);
32+
/// }
33+
/// }
34+
/// </code>
35+
/// </example>
1036
public interface IEntityController<in TEntity>
1137
where TEntity : IKubernetesObject<V1ObjectMeta>
1238
{

src/KubeOps.Abstractions/Finalizer/EntityFinalizerAttacher.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@ namespace KubeOps.Abstractions.Finalizer;
1414
/// modification and Kubernetes client interactions, since the resource version
1515
/// is updated each time the entity is modified.
1616
/// </para>
17+
/// <para>
18+
/// Note that the operator needs RBAC access to modify the list of
19+
/// finalizers on the entity.
20+
/// </para>
1721
/// </summary>
1822
/// <typeparam name="TImplementation">The type of the entity finalizer.</typeparam>
1923
/// <typeparam name="TEntity">The type of the Kubernetes entity.</typeparam>
2024
/// <param name="entity">The instance of the entity, that the finalizer is attached if needed.</param>
2125
/// <returns>A <see cref="Task"/> that resolves when the finalizer was attached.</returns>
2226
/// <example>
27+
/// Use the finalizer delegate to attach the "FinalizerOne" to the entity as soon
28+
/// as the entity gets reconciled.
2329
/// <code>
2430
/// [EntityRbac(typeof(V1TestEntity), Verbs = RbacVerb.All)]
2531
/// public class V1TestEntityController : IEntityController&lt;V1TestEntity&gt;

src/KubeOps.Abstractions/Finalizer/IEntityFinalizer{TEntity}.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,10 @@ public interface IEntityFinalizer<in TEntity>
1515
/// </summary>
1616
/// <param name="entity">The kubernetes entity that needs to be finalized.</param>
1717
/// <returns>A task that resolves when the operation is done.</returns>
18+
#if NETSTANDARD2_0
1819
Task FinalizeAsync(TEntity entity);
20+
#else
21+
Task FinalizeAsync(TEntity entity) =>
22+
Task.CompletedTask;
23+
#endif
1924
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
using k8s;
2+
using k8s.Models;
3+
4+
namespace KubeOps.Abstractions.Queue;
5+
6+
/// <summary>
7+
/// <para>Injectable delegate for requeueing entities.</para>
8+
/// <para>
9+
/// Use this delegate when you need to pro-actively reconcile an entity after a
10+
/// certain amount of time. This is useful if you want to check your entities
11+
/// periodically.
12+
/// </para>
13+
/// <para>
14+
/// After the timeout is reached, the entity is fetched
15+
/// from the API and passed to the controller for reconciliation.
16+
/// If the entity was deleted in the meantime, the controller will not be called.
17+
/// </para>
18+
/// <para>
19+
/// If the entity gets modified while the timeout is running, the timer
20+
/// is canceled and restarted, if another requeue is requested.
21+
/// </para>
22+
/// </summary>
23+
/// <typeparam name="TEntity">The type of the entity.</typeparam>
24+
/// <param name="entity">The instance of the entity that should be requeued.</param>
25+
/// <param name="requeueIn">The time to wait before another reconcile loop is fired.</param>
26+
/// <example>
27+
/// Use the requeue delegate to repeatedly reconcile an entity after 5 seconds.
28+
/// <code>
29+
/// [EntityRbac(typeof(V1TestEntity), Verbs = RbacVerb.All)]
30+
/// public class V1TestEntityController : IEntityController&lt;V1TestEntity&gt;
31+
/// {
32+
/// private readonly EntityRequeue&lt;V1TestEntity&gt; _requeue;
33+
///
34+
/// public V1TestEntityController(EntityRequeue&lt;V1TestEntity&gt; requeue)
35+
/// {
36+
/// _requeue = requeue;
37+
/// }
38+
///
39+
/// public async Task ReconcileAsync(V1TestEntity entity)
40+
/// {
41+
/// _requeue(entity, TimeSpan.FromSeconds(5));
42+
/// }
43+
/// }
44+
/// </code>
45+
/// </example>
46+
public delegate void EntityRequeue<TEntity>(TEntity entity, TimeSpan requeueIn)
47+
where TEntity : IKubernetesObject<V1ObjectMeta>;

src/KubeOps.Abstractions/Rbac/EntityRbacAttribute.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@
55
/// Attach this attribute to a controller with the type reference to
66
/// a custom entity to define rbac needs for this given type(s).
77
/// </summary>
8-
/// <example>[EntityRbac(typeof(V1TestEntity), Verbs = RbacVerb.All)].</example>
8+
/// <example>
9+
/// Allow the operator "ALL" access to the V1TestEntity.
10+
/// <code>
11+
/// [EntityRbac(typeof(V1TestEntity), Verbs = RbacVerb.All)]
12+
/// </code>
13+
/// </example>
914
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
1015
public class EntityRbacAttribute : RbacAttribute
1116
{

src/KubeOps.Abstractions/Rbac/GenericRbacAttribute.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class GenericRbacAttribute : RbacAttribute
2121
#if NETSTANDARD
2222
public string[] Groups { get; set; } = { };
2323
#else
24-
public string[] Groups { get; init; } = { };
24+
public string[] Groups { get; init; } = Array.Empty<string>();
2525
#endif
2626

2727
/// <summary>
@@ -34,7 +34,7 @@ public class GenericRbacAttribute : RbacAttribute
3434
#if NETSTANDARD
3535
public string[] Resources { get; set; } = { };
3636
#else
37-
public string[] Resources { get; init; } = { };
37+
public string[] Resources { get; init; } = Array.Empty<string>();
3838
#endif
3939

4040
/// <summary>
@@ -43,7 +43,7 @@ public class GenericRbacAttribute : RbacAttribute
4343
#if NETSTANDARD
4444
public string[] Urls { get; set; } = { };
4545
#else
46-
public string[] Urls { get; init; } = { };
46+
public string[] Urls { get; init; } = Array.Empty<string>();
4747
#endif
4848

4949
/// <summary>

src/KubeOps.Operator/Builder/OperatorBuilder.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
using KubeOps.Abstractions.Controller;
66
using KubeOps.Abstractions.Entities;
77
using KubeOps.Abstractions.Finalizer;
8+
using KubeOps.Abstractions.Queue;
89
using KubeOps.KubernetesClient;
910
using KubeOps.Operator.Finalizer;
11+
using KubeOps.Operator.Queue;
1012
using KubeOps.Operator.Watcher;
1113

1214
using Microsoft.Extensions.DependencyInjection;
@@ -36,6 +38,20 @@ public IOperatorBuilder AddController<TImplementation, TEntity>()
3638
{
3739
Services.AddScoped<IEntityController<TEntity>, TImplementation>();
3840
Services.AddHostedService<ResourceWatcher<TEntity>>();
41+
Services.AddSingleton(new TimedEntityQueue<TEntity>());
42+
Services.AddTransient<EntityRequeue<TEntity>>(services => (entity, timespan) =>
43+
{
44+
var logger = services.GetService<ILogger<EntityRequeue<TEntity>>>();
45+
var queue = services.GetRequiredService<TimedEntityQueue<TEntity>>();
46+
47+
logger?.LogTrace(
48+
"""Requeue entity "{kind}/{name}" in {milliseconds}ms.""",
49+
entity.Kind,
50+
entity.Name(),
51+
timespan.TotalMilliseconds);
52+
53+
queue.Enqueue(entity, timespan);
54+
});
3955

4056
return this;
4157
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
using System.Collections.Concurrent;
2+
3+
using k8s;
4+
using k8s.Models;
5+
6+
using Timer = System.Timers.Timer;
7+
8+
namespace KubeOps.Operator.Queue;
9+
10+
internal class TimedEntityQueue<TEntity>
11+
where TEntity : IKubernetesObject<V1ObjectMeta>
12+
{
13+
private readonly ConcurrentDictionary<string, (string Name, string? Namespace, Timer Timer)> _queue = new();
14+
15+
public event EventHandler<(string Name, string? Namespace)>? RequeueRequested;
16+
17+
internal int Count => _queue.Count;
18+
19+
public void Clear()
20+
{
21+
foreach (var (_, _, timer) in _queue.Values)
22+
{
23+
timer.Stop();
24+
}
25+
26+
_queue.Clear();
27+
}
28+
29+
public void Enqueue(TEntity entity, TimeSpan requeueIn)
30+
{
31+
var (_, _, timer) =
32+
_queue.AddOrUpdate(
33+
entity.Uid(),
34+
(entity.Name(), entity.Namespace(), new Timer(requeueIn.TotalMilliseconds)),
35+
(_, e) =>
36+
{
37+
e.Timer.Stop();
38+
e.Timer.Dispose();
39+
return (e.Name, e.Namespace, new Timer(requeueIn.TotalMilliseconds));
40+
});
41+
42+
timer.Elapsed += (_, _) =>
43+
{
44+
if (!_queue.TryRemove(entity.Metadata.Uid, out var e))
45+
{
46+
return;
47+
}
48+
49+
e.Timer.Stop();
50+
e.Timer.Dispose();
51+
RequeueRequested?.Invoke(this, (e.Name, e.Namespace));
52+
};
53+
timer.Start();
54+
}
55+
56+
public void RemoveIfQueued(TEntity entity)
57+
{
58+
if (_queue.TryRemove(entity.Uid(), out var entry))
59+
{
60+
entry.Timer.Stop();
61+
}
62+
}
63+
}

0 commit comments

Comments
 (0)