Skip to content

Commit 24c3ec1

Browse files
authored
fix: dont dispose the resource watcher when serialization exceptions happen (#534)
This PR prevents the disposal of the resource watcher during JSON serialization exceptions. Fixes #477.
1 parent a3bc7c2 commit 24c3ec1

File tree

2 files changed

+85
-44
lines changed

2 files changed

+85
-44
lines changed

src/KubeOps/Operator/Kubernetes/ResourceWatcher{TEntity}.cs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,17 @@ private async void RestartWatcher()
155155

156156
private void OnException(Exception e)
157157
{
158+
switch (e)
159+
{
160+
case SerializationException when
161+
e.InnerException is JsonException &&
162+
e.InnerException.Message.Contains("The input does not contain any JSON tokens"):
163+
_logger.LogDebug(
164+
@"The watcher received an empty response for resource ""{resource}"".",
165+
typeof(TEntity));
166+
return;
167+
}
168+
158169
var backoff = DefaultBackoff;
159170

160171
try
@@ -174,13 +185,6 @@ private void OnException(Exception e)
174185
typeof(TEntity));
175186
WatchResource().ConfigureAwait(false);
176187
return;
177-
case SerializationException when
178-
e.InnerException is JsonException &&
179-
e.InnerException.Message.Contains("The input does not contain any JSON tokens"):
180-
_logger.LogDebug(
181-
@"The watcher received an empty response for resource ""{resource}"".",
182-
typeof(TEntity));
183-
return;
184188
}
185189

186190
++_reconnectAttempts;

tests/KubeOps.Test/Operator/Kubernetes/ResourceWatcher{TEntity}.Test.cs

Lines changed: 74 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
using System.Reactive.Linq;
2+
using System.Runtime.Serialization;
3+
using System.Text.Json;
24
using FluentAssertions;
35
using k8s;
46
using k8s.Models;
@@ -32,22 +34,7 @@ public async Task Should_Restart_Watcher_On_Exception()
3234
{
3335
var settings = new OperatorSettings();
3436

35-
Action<Exception>? onError = null;
36-
37-
_client.Setup(
38-
c => c.Watch(
39-
It.IsAny<TimeSpan>(),
40-
It.IsAny<Action<WatchEventType, TestResource>>(),
41-
It.IsAny<Action<Exception>?>(),
42-
It.IsAny<Action>(),
43-
null,
44-
It.IsAny<CancellationToken>(),
45-
It.IsAny<string?>()))
46-
.Callback<TimeSpan, Action<WatchEventType, TestResource>, Action<Exception>?, Action?, string?,
47-
CancellationToken, string?>(
48-
(_, _, onErrorArg, _, _, _, _) => { onError = onErrorArg; })
49-
.Returns(Task.FromResult(CreateFakeWatcher()))
50-
.Verifiable();
37+
Func<Action<Exception>?> getOnError = SetupKubernetesClientWatch();
5138

5239
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
5340
_metrics.Setup(c => c.WatcherExceptions).Returns(Mock.Of<ICounter>());
@@ -63,7 +50,7 @@ public async Task Should_Restart_Watcher_On_Exception()
6350

6451
await resourceWatcher.StartAsync();
6552

66-
onError?.Invoke(new Exception());
53+
getOnError()?.Invoke(new Exception());
6754

6855
var backoff = settings.ErrorBackoffStrategy(1);
6956

@@ -72,11 +59,8 @@ public async Task Should_Restart_Watcher_On_Exception()
7259
VerifyWatch(2);
7360
}
7461

75-
[Fact]
76-
public async Task Should_Not_Throw_Overflow_Exception()
62+
private Func<Action<Exception>?> SetupKubernetesClientWatch()
7763
{
78-
var settings = new OperatorSettings();
79-
8064
Action<Exception>? onError = null;
8165

8266
_client.Setup(
@@ -88,12 +72,25 @@ public async Task Should_Not_Throw_Overflow_Exception()
8872
null,
8973
It.IsAny<CancellationToken>(),
9074
It.IsAny<string?>()))
91-
.Callback<TimeSpan, Action<WatchEventType, TestResource>, Action<Exception>?, Action?, string?,
92-
CancellationToken, string?>(
93-
(_, _, onErrorArg, _, _, _, _) => { onError = onErrorArg; })
75+
.Callback<TimeSpan, Action<WatchEventType, TestResource>, Action<Exception>?, Action?, string?, CancellationToken, string?>(
76+
(_, _, onErrorArg, _, _, _, _) =>
77+
{
78+
onError = onErrorArg;
79+
})
9480
.Returns(Task.FromResult(CreateFakeWatcher()))
9581
.Verifiable();
9682

83+
84+
return () => onError;
85+
}
86+
87+
[Fact]
88+
public async Task Should_Not_Throw_Overflow_Exception()
89+
{
90+
var settings = new OperatorSettings();
91+
92+
Func<Action<Exception>?> getOnError = SetupKubernetesClientWatch();
93+
9794
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
9895
_metrics.Setup(c => c.WatcherExceptions).Returns(Mock.Of<ICounter>());
9996

@@ -111,7 +108,7 @@ public async Task Should_Not_Throw_Overflow_Exception()
111108
const int numberOfRetries = 40;
112109
for (int reconnectAttempts = 1; reconnectAttempts <= numberOfRetries; reconnectAttempts++)
113110
{
114-
onError?.Invoke(new Exception());
111+
getOnError()?.Invoke(new Exception());
115112

116113
var backoff = settings.ErrorBackoffStrategy(reconnectAttempts > 39 ? 39 : reconnectAttempts);
117114
if (backoff.TotalSeconds > settings.WatcherMaxRetrySeconds)
@@ -122,24 +119,15 @@ public async Task Should_Not_Throw_Overflow_Exception()
122119
testScheduler.AdvanceBy(backoff.Add(TimeSpan.FromSeconds(1)).Ticks);
123120
}
124121

125-
VerifyWatch(numberOfRetries+1);
122+
VerifyWatch(numberOfRetries + 1);
126123
}
127124

128125
[Fact]
129126
public async Task Should_Not_Dispose_Reconnect_Subject_Or_Throw_Exception_After_Restarts()
130127
{
131128
var settings = new OperatorSettings();
132129

133-
Action<Exception>? onError = null;
134-
135-
_client.Setup(c => c.Watch(It.IsAny<TimeSpan>(), It.IsAny<Action<WatchEventType, TestResource>>(), It.IsAny<Action<Exception>?>(), It.IsAny<Action>(), null, It.IsAny<CancellationToken>(), It.IsAny<string?>()))
136-
.Callback<TimeSpan, Action<WatchEventType, TestResource>, Action<Exception>?, Action?, string?, CancellationToken, string?>(
137-
(_, _, onErrorArg, _, _, _, _) =>
138-
{
139-
onError = onErrorArg;
140-
})
141-
.Returns(Task.FromResult(CreateFakeWatcher()))
142-
.Verifiable();
130+
Func<Action<Exception>?> getOnError = SetupKubernetesClientWatch();
143131

144132
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
145133
_metrics.Setup(c => c.WatcherExceptions).Returns(Mock.Of<ICounter>());
@@ -154,13 +142,62 @@ public async Task Should_Not_Dispose_Reconnect_Subject_Or_Throw_Exception_After_
154142

155143
var kubernetesException = new KubernetesException(new V1Status());
156144

157-
onError?.Invoke(kubernetesException);
145+
getOnError()?.Invoke(kubernetesException);
158146

159147
resourceWatcher.WatchEvents.Should().NotBeNull();
160148

161149
VerifyWatch(2);
162150
}
163151

152+
[Fact]
153+
public async Task Should_Not_Restart_On_Serialization_Exception()
154+
{
155+
var settings = new OperatorSettings();
156+
157+
Func<Action<Exception>?> getOnError = SetupKubernetesClientWatch();
158+
159+
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
160+
_metrics.Setup(c => c.WatcherExceptions).Returns(Mock.Of<ICounter>());
161+
162+
using var resourceWatcher = new ResourceWatcher<TestResource>(_client.Object, new NullLogger<ResourceWatcher<TestResource>>(), _metrics.Object, settings);
163+
164+
await resourceWatcher.StartAsync();
165+
166+
var serializationException = new SerializationException(string.Empty, new JsonException("The input does not contain any JSON tokens"));
167+
168+
getOnError()?.Invoke(serializationException);
169+
170+
resourceWatcher.WatchEvents.Should().NotBeNull();
171+
172+
VerifyWatch(1);
173+
}
174+
175+
[Fact]
176+
public async Task Should_Be_Restarted_After_TaskCanceledException_IOException()
177+
{
178+
var settings = new OperatorSettings();
179+
180+
Func<Action<Exception>?> getOnError = SetupKubernetesClientWatch();
181+
182+
SetupResourceWatcherMetrics();
183+
184+
using var resourceWatcher = new ResourceWatcher<TestResource>(_client.Object, new NullLogger<ResourceWatcher<TestResource>>(), _metrics.Object, settings);
185+
186+
await resourceWatcher.StartAsync();
187+
188+
var serializationException = new TaskCanceledException(string.Empty, new IOException($@"Either the server or the client did close the connection on watcher for resource ""{typeof(TestResource)}"". Restart."));
189+
190+
getOnError()?.Invoke(serializationException);
191+
192+
VerifyWatch(2);
193+
}
194+
195+
private void SetupResourceWatcherMetrics()
196+
{
197+
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
198+
_metrics.Setup(c => c.WatcherExceptions).Returns(Mock.Of<ICounter>());
199+
}
200+
164201
[Fact]
165202
public async Task Should_Publish_On_Watcher_Event()
166203
{

0 commit comments

Comments
 (0)