Skip to content

Commit a64cc15

Browse files
rgramaRazvan Grama
andauthored
feat: Resource Watcher with Labels (#866)
This feature extends the ResourceWatcher functionality with support for watching only resources which match the label restrictions. --------- Co-authored-by: Razvan Grama <[email protected]>
1 parent 79f5533 commit a64cc15

File tree

8 files changed

+140
-2
lines changed

8 files changed

+140
-2
lines changed

src/KubeOps.Abstractions/Builder/IOperatorBuilder.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using k8s.Models;
33

44
using KubeOps.Abstractions.Controller;
5+
using KubeOps.Abstractions.Entities;
56
using KubeOps.Abstractions.Finalizer;
67

78
using Microsoft.Extensions.DependencyInjection;
@@ -29,6 +30,19 @@ IOperatorBuilder AddController<TImplementation, TEntity>()
2930
where TImplementation : class, IEntityController<TEntity>
3031
where TEntity : IKubernetesObject<V1ObjectMeta>;
3132

33+
/// <summary>
34+
/// Add a controller implementation for a specific entity to the operator.
35+
/// The metadata for the entity must be added as well.
36+
/// </summary>
37+
/// <typeparam name="TImplementation">Implementation type of the controller.</typeparam>
38+
/// <typeparam name="TEntity">Entity type.</typeparam>
39+
/// <typeparam name="TLabelSelector">Label Selector type.</typeparam>
40+
/// <returns>The builder for chaining.</returns>
41+
IOperatorBuilder AddController<TImplementation, TEntity, TLabelSelector>()
42+
where TImplementation : class, IEntityController<TEntity>
43+
where TEntity : IKubernetesObject<V1ObjectMeta>
44+
where TLabelSelector : class, IEntityLabelSelector<TEntity>;
45+
3246
/// <summary>
3347
/// Add a finalizer implementation for a specific entity.
3448
/// This adds the implementation as a transient service and registers
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using k8s;
2+
using k8s.Models;
3+
4+
namespace KubeOps.Abstractions.Entities;
5+
6+
public class DefaultEntityLabelSelector<TEntity> : IEntityLabelSelector<TEntity>
7+
where TEntity : IKubernetesObject<V1ObjectMeta>
8+
{
9+
public ValueTask<string?> GetLabelSelectorAsync(CancellationToken cancellationToken) => ValueTask.FromResult<string?>(null);
10+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using k8s;
2+
using k8s.Models;
3+
4+
namespace KubeOps.Abstractions.Entities;
5+
6+
// This is the same pattern used by Microsoft on ILogger<T>.
7+
// An alternative would be to use a KeyedSingleton when registering this however that's only valid from .NET 8 and above.
8+
// Other methods are far less elegant
9+
#pragma warning disable S2326
10+
public interface IEntityLabelSelector<TEntity>
11+
where TEntity : IKubernetesObject<V1ObjectMeta>
12+
{
13+
ValueTask<string?> GetLabelSelectorAsync(CancellationToken cancellationToken);
14+
}
15+
#pragma warning restore S2326

src/KubeOps.Operator/Builder/OperatorBuilder.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using KubeOps.Abstractions.Builder;
55
using KubeOps.Abstractions.Controller;
6+
using KubeOps.Abstractions.Entities;
67
using KubeOps.Abstractions.Events;
78
using KubeOps.Abstractions.Finalizer;
89
using KubeOps.Abstractions.Queue;
@@ -54,6 +55,31 @@ public IOperatorBuilder AddController<TImplementation, TEntity>()
5455
return this;
5556
}
5657

58+
public IOperatorBuilder AddController<TImplementation, TEntity, TLabelSelector>()
59+
where TImplementation : class, IEntityController<TEntity>
60+
where TEntity : IKubernetesObject<V1ObjectMeta>
61+
where TLabelSelector : class, IEntityLabelSelector<TEntity>
62+
{
63+
Services.AddHostedService<EntityRequeueBackgroundService<TEntity>>();
64+
Services.TryAddScoped<IEntityController<TEntity>, TImplementation>();
65+
Services.TryAddSingleton(new TimedEntityQueue<TEntity>());
66+
Services.TryAddTransient<IEntityRequeueFactory, KubeOpsEntityRequeueFactory>();
67+
Services.TryAddTransient<EntityRequeue<TEntity>>(services =>
68+
services.GetRequiredService<IEntityRequeueFactory>().Create<TEntity>());
69+
Services.TryAddSingleton<IEntityLabelSelector<TEntity>, TLabelSelector>();
70+
71+
if (_settings.EnableLeaderElection)
72+
{
73+
Services.AddHostedService<LeaderAwareResourceWatcher<TEntity>>();
74+
}
75+
else
76+
{
77+
Services.AddHostedService<ResourceWatcher<TEntity>>();
78+
}
79+
80+
return this;
81+
}
82+
5783
public IOperatorBuilder AddFinalizer<TImplementation, TEntity>(string identifier)
5884
where TImplementation : class, IEntityFinalizer<TEntity>
5985
where TEntity : IKubernetesObject<V1ObjectMeta>
@@ -91,6 +117,8 @@ private void AddOperatorBase()
91117
Services.TryAddTransient<EventPublisher>(
92118
services => services.GetRequiredService<IEventPublisherFactory>().Create());
93119

120+
Services.AddSingleton(typeof(IEntityLabelSelector<>), typeof(DefaultEntityLabelSelector<>));
121+
94122
if (_settings.EnableLeaderElection)
95123
{
96124
Services.AddLeaderElection();

src/KubeOps.Operator/Watcher/LeaderAwareResourceWatcher{TEntity}.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using k8s.Models;
44

55
using KubeOps.Abstractions.Builder;
6+
using KubeOps.Abstractions.Entities;
67
using KubeOps.KubernetesClient;
78
using KubeOps.Operator.Queue;
89

@@ -16,10 +17,11 @@ internal sealed class LeaderAwareResourceWatcher<TEntity>(
1617
IServiceProvider provider,
1718
TimedEntityQueue<TEntity> queue,
1819
OperatorSettings settings,
20+
IEntityLabelSelector<TEntity> labelSelector,
1921
IKubernetesClient client,
2022
IHostApplicationLifetime hostApplicationLifetime,
2123
LeaderElector elector)
22-
: ResourceWatcher<TEntity>(logger, provider, queue, settings, client)
24+
: ResourceWatcher<TEntity>(logger, provider, queue, settings, labelSelector, client)
2325
where TEntity : IKubernetesObject<V1ObjectMeta>
2426
{
2527
private CancellationTokenSource _cts = new();

src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
using KubeOps.Abstractions.Builder;
1010
using KubeOps.Abstractions.Controller;
11+
using KubeOps.Abstractions.Entities;
1112
using KubeOps.Abstractions.Finalizer;
1213
using KubeOps.KubernetesClient;
1314
using KubeOps.Operator.Queue;
@@ -23,6 +24,7 @@ internal class ResourceWatcher<TEntity>(
2324
IServiceProvider provider,
2425
TimedEntityQueue<TEntity> requeue,
2526
OperatorSettings settings,
27+
IEntityLabelSelector<TEntity> labelSelector,
2628
IKubernetesClient client)
2729
: IHostedService, IAsyncDisposable, IDisposable
2830
where TEntity : IKubernetesObject<V1ObjectMeta>
@@ -139,6 +141,7 @@ private async Task WatchClientEventsAsync(CancellationToken stoppingToken)
139141
await foreach ((WatchEventType type, TEntity entity) in client.WatchAsync<TEntity>(
140142
settings.Namespace,
141143
resourceVersion: currentVersion,
144+
labelSelector: await labelSelector.GetLabelSelectorAsync(stoppingToken),
142145
allowWatchBookmarks: true,
143146
cancellationToken: stoppingToken))
144147
{

test/KubeOps.Operator.Test/Builder/OperatorBuilder.Test.cs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,19 @@
22

33
using KubeOps.Abstractions.Builder;
44
using KubeOps.Abstractions.Controller;
5+
using KubeOps.Abstractions.Entities;
56
using KubeOps.Abstractions.Events;
67
using KubeOps.Abstractions.Finalizer;
78
using KubeOps.Abstractions.Queue;
9+
using KubeOps.KubernetesClient.LabelSelectors;
810
using KubeOps.Operator.Builder;
911
using KubeOps.Operator.Finalizer;
1012
using KubeOps.Operator.Queue;
1113
using KubeOps.Operator.Test.TestEntities;
1214
using KubeOps.Operator.Watcher;
1315

1416
using Microsoft.Extensions.DependencyInjection;
17+
using Microsoft.Extensions.DependencyInjection.Extensions;
1518
using Microsoft.Extensions.Hosting;
1619

1720
namespace KubeOps.Operator.Test.Builder;
@@ -29,6 +32,29 @@ public void Should_Add_Default_Resources()
2932
_builder.Services.Should().Contain(s =>
3033
s.ServiceType == typeof(EventPublisher) &&
3134
s.Lifetime == ServiceLifetime.Transient);
35+
_builder.Services.Should().Contain(s =>
36+
s.ServiceType == typeof(IEntityLabelSelector<>) &&
37+
s.ImplementationType == typeof(DefaultEntityLabelSelector<>) &&
38+
s.Lifetime == ServiceLifetime.Singleton);
39+
}
40+
41+
[Fact]
42+
public void Should_Use_Specific_EntityLabelSelector_Implementation()
43+
{
44+
// Arrange
45+
var services = new ServiceCollection();
46+
47+
// Register the default and specific implementations
48+
services.AddSingleton(typeof(IEntityLabelSelector<>), typeof(DefaultEntityLabelSelector<>));
49+
services.TryAddSingleton<IEntityLabelSelector<V1OperatorIntegrationTestEntity>, TestLabelSelector>();
50+
51+
var serviceProvider = services.BuildServiceProvider();
52+
53+
// Act
54+
var resolvedService = serviceProvider.GetRequiredService<IEntityLabelSelector<V1OperatorIntegrationTestEntity>>();
55+
56+
// Assert
57+
Assert.IsType<TestLabelSelector>(resolvedService);
3258
}
3359

3460
[Fact]
@@ -52,6 +78,31 @@ public void Should_Add_Controller_Resources()
5278
s.Lifetime == ServiceLifetime.Transient);
5379
}
5480

81+
[Fact]
82+
public void Should_Add_Controller_Resources_With_Label_Selector()
83+
{
84+
_builder.AddController<TestController, V1OperatorIntegrationTestEntity, TestLabelSelector>();
85+
86+
_builder.Services.Should().Contain(s =>
87+
s.ServiceType == typeof(IEntityController<V1OperatorIntegrationTestEntity>) &&
88+
s.ImplementationType == typeof(TestController) &&
89+
s.Lifetime == ServiceLifetime.Scoped);
90+
_builder.Services.Should().Contain(s =>
91+
s.ServiceType == typeof(IHostedService) &&
92+
s.ImplementationType == typeof(ResourceWatcher<V1OperatorIntegrationTestEntity>) &&
93+
s.Lifetime == ServiceLifetime.Singleton);
94+
_builder.Services.Should().Contain(s =>
95+
s.ServiceType == typeof(TimedEntityQueue<V1OperatorIntegrationTestEntity>) &&
96+
s.Lifetime == ServiceLifetime.Singleton);
97+
_builder.Services.Should().Contain(s =>
98+
s.ServiceType == typeof(EntityRequeue<V1OperatorIntegrationTestEntity>) &&
99+
s.Lifetime == ServiceLifetime.Transient);
100+
_builder.Services.Should().Contain(s =>
101+
s.ServiceType == typeof(IEntityLabelSelector<V1OperatorIntegrationTestEntity>) &&
102+
s.ImplementationType == typeof(TestLabelSelector) &&
103+
s.Lifetime == ServiceLifetime.Singleton);
104+
}
105+
55106
[Fact]
56107
public void Should_Add_Finalizer_Resources()
57108
{
@@ -105,4 +156,17 @@ private class TestFinalizer : IEntityFinalizer<V1OperatorIntegrationTestEntity>
105156
public Task FinalizeAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) =>
106157
Task.CompletedTask;
107158
}
159+
160+
private class TestLabelSelector : IEntityLabelSelector<V1OperatorIntegrationTestEntity>
161+
{
162+
public ValueTask<string?> GetLabelSelectorAsync(CancellationToken cancellationToken)
163+
{
164+
var labelSelectors = new LabelSelector[]
165+
{
166+
new EqualsSelector("label", "value")
167+
};
168+
169+
return ValueTask.FromResult<string?>(labelSelectors.ToExpression());
170+
}
171+
}
108172
}

test/KubeOps.Operator.Test/Watcher/ResourceWatcher{TEntity}.Test.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using k8s.Models;
55

66
using KubeOps.Abstractions.Builder;
7+
using KubeOps.Abstractions.Entities;
78
using KubeOps.KubernetesClient;
89
using KubeOps.Operator.Queue;
910
using KubeOps.Operator.Watcher;
@@ -25,12 +26,13 @@ public async Task Restarting_Watcher_Should_Trigger_New_Watch()
2526
var timedEntityQueue = new TimedEntityQueue<V1Pod>();
2627
var operatorSettings = new OperatorSettings() { Namespace = "unit-test" };
2728
var kubernetesClient = Mock.Of<IKubernetesClient>();
29+
var labelSelector = new DefaultEntityLabelSelector<V1Pod>();
2830

2931
Mock.Get(kubernetesClient)
3032
.Setup(client => client.WatchAsync<V1Pod>("unit-test", null, null, true, It.IsAny<CancellationToken>()))
3133
.Returns<string?, string?, string?, bool?, CancellationToken>((_, _, _, _, cancellationToken) => WaitForCancellationAsync<(WatchEventType, V1Pod)>(cancellationToken));
3234

33-
var resourceWatcher = new ResourceWatcher<V1Pod>(logger, serviceProvider, timedEntityQueue, operatorSettings, kubernetesClient);
35+
var resourceWatcher = new ResourceWatcher<V1Pod>(logger, serviceProvider, timedEntityQueue, operatorSettings, labelSelector, kubernetesClient);
3436

3537
// Act.
3638
// Start and stop the watcher.

0 commit comments

Comments
 (0)