Skip to content

Commit b8d7827

Browse files
authored
feat(operator): add leader election via KubernetesClient (#627)
1 parent 8e07bc6 commit b8d7827

File tree

11 files changed

+285
-26
lines changed

11 files changed

+285
-26
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
using KubeOps.Abstractions.Controller;
2+
using KubeOps.Abstractions.Events;
3+
using KubeOps.Abstractions.Finalizer;
4+
using KubeOps.Abstractions.Queue;
5+
using KubeOps.Abstractions.Rbac;
6+
using KubeOps.KubernetesClient;
7+
8+
using Microsoft.Extensions.Logging;
9+
10+
using Operator.Entities;
11+
using Operator.Finalizer;
12+
13+
namespace Operator.Controller;
14+
15+
[EntityRbac(typeof(V1SecondEntity), Verbs = RbacVerb.All)]
16+
public class V1SecondEntityController : IEntityController<V1SecondEntity>
17+
{
18+
private readonly ILogger<V1SecondEntityController> _logger;
19+
20+
public V1SecondEntityController(
21+
ILogger<V1SecondEntityController> logger)
22+
{
23+
_logger = logger;
24+
}
25+
26+
public async Task ReconcileAsync(V1SecondEntity entity)
27+
{
28+
_logger.LogInformation("Reconciling entity {Entity}.", entity);
29+
}
30+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using k8s.Models;
2+
3+
using KubeOps.Abstractions.Entities;
4+
5+
namespace Operator.Entities;
6+
7+
[KubernetesEntity(Group = "testing.dev", ApiVersion = "v1", Kind = "SecondEntity")]
8+
public partial class V1SecondEntity : CustomKubernetesEntity
9+
{
10+
public override string ToString() => $"Second Entity ({Metadata.Name})";
11+
}

examples/Operator/todos.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
todo:
2-
- leadership election
32
- build targets
43
- other CLI commands
54
- error handling
5+
- namespaced operator
66
- web: webhooks
77
- docs
88
- try .net 8 AOT?

src/KubeOps.Abstractions/Builder/OperatorSettings.cs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,27 +31,32 @@ public sealed class OperatorSettings
3131

3232
/// <summary>
3333
/// <para>
34-
/// Defines if the leader elector should run. You may disable this,
35-
/// if you don't intend to run your operator multiple times.
34+
/// Whether the leader elector should run. You should enable
35+
/// this if you plan to run the operator redundantly.
3636
/// </para>
3737
/// <para>
38-
/// If this is disabled, and an operator runs in multiple instance
39-
/// (in the same namespace) it can lead to a "split brain" problem.
38+
/// If this is disabled and an operator runs in multiple instances
39+
/// (in the same namespace), it can lead to a "split brain" problem.
4040
/// </para>
4141
/// <para>
42-
/// This could be disabled when developing locally.
42+
/// Defaults to `false`.
4343
/// </para>
4444
/// </summary>
45-
public bool EnableLeaderElection { get; set; } = true;
45+
public bool EnableLeaderElection { get; set; }
4646

4747
/// <summary>
48-
/// The interval in seconds in which this particular instance of the operator
49-
/// will check for leader election.
48+
/// Defines how long one lease is valid for any leader.
49+
/// Defaults to 15 seconds.
5050
/// </summary>
51-
public ushort LeaderElectionCheckInterval { get; set; } = 15;
51+
public TimeSpan LeaderElectionLeaseDuration { get; set; } = TimeSpan.FromSeconds(15);
5252

5353
/// <summary>
54-
/// The duration in seconds in which the leader lease is valid.
54+
/// When the leader elector tries to refresh the leadership lease.
5555
/// </summary>
56-
public ushort LeaderElectionLeaseDuration { get; set; } = 30;
56+
public TimeSpan LeaderElectionRenewDeadline { get; set; } = TimeSpan.FromSeconds(10);
57+
58+
/// <summary>
59+
/// The wait timeout if the lease cannot be acquired.
60+
/// </summary>
61+
public TimeSpan LeaderElectionRetryPeriod { get; set; } = TimeSpan.FromSeconds(2);
5762
}

src/KubeOps.Operator/Builder/OperatorBuilder.cs

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
using System.Text;
33

44
using k8s;
5+
using k8s.LeaderElection;
6+
using k8s.LeaderElection.ResourceLock;
57
using k8s.Models;
68

79
using KubeOps.Abstractions.Builder;
@@ -22,13 +24,13 @@ namespace KubeOps.Operator.Builder;
2224

2325
internal class OperatorBuilder : IOperatorBuilder
2426
{
27+
private readonly OperatorSettings _settings;
28+
2529
public OperatorBuilder(IServiceCollection services, OperatorSettings settings)
2630
{
31+
_settings = settings;
2732
Services = services;
28-
Services.AddSingleton(settings);
29-
Services.AddTransient<IKubernetesClient<Corev1Event>>(_ => new KubernetesClient<Corev1Event>(new(
30-
Corev1Event.KubeKind, Corev1Event.KubeApiVersion, Plural: Corev1Event.KubePluralName)));
31-
Services.AddTransient(CreateEventPublisher());
33+
AddOperatorBase();
3234
}
3335

3436
public IServiceCollection Services { get; }
@@ -45,10 +47,18 @@ public IOperatorBuilder AddController<TImplementation, TEntity>()
4547
where TEntity : IKubernetesObject<V1ObjectMeta>
4648
{
4749
Services.AddScoped<IEntityController<TEntity>, TImplementation>();
48-
Services.AddHostedService<ResourceWatcher<TEntity>>();
4950
Services.AddSingleton(new TimedEntityQueue<TEntity>());
5051
Services.AddTransient(CreateEntityRequeue<TEntity>());
5152

53+
if (_settings.EnableLeaderElection)
54+
{
55+
Services.AddHostedService<LeaderAwareResourceWatcher<TEntity>>();
56+
}
57+
else
58+
{
59+
Services.AddHostedService<ResourceWatcher<TEntity>>();
60+
}
61+
5262
return this;
5363
}
5464

@@ -99,7 +109,7 @@ private static Func<IServiceProvider, EntityFinalizerAttacher<TImplementation, T
99109

100110
private static Func<IServiceProvider, EntityRequeue<TEntity>> CreateEntityRequeue<TEntity>()
101111
where TEntity : IKubernetesObject<V1ObjectMeta>
102-
=> services => (entity, timespan) =>
112+
=> services => (entity, timeSpan) =>
103113
{
104114
var logger = services.GetService<ILogger<EntityRequeue<TEntity>>>();
105115
var queue = services.GetRequiredService<TimedEntityQueue<TEntity>>();
@@ -108,9 +118,9 @@ private static Func<IServiceProvider, EntityRequeue<TEntity>> CreateEntityRequeu
108118
"""Requeue entity "{kind}/{name}" in {milliseconds}ms.""",
109119
entity.Kind,
110120
entity.Name(),
111-
timespan.TotalMilliseconds);
121+
timeSpan.TotalMilliseconds);
112122

113-
queue.Enqueue(entity, timespan);
123+
queue.Enqueue(entity, timeSpan);
114124
};
115125

116126
private static Func<IServiceProvider, EventPublisher> CreateEventPublisher()
@@ -192,4 +202,33 @@ private static Func<IServiceProvider, EventPublisher> CreateEventPublisher()
192202
entity.Name());
193203
}
194204
};
205+
206+
private void AddOperatorBase()
207+
{
208+
Services.AddSingleton(_settings);
209+
Services.AddTransient<IKubernetesClient<Corev1Event>>(_ => new KubernetesClient<Corev1Event>(new(
210+
Corev1Event.KubeKind, Corev1Event.KubeApiVersion, Plural: Corev1Event.KubePluralName)));
211+
Services.AddTransient(CreateEventPublisher());
212+
213+
if (_settings.EnableLeaderElection)
214+
{
215+
using var client = new KubernetesClient<Corev1Event>(new(
216+
Corev1Event.KubeKind, Corev1Event.KubeApiVersion, Plural: Corev1Event.KubePluralName));
217+
218+
var elector = new LeaderElector(
219+
new LeaderElectionConfig(
220+
new LeaseLock(
221+
new Kubernetes(KubernetesClientConfiguration.BuildDefaultConfig()),
222+
client.GetCurrentNamespace(),
223+
$"{_settings.Name}-leader",
224+
Environment.MachineName))
225+
{
226+
LeaseDuration = _settings.LeaderElectionLeaseDuration,
227+
RenewDeadline = _settings.LeaderElectionRenewDeadline,
228+
RetryPeriod = _settings.LeaderElectionRetryPeriod,
229+
});
230+
Services.AddSingleton(elector);
231+
elector.RunAsync();
232+
}
233+
}
195234
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
using k8s;
2+
using k8s.LeaderElection;
3+
using k8s.Models;
4+
5+
using KubeOps.KubernetesClient;
6+
using KubeOps.Operator.Queue;
7+
8+
using Microsoft.Extensions.Logging;
9+
10+
namespace KubeOps.Operator.Watcher;
11+
12+
internal class LeaderAwareResourceWatcher<TEntity> : ResourceWatcher<TEntity>
13+
where TEntity : IKubernetesObject<V1ObjectMeta>
14+
{
15+
private readonly ILogger<LeaderAwareResourceWatcher<TEntity>> _logger;
16+
private readonly LeaderElector _elector;
17+
18+
public LeaderAwareResourceWatcher(
19+
ILogger<LeaderAwareResourceWatcher<TEntity>> logger,
20+
IServiceProvider provider,
21+
IKubernetesClient<TEntity> client,
22+
TimedEntityQueue<TEntity> queue,
23+
LeaderElector elector)
24+
: base(logger, provider, client, queue)
25+
{
26+
_logger = logger;
27+
_elector = elector;
28+
}
29+
30+
public override Task StartAsync(CancellationToken cancellationToken)
31+
{
32+
_logger.LogDebug("Subscribe for leadership updates.");
33+
_elector.OnStartedLeading += StartedLeading;
34+
_elector.OnStoppedLeading += StoppedLeading;
35+
if (_elector.IsLeader())
36+
{
37+
StartedLeading();
38+
}
39+
40+
return Task.CompletedTask;
41+
}
42+
43+
public override Task StopAsync(CancellationToken cancellationToken)
44+
{
45+
_logger.LogDebug("Unsubscribe from leadership updates.");
46+
_elector.OnStartedLeading -= StartedLeading;
47+
_elector.OnStoppedLeading -= StoppedLeading;
48+
return Task.CompletedTask;
49+
}
50+
51+
private void StartedLeading()
52+
{
53+
_logger.LogInformation("This instance started leading, starting watcher.");
54+
base.StartAsync(default);
55+
}
56+
57+
private void StoppedLeading()
58+
{
59+
_logger.LogInformation("This instance stopped leading, stopping watcher.");
60+
base.StopAsync(default);
61+
}
62+
}

src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ internal class ResourceWatcher<TEntity> : IHostedService
2626
private readonly TimedEntityQueue<TEntity> _queue;
2727
private readonly ConcurrentDictionary<string, long> _entityCache = new();
2828
private readonly Lazy<List<FinalizerRegistration>> _finalizers;
29+
private bool _stopped;
2930

3031
private Watcher<TEntity>? _watcher;
3132

@@ -42,17 +43,19 @@ public ResourceWatcher(
4243
_finalizers = new(() => _provider.GetServices<FinalizerRegistration>().ToList());
4344
}
4445

45-
public Task StartAsync(CancellationToken cancellationToken)
46+
public virtual Task StartAsync(CancellationToken cancellationToken)
4647
{
4748
_logger.LogInformation("Starting resource watcher for {ResourceType}.", typeof(TEntity).Name);
49+
_stopped = false;
4850
_queue.RequeueRequested += OnEntityRequeue;
4951
WatchResource();
5052
return Task.CompletedTask;
5153
}
5254

53-
public Task StopAsync(CancellationToken cancellationToken)
55+
public virtual Task StopAsync(CancellationToken cancellationToken)
5456
{
5557
_logger.LogInformation("Stopping resource watcher for {ResourceType}.", typeof(TEntity).Name);
58+
_stopped = true;
5659
StopWatching();
5760
_queue.RequeueRequested -= OnEntityRequeue;
5861
_queue.Clear();
@@ -84,8 +87,11 @@ private void StopWatching()
8487

8588
private void OnClosed()
8689
{
87-
_logger.LogDebug("The server closed the connection. Trying to reconnect.");
88-
WatchResource();
90+
_logger.LogDebug("The server closed the connection.");
91+
if (!_stopped)
92+
{
93+
WatchResource();
94+
}
8995
}
9096

9197
private async void OnEntityRequeue(object? sender, (string Name, string? Namespace) queued)

test/KubeOps.Operator.Test/Builder/OperatorBuilder.Test.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using FluentAssertions;
22

3+
using k8s.LeaderElection;
34
using k8s.Models;
45

56
using KubeOps.Abstractions.Builder;
@@ -85,6 +86,31 @@ public void Should_Add_Finalizer_Resources()
8586
s.Lifetime == ServiceLifetime.Transient);
8687
}
8788

89+
[Fact]
90+
public void Should_Add_Leader_Elector()
91+
{
92+
var builder = new OperatorBuilder(new ServiceCollection(), new() { EnableLeaderElection = true });
93+
builder.Services.Should().Contain(s =>
94+
s.ServiceType == typeof(k8s.LeaderElection.LeaderElector) &&
95+
s.Lifetime == ServiceLifetime.Singleton);
96+
}
97+
98+
[Fact]
99+
public void Should_Add_LeaderAwareResourceWatcher()
100+
{
101+
var builder = new OperatorBuilder(new ServiceCollection(), new() { EnableLeaderElection = true });
102+
builder.AddController<TestController, V1IntegrationTestEntity>();
103+
104+
builder.Services.Should().Contain(s =>
105+
s.ServiceType == typeof(IHostedService) &&
106+
s.ImplementationType == typeof(LeaderAwareResourceWatcher<V1IntegrationTestEntity>) &&
107+
s.Lifetime == ServiceLifetime.Singleton);
108+
builder.Services.Should().NotContain(s =>
109+
s.ServiceType == typeof(IHostedService) &&
110+
s.ImplementationType == typeof(ResourceWatcher<V1IntegrationTestEntity>) &&
111+
s.Lifetime == ServiceLifetime.Singleton);
112+
}
113+
88114
private class TestController : IEntityController<V1IntegrationTestEntity>
89115
{
90116
}

test/KubeOps.Operator.Test/Events/EventPublisher.Integration.Test.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public async Task ReconcileAsync(V1IntegrationTestEntity entity)
104104

105105
if (_svc.Invocations.Count < _svc.TargetInvocationCount)
106106
{
107-
_requeue(entity, TimeSpan.FromMilliseconds(1));
107+
_requeue(entity, TimeSpan.FromMilliseconds(10));
108108
}
109109
}
110110
}

test/KubeOps.Operator.Test/IntegrationTestCollection.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ public async Task ConfigureAndStart(Action<HostApplicationBuilder> configure)
4141
}
4242

4343
var builder = Host.CreateApplicationBuilder();
44-
builder.Logging.SetMinimumLevel(LogLevel.Warning);
44+
#if DEBUG
45+
builder.Logging.SetMinimumLevel(LogLevel.Trace);
46+
#else
47+
builder.Logging.SetMinimumLevel(LogLevel.None);
48+
#endif
4549
configure(builder);
4650
_host = builder.Build();
4751
await _host.StartAsync();

0 commit comments

Comments
 (0)