Skip to content

Commit e0523ca

Browse files
authored
fix: Leader election failure to restart (#783)
This fixes the following behavioral issues noted when testing a leader-aware operator with transient network issues: - In `LeaderElectionBackgroundService`, if `elector.RunUntilLeadershipLostAsync()` throws, the exception is not observed in the library and no further attempts to become the leader occur. The library now logs any unexpected exceptions and tries to become the leader again. - A leader could not stop and then subsequently start being a leader once more due to cancellation token sources not being recreated. The library now disposes and recreates the cancellation token sources as required. - `LeaderAwareResourceWatcher<TEntity>.StoppedLeading` would erroneously pass a cancelled cancellation token to `ResourceWatcher<TEntity>`. The library now passes the `IHostApplicationLifetime.ApplicationStopped` token to the `ResourceWatcher<TEntity>` - we can assume that `ApplicationStopped` is a good indication that the stop should no longer be graceful.
1 parent aa073d1 commit e0523ca

File tree

6 files changed

+174
-13
lines changed

6 files changed

+174
-13
lines changed

src/KubeOps.Operator/LeaderElection/LeaderElectionBackgroundService.cs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
11
using k8s.LeaderElection;
22

33
using Microsoft.Extensions.Hosting;
4+
using Microsoft.Extensions.Logging;
45

56
namespace KubeOps.Operator.LeaderElection;
67

78
/// <summary>
89
/// This background service connects to the API and continuously watches the leader election.
910
/// </summary>
11+
/// <param name="logger">The logger.</param>
1012
/// <param name="elector">The elector.</param>
11-
internal sealed class LeaderElectionBackgroundService(LeaderElector elector)
13+
internal sealed class LeaderElectionBackgroundService(ILogger<LeaderElectionBackgroundService> logger, LeaderElector elector)
1214
: IHostedService, IDisposable, IAsyncDisposable
1315
{
1416
private readonly CancellationTokenSource _cts = new();
1517
private bool _disposed;
18+
private Task? _leadershipTask;
1619

1720
public Task StartAsync(CancellationToken cancellationToken)
1821
{
@@ -26,7 +29,7 @@ public Task StartAsync(CancellationToken cancellationToken)
2629
// Therefore, we use Task.Run() and put the work to queue. The passed cancellation token of the StartAsync
2730
// method is not used, because it would only cancel the scheduling (which we definitely don't want to cancel).
2831
// To make this intention explicit, CancellationToken.None gets passed.
29-
_ = Task.Run(() => elector.RunUntilLeadershipLostAsync(_cts.Token), CancellationToken.None);
32+
_leadershipTask = Task.Run(RunAndTryToHoldLeadershipForeverAsync, CancellationToken.None);
3033

3134
return Task.CompletedTask;
3235
}
@@ -38,19 +41,23 @@ public void Dispose()
3841
_disposed = true;
3942
}
4043

41-
public Task StopAsync(CancellationToken cancellationToken)
44+
public async Task StopAsync(CancellationToken cancellationToken)
4245
{
4346
if (_disposed)
4447
{
45-
return Task.CompletedTask;
48+
return;
4649
}
4750

4851
#if NET8_0_OR_GREATER
49-
return _cts.CancelAsync();
52+
await _cts.CancelAsync();
5053
#else
5154
_cts.Cancel();
52-
return Task.CompletedTask;
5355
#endif
56+
57+
if (_leadershipTask is not null)
58+
{
59+
await _leadershipTask;
60+
}
5461
}
5562

5663
public async ValueTask DisposeAsync()
@@ -72,4 +79,23 @@ static async ValueTask CastAndDispose(IDisposable resource)
7279
}
7380
}
7481
}
82+
83+
private async Task RunAndTryToHoldLeadershipForeverAsync()
84+
{
85+
while (!_cts.IsCancellationRequested)
86+
{
87+
try
88+
{
89+
await elector.RunUntilLeadershipLostAsync(_cts.Token);
90+
}
91+
catch (OperationCanceledException) when (_cts.IsCancellationRequested)
92+
{
93+
// Ignore cancellation exceptions when we've been asked to stop.
94+
}
95+
catch (Exception exception)
96+
{
97+
logger.LogError(exception, "Failed to hold leadership.");
98+
}
99+
}
100+
}
75101
}

src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
using k8s;
1+
using k8s;
22
using k8s.LeaderElection;
33
using k8s.Models;
44

55
using KubeOps.Abstractions.Builder;
66
using KubeOps.KubernetesClient;
77
using KubeOps.Operator.Queue;
88

9+
using Microsoft.Extensions.Hosting;
910
using Microsoft.Extensions.Logging;
1011

1112
namespace KubeOps.Operator.Watcher;
@@ -16,21 +17,26 @@ internal sealed class LeaderAwareResourceWatcher<TEntity>(
1617
TimedEntityQueue<TEntity> queue,
1718
OperatorSettings settings,
1819
IKubernetesClient client,
20+
IHostApplicationLifetime hostApplicationLifetime,
1921
LeaderElector elector)
2022
: ResourceWatcher<TEntity>(logger, provider, queue, settings, client)
2123
where TEntity : IKubernetesObject<V1ObjectMeta>
2224
{
23-
private readonly CancellationTokenSource _cts = new();
25+
private CancellationTokenSource _cts = new();
2426
private bool _disposed;
2527

26-
public override Task StartAsync(CancellationToken cancellationToken)
28+
public override async Task StartAsync(CancellationToken cancellationToken)
2729
{
2830
logger.LogDebug("Subscribe for leadership updates.");
2931

3032
elector.OnStartedLeading += StartedLeading;
3133
elector.OnStoppedLeading += StoppedLeading;
3234

33-
return elector.IsLeader() ? base.StartAsync(_cts.Token) : Task.CompletedTask;
35+
if (elector.IsLeader())
36+
{
37+
using CancellationTokenSource linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token);
38+
await base.StartAsync(linkedCancellationTokenSource.Token);
39+
}
3440
}
3541

3642
public override Task StopAsync(CancellationToken cancellationToken)
@@ -43,7 +49,8 @@ public override Task StopAsync(CancellationToken cancellationToken)
4349

4450
elector.OnStartedLeading -= StartedLeading;
4551
elector.OnStoppedLeading -= StoppedLeading;
46-
return Task.CompletedTask;
52+
53+
return elector.IsLeader() ? base.StopAsync(cancellationToken) : Task.CompletedTask;
4754
}
4855

4956
protected override void Dispose(bool disposing)
@@ -63,6 +70,13 @@ protected override void Dispose(bool disposing)
6370
private void StartedLeading()
6471
{
6572
logger.LogInformation("This instance started leading, starting watcher.");
73+
74+
if (_cts.IsCancellationRequested)
75+
{
76+
_cts.Dispose();
77+
_cts = new CancellationTokenSource();
78+
}
79+
6680
base.StartAsync(_cts.Token);
6781
}
6882

@@ -71,6 +85,9 @@ private void StoppedLeading()
7185
_cts.Cancel();
7286

7387
logger.LogInformation("This instance stopped leading, stopping watcher.");
74-
base.StopAsync(_cts.Token).Wait();
88+
89+
// Stop the base implementation using the 'ApplicationStopped' cancellation token.
90+
// The cancellation token should only be marked cancelled when the stop should no longer be graceful.
91+
base.StopAsync(hostApplicationLifetime.ApplicationStopped).Wait();
7592
}
7693
}

src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ internal class ResourceWatcher<TEntity>(
2828
where TEntity : IKubernetesObject<V1ObjectMeta>
2929
{
3030
private readonly ConcurrentDictionary<string, long> _entityCache = new();
31-
private readonly CancellationTokenSource _cancellationTokenSource = new();
3231

32+
private CancellationTokenSource _cancellationTokenSource = new();
3333
private uint _watcherReconnectRetries;
3434
private Task? _eventWatcher;
3535
private bool _disposed;
@@ -40,6 +40,12 @@ public virtual Task StartAsync(CancellationToken cancellationToken)
4040
{
4141
logger.LogInformation("Starting resource watcher for {ResourceType}.", typeof(TEntity).Name);
4242

43+
if (_cancellationTokenSource.IsCancellationRequested)
44+
{
45+
_cancellationTokenSource.Dispose();
46+
_cancellationTokenSource = new CancellationTokenSource();
47+
}
48+
4349
_eventWatcher = WatchClientEventsAsync(_cancellationTokenSource.Token);
4450

4551
logger.LogInformation("Started resource watcher for {ResourceType}.", typeof(TEntity).Name);

test/KubeOps.Operator.Test/KubeOps.Operator.Test.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
<PackageReference Include="Microsoft.CodeAnalysis.CSharp" Version="4.9.2" />
88
<PackageReference Include="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="4.9.2" />
99
<PackageReference Include="Microsoft.CodeAnalysis.Workspaces.MSBuild" Version="4.9.2" />
10+
<PackageReference Include="Moq" Version="4.20.70" />
1011
</ItemGroup>
1112

1213
<ItemGroup>
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
using FluentAssertions;
2+
3+
using k8s.LeaderElection;
4+
5+
using KubeOps.Operator.LeaderElection;
6+
7+
using Microsoft.Extensions.Logging;
8+
9+
using Moq;
10+
11+
namespace KubeOps.Operator.Test.LeaderElector;
12+
13+
public sealed class LeaderElectionBackgroundServiceTest
14+
{
15+
[Fact]
16+
public async Task Elector_Throws_Should_Retry()
17+
{
18+
// Arrange.
19+
var logger = Mock.Of<ILogger<LeaderElectionBackgroundService>>();
20+
21+
var electionLock = Mock.Of<ILock>();
22+
23+
var electionLockSubsequentCallEvent = new AutoResetEvent(false);
24+
bool hasElectionLockThrown = false;
25+
Mock.Get(electionLock)
26+
.Setup(electionLock => electionLock.GetAsync(It.IsAny<CancellationToken>()))
27+
.Returns<CancellationToken>(
28+
async cancellationToken =>
29+
{
30+
if (hasElectionLockThrown)
31+
{
32+
// Signal to the test that a subsequent call has been made.
33+
electionLockSubsequentCallEvent.Set();
34+
35+
// Delay returning for a long time, allowing the test to stop the background service, in turn cancelling the cancellation token.
36+
await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
37+
throw new InvalidOperationException();
38+
}
39+
40+
hasElectionLockThrown = true;
41+
throw new Exception("Unit test exception");
42+
});
43+
44+
var leaderElectionConfig = new LeaderElectionConfig(electionLock);
45+
var leaderElector = new k8s.LeaderElection.LeaderElector(leaderElectionConfig);
46+
47+
var leaderElectionBackgroundService = new LeaderElectionBackgroundService(logger, leaderElector);
48+
49+
// Act / Assert.
50+
await leaderElectionBackgroundService.StartAsync(CancellationToken.None);
51+
52+
// Starting the background service should result in the lock attempt throwing, and then a subsequent attempt being made.
53+
// Wait for the subsequent event to be signalled, if we time out the test fails.
54+
electionLockSubsequentCallEvent.WaitOne(TimeSpan.FromMilliseconds(500)).Should().BeTrue();
55+
56+
await leaderElectionBackgroundService.StopAsync(CancellationToken.None);
57+
}
58+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
using System.Runtime.CompilerServices;
2+
3+
using k8s;
4+
using k8s.Models;
5+
6+
using KubeOps.Abstractions.Builder;
7+
using KubeOps.KubernetesClient;
8+
using KubeOps.Operator.Queue;
9+
using KubeOps.Operator.Watcher;
10+
11+
using Microsoft.Extensions.Logging;
12+
13+
using Moq;
14+
15+
namespace KubeOps.Operator.Test.Watcher;
16+
17+
public sealed class ResourceWatcherTest
18+
{
19+
[Fact]
20+
public async Task Restarting_Watcher_Should_Trigger_New_Watch()
21+
{
22+
// Arrange.
23+
var logger = Mock.Of<ILogger<ResourceWatcher<V1Pod>>>();
24+
var serviceProvider = Mock.Of<IServiceProvider>();
25+
var timedEntityQueue = new TimedEntityQueue<V1Pod>();
26+
var operatorSettings = new OperatorSettings() { Namespace = "unit-test" };
27+
var kubernetesClient = Mock.Of<IKubernetesClient>();
28+
29+
Mock.Get(kubernetesClient)
30+
.Setup(client => client.WatchAsync<V1Pod>("unit-test", null, null, true, It.IsAny<CancellationToken>()))
31+
.Returns<string?, string?, string?, bool?, CancellationToken>((_, _, _, _, cancellationToken) => WaitForCancellationAsync<(WatchEventType, V1Pod)>(cancellationToken));
32+
33+
var resourceWatcher = new ResourceWatcher<V1Pod>(logger, serviceProvider, timedEntityQueue, operatorSettings, kubernetesClient);
34+
35+
// Act.
36+
// Start and stop the watcher.
37+
await resourceWatcher.StartAsync(CancellationToken.None);
38+
await resourceWatcher.StopAsync(CancellationToken.None);
39+
40+
// Restart the watcher.
41+
await resourceWatcher.StartAsync(CancellationToken.None);
42+
43+
// Assert.
44+
Mock.Get(kubernetesClient)
45+
.Verify(client => client.WatchAsync<V1Pod>("unit-test", null, null, true, It.IsAny<CancellationToken>()), Times.Exactly(2));
46+
}
47+
48+
private static async IAsyncEnumerable<T> WaitForCancellationAsync<T>([EnumeratorCancellation] CancellationToken cancellationToken)
49+
{
50+
await Task.Delay(Timeout.Infinite, cancellationToken);
51+
yield return default!;
52+
}
53+
}

0 commit comments

Comments
 (0)