Skip to content

Commit dd890bb

Browse files
authored
Resilient event watcher (#60)
1 parent 2a621c6 commit dd890bb

File tree

4 files changed

+51
-15
lines changed

4 files changed

+51
-15
lines changed

src/K8sOperator.NET/EventWatcher.cs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,33 @@ public async Task Start(CancellationToken cancellationToken)
5353
_cancellationToken = cancellationToken;
5454
_isRunning = true;
5555

56-
var response = Client.ListAsync<T>(LabelSelector, cancellationToken);
56+
Logger.BeginWatch(Crd.PluralName, LabelSelector);
5757

58-
await foreach (var (type, item) in response.WatchAsync<T, object>(OnError, cancellationToken))
58+
while (_isRunning && !_cancellationToken.IsCancellationRequested)
5959
{
60-
OnEvent(type, item);
60+
try
61+
{
62+
var response = Client.ListAsync<T>(LabelSelector, cancellationToken);
63+
64+
await foreach (var (type, item) in response.WatchAsync<T, object>(OnError, cancellationToken))
65+
{
66+
OnEvent(type, item);
67+
}
68+
}
69+
catch (TaskCanceledException)
70+
{
71+
Logger.WatcherError("Task was canceled.");
72+
}
73+
catch (OperationCanceledException)
74+
{
75+
Logger.WatcherError("Operation was canceled restarting...");
76+
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
77+
}
78+
catch (HttpOperationException ex)
79+
{
80+
Logger.WatcherError($"Http Error: {ex.Response.Content}, restarting...");
81+
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
82+
}
6183
}
6284

6385
Logger.EndWatch(Crd.PluralName, LabelSelector);
@@ -76,8 +98,7 @@ private void OnEvent(WatchEventType eventType, T customResource)
7698
var exception = t.Exception.Flatten().InnerException;
7799
Logger.ProcessEventError(exception, eventType, customResource);
78100
}
79-
})
80-
;
101+
});
81102
}
82103

83104
private async Task ProccessEventAsync(WatchEventType eventType, T resource)
@@ -233,7 +254,7 @@ private void OnError(Exception exception)
233254
{
234255
if (_isRunning)
235256
{
236-
Logger.LogError(exception, "Watcher error");
257+
Logger.WatcherError(exception.Message);
237258
}
238259
}
239260
}

src/K8sOperator.NET/Extensions/LoggingExtensions.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ internal static partial class LoggingExtensions
3636
[LoggerMessage(
3737
EventId = 4,
3838
Level = LogLevel.Information,
39-
Message = "Begin watch {ns}/{plural} {labelselector}"
39+
Message = "Begin watch {plural} {labelselector}"
4040
)]
41-
public static partial void BeginWatch(this ILogger logger, string ns, string plural, string labelselector);
41+
public static partial void BeginWatch(this ILogger logger, string plural, string labelselector);
4242

4343
[LoggerMessage(
4444
EventId = 5,
@@ -181,4 +181,17 @@ internal static partial class LoggingExtensions
181181
Message = "End Error {resource}")]
182182
public static partial void EndError(this ILogger logger, CustomResource resource);
183183

184+
[LoggerMessage(
185+
EventId = 28,
186+
Level = LogLevel.Information,
187+
Message = "Watcher Error {message}")]
188+
public static partial void WatcherError(this ILogger logger, string message);
189+
190+
[LoggerMessage(
191+
EventId = 29,
192+
Level = LogLevel.Information,
193+
Message = "ListAsync {ns}/{plural} {labelselector}"
194+
)]
195+
public static partial void ListAsync(this ILogger logger, string ns, string plural, string labelselector);
196+
184197
}

src/K8sOperator.NET/KubernetesClient.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public Task<HttpOperationResponse<object>> ListAsync<T>(string labelSelector, Ca
2828
{
2929
var info = typeof(T).GetCustomAttribute<KubernetesEntityAttribute>()!;
3030

31-
Logger.BeginWatch(Namespace, info.PluralName, labelSelector);
31+
Logger.ListAsync(Namespace, info.PluralName, labelSelector);
3232

3333
var response = Client.CustomObjects.ListNamespacedCustomObjectWithHttpMessagesAsync(
3434
info.Group,
@@ -98,7 +98,7 @@ public Task<HttpOperationResponse<object>> ListAsync<T>(string labelSelector, Ca
9898
{
9999
var info = typeof(T).GetCustomAttribute<KubernetesEntityAttribute>()!;
100100

101-
Logger.BeginWatch("cluster-wide", info.PluralName, labelSelector);
101+
Logger.ListAsync("cluster-wide", info.PluralName, labelSelector);
102102

103103
var response = Client.CustomObjects.ListClusterCustomObjectWithHttpMessagesAsync(
104104
info.Group,

test/K8sOperator.NET.Tests/EventWatcherTests.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,14 @@ private static Watcher<T>.WatchEvent CreateEvent<T>(WatchEventType type, T item)
5252

5353
private readonly ITestOutputHelper _testOutput;
5454
private readonly Controller<TestResource> _controller = Substitute.For<Controller<TestResource>>();
55+
private readonly CancellationTokenSource _tokenSource;
5556
private readonly ILoggerFactory _loggerFactory = Substitute.For<ILoggerFactory>();
5657
private readonly ILogger _logger = Substitute.For<ILogger>();
5758
private readonly List<object> _metadata;
5859

5960
public EventWatcherTests(ITestOutputHelper testOutput)
6061
{
62+
_tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(2));
6163
_testOutput = testOutput;
6264
_loggerFactory.CreateLogger(Arg.Any<string>()).Returns(_logger);
6365
_metadata = [
@@ -71,8 +73,8 @@ public EventWatcherTests(ITestOutputHelper testOutput)
7173
[Fact]
7274
public async Task Start_Should_StartWatchAndLogStart()
7375
{
74-
var cancellationToken = new CancellationTokenSource().Token;
75-
76+
var cancellationToken = _tokenSource.Token;
77+
7678
using ( var server = new MockKubeApiServer(_testOutput, endpoints =>
7779
{
7880
endpoints.MapListNamespacedCustomObjectWithHttpMessagesAsync<TestResource>();
@@ -90,7 +92,7 @@ public async Task Start_Should_StartWatchAndLogStart()
9092
[Fact]
9193
public async Task OnEvent_Should_HandleAddedEventAndCallAddOrModifyAsync()
9294
{
93-
var cancellationToken = new CancellationTokenSource().Token;
95+
var cancellationToken = _tokenSource.Token;
9496

9597
using (var server = new MockKubeApiServer(_testOutput, endpoints =>
9698
{
@@ -112,7 +114,7 @@ public async Task OnEvent_Should_HandleAddedEventAndCallAddOrModifyAsync()
112114
[Fact]
113115
public async Task OnEvent_Should_HandleDeletedEventAndCallDeleteAsync()
114116
{
115-
var cancellationToken = new CancellationTokenSource().Token;
117+
var cancellationToken = _tokenSource.Token;
116118

117119
using (var server = new MockKubeApiServer(_testOutput, endpoints =>
118120
{
@@ -134,7 +136,7 @@ public async Task OnEvent_Should_HandleDeletedEventAndCallDeleteAsync()
134136
[Fact]
135137
public async Task HandleFinalizeAsync_Should_CallFinalizeAndRemoveFinalizer()
136138
{
137-
var cancellationToken = new CancellationTokenSource().Token;
139+
var cancellationToken = _tokenSource.Token;
138140

139141
using (var server = new MockKubeApiServer(_testOutput, endpoints =>
140142
{

0 commit comments

Comments
 (0)