Skip to content

Commit f524069

Browse files
authored
fix(client): don't access disposed resources (#738)
This PR is ensuring that the host does not stop with an exception. Those exceptions were occuring because the `CancellationTokenSource` inside the hosted services was already disposed when `StopAsync` got called. The reason for this is that the host invokes `DisposeAsync` **before** `StopAsync`.
1 parent b94eb81 commit f524069

File tree

7 files changed

+251
-21
lines changed

7 files changed

+251
-21
lines changed

src/KubeOps.KubernetesClient/KubernetesClient.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -428,13 +428,11 @@ public void Dispose()
428428

429429
protected virtual void Dispose(bool disposing)
430430
{
431-
if (!disposing)
431+
if (!disposing || _disposed)
432432
{
433433
return;
434434
}
435435

436-
ThrowIfDisposed();
437-
438436
// The property is intentionally set before the underlying _client is disposed.
439437
// This ensures that even if the disposal of the client is not finished yet, that all calls to the client
440438
// are instantly failing.

src/KubeOps.Operator/LeaderElection/LeaderElectionBackgroundService.cs

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ namespace KubeOps.Operator.LeaderElection;
88
/// This background service connects to the API and continuously watches the leader election.
99
/// </summary>
1010
/// <param name="elector">The elector.</param>
11-
internal sealed class LeaderElectionBackgroundService(LeaderElector elector) : IHostedService, IDisposable
11+
internal sealed class LeaderElectionBackgroundService(LeaderElector elector)
12+
: IHostedService, IDisposable, IAsyncDisposable
1213
{
1314
private readonly CancellationTokenSource _cts = new();
15+
private bool _disposed;
1416

1517
public Task StartAsync(CancellationToken cancellationToken)
1618
{
@@ -29,15 +31,45 @@ public Task StartAsync(CancellationToken cancellationToken)
2931
return Task.CompletedTask;
3032
}
3133

32-
public void Dispose() => _cts.Dispose();
34+
public void Dispose()
35+
{
36+
_cts.Dispose();
37+
elector.Dispose();
38+
_disposed = true;
39+
}
3340

34-
#if NET8_0_OR_GREATER
35-
public Task StopAsync(CancellationToken cancellationToken) => _cts.CancelAsync();
36-
#else
3741
public Task StopAsync(CancellationToken cancellationToken)
3842
{
43+
if (_disposed)
44+
{
45+
return Task.CompletedTask;
46+
}
47+
48+
#if NET8_0_OR_GREATER
49+
return _cts.CancelAsync();
50+
#else
3951
_cts.Cancel();
4052
return Task.CompletedTask;
41-
}
4253
#endif
54+
}
55+
56+
public async ValueTask DisposeAsync()
57+
{
58+
await CastAndDispose(_cts);
59+
await CastAndDispose(elector);
60+
61+
_disposed = true;
62+
63+
static async ValueTask CastAndDispose(IDisposable resource)
64+
{
65+
if (resource is IAsyncDisposable resourceAsyncDisposable)
66+
{
67+
await resourceAsyncDisposable.DisposeAsync();
68+
}
69+
else
70+
{
71+
resource.Dispose();
72+
}
73+
}
74+
}
4375
}

src/KubeOps.Operator/Queue/EntityRequeueBackgroundService.cs

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@ internal sealed class EntityRequeueBackgroundService<TEntity>(
1414
IKubernetesClient client,
1515
TimedEntityQueue<TEntity> queue,
1616
IServiceProvider provider,
17-
ILogger<EntityRequeueBackgroundService<TEntity>> logger) : IHostedService, IDisposable
17+
ILogger<EntityRequeueBackgroundService<TEntity>> logger) : IHostedService, IDisposable, IAsyncDisposable
1818
where TEntity : IKubernetesObject<V1ObjectMeta>
1919
{
2020
private readonly CancellationTokenSource _cts = new();
21+
private bool _disposed;
2122

2223
public Task StartAsync(CancellationToken cancellationToken)
2324
{
@@ -36,17 +37,50 @@ public Task StartAsync(CancellationToken cancellationToken)
3637
return Task.CompletedTask;
3738
}
3839

39-
#if NET8_0_OR_GREATER
40-
public Task StopAsync(CancellationToken cancellationToken) => _cts.CancelAsync();
41-
#else
4240
public Task StopAsync(CancellationToken cancellationToken)
4341
{
42+
if (_disposed)
43+
{
44+
return Task.CompletedTask;
45+
}
46+
47+
#if NET8_0_OR_GREATER
48+
return _cts.CancelAsync();
49+
#else
4450
_cts.Cancel();
4551
return Task.CompletedTask;
46-
}
4752
#endif
53+
}
4854

49-
public void Dispose() => _cts.Dispose();
55+
public void Dispose()
56+
{
57+
_cts.Dispose();
58+
client.Dispose();
59+
queue.Dispose();
60+
61+
_disposed = true;
62+
}
63+
64+
public async ValueTask DisposeAsync()
65+
{
66+
await CastAndDispose(_cts);
67+
await CastAndDispose(client);
68+
await CastAndDispose(queue);
69+
70+
_disposed = true;
71+
72+
static async ValueTask CastAndDispose(IDisposable resource)
73+
{
74+
if (resource is IAsyncDisposable resourceAsyncDisposable)
75+
{
76+
await resourceAsyncDisposable.DisposeAsync();
77+
}
78+
else
79+
{
80+
resource.Dispose();
81+
}
82+
}
83+
}
5084

5185
private async Task WatchAsync(CancellationToken cancellationToken)
5286
{

src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ internal sealed class LeaderAwareResourceWatcher<TEntity>(
2121
where TEntity : IKubernetesObject<V1ObjectMeta>
2222
{
2323
private readonly CancellationTokenSource _cts = new();
24+
private bool _disposed;
2425

2526
public override Task StartAsync(CancellationToken cancellationToken)
2627
{
@@ -35,14 +36,30 @@ public override Task StartAsync(CancellationToken cancellationToken)
3536
public override Task StopAsync(CancellationToken cancellationToken)
3637
{
3738
logger.LogDebug("Unsubscribe from leadership updates.");
38-
_cts.Cancel();
39-
_cts.Dispose();
39+
if (_disposed)
40+
{
41+
return Task.CompletedTask;
42+
}
4043

4144
elector.OnStartedLeading -= StartedLeading;
4245
elector.OnStoppedLeading -= StoppedLeading;
4346
return Task.CompletedTask;
4447
}
4548

49+
protected override void Dispose(bool disposing)
50+
{
51+
if (!disposing)
52+
{
53+
return;
54+
}
55+
56+
_cts.Dispose();
57+
elector.Dispose();
58+
_disposed = true;
59+
60+
base.Dispose(disposing);
61+
}
62+
4663
private void StartedLeading()
4764
{
4865
logger.LogInformation("This instance started leading, starting watcher.");

src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,20 @@ internal class ResourceWatcher<TEntity>(
2424
TimedEntityQueue<TEntity> requeue,
2525
OperatorSettings settings,
2626
IKubernetesClient client)
27-
: IHostedService
27+
: IHostedService, IAsyncDisposable, IDisposable
2828
where TEntity : IKubernetesObject<V1ObjectMeta>
2929
{
3030
private readonly ConcurrentDictionary<string, long> _entityCache = new();
3131
private readonly CancellationTokenSource _cancellationTokenSource = new();
3232

3333
private uint _watcherReconnectRetries;
34-
private Task _eventWatcher = Task.CompletedTask;
34+
private Task? _eventWatcher;
35+
private bool _disposed;
36+
37+
~ResourceWatcher()
38+
{
39+
Dispose(false);
40+
}
3541

3642
public virtual Task StartAsync(CancellationToken cancellationToken)
3743
{
@@ -46,16 +52,80 @@ public virtual Task StartAsync(CancellationToken cancellationToken)
4652
public virtual async Task StopAsync(CancellationToken cancellationToken)
4753
{
4854
logger.LogInformation("Stopping resource watcher for {ResourceType}.", typeof(TEntity).Name);
55+
if (_disposed)
56+
{
57+
return;
58+
}
59+
4960
#if NET8_0_OR_GREATER
5061
await _cancellationTokenSource.CancelAsync();
5162
#else
5263
_cancellationTokenSource.Cancel();
5364
#endif
54-
await _eventWatcher.WaitAsync(cancellationToken);
55-
_cancellationTokenSource.Dispose();
65+
if (_eventWatcher is not null)
66+
{
67+
await _eventWatcher.WaitAsync(cancellationToken);
68+
}
69+
5670
logger.LogInformation("Stopped resource watcher for {ResourceType}.", typeof(TEntity).Name);
5771
}
5872

73+
public async ValueTask DisposeAsync()
74+
{
75+
await StopAsync(CancellationToken.None);
76+
await DisposeAsyncCore();
77+
GC.SuppressFinalize(this);
78+
}
79+
80+
public void Dispose()
81+
{
82+
Dispose(true);
83+
GC.SuppressFinalize(this);
84+
}
85+
86+
protected virtual void Dispose(bool disposing)
87+
{
88+
if (!disposing)
89+
{
90+
return;
91+
}
92+
93+
_cancellationTokenSource.Dispose();
94+
_eventWatcher?.Dispose();
95+
requeue.Dispose();
96+
client.Dispose();
97+
98+
_disposed = true;
99+
}
100+
101+
protected virtual async ValueTask DisposeAsyncCore()
102+
{
103+
if (_eventWatcher is not null)
104+
{
105+
await CastAndDispose(_eventWatcher);
106+
}
107+
108+
await CastAndDispose(_cancellationTokenSource);
109+
await CastAndDispose(requeue);
110+
await CastAndDispose(client);
111+
112+
_disposed = true;
113+
114+
return;
115+
116+
static async ValueTask CastAndDispose(IDisposable resource)
117+
{
118+
if (resource is IAsyncDisposable resourceAsyncDisposable)
119+
{
120+
await resourceAsyncDisposable.DisposeAsync();
121+
}
122+
else
123+
{
124+
resource.Dispose();
125+
}
126+
}
127+
}
128+
59129
private async Task WatchClientEventsAsync(CancellationToken stoppingToken)
60130
{
61131
try
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
using KubeOps.Abstractions.Controller;
2+
using KubeOps.Operator.Test.TestEntities;
3+
4+
using Microsoft.Extensions.Hosting;
5+
6+
namespace KubeOps.Operator.Test.HostedServices;
7+
8+
public class LeaderAwareHostedServiceDisposeIntegrationTest : HostedServiceDisposeIntegrationTest
9+
{
10+
protected override void ConfigureHost(HostApplicationBuilder builder)
11+
{
12+
builder.Services
13+
.AddKubernetesOperator(op => op.EnableLeaderElection = true)
14+
.AddController<TestController, V1OperatorIntegrationTestEntity>();
15+
}
16+
17+
private class TestController : IEntityController<V1OperatorIntegrationTestEntity>
18+
{
19+
public Task ReconcileAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) =>
20+
Task.CompletedTask;
21+
22+
public Task DeletedAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) =>
23+
Task.CompletedTask;
24+
}
25+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
using KubeOps.Abstractions.Controller;
2+
using KubeOps.Operator.Test.TestEntities;
3+
4+
using Microsoft.Extensions.DependencyInjection;
5+
using Microsoft.Extensions.Hosting;
6+
7+
namespace KubeOps.Operator.Test.HostedServices;
8+
9+
public class HostedServiceDisposeIntegrationTest : IntegrationTestBase
10+
{
11+
[Fact]
12+
public async Task Should_Allow_DisposeAsync_Before_StopAsync()
13+
{
14+
var hostedServices = Services.GetServices<IHostedService>()
15+
.Where(service => service.GetType().Namespace!.StartsWith("KubeOps"));
16+
17+
// We need to test the inverse order, because the Host is usually disposing the resources in advance of
18+
// stopping them.
19+
foreach (IHostedService service in hostedServices)
20+
{
21+
await Assert.IsAssignableFrom<IAsyncDisposable>(service).DisposeAsync();
22+
await service.StopAsync(CancellationToken.None);
23+
}
24+
}
25+
26+
[Fact]
27+
public async Task Should_Allow_StopAsync_Before_DisposeAsync()
28+
{
29+
var hostedServices = Services.GetServices<IHostedService>()
30+
.Where(service => service.GetType().Namespace!.StartsWith("KubeOps"));
31+
32+
foreach (IHostedService service in hostedServices)
33+
{
34+
await service.StopAsync(CancellationToken.None);
35+
await Assert.IsAssignableFrom<IAsyncDisposable>(service).DisposeAsync();
36+
}
37+
}
38+
39+
protected override void ConfigureHost(HostApplicationBuilder builder)
40+
{
41+
builder.Services
42+
.AddKubernetesOperator()
43+
.AddController<TestController, V1OperatorIntegrationTestEntity>();
44+
}
45+
46+
private class TestController : IEntityController<V1OperatorIntegrationTestEntity>
47+
{
48+
public Task ReconcileAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) =>
49+
Task.CompletedTask;
50+
51+
public Task DeletedAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) =>
52+
Task.CompletedTask;
53+
}
54+
}

0 commit comments

Comments
 (0)