Skip to content

Commit bb6a89d

Browse files
author
Christoph Bühler
committed
fix(queue): delayed enqueues (requeue) will refetch the resource to avoid conflict
1 parent 6d18dea commit bb6a89d

File tree

2 files changed

+18
-8
lines changed

2 files changed

+18
-8
lines changed

src/KubeOps/Operator/Client/KubernetesClient.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public async Task UpdateStatus<TStatus>(IStatus<TStatus> resource)
154154
{
155155
if (!(resource is IKubernetesObject<V1ObjectMeta> kubernetesObject))
156156
{
157-
throw new ArgumentException("Resource is not a propper kubernetes object");
157+
throw new ArgumentException("Resource is not a proper kubernetes object");
158158
}
159159

160160
var crd = kubernetesObject.CreateResourceDefinition();
@@ -176,7 +176,10 @@ public async Task UpdateStatus<TStatus>(IStatus<TStatus> resource)
176176
if (result?.ToObject(resource.GetType()) is IKubernetesObject<V1ObjectMeta> parsed)
177177
{
178178
kubernetesObject.Metadata.ResourceVersion = parsed.Metadata.ResourceVersion;
179+
return;
179180
}
181+
182+
throw new ArgumentException("Could not parse result");
180183
}
181184

182185
public Task Delete<TResource>(TResource resource)

src/KubeOps/Operator/Queue/ResourceEventQueue.cs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@
77
using k8s;
88
using k8s.Models;
99
using KubeOps.Operator.Caching;
10+
using KubeOps.Operator.Client;
1011
using KubeOps.Operator.Errors;
1112
using KubeOps.Operator.Watcher;
1213
using Microsoft.Extensions.Logging;
1314

1415
namespace KubeOps.Operator.Queue
1516
{
1617
internal class ResourceEventQueue<TEntity> : IResourceEventQueue<TEntity>
17-
where TEntity : IKubernetesObject<V1ObjectMeta>
18+
where TEntity : class, IKubernetesObject<V1ObjectMeta>
1819
{
1920
// TODO: Make configurable
2021
private const int QueueLimit = 512;
@@ -24,6 +25,7 @@ internal class ResourceEventQueue<TEntity> : IResourceEventQueue<TEntity>
2425

2526
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);
2627
private readonly ILogger<ResourceEventQueue<TEntity>> _logger;
28+
private readonly IKubernetesClient _client;
2729
private readonly IResourceCache<TEntity> _cache;
2830
private readonly IResourceWatcher<TEntity> _watcher;
2931

@@ -39,10 +41,12 @@ internal class ResourceEventQueue<TEntity> : IResourceEventQueue<TEntity>
3941

4042
public ResourceEventQueue(
4143
ILogger<ResourceEventQueue<TEntity>> logger,
44+
IKubernetesClient client,
4245
IResourceCache<TEntity> cache,
4346
IResourceWatcher<TEntity> watcher)
4447
{
4548
_logger = logger;
49+
_client = client;
4650
_cache = cache;
4751
_watcher = watcher;
4852
}
@@ -131,17 +135,20 @@ public async Task Enqueue(TEntity resource, TimeSpan? enqueueDelay = null)
131135
_semaphore.Release();
132136
}
133137

134-
var cachedResource = _cache.Get(delayedResource.Metadata.Uid);
135-
if (cachedResource == null)
138+
var newResource = await _client.Get<TEntity>(
139+
delayedResource.Metadata.Name,
140+
delayedResource.Metadata.NamespaceProperty);
141+
142+
if (newResource == null)
136143
{
137144
_logger.LogDebug(
138-
@"Resource ""{kind}/{name}"" was not present in the cache anymore. Don't execute delayed timer.",
139-
delayedResource.Kind,
140-
delayedResource.Metadata.Name);
145+
@"Resource ""{kind}/{name}"" for enqueued event was not present anymore.",
146+
resource.Kind,
147+
resource.Metadata.Name);
141148
return;
142149
}
143150

144-
await Enqueue(cachedResource);
151+
await Enqueue(newResource);
145152
});
146153
_delayedEnqueue.Add(resource.Metadata.Uid, timer);
147154

0 commit comments

Comments
 (0)