Skip to content

Commit 29959c5

Browse files
author
Christoph Bühler
committed
refactor(error handler): use better centralized exponential backoff handler
1 parent c4b5244 commit 29959c5

File tree

6 files changed

+148
-73
lines changed

6 files changed

+148
-73
lines changed

src/KubeOps/Operator/Commands/Generators/CrdGenerator.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,6 @@ private static V1JSONSchemaProps MapType(Type type)
150150
// this description is on the class
151151
props.Description = type.GetCustomAttributes<DisplayAttribute>(true).FirstOrDefault()?.Description;
152152

153-
// TODO: validator attributes
154-
155153
if (type == typeof(V1ObjectMeta))
156154
{
157155
// TODO(check): is this correct? should metadata be filtered?
@@ -211,6 +209,8 @@ private static V1JSONSchemaProps MapType(Type type)
211209
props.Nullable = true;
212210
}
213211

212+
// TODO: validator attributes
213+
214214
return props;
215215
}
216216

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Timer = System.Timers.Timer;
5+
6+
namespace KubeOps.Operator.Errors
7+
{
8+
internal class ExponentialBackoffHandler : IDisposable
9+
{
10+
private const double MaxRetrySeconds = 64;
11+
private readonly Action? _retryHandler;
12+
private readonly Func<Task>? _asyncRetryHandler;
13+
private readonly Random _rnd = new Random();
14+
15+
private Timer? _retryTimer;
16+
private Timer? _resetTimer;
17+
18+
private int _tryCount = -1;
19+
20+
public event EventHandler? RetryHandler;
21+
22+
public ExponentialBackoffHandler()
23+
{
24+
}
25+
26+
public ExponentialBackoffHandler(Action retryHandler)
27+
{
28+
_retryHandler = retryHandler;
29+
}
30+
31+
public ExponentialBackoffHandler(Func<Task> asyncRetryHandler)
32+
{
33+
_asyncRetryHandler = asyncRetryHandler;
34+
}
35+
36+
public TimeSpan Retry(TimeSpan? resetTimer = null)
37+
{
38+
DisposeTimer(_resetTimer);
39+
if (resetTimer != null)
40+
{
41+
TimedReset(resetTimer.Value);
42+
}
43+
44+
var span = ExponentialBackoff(Interlocked.Increment(ref _tryCount));
45+
DisposeTimer(_retryTimer);
46+
_retryTimer = new Timer(span.TotalMilliseconds);
47+
_retryTimer.Elapsed += (_, __) =>
48+
{
49+
RetryHandler?.Invoke(this, EventArgs.Empty);
50+
_retryHandler?.Invoke();
51+
_asyncRetryHandler?.Invoke();
52+
DisposeTimer(_retryTimer);
53+
};
54+
_retryTimer.Start();
55+
56+
return span;
57+
}
58+
59+
public void Reset()
60+
{
61+
DisposeTimer(_resetTimer);
62+
DisposeTimer(_retryTimer);
63+
Interlocked.Exchange(ref _tryCount, -1);
64+
}
65+
66+
public void TimedReset(TimeSpan resetTimer)
67+
{
68+
_resetTimer = new Timer(resetTimer.TotalMilliseconds);
69+
_resetTimer.Elapsed += (_, __) =>
70+
{
71+
Interlocked.Exchange(ref _tryCount, -1);
72+
DisposeTimer(_resetTimer);
73+
};
74+
_resetTimer.Start();
75+
}
76+
77+
public void Dispose()
78+
{
79+
Reset();
80+
foreach (var handler in RetryHandler?.GetInvocationList() ?? new Delegate[] { })
81+
{
82+
RetryHandler -= (EventHandler) handler;
83+
}
84+
}
85+
86+
private static void DisposeTimer(Timer? timer)
87+
{
88+
timer?.Stop();
89+
timer?.Dispose();
90+
}
91+
92+
private TimeSpan ExponentialBackoff(int retryCount) => TimeSpan
93+
.FromSeconds(Math.Min(Math.Pow(2, retryCount), MaxRetrySeconds))
94+
.Add(TimeSpan.FromMilliseconds(_rnd.Next(0, 1000)));
95+
}
96+
}

src/KubeOps/Operator/Queue/ResourceEventQueue.cs

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using k8s;
88
using k8s.Models;
99
using KubeOps.Operator.Caching;
10+
using KubeOps.Operator.Errors;
1011
using KubeOps.Operator.Watcher;
1112
using Microsoft.Extensions.Logging;
1213

@@ -17,22 +18,20 @@ internal class ResourceEventQueue<TEntity> : IResourceEventQueue<TEntity>
1718
{
1819
// TODO: Make configurable
1920
private const int QueueLimit = 512;
20-
private const double MaxRetrySeconds = 64;
2121

2222
private readonly Channel<(ResourceEventType type, TEntity resource)> _queue =
2323
Channel.CreateBounded<(ResourceEventType type, TEntity resource)>(QueueLimit);
2424

2525
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);
26-
private readonly Random _rnd = new Random();
2726
private readonly ILogger<ResourceEventQueue<TEntity>> _logger;
2827
private readonly IResourceCache<TEntity> _cache;
2928
private readonly IResourceWatcher<TEntity> _watcher;
3029

3130
private readonly IDictionary<string, ResourceTimer<TEntity>> _delayedEnqueue =
3231
new ConcurrentDictionary<string, ResourceTimer<TEntity>>();
3332

34-
private readonly IDictionary<string, int> _erroredEventsCounter =
35-
new ConcurrentDictionary<string, int>();
33+
private readonly ConcurrentDictionary<string, ExponentialBackoffHandler> _errorHandlers =
34+
new ConcurrentDictionary<string, ExponentialBackoffHandler>();
3635

3736
private CancellationTokenSource? _cancellation;
3837

@@ -70,7 +69,13 @@ public async Task Stop()
7069
timer.Destroy();
7170
}
7271

72+
foreach (var errorBackoff in _errorHandlers.Values)
73+
{
74+
errorBackoff.Dispose();
75+
}
76+
7377
_delayedEnqueue.Clear();
78+
_errorHandlers.Clear();
7479
}
7580

7681
public void Dispose()
@@ -93,6 +98,11 @@ public void Dispose()
9398
{
9499
ResourceEvent -= (EventHandler<(ResourceEventType type, TEntity resource)>) handler;
95100
}
101+
102+
foreach (var errorBackoff in _errorHandlers.Values)
103+
{
104+
errorBackoff.Dispose();
105+
}
96106
}
97107

98108
public async Task Enqueue(TEntity resource, TimeSpan? enqueueDelay = null)
@@ -172,40 +182,38 @@ public async Task Enqueue(TEntity resource, TimeSpan? enqueueDelay = null)
172182

173183
public void EnqueueErrored(ResourceEventType type, TEntity resource)
174184
{
175-
if (!_erroredEventsCounter.ContainsKey(resource.Metadata.Uid))
176-
{
177-
_erroredEventsCounter[resource.Metadata.Uid] = 0;
178-
}
179-
else
180-
{
181-
_erroredEventsCounter[resource.Metadata.Uid]++;
182-
}
185+
var handler = _errorHandlers.GetOrAdd(
186+
resource.Metadata.Uid,
187+
_ =>
188+
{
189+
return new ExponentialBackoffHandler(
190+
async () =>
191+
{
192+
_logger.LogTrace(
193+
@"Backoff (error) requeue timer elapsed for ""{kind}/{name}"".",
194+
resource.Kind,
195+
resource.Metadata.Name);
196+
await EnqueueEvent(type, resource);
197+
});
198+
});
199+
183200

184-
var backoff = ExponentialBackoff(_erroredEventsCounter[resource.Metadata.Uid]);
201+
var backoff = handler.Retry();
185202
_logger.LogDebug(
186203
@"Requeue event ""{eventType}"" with backoff ""{backoff}"" for resource ""{kind}/{name}"".",
187204
type,
188205
backoff,
189206
resource.Kind,
190207
resource.Metadata.Name);
191-
192-
var timer = new ResourceTimer<TEntity>(
193-
resource,
194-
backoff,
195-
async delayedResource =>
196-
{
197-
_logger.LogTrace(
198-
@"Backoff (error) requeue timer elapsed for ""{kind}/{name}"".",
199-
delayedResource.Kind,
200-
delayedResource.Metadata.Name);
201-
_delayedEnqueue.Remove(delayedResource.Metadata.Uid);
202-
await EnqueueEvent(type, delayedResource);
203-
});
204-
_delayedEnqueue.Add(resource.Metadata.Uid, timer);
205-
timer.Start();
206208
}
207209

208-
public void ClearError(TEntity resource) => _erroredEventsCounter.Remove(resource.Metadata.Uid);
210+
public void ClearError(TEntity resource)
211+
{
212+
if (_errorHandlers.Remove(resource.Metadata.Uid, out var handler))
213+
{
214+
handler.Dispose();
215+
}
216+
}
209217

210218
private async void OnWatcherEvent(object? _, (WatchEventType type, TEntity resource) args)
211219
{
@@ -292,9 +300,5 @@ await _queue.Reader.WaitToReadAsync(_cancellation.Token))
292300
ResourceEvent?.Invoke(this, message);
293301
}
294302
}
295-
296-
private TimeSpan ExponentialBackoff(int retryCount) => TimeSpan
297-
.FromSeconds(Math.Min(Math.Pow(2, retryCount), MaxRetrySeconds))
298-
.Add(TimeSpan.FromMilliseconds(_rnd.Next(0, 1000)));
299303
}
300304
}

src/KubeOps/Operator/Watcher/ResourceWatcher.cs

Lines changed: 6 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,34 +4,28 @@
44
using k8s;
55
using k8s.Models;
66
using KubeOps.Operator.Client;
7+
using KubeOps.Operator.Errors;
78
using Microsoft.Extensions.Logging;
8-
using Timer = System.Timers.Timer;
99

1010
namespace KubeOps.Operator.Watcher
1111
{
1212
internal class ResourceWatcher<TEntity> : IResourceWatcher<TEntity>
1313
where TEntity : IKubernetesObject<V1ObjectMeta>
1414
{
15-
private const double MaxRetrySeconds = 64;
16-
17-
private int _errorCount;
18-
1915
private readonly ILogger<ResourceWatcher<TEntity>> _logger;
2016
private readonly IKubernetesClient _client;
17+
private readonly ExponentialBackoffHandler _reconnectHandler;
2118

22-
private readonly Random _rnd = new Random();
2319
private CancellationTokenSource? _cancellation;
2420
private Watcher<TEntity>? _watcher;
2521

26-
private Timer? _reconnectTimer;
27-
private Timer? _resetErrCountTimer;
28-
2922
public event EventHandler<(WatchEventType type, TEntity resource)>? WatcherEvent;
3023

3124
public ResourceWatcher(ILogger<ResourceWatcher<TEntity>> logger, IKubernetesClient client)
3225
{
3326
_logger = logger;
3427
_client = client;
28+
_reconnectHandler = new ExponentialBackoffHandler(async () => await WatchResource());
3529
}
3630

3731
public Task Start()
@@ -59,8 +53,7 @@ public void Dispose()
5953
WatcherEvent -= (EventHandler<(WatchEventType type, TEntity resource)>) handler;
6054
}
6155

62-
_reconnectTimer?.Dispose();
63-
_resetErrCountTimer?.Dispose();
56+
_reconnectHandler.Dispose();
6457
_cancellation?.Dispose();
6558
_watcher?.Dispose();
6659
_logger.LogTrace(@"Disposed resource watcher for type ""{type}"".", typeof(TEntity));
@@ -81,19 +74,6 @@ private async Task WatchResource()
8174
}
8275
}
8376

84-
_resetErrCountTimer = new Timer(TimeSpan.FromSeconds(10).TotalMilliseconds);
85-
_resetErrCountTimer.Elapsed += (_, __) =>
86-
{
87-
_logger.LogTrace("Reset error count in resource watcher.");
88-
_errorCount = 0;
89-
_resetErrCountTimer.Dispose();
90-
_resetErrCountTimer = null;
91-
_reconnectTimer?.Stop();
92-
_reconnectTimer?.Dispose();
93-
_reconnectTimer = null;
94-
};
95-
_resetErrCountTimer.Start();
96-
9777
_cancellation = new CancellationTokenSource();
9878
// TODO: namespaced resources
9979
_watcher = await _client.Watch<TEntity>(
@@ -144,15 +124,8 @@ private void OnException(Exception e)
144124
_watcher?.Dispose();
145125
_watcher = null;
146126

147-
_logger.LogInformation("Trying to reconnect with exponential backoff.");
148-
_resetErrCountTimer?.Stop();
149-
_resetErrCountTimer?.Dispose();
150-
_resetErrCountTimer = null;
151-
_reconnectTimer?.Stop();
152-
_reconnectTimer?.Dispose();
153-
_reconnectTimer = new Timer(ExponentialBackoff(++_errorCount).TotalMilliseconds);
154-
_reconnectTimer.Elapsed += (_, __) => RestartWatcher();
155-
_reconnectTimer.Start();
127+
var backoff = _reconnectHandler.Retry(TimeSpan.FromSeconds(5));
128+
_logger.LogInformation("Trying to reconnect with exponential backoff {backoff}.", backoff);
156129
}
157130

158131
private void OnClose()
@@ -163,9 +136,5 @@ private void OnClose()
163136
RestartWatcher();
164137
}
165138
}
166-
167-
private TimeSpan ExponentialBackoff(int retryCount) => TimeSpan
168-
.FromSeconds(Math.Min(Math.Pow(2, retryCount), MaxRetrySeconds))
169-
.Add(TimeSpan.FromMilliseconds(_rnd.Next(0, 1000)));
170139
}
171140
}

tests/KubeOps.TestOperator/appsettings.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"Logging": {
33
"LogLevel": {
4-
"Default": "Debug",
4+
"Default": "Trace",
55
"System": "Information",
66
"Microsoft": "Information"
77
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
apiVersion: testing.dev/v1
2+
kind: TestEntity
3+
metadata:
4+
name: my-test-entity
5+
spec:
6+
spec: string

0 commit comments

Comments
 (0)