Skip to content

Commit 387900b

Browse files
author
Christoph Bühler
committed
feat(leader election): support leader election via V1Lease.
This closes #22. The leader election utilizes V1Lease objects in the namespace that the operator is running in. If no object exists, the instance is elected as leader. Later, the config determines the interval in which the check occurs. When a leader does not update the "RenewTime", another instance can take leadership.
1 parent 5a09034 commit 387900b

File tree

14 files changed

+431
-29
lines changed

14 files changed

+431
-29
lines changed

src/KubeOps/Operator/Builder/OperatorBuilder.cs

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66
using KubeOps.Operator.Controller;
77
using KubeOps.Operator.DevOps;
88
using KubeOps.Operator.Finalizer;
9+
using KubeOps.Operator.Leader;
910
using KubeOps.Operator.Queue;
1011
using KubeOps.Operator.Serialization;
1112
using KubeOps.Operator.Services;
1213
using KubeOps.Operator.Watcher;
1314
using Microsoft.Extensions.DependencyInjection;
1415
using Microsoft.Extensions.Diagnostics.HealthChecks;
16+
using Microsoft.Rest.Serialization;
1517
using Newtonsoft.Json;
1618
using Newtonsoft.Json.Converters;
1719
using Newtonsoft.Json.Serialization;
@@ -91,23 +93,22 @@ internal IOperatorBuilder AddOperatorBase(OperatorSettings settings)
9193
// support lazy service resolution
9294
Services.AddTransient(typeof(Lazy<>), typeof(LazyService<>));
9395

94-
Services.AddTransient(
95-
_ => new JsonSerializerSettings
96-
{
97-
ContractResolver = new NamingConvention(),
98-
Converters = new List<JsonConverter>
99-
{
100-
new StringEnumConverter { NamingStrategy = new CamelCaseNamingStrategy() },
101-
},
102-
});
103-
JsonConvert.DefaultSettings = () => new JsonSerializerSettings
96+
var jsonSettings = new JsonSerializerSettings
10497
{
98+
DateFormatHandling = DateFormatHandling.IsoDateFormat,
99+
DateTimeZoneHandling = DateTimeZoneHandling.Utc,
100+
NullValueHandling = NullValueHandling.Ignore,
101+
ReferenceLoopHandling = ReferenceLoopHandling.Serialize,
105102
ContractResolver = new NamingConvention(),
106103
Converters = new List<JsonConverter>
107104
{
108105
new StringEnumConverter { NamingStrategy = new CamelCaseNamingStrategy() },
106+
new Iso8601TimeSpanConverter(),
109107
},
108+
DateFormatString = "yyyy'-'MM'-'dd'T'HH':'mm':'ss.ffffffK",
110109
};
110+
Services.AddTransient(_ => jsonSettings);
111+
JsonConvert.DefaultSettings = () => jsonSettings;
111112

112113
Services.AddTransient(
113114
_ => new SerializerBuilder()
@@ -126,15 +127,32 @@ internal IOperatorBuilder AddOperatorBase(OperatorSettings settings)
126127
{
127128
SerializationSettings =
128129
{
130+
Formatting = Formatting.Indented,
131+
DateFormatHandling = DateFormatHandling.IsoDateFormat,
132+
DateTimeZoneHandling = DateTimeZoneHandling.Utc,
133+
NullValueHandling = NullValueHandling.Ignore,
134+
ReferenceLoopHandling = ReferenceLoopHandling.Serialize,
129135
ContractResolver = new NamingConvention(),
130136
Converters = new List<JsonConverter>
131-
{ new StringEnumConverter { NamingStrategy = new CamelCaseNamingStrategy() } },
137+
{
138+
new StringEnumConverter { NamingStrategy = new CamelCaseNamingStrategy() },
139+
new Iso8601TimeSpanConverter(),
140+
},
141+
DateFormatString = "yyyy'-'MM'-'dd'T'HH':'mm':'ss.ffffffK",
132142
},
133143
DeserializationSettings =
134144
{
145+
DateFormatHandling = DateFormatHandling.IsoDateFormat,
146+
DateTimeZoneHandling = DateTimeZoneHandling.Utc,
147+
NullValueHandling = NullValueHandling.Ignore,
148+
ReferenceLoopHandling = ReferenceLoopHandling.Serialize,
135149
ContractResolver = new NamingConvention(),
136150
Converters = new List<JsonConverter>
137-
{ new StringEnumConverter { NamingStrategy = new CamelCaseNamingStrategy() } },
151+
{
152+
new StringEnumConverter { NamingStrategy = new CamelCaseNamingStrategy() },
153+
new Iso8601TimeSpanConverter(),
154+
},
155+
DateFormatString = "yyyy'-'MM'-'dd'T'HH':'mm':'ss.ffffffK",
138156
},
139157
});
140158

@@ -151,6 +169,10 @@ internal IOperatorBuilder AddOperatorBase(OperatorSettings settings)
151169
// Add the default controller liveness check.
152170
AddHealthCheck<ControllerLivenessCheck>();
153171

172+
// Support for leader election via V1Leases.
173+
Services.AddHostedService<LeaderElector>();
174+
Services.AddSingleton<ILeaderElection, LeaderElection>();
175+
154176
return this;
155177
}
156178
}

src/KubeOps/Operator/Client/IKubernetesClient.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ public interface IKubernetesClient
4040
/// <returns>A string containing the current namespace (or a fallback of it).</returns>
4141
Task<string> GetCurrentNamespace(string downwardApiEnvName = "POD_NAMESPACE");
4242

43+
/// <summary>
44+
/// Fetch and return the actual kubernetes <see cref="VersionInfo"/> (aka. Server Version).
45+
/// </summary>
46+
/// <returns>The <see cref="VersionInfo"/> of the current server.</returns>
4347
Task<VersionInfo> GetServerVersion();
4448

4549
Task<TResource?> Get<TResource>(string name, string? @namespace = null)

src/KubeOps/Operator/Controller/ResourceControllerBase.cs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using k8s.Models;
88
using KubeOps.Operator.Client;
99
using KubeOps.Operator.Finalizer;
10+
using KubeOps.Operator.Leader;
1011
using KubeOps.Operator.Queue;
1112
using KubeOps.Operator.Services;
1213
using Microsoft.Extensions.Logging;
@@ -39,22 +40,24 @@ protected ResourceControllerBase(IResourceServices<TEntity> services)
3940

4041
protected IKubernetesClient Client => _services.Client;
4142

42-
public async Task StartAsync(CancellationToken cancellationToken)
43+
public Task StartAsync(CancellationToken cancellationToken)
4344
{
4445
_logger.LogInformation(@"Startup CRD Controller for ""{resource}"".", typeof(TEntity));
45-
46-
_services.EventQueue.ResourceEvent += OnResourceEvent;
47-
await _services.EventQueue.Start();
4846
_running = true;
47+
_services.LeaderElection.LeadershipChange += LeadershipChanged;
48+
LeadershipChanged(null, _services.LeaderElection.State);
49+
50+
return Task.CompletedTask;
4951
}
5052

5153
public Task StopAsync(CancellationToken cancellationToken)
5254
{
5355
_logger.LogInformation(@"Shutdown CRD Controller for ""{resource}"".", typeof(TEntity));
5456

55-
_services.EventQueue.Stop();
56-
_services.EventQueue.ResourceEvent -= OnResourceEvent;
57+
LeadershipChanged(null, LeaderState.None);
58+
_services.LeaderElection.LeadershipChange -= LeadershipChanged;
5759
_running = false;
60+
5861
return Task.CompletedTask;
5962
}
6063

@@ -116,6 +119,23 @@ protected Task AttachFinalizer<TFinalizer>(TEntity resource)
116119
return finalizer.Register(resource);
117120
}
118121

122+
private async void LeadershipChanged(object? sender, LeaderState state)
123+
{
124+
if (state == LeaderState.Leader)
125+
{
126+
_logger.LogInformation("This instance was elected as leader, starting event queue.");
127+
_services.EventQueue.ResourceEvent += OnResourceEvent;
128+
await _services.EventQueue.Start();
129+
130+
return;
131+
}
132+
133+
_logger.LogInformation(
134+
"This instance has either resigned from leadership, was elected candidate or is shutting down. Stopping event queue.");
135+
await _services.EventQueue.Stop();
136+
_services.EventQueue.ResourceEvent -= OnResourceEvent;
137+
}
138+
119139
private async void OnResourceEvent(object? _, (ResourceEventType Type, TEntity Resource) args)
120140
{
121141
var (type, resource) = args;
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using System;
2+
3+
namespace KubeOps.Operator.Leader
4+
{
5+
public interface ILeaderElection
6+
{
7+
event EventHandler<LeaderState>? LeadershipChange;
8+
9+
LeaderState State { get; }
10+
11+
internal void LeadershipChanged(LeaderState state);
12+
}
13+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using System;
2+
using Microsoft.Extensions.Logging;
3+
4+
namespace KubeOps.Operator.Leader
5+
{
6+
internal class LeaderElection : ILeaderElection
7+
{
8+
private readonly ILogger<LeaderElection> _logger;
9+
10+
public LeaderElection(ILogger<LeaderElection> logger)
11+
{
12+
_logger = logger;
13+
}
14+
15+
public event EventHandler<LeaderState>? LeadershipChange;
16+
17+
public LeaderState State { get; private set; } = LeaderState.None;
18+
19+
void ILeaderElection.LeadershipChanged(LeaderState state)
20+
{
21+
if (State == state)
22+
{
23+
return;
24+
}
25+
26+
_logger.LogDebug("Leadership state changed to: {state}.", state);
27+
State = state;
28+
LeadershipChange?.Invoke(this, state);
29+
}
30+
}
31+
}

0 commit comments

Comments
 (0)