Skip to content

Commit 0edac7e

Browse files
authored
feat: Resource Event De-duplication (#394)
When an event for a given resource is added to the queue, all enqueued events (those currently waiting for their "Delay" to elapse) for that resource are unsubscribed from and are thus ignored. For example, if I were to queue a "NotModified" event for a resource with a delay of 10 seconds, then 5 seconds later queue a "Deleted" event, the "Deleted" event would process immediately, and the "NotModified" event would be ignored and not processed. This has no effect on events which are currently in the "UpdateResourceData" or "HandleEvent" steps, as those are already past the de-duplication step.
1 parent 54c2e37 commit 0edac7e

19 files changed

+713
-374
lines changed

src/KubeOps.Testing/KubernetesOperatorFactory.cs

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
using DotnetKubernetesClient;
55
using k8s;
66
using k8s.Models;
7-
using KubeOps.Operator.Builder;
87
using KubeOps.Operator.Controller;
98
using KubeOps.Operator.Kubernetes;
109
using KubeOps.Operator.Leadership;
@@ -55,20 +54,24 @@ public void Run()
5554
var server = Server;
5655
}
5756

58-
public Task EnqueueEvent<TEntity>(ResourceEventType type, TEntity resource)
57+
public Task EnqueueEvent<TEntity>(ResourceEventType type, TEntity resource, int attempt = 0, TimeSpan? delay = null)
5958
where TEntity : class, IKubernetesObject<V1ObjectMeta>
6059
{
61-
var controller = GetMockController<TEntity>();
60+
var queue = Services.GetService<IEventQueue<TEntity>>();
6261

63-
return controller?.EnqueueEvent(type, resource) ?? Task.CompletedTask;
62+
queue?.EnqueueLocal(new ResourceEvent<TEntity>(type, resource, attempt, delay));
63+
64+
return Task.CompletedTask;
6465
}
6566

6667
public Task EnqueueFinalization<TEntity>(TEntity resource)
6768
where TEntity : class, IKubernetesObject<V1ObjectMeta>
6869
{
69-
var controller = GetMockController<TEntity>();
70+
var queue = Services.GetService<IEventQueue<TEntity>>();
71+
72+
queue?.EnqueueLocal(new ResourceEvent<TEntity>(ResourceEventType.Finalizing, resource));
7073

71-
return controller?.EnqueueFinalization(resource) ?? Task.CompletedTask;
74+
return Task.CompletedTask;
7275
}
7376

7477
/// <summary>
@@ -90,37 +93,25 @@ protected override IHostBuilder CreateHostBuilder() =>
9093
protected override void ConfigureWebHost(IWebHostBuilder builder)
9194
{
9295
base.ConfigureWebHost(builder);
93-
if (_solutionRelativeContentRoot != null)
94-
{
95-
builder.UseSolutionRelativeContentRoot(_solutionRelativeContentRoot);
96-
}
97-
9896
builder.ConfigureTestServices(
9997
services =>
10098
{
10199
var elector = services.First(
102100
d => d.ServiceType == typeof(IHostedService) && d.ImplementationType == typeof(LeaderElector));
103101
services.Remove(elector);
104102

105-
services.RemoveAll(typeof(IKubernetesClient));
103+
services.RemoveAll<IKubernetesClient>();
106104
services.AddSingleton<IKubernetesClient, MockKubernetesClient>();
107105

108-
services.RemoveAll<Func<IComponentRegistrar.ControllerRegistration, IManagedResourceController>>();
109-
services.AddSingleton(
110-
s => (Func<IComponentRegistrar.ControllerRegistration, IManagedResourceController>)(r =>
111-
(IManagedResourceController)ActivatorUtilities.CreateInstance(
112-
s,
113-
typeof(MockManagedResourceController<>).MakeGenericType(r.EntityType),
114-
r)));
106+
services.RemoveAll(typeof(IEventQueue<>));
107+
services.AddSingleton(typeof(IEventQueue<>), typeof(MockEventQueue<>));
115108
});
109+
if (_solutionRelativeContentRoot != null)
110+
{
111+
builder.UseSolutionRelativeContentRoot(_solutionRelativeContentRoot);
112+
}
113+
116114
builder.ConfigureLogging(logging => logging.ClearProviders());
117115
}
118-
119-
private MockManagedResourceController<TEntity>? GetMockController<TEntity>()
120-
where TEntity : class, IKubernetesObject<V1ObjectMeta> =>
121-
Services.GetRequiredService<IControllerInstanceBuilder>()
122-
.BuildControllers<TEntity>()
123-
.OfType<MockManagedResourceController<TEntity>>()
124-
.FirstOrDefault();
125116
}
126117
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using System;
2+
using System.Reactive.Linq;
3+
using System.Reactive.Subjects;
4+
using System.Threading.Tasks;
5+
using k8s;
6+
using k8s.Models;
7+
using KubeOps.Operator.Controller;
8+
9+
namespace KubeOps.Testing;
10+
11+
internal class MockEventQueue<TEntity> : IEventQueue<TEntity>
12+
where TEntity : class, IKubernetesObject<V1ObjectMeta>
13+
{
14+
private readonly Subject<ResourceEvent<TEntity>> _localEvents;
15+
16+
public MockEventQueue()
17+
{
18+
_localEvents = new Subject<ResourceEvent<TEntity>>();
19+
20+
Events = _localEvents;
21+
}
22+
23+
public IObservable<ResourceEvent<TEntity>> Events { get; }
24+
25+
public Task StartAsync(Action<ResourceEvent<TEntity>> onWatcherEvent)
26+
{
27+
return Task.CompletedTask;
28+
}
29+
30+
public Task StopAsync()
31+
{
32+
return Task.CompletedTask;
33+
}
34+
35+
public void EnqueueLocal(ResourceEvent<TEntity> resourceEvent) => _localEvents.OnNext(resourceEvent);
36+
}

src/KubeOps.Testing/MockManagedResourceController{TResource}.cs

Lines changed: 0 additions & 42 deletions
This file was deleted.

src/KubeOps/Operator/Builder/OperatorBuilder.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,12 @@ internal IOperatorBuilder AddOperatorBase(OperatorSettings settings)
199199

200200
Services.AddTransient<EntitySerializer>();
201201

202-
Services.AddTransient<IKubernetesClient, KubernetesClient>();
203-
Services.AddTransient<IEventManager, EventManager>();
202+
Services.AddScoped<IKubernetesClient, KubernetesClient>();
203+
Services.AddScoped<IEventManager, EventManager>();
204204

205-
Services.AddTransient(typeof(ResourceCache<>));
206-
Services.AddTransient(typeof(ResourceWatcher<>));
205+
Services.AddScoped(typeof(ResourceCache<>));
206+
Services.AddScoped(typeof(ResourceWatcher<>));
207+
Services.AddScoped(typeof(IEventQueue<>), typeof(EventQueue<>));
207208

208209
// Support all the metrics
209210
Services.AddSingleton(typeof(ResourceWatcherMetrics<>));
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using System.Collections.Generic;
2+
using k8s;
3+
using k8s.Models;
4+
5+
namespace KubeOps.Operator.Caching;
6+
7+
internal interface IResourceCache<TEntity>
8+
where TEntity : IKubernetesObject<V1ObjectMeta>
9+
{
10+
TEntity Get(string id);
11+
12+
TEntity Upsert(TEntity resource, out CacheComparisonResult result);
13+
14+
void Fill(IEnumerable<TEntity> resources);
15+
16+
void Remove(TEntity resource);
17+
18+
void Clear();
19+
}

src/KubeOps/Operator/Caching/ResourceCache{TEntity}.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
namespace KubeOps.Operator.Caching;
1212

13-
internal class ResourceCache<TEntity>
13+
internal class ResourceCache<TEntity> : IResourceCache<TEntity>
1414
where TEntity : IKubernetesObject<V1ObjectMeta>
1515
{
1616
private const string Finalizers = "Metadata.Finalizers";

0 commit comments

Comments
 (0)