Skip to content

Commit 4283cf6

Browse files
slacki123buehler
andcommitted
fix(watcher): queues not polling when status updated (#712)
With reference to #707 This fixes the behaviour of requeues Co-authored-by: Christoph Bühler <[email protected]>
1 parent 9a5a6d5 commit 4283cf6

File tree

4 files changed

+83
-15
lines changed

4 files changed

+83
-15
lines changed

src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@ private async void OnEntityRequeue(object? sender, (string Name, string? Namespa
111111
return;
112112
}
113113

114-
_entityCache.TryRemove(entity.Uid(), out _);
115114
await ReconcileModification(entity);
116115
}
117116

@@ -173,16 +172,32 @@ private async void OnEvent(WatchEventType type, TEntity entity)
173172
entity.Name(),
174173
_lastResourceVersion);
175174

176-
_queue.RemoveIfQueued(entity);
177-
178175
try
179176
{
180177
switch (type)
181178
{
182-
case WatchEventType.Added or WatchEventType.Modified:
179+
case WatchEventType.Added:
180+
_entityCache.TryAdd(entity.Uid(), entity.Generation() ?? 0);
181+
await ReconcileModification(entity);
182+
break;
183+
case WatchEventType.Modified:
183184
switch (entity)
184185
{
185186
case { Metadata.DeletionTimestamp: null }:
187+
_entityCache.TryGetValue(entity.Uid(), out var cachedGeneration);
188+
189+
// Check if entity spec has changed through "Generation" value increment. Skip reconcile if not changed.
190+
if (entity.Generation() <= cachedGeneration)
191+
{
192+
_logger.LogDebug(
193+
"""Entity "{kind}/{name}" modification did not modify generation. Skip event.""",
194+
entity.Kind,
195+
entity.Name());
196+
return;
197+
}
198+
199+
// update cached generation since generation now changed
200+
_entityCache.TryUpdate(entity.Uid(), entity.Generation() ?? 1, cachedGeneration);
186201
await ReconcileModification(entity);
187202
break;
188203
case { Metadata: { DeletionTimestamp: not null, Finalizers.Count: > 0 } }:
@@ -216,31 +231,25 @@ private async void OnEvent(WatchEventType type, TEntity entity)
216231

217232
private async Task ReconcileModification(TEntity entity)
218233
{
219-
var latestGeneration = _entityCache.GetOrAdd(entity.Uid(), 0);
220-
if (entity.Generation() <= latestGeneration)
221-
{
222-
_logger.LogDebug(
223-
"""Entity "{kind}/{name}" modification did not modify generation. Skip event.""",
224-
entity.Kind,
225-
entity.Name());
226-
return;
227-
}
228-
229-
_entityCache.TryUpdate(entity.Uid(), entity.Generation() ?? 1, latestGeneration);
234+
// Re-queue should requested in the controller reconcile method. Invalidate any existing queues.
235+
_queue.RemoveIfQueued(entity);
230236
await using var scope = _provider.CreateAsyncScope();
231237
var controller = scope.ServiceProvider.GetRequiredService<IEntityController<TEntity>>();
232238
await controller.ReconcileAsync(entity);
233239
}
234240

235241
private async Task ReconcileDeletion(TEntity entity)
236242
{
243+
_queue.RemoveIfQueued(entity);
244+
_entityCache.TryRemove(entity.Uid(), out _);
237245
await using var scope = _provider.CreateAsyncScope();
238246
var controller = scope.ServiceProvider.GetRequiredService<IEntityController<TEntity>>();
239247
await controller.DeletedAsync(entity);
240248
}
241249

242250
private async Task ReconcileFinalizer(TEntity entity)
243251
{
252+
_queue.RemoveIfQueued(entity);
244253
var pendingFinalizer = entity.Finalizers();
245254
if (_finalizers.Value.Find(reg =>
246255
reg.EntityType == entity.GetType() && pendingFinalizer.Contains(reg.Identifier)) is not

test/KubeOps.Operator.Test/Controller/CancelEntityRequeue.Integration.Test.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,20 @@ public async Task Should_Cancel_Requeue_If_New_Event_Fires()
3434
Services.GetRequiredService<TimedEntityQueue<V1OperatorIntegrationTestEntity>>().Count.Should().Be(0);
3535
}
3636

37+
[Fact]
38+
public async Task Should_Not_Affect_Queues_If_Only_Status_Updated()
39+
{
40+
_mock.TargetInvocationCount = 1;
41+
var result = await _client.CreateAsync(new V1OperatorIntegrationTestEntity("test-entity", "username", _ns.Namespace));
42+
result.Status.Status = "changed";
43+
await _client.UpdateStatusAsync(result);
44+
await _mock.WaitForInvocations;
45+
46+
_mock.Invocations.Count.Should().Be(1);
47+
Services.GetRequiredService<TimedEntityQueue<V1OperatorIntegrationTestEntity>>().Count.Should().Be(1);
48+
49+
}
50+
3751
public override async Task InitializeAsync()
3852
{
3953
await base.InitializeAsync();

test/KubeOps.Operator.Test/Controller/EntityController.Integration.Test.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,27 @@ void Check(int idx, string username)
5757
}
5858
}
5959

60+
[Fact]
61+
public async Task Should_Not_Call_Reconcile_When_Only_Entity_Status_Changed()
62+
{
63+
_mock.TargetInvocationCount = 1;
64+
65+
var result =
66+
await _client.CreateAsync(new V1OperatorIntegrationTestEntity("test-entity", "username", _ns.Namespace));
67+
result.Status.Status = "changed";
68+
// Update or UpdateStatus do not call Reconcile
69+
await _client.UpdateAsync(result);
70+
await _client.UpdateStatusAsync(result);
71+
await _mock.WaitForInvocations;
72+
73+
_mock.Invocations.Count.Should().Be(1);
74+
75+
(string method, V1OperatorIntegrationTestEntity entity) = _mock.Invocations.Single();
76+
method.Should().Be("ReconcileAsync");
77+
entity.Should().BeOfType<V1OperatorIntegrationTestEntity>();
78+
entity.Spec.Username.Should().Be("username");
79+
}
80+
6081
[Fact]
6182
public async Task Should_Call_Delete_For_Deleted_Entity()
6283
{

test/KubeOps.Operator.Test/Controller/EntityRequeue.Integration.Test.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,30 @@ public async Task Should_Requeue_Entity_And_Reconcile()
3535
_mock.Invocations.Count.Should().Be(5);
3636
}
3737

38+
[Fact]
39+
public async Task Should_Separately_And_Reliably_Requeue_And_Reconcile_Multiple_Entities_In_Parallel()
40+
{
41+
_mock.TargetInvocationCount = 100;
42+
await _client.CreateAsync(new V1OperatorIntegrationTestEntity("test-entity1", "username", _ns.Namespace));
43+
await _client.CreateAsync(new V1OperatorIntegrationTestEntity("test-entity2", "username", _ns.Namespace));
44+
await _client.CreateAsync(new V1OperatorIntegrationTestEntity("test-entity3", "username", _ns.Namespace));
45+
await _client.CreateAsync(new V1OperatorIntegrationTestEntity("test-entity4", "username", _ns.Namespace));
46+
await _mock.WaitForInvocations;
47+
48+
// Expecting invocations, but since in parallel, there is a possibility to for target hit while other are in flight.
49+
_mock.Invocations.Count.Should().BeGreaterOrEqualTo(100).And.BeLessThan(105);
50+
var invocationsGroupedById = _mock.Invocations.GroupBy(item => item.Entity.Metadata.Uid).ToList();
51+
invocationsGroupedById.Count.Should().Be(4);
52+
var invocationDistributions = invocationsGroupedById
53+
.Select(g => (double)g.Count() / _mock.Invocations.Count * 100)
54+
.ToList();
55+
invocationDistributions
56+
.All(p => p is >= 15 and <= 35) // Check that invocations are reasonably distributed
57+
.Should()
58+
.BeTrue($"each entity invocation proportion should be within the specified range of total invocations, " +
59+
$"but instead the distributions were: '{string.Join(", ", invocationDistributions)}'");
60+
}
61+
3862
public override async Task InitializeAsync()
3963
{
4064
await base.InitializeAsync();

0 commit comments

Comments
 (0)