Skip to content

Commit 5a37536

Browse files
tomasfabianbuehler
andauthored
test: Improved ResourceWatcher restart unit test (#527)
#482 #522 #523 #526 --------- Co-authored-by: Christoph Bühler <[email protected]>
1 parent f0867bc commit 5a37536

File tree

3 files changed

+228
-6
lines changed

3 files changed

+228
-6
lines changed

src/KubeOps/Operator/Kubernetes/ResourceWatcher{TEntity}.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Reactive.Linq;
1+
using System.Reactive.Concurrency;
2+
using System.Reactive.Linq;
23
using System.Reactive.Subjects;
34
using System.Runtime.Serialization;
45
using System.Text.Json;
@@ -39,14 +40,16 @@ public ResourceWatcher(
3940
_settings = settings;
4041
_reconnectSubscription =
4142
_reconnectHandler
42-
.Select(Observable.Timer)
43+
.Select(backoff => Observable.Timer(backoff, TimeBasedScheduler))
4344
.Switch()
4445
.Retry()
4546
.Subscribe(async _ => await WatchResource(), error => _logger.LogError(error, $"There was an error while restarting the resource watcher {typeof(TEntity)}"));
4647
}
4748

4849
public IObservable<WatchEvent> WatchEvents => _watchEvents;
4950

51+
internal IScheduler TimeBasedScheduler { get; set; } = DefaultScheduler.Instance;
52+
5053
private TimeSpan DefaultBackoff => _settings.ErrorBackoffStrategy(1);
5154

5255
public Task StartAsync()

tests/KubeOps.Test/KubeOps.Test.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
<Import Project="..\..\config\CommonTests.targets" />
44

5+
<ItemGroup>
6+
<PackageReference Include="Microsoft.Reactive.Testing" Version="5.0.0" />
7+
</ItemGroup>
8+
59
<ItemGroup>
610
<ProjectReference Include="..\..\src\KubeOps.Testing\KubeOps.Testing.csproj" />
711
<ProjectReference Include="..\..\src\KubeOps\KubeOps.csproj" />

tests/KubeOps.Test/Operator/Kubernetes/ResourceWatcher{TEntity}.Test.cs

Lines changed: 219 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
using FluentAssertions;
1+
using System.Reactive.Linq;
2+
using FluentAssertions;
23
using k8s;
34
using k8s.Models;
45
using KubeOps.KubernetesClient;
56
using KubeOps.Operator;
67
using KubeOps.Operator.DevOps;
78
using KubeOps.Operator.Kubernetes;
8-
using KubeOps.Testing;
99
using Microsoft.Extensions.Logging.Abstractions;
10+
using Microsoft.Reactive.Testing;
1011
using Moq;
1112
using Prometheus;
1213
using Xunit;
@@ -23,24 +24,238 @@ public class TestResource : IKubernetesObject<V1ObjectMeta>
2324
public V1ObjectMeta Metadata { get; set; } = null!;
2425
}
2526

26-
private readonly IKubernetesClient _client = new MockKubernetesClient();
27+
private readonly Mock<IKubernetesClient> _client = new();
2728
private readonly Mock<IResourceWatcherMetrics<TestResource>> _metrics = new();
2829

30+
[Fact]
31+
public async Task Should_Restart_Watcher_On_Exception()
32+
{
33+
var settings = new OperatorSettings();
34+
35+
Action<Exception>? onError = null;
36+
37+
_client.Setup(
38+
c => c.Watch(
39+
It.IsAny<TimeSpan>(),
40+
It.IsAny<Action<WatchEventType, TestResource>>(),
41+
It.IsAny<Action<Exception>?>(),
42+
It.IsAny<Action>(),
43+
null,
44+
It.IsAny<CancellationToken>(),
45+
It.IsAny<string?>()))
46+
.Callback<TimeSpan, Action<WatchEventType, TestResource>, Action<Exception>?, Action?, string?,
47+
CancellationToken, string?>(
48+
(_, _, onErrorArg, _, _, _, _) => { onError = onErrorArg; })
49+
.Returns(Task.FromResult(CreateFakeWatcher()))
50+
.Verifiable();
51+
52+
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
53+
_metrics.Setup(c => c.WatcherExceptions).Returns(Mock.Of<ICounter>());
54+
55+
using var resourceWatcher = new ResourceWatcher<TestResource>(
56+
_client.Object,
57+
new NullLogger<ResourceWatcher<TestResource>>(),
58+
_metrics.Object,
59+
settings);
60+
61+
var testScheduler = new TestScheduler();
62+
resourceWatcher.TimeBasedScheduler = testScheduler;
63+
64+
await resourceWatcher.StartAsync();
65+
66+
onError?.Invoke(new Exception());
67+
68+
var backoff = settings.ErrorBackoffStrategy(1);
69+
70+
testScheduler.AdvanceBy(backoff.Add(TimeSpan.FromSeconds(1)).Ticks);
71+
72+
VerifyWatch(2);
73+
}
74+
75+
[Fact]
76+
public async Task Should_Not_Throw_Overflow_Exception()
77+
{
78+
var settings = new OperatorSettings();
79+
80+
Action<Exception>? onError = null;
81+
82+
_client.Setup(
83+
c => c.Watch(
84+
It.IsAny<TimeSpan>(),
85+
It.IsAny<Action<WatchEventType, TestResource>>(),
86+
It.IsAny<Action<Exception>?>(),
87+
It.IsAny<Action>(),
88+
null,
89+
It.IsAny<CancellationToken>(),
90+
It.IsAny<string?>()))
91+
.Callback<TimeSpan, Action<WatchEventType, TestResource>, Action<Exception>?, Action?, string?,
92+
CancellationToken, string?>(
93+
(_, _, onErrorArg, _, _, _, _) => { onError = onErrorArg; })
94+
.Returns(Task.FromResult(CreateFakeWatcher()))
95+
.Verifiable();
96+
97+
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
98+
_metrics.Setup(c => c.WatcherExceptions).Returns(Mock.Of<ICounter>());
99+
100+
using var resourceWatcher = new ResourceWatcher<TestResource>(
101+
_client.Object,
102+
new NullLogger<ResourceWatcher<TestResource>>(),
103+
_metrics.Object,
104+
settings);
105+
106+
var testScheduler = new TestScheduler();
107+
resourceWatcher.TimeBasedScheduler = testScheduler;
108+
109+
await resourceWatcher.StartAsync();
110+
111+
const int numberOfRetries = 40;
112+
for (int reconnectAttempts = 1; reconnectAttempts <= numberOfRetries; reconnectAttempts++)
113+
{
114+
onError?.Invoke(new Exception());
115+
116+
var backoff = settings.ErrorBackoffStrategy(reconnectAttempts > 39 ? 39 : reconnectAttempts);
117+
if (backoff.TotalSeconds > settings.WatcherMaxRetrySeconds)
118+
{
119+
backoff = TimeSpan.FromSeconds(settings.WatcherMaxRetrySeconds);
120+
}
121+
122+
testScheduler.AdvanceBy(backoff.Add(TimeSpan.FromSeconds(1)).Ticks);
123+
}
124+
125+
VerifyWatch(numberOfRetries+1);
126+
}
127+
29128
[Fact]
30129
public async Task Should_Not_Dispose_Reconnect_Subject_Or_Throw_Exception_After_Restarts()
31130
{
32131
var settings = new OperatorSettings();
33132

133+
Action<Exception>? onError = null;
134+
135+
_client.Setup(c => c.Watch(It.IsAny<TimeSpan>(), It.IsAny<Action<WatchEventType, TestResource>>(), It.IsAny<Action<Exception>?>(), It.IsAny<Action>(), null, It.IsAny<CancellationToken>(), It.IsAny<string?>()))
136+
.Callback<TimeSpan, Action<WatchEventType, TestResource>, Action<Exception>?, Action?, string?, CancellationToken, string?>(
137+
(_, _, onErrorArg, _, _, _, _) =>
138+
{
139+
onError = onErrorArg;
140+
})
141+
.Returns(Task.FromResult(CreateFakeWatcher()))
142+
.Verifiable();
143+
34144
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
145+
_metrics.Setup(c => c.WatcherExceptions).Returns(Mock.Of<ICounter>());
35146

36-
using var resourceWatcher = new ResourceWatcher<TestResource>(_client, new NullLogger<ResourceWatcher<TestResource>>(), _metrics.Object, settings);
147+
using var resourceWatcher = new ResourceWatcher<TestResource>(_client.Object, new NullLogger<ResourceWatcher<TestResource>>(), _metrics.Object, settings);
37148

38149
await resourceWatcher.StartAsync();
39150

40151
await resourceWatcher.StopAsync();
41152

42153
await resourceWatcher.StartAsync();
43154

155+
var kubernetesException = new KubernetesException(new V1Status());
156+
157+
onError?.Invoke(kubernetesException);
158+
44159
resourceWatcher.WatchEvents.Should().NotBeNull();
160+
161+
VerifyWatch(2);
162+
}
163+
164+
[Fact]
165+
public async Task Should_Publish_On_Watcher_Event()
166+
{
167+
var settings = new OperatorSettings();
168+
169+
Action<WatchEventType, TestResource> onWatcherEvent = null!;
170+
171+
_client.Setup(
172+
c => c.Watch(
173+
It.IsAny<TimeSpan>(),
174+
It.IsAny<Action<WatchEventType, TestResource>>(),
175+
It.IsAny<Action<Exception>?>(),
176+
It.IsAny<Action>(),
177+
null,
178+
It.IsAny<CancellationToken>(),
179+
It.IsAny<string?>()))
180+
.Callback<TimeSpan, Action<WatchEventType, TestResource>, Action<Exception>?, Action?, string?,
181+
CancellationToken, string?>(
182+
(_, onWatcherEventArg, _, _, _, _, _) => { onWatcherEvent = onWatcherEventArg; })
183+
.Returns(Task.FromResult(CreateFakeWatcher()))
184+
.Verifiable();
185+
186+
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
187+
_metrics.Setup(c => c.WatchedEvents).Returns(Mock.Of<ICounter>());
188+
189+
using var resourceWatcher = new ResourceWatcher<TestResource>(
190+
_client.Object,
191+
new NullLogger<ResourceWatcher<TestResource>>(),
192+
_metrics.Object,
193+
settings);
194+
195+
var watchEvents = resourceWatcher.WatchEvents.Replay(1);
196+
watchEvents.Connect();
197+
198+
await resourceWatcher.StartAsync();
199+
200+
var resource = new TestResource
201+
{
202+
Metadata = new()
203+
};
204+
205+
onWatcherEvent(WatchEventType.Added, resource);
206+
207+
var watchEvent = await watchEvents.FirstAsync();
208+
209+
watchEvent.Type.Should().Be(WatchEventType.Added);
210+
watchEvent.Resource.Should().BeEquivalentTo(resource);
211+
}
212+
213+
[Fact]
214+
public async Task Should_Restart_Watcher_On_Close()
215+
{
216+
var settings = new OperatorSettings();
217+
218+
Action? onClose = null;
219+
220+
_client.Setup(c => c.Watch(It.IsAny<TimeSpan>(), It.IsAny<Action<WatchEventType, TestResource>>(), It.IsAny<Action<Exception>?>(), It.IsAny<Action>(), null, It.IsAny<CancellationToken>(), It.IsAny<string?>()))
221+
.Callback<TimeSpan, Action<WatchEventType, TestResource>, Action<Exception>?, Action?, string?, CancellationToken, string?>(
222+
(_, _, _, onCloseArg, _, _, _) =>
223+
{
224+
onClose = onCloseArg;
225+
})
226+
.Returns(Task.FromResult(CreateFakeWatcher()))
227+
.Verifiable();
228+
229+
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
230+
_metrics.Setup(c => c.WatcherClosed).Returns(Mock.Of<ICounter>());
231+
232+
using var resourceWatcher = new ResourceWatcher<TestResource>(_client.Object, new NullLogger<ResourceWatcher<TestResource>>(), _metrics.Object, settings);
233+
234+
await resourceWatcher.StartAsync();
235+
236+
onClose?.Invoke();
237+
238+
VerifyWatch(2);
239+
}
240+
241+
private void VerifyWatch(int callTime)
242+
{
243+
_client.Verify(
244+
c => c.Watch(
245+
It.IsAny<TimeSpan>(),
246+
It.IsAny<Action<WatchEventType, TestResource>>(),
247+
It.IsAny<Action<Exception>?>(),
248+
It.IsAny<Action>(),
249+
null,
250+
It.IsAny<CancellationToken>(),
251+
It.IsAny<string?>()), Times.Exactly(callTime));
252+
}
253+
254+
private static Watcher<TestResource> CreateFakeWatcher()
255+
{
256+
return new Watcher<TestResource>(
257+
() => Task.FromResult(new StreamReader(new MemoryStream())),
258+
(_, __) => { },
259+
_ => { });
45260
}
46261
}

0 commit comments

Comments
 (0)