Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 8 additions & 23 deletions src/ES.Kubernetes.Reflector/Core/ConfigMapMirror.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,18 @@

namespace ES.Kubernetes.Reflector.Core;

public class ConfigMapMirror(ILogger<ConfigMapMirror> logger, IServiceProvider serviceProvider)
: ResourceMirror<V1ConfigMap>(logger, serviceProvider)
public class ConfigMapMirror(ILogger<ConfigMapMirror> logger, IKubernetes kubernetesClient)
: ResourceMirror<V1ConfigMap>(logger, kubernetesClient)
{
private readonly IServiceProvider _serviceProvider = serviceProvider;

protected override async Task<V1ConfigMap[]> OnResourceWithNameList(string itemRefName)
{
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
return (await client.CoreV1.ListConfigMapForAllNamespacesAsync(fieldSelector: $"metadata.name={itemRefName}"))
return (await KubernetesClient.CoreV1.ListConfigMapForAllNamespacesAsync(fieldSelector: $"metadata.name={itemRefName}"))
.Items
.ToArray();
}

protected override async Task OnResourceApplyPatch(V1Patch patch, KubeRef refId)
{
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
await client.CoreV1.PatchNamespacedConfigMapAsync(patch, refId.Name, refId.Namespace);
}
protected override async Task OnResourceApplyPatch(V1Patch patch, KubeRef refId)
=> await KubernetesClient.CoreV1.PatchNamespacedConfigMapAsync(patch, refId.Name, refId.Namespace);

protected override Task OnResourceConfigurePatch(V1ConfigMap source, JsonPatchDocument<V1ConfigMap> patchDoc)
{
Expand All @@ -33,10 +27,7 @@ protected override Task OnResourceConfigurePatch(V1ConfigMap source, JsonPatchDo
}

protected override async Task OnResourceCreate(V1ConfigMap item, string ns)
{
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
await client.CoreV1.CreateNamespacedConfigMapAsync(item, ns);
}
=> await KubernetesClient.CoreV1.CreateNamespacedConfigMapAsync(item, ns);

protected override Task<V1ConfigMap> OnResourceClone(V1ConfigMap sourceResource)
{
Expand All @@ -50,14 +41,8 @@ protected override Task<V1ConfigMap> OnResourceClone(V1ConfigMap sourceResource)
}

protected override async Task OnResourceDelete(KubeRef resourceId)
{
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
await client.CoreV1.DeleteNamespacedConfigMapAsync(resourceId.Name, resourceId.Namespace);
}
=> await KubernetesClient.CoreV1.DeleteNamespacedConfigMapAsync(resourceId.Name, resourceId.Namespace);

protected override async Task<V1ConfigMap> OnResourceGet(KubeRef refId)
{
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
return await client.CoreV1.ReadNamespacedConfigMapAsync(refId.Name, refId.Namespace);
}
=> await KubernetesClient.CoreV1.ReadNamespacedConfigMapAsync(refId.Name, refId.Namespace);
}
9 changes: 4 additions & 5 deletions src/ES.Kubernetes.Reflector/Core/ConfigMapWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ namespace ES.Kubernetes.Reflector.Core;
public class ConfigMapWatcher(
ILogger<ConfigMapWatcher> logger,
IMediator mediator,
IServiceProvider serviceProvider,
IKubernetes kubernetesClient,
IOptionsMonitor<ReflectorOptions> options)
: WatcherBackgroundService<V1ConfigMap, V1ConfigMapList>(logger, mediator, serviceProvider, options)
: WatcherBackgroundService<V1ConfigMap, V1ConfigMapList>(logger, mediator, options)
{
protected override Task<HttpOperationResponse<V1ConfigMapList>> OnGetWatcher(IKubernetes client,
CancellationToken cancellationToken)
protected override Task<HttpOperationResponse<V1ConfigMapList>> OnGetWatcher(CancellationToken cancellationToken)
{
return client.CoreV1.ListConfigMapForAllNamespacesWithHttpMessagesAsync(watch: true,
return kubernetesClient.CoreV1.ListConfigMapForAllNamespacesWithHttpMessagesAsync(watch: true,
timeoutSeconds: WatcherTimeout,
cancellationToken: cancellationToken);
}
Expand Down
6 changes: 3 additions & 3 deletions src/ES.Kubernetes.Reflector/Core/Mirroring/ResourceMirror.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

namespace ES.Kubernetes.Reflector.Core.Mirroring;

public abstract class ResourceMirror<TResource>(ILogger logger, IServiceProvider serviceProvider) :
public abstract class ResourceMirror<TResource>(ILogger logger, IKubernetes kubernetesClient) :
INotificationHandler<WatcherEvent>,
INotificationHandler<WatcherClosed>
where TResource : class, IKubernetesObject<V1ObjectMeta>
Expand All @@ -28,6 +28,7 @@ public abstract class ResourceMirror<TResource>(ILogger logger, IServiceProvider
private readonly ConcurrentDictionary<KubeRef, bool> _notFoundCache = new();
private readonly ConcurrentDictionary<KubeRef, ReflectorProperties> _propertiesCache = new();
protected readonly ILogger Logger = logger;
protected readonly IKubernetes KubernetesClient = kubernetesClient;


/// <summary>
Expand Down Expand Up @@ -324,8 +325,7 @@ private async Task AutoReflectionForSource(KubeRef resourceRef, TResource? resou
var autoReflectionList = _autoReflectionCache.GetOrAdd(resourceRef, _ => new List<KubeRef>());

var matches = await OnResourceWithNameList(resourceRef.Name);
using var client = serviceProvider.GetRequiredService<IKubernetes>();
var namespaces = (await client.CoreV1.ListNamespaceAsync(cancellationToken: cancellationToken)).Items;
var namespaces = (await KubernetesClient.CoreV1.ListNamespaceAsync(cancellationToken: cancellationToken)).Items;

foreach (var match in matches)
{
Expand Down
9 changes: 4 additions & 5 deletions src/ES.Kubernetes.Reflector/Core/NamespaceWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ namespace ES.Kubernetes.Reflector.Core;
public class NamespaceWatcher(
ILogger<NamespaceWatcher> logger,
IMediator mediator,
IServiceProvider serviceProvider,
IKubernetes kubernetesClient,
IOptionsMonitor<ReflectorOptions> options)
: WatcherBackgroundService<V1Namespace, V1NamespaceList>(logger, mediator, serviceProvider, options)
: WatcherBackgroundService<V1Namespace, V1NamespaceList>(logger, mediator, options)
{
protected override Task<HttpOperationResponse<V1NamespaceList>> OnGetWatcher(IKubernetes client,
CancellationToken cancellationToken)
protected override Task<HttpOperationResponse<V1NamespaceList>> OnGetWatcher(CancellationToken cancellationToken)
{
return client.CoreV1.ListNamespaceWithHttpMessagesAsync(watch: true, timeoutSeconds: WatcherTimeout,
return kubernetesClient.CoreV1.ListNamespaceWithHttpMessagesAsync(watch: true, timeoutSeconds: WatcherTimeout,
cancellationToken: cancellationToken);
}
}
29 changes: 7 additions & 22 deletions src/ES.Kubernetes.Reflector/Core/SecretMirror.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,18 @@

namespace ES.Kubernetes.Reflector.Core;

public class SecretMirror(ILogger<SecretMirror> logger, IServiceProvider serviceProvider)
: ResourceMirror<V1Secret>(logger, serviceProvider)
public class SecretMirror(ILogger<SecretMirror> logger, IKubernetes kubernetesClient)
: ResourceMirror<V1Secret>(logger, kubernetesClient)
{
private readonly IServiceProvider _serviceProvider = serviceProvider;

protected override async Task<V1Secret[]> OnResourceWithNameList(string itemRefName)
{
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
return (await client.CoreV1.ListSecretForAllNamespacesAsync(fieldSelector: $"metadata.name={itemRefName}"))
return (await KubernetesClient.CoreV1.ListSecretForAllNamespacesAsync(fieldSelector: $"metadata.name={itemRefName}"))
.Items
.ToArray();
}

protected override async Task OnResourceApplyPatch(V1Patch patch, KubeRef refId)
{
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
await client.CoreV1.PatchNamespacedSecretWithHttpMessagesAsync(patch, refId.Name, refId.Namespace);
}
=> await KubernetesClient.CoreV1.PatchNamespacedSecretWithHttpMessagesAsync(patch, refId.Name, refId.Namespace);

protected override Task OnResourceConfigurePatch(V1Secret source, JsonPatchDocument<V1Secret> patchDoc)
{
Expand All @@ -32,10 +26,7 @@ protected override Task OnResourceConfigurePatch(V1Secret source, JsonPatchDocum
}

protected override async Task OnResourceCreate(V1Secret item, string ns)
{
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
await client.CoreV1.CreateNamespacedSecretAsync(item, ns);
}
=> await KubernetesClient.CoreV1.CreateNamespacedSecretAsync(item, ns);

protected override Task<V1Secret> OnResourceClone(V1Secret sourceResource)
{
Expand All @@ -49,16 +40,10 @@ protected override Task<V1Secret> OnResourceClone(V1Secret sourceResource)
}

protected override async Task OnResourceDelete(KubeRef resourceId)
{
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
await client.CoreV1.DeleteNamespacedSecretAsync(resourceId.Name, resourceId.Namespace);
}
=> await KubernetesClient.CoreV1.DeleteNamespacedSecretAsync(resourceId.Name, resourceId.Namespace);

protected override async Task<V1Secret> OnResourceGet(KubeRef refId)
{
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
return await client.CoreV1.ReadNamespacedSecretAsync(refId.Name, refId.Namespace);
}
=> await KubernetesClient.CoreV1.ReadNamespacedSecretAsync(refId.Name, refId.Namespace);

protected override Task<bool> OnResourceIgnoreCheck(V1Secret item)
{
Expand Down
9 changes: 4 additions & 5 deletions src/ES.Kubernetes.Reflector/Core/SecretWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ namespace ES.Kubernetes.Reflector.Core;
public class SecretWatcher(
ILogger<SecretWatcher> logger,
IMediator mediator,
IServiceProvider serviceProvider,
IKubernetes kubernetesClient,
IOptionsMonitor<ReflectorOptions> options)
: WatcherBackgroundService<V1Secret, V1SecretList>(logger, mediator, serviceProvider, options)
: WatcherBackgroundService<V1Secret, V1SecretList>(logger, mediator, options)
{
protected override Task<HttpOperationResponse<V1SecretList>> OnGetWatcher(IKubernetes client,
CancellationToken cancellationToken)
protected override Task<HttpOperationResponse<V1SecretList>> OnGetWatcher(CancellationToken cancellationToken)
{
return client.CoreV1.ListSecretForAllNamespacesWithHttpMessagesAsync(watch: true,
return kubernetesClient.CoreV1.ListSecretForAllNamespacesWithHttpMessagesAsync(watch: true,
timeoutSeconds: WatcherTimeout,
cancellationToken: cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ namespace ES.Kubernetes.Reflector.Core.Watchers;
public abstract class WatcherBackgroundService<TResource, TResourceList>(
ILogger logger,
IMediator mediator,
IServiceProvider serviceProvider,
IOptionsMonitor<ReflectorOptions> options)
: BackgroundService
where TResource : IKubernetesObject<V1ObjectMeta>
Expand All @@ -27,8 +26,6 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
var sessionStopwatch = new Stopwatch();
while (!stoppingToken.IsCancellationRequested)
{
await using var scope = serviceProvider.CreateAsyncScope();

var sessionFaulted = false;
sessionStopwatch.Restart();

Expand All @@ -40,9 +37,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

using var absoluteTimeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(WatcherTimeout + 3));
using var cancellationCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, absoluteTimeoutCts.Token);
using var client = scope.ServiceProvider.GetRequiredService<IKubernetes>();

using var watcher = OnGetWatcher(client, stoppingToken);
using var watcher = OnGetWatcher(stoppingToken);
var watchList = watcher.WatchAsync<TResource, TResourceList>(cancellationToken: cancellationCts.Token);

await foreach (var (type, item) in watchList)
Expand Down Expand Up @@ -79,6 +75,5 @@ await Mediator.Publish(new WatcherClosed
}
}

protected abstract Task<HttpOperationResponse<TResourceList>> OnGetWatcher(IKubernetes client,
CancellationToken cancellationToken);
protected abstract Task<HttpOperationResponse<TResourceList>> OnGetWatcher(CancellationToken cancellationToken);
}
8 changes: 2 additions & 6 deletions src/ES.Kubernetes.Reflector/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

builder.Services.Configure<ReflectorOptions>(builder.Configuration.GetSection(nameof(ES.Kubernetes.Reflector)));

builder.Services.AddTransient(s =>
builder.Services.AddSingleton(s =>
{
var reflectorOptions = s.GetRequiredService<IOptions<ReflectorOptions>>();

Expand All @@ -35,17 +35,13 @@
});


builder.Services.AddTransient<IKubernetes>(s =>
builder.Services.AddSingleton<IKubernetes>(s =>
new Kubernetes(s.GetRequiredService<KubernetesClientConfiguration>()));

builder.Services.AddHostedService<NamespaceWatcher>();
builder.Services.AddHostedService<SecretWatcher>();
builder.Services.AddHostedService<ConfigMapWatcher>();

builder.Services.AddSingleton<SecretMirror>();
builder.Services.AddSingleton<ConfigMapMirror>();


var app = builder.Build();
app.Ignite();
await app.RunAsync();
Expand Down