diff --git a/src/core/Synapse.Core/Resources/KubernetesRuntimeConfiguration.cs b/src/core/Synapse.Core/Resources/KubernetesRuntimeConfiguration.cs index 4f5514bd..e859889f 100644 --- a/src/core/Synapse.Core/Resources/KubernetesRuntimeConfiguration.cs +++ b/src/core/Synapse.Core/Resources/KubernetesRuntimeConfiguration.cs @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +using k8s; using k8s.Models; using Neuroglia.Serialization.Yaml; @@ -108,7 +109,7 @@ public static V1PodTemplateSpec LoadPodTemplate() var templateFilePath = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runtime.Kubernetes.Pod); if (string.IsNullOrWhiteSpace(templateFilePath) || !File.Exists(templateFilePath)) return DefaultPodTemplate; var yaml = File.ReadAllText(templateFilePath); - return YamlSerializer.Default.Deserialize(yaml)!; + return KubernetesYaml.Deserialize(yaml); } } diff --git a/src/operator/Synapse.Operator/Program.cs b/src/operator/Synapse.Operator/Program.cs index 1d0c971e..4681cf5a 100644 --- a/src/operator/Synapse.Operator/Program.cs +++ b/src/operator/Synapse.Operator/Program.cs @@ -28,11 +28,6 @@ .ConfigureServices((context, services) => { services.Configure(context.Configuration); - services.AddSingleton(provider => - { - var options = provider.GetRequiredService>().CurrentValue; - return Options.Create(options.Runner); - }); services.AddLogging(builder => { builder.AddSimpleConsole(options => @@ -67,9 +62,13 @@ services.AddScoped(); services.AddScoped>(provider => provider.GetRequiredService()); - services.AddHostedService(); + services.AddSingleton(); + services.AddHostedService(provider => provider.GetRequiredService()); + services.AddSingleton(); + services.AddHostedService(provider => provider.GetRequiredService()); + services.AddSingleton>(provider => provider.GetRequiredService()); }); using var app = builder.Build(); -await app.RunAsync(); +await app.RunAsync(); \ No newline at end of file diff --git a/src/operator/Synapse.Operator/Services/OperatorApplication.cs b/src/operator/Synapse.Operator/Services/OperatorApplication.cs index 86c2dab7..0418051c 100644 --- a/src/operator/Synapse.Operator/Services/OperatorApplication.cs +++ b/src/operator/Synapse.Operator/Services/OperatorApplication.cs @@ -20,16 +20,17 @@ internal class OperatorApplication(IServiceProvider serviceProvider) readonly IServiceScope _scope = serviceProvider.CreateScope(); IServiceProvider ServiceProvider => this._scope.ServiceProvider; - OperatorController _operatorController = null!; WorkflowController _workflowController = null!; WorkflowInstanceController _workflowInstanceController = null!; + internal OperatorController OperatorController { get; private set; } = null!; + public async Task StartAsync(CancellationToken cancellationToken) { - this._operatorController = this.ServiceProvider.GetRequiredService(); + this.OperatorController = this.ServiceProvider.GetRequiredService(); this._workflowController = this.ServiceProvider.GetRequiredService(); this._workflowInstanceController = this.ServiceProvider.GetRequiredService(); - await this._operatorController.StartAsync(cancellationToken).ConfigureAwait(false); + await this.OperatorController.StartAsync(cancellationToken).ConfigureAwait(false); await Task.WhenAll( [ this._workflowController.StartAsync(cancellationToken), @@ -41,7 +42,7 @@ public async Task StopAsync(CancellationToken cancellationToken) { await Task.WhenAll( [ - this._operatorController.StopAsync(cancellationToken), + this.OperatorController.StopAsync(cancellationToken), this._workflowController.StopAsync(cancellationToken), this._workflowInstanceController.StopAsync(cancellationToken) ]).ConfigureAwait(false); @@ -49,4 +50,4 @@ await Task.WhenAll( void IDisposable.Dispose() => this._scope.Dispose(); -} \ No newline at end of file +} diff --git a/src/operator/Synapse.Operator/Services/OperatorController.cs b/src/operator/Synapse.Operator/Services/OperatorController.cs index 0392e416..2c6e493d 100644 --- a/src/operator/Synapse.Operator/Services/OperatorController.cs +++ b/src/operator/Synapse.Operator/Services/OperatorController.cs @@ -18,8 +18,7 @@ namespace Synapse.Operator.Services; /// /// The service used to manage s /// The current -/// The current -public class OperatorController(IResourceRepository repository, IOptionsMonitor options, IOptionsMonitor runnerOptions) +public class OperatorController(IResourceRepository repository, IOptionsMonitor options) : IOperatorController { @@ -33,11 +32,6 @@ public class OperatorController(IResourceRepository repository, IOptionsMonitor< /// protected OperatorOptions Options => options.CurrentValue; - /// - /// Gets the current - /// - protected RunnerConfiguration RunnerOptions => runnerOptions.CurrentValue; - /// public IResourceMonitor Operator { get; protected set; } = null!; @@ -90,9 +84,6 @@ protected virtual async Task SetOperatorStatusPhaseAsync(string phase, Cancellat protected virtual void OnOperatorSpecChanged() { this.Options.Runner = this.Operator.Resource.Spec.Runner; - this.RunnerOptions.Api = this.Options.Runner.Api; - this.RunnerOptions.Runtime = this.Options.Runner.Runtime; - this.RunnerOptions.Certificates = this.Options.Runner.Certificates; } /// diff --git a/src/operator/Synapse.Operator/Services/RunnerConfigurationMonitor.cs b/src/operator/Synapse.Operator/Services/RunnerConfigurationMonitor.cs new file mode 100644 index 00000000..566aa552 --- /dev/null +++ b/src/operator/Synapse.Operator/Services/RunnerConfigurationMonitor.cs @@ -0,0 +1,85 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Synapse.Operator.Services; + +/// +/// Represents the service used to monitor the operator's . +/// +/// The service used to monitor the current operator resource +internal sealed class RunnerConfigurationMonitor(OperatorApplication application) + : IHostedService, IOptionsMonitor +{ + + ConcurrentHashSet _changeTokenSources = []; + IDisposable? _subscription; + IResourceMonitor _operator = null!; + + /// + public RunnerConfiguration CurrentValue => _operator.Resource.Spec.Runner; + + /// + public Task StartAsync(CancellationToken cancellationToken) + { + _operator = application.OperatorController.Operator; + _operator.Where(e => e.Type == ResourceWatchEventType.Updated).Select(e => e.Resource.Spec.Runner).Distinct().Subscribe(OnConfigurationChanged); + return Task.CompletedTask; + } + + /// + public Task StopAsync(CancellationToken cancellationToken) + { + _subscription?.Dispose(); + return Task.CompletedTask; + } + + /// + public RunnerConfiguration Get(string? name) => CurrentValue; + + void OnConfigurationChanged(RunnerConfiguration configuration) + { + foreach (var changeTokenSource in _changeTokenSources) changeTokenSource.Invoke(configuration, null); + } + + IDisposable? IOptionsMonitor.OnChange(Action listener) + { + var changeTokenSource = new RunnerConfigurationChangeTokenSource(listener); + changeTokenSource.OnDisposed += OnChangeTokenSourceDisposed; + _changeTokenSources.Add(changeTokenSource); + return changeTokenSource; + } + + void OnChangeTokenSourceDisposed(object? sender, EventArgs e) + { + if (sender is not RunnerConfigurationChangeTokenSource changeTokenSource) return; + changeTokenSource.OnDisposed -= OnChangeTokenSourceDisposed; + _changeTokenSources.Remove(changeTokenSource); + } + + class RunnerConfigurationChangeTokenSource(Action listener) + : IDisposable + { + + public event EventHandler? OnDisposed; + + public void Invoke(RunnerConfiguration configuration, string? name) => listener(configuration, name); + + public void Dispose() + { + OnDisposed?.Invoke(this, EventArgs.Empty); + GC.SuppressFinalize(this); + } + + } + +} \ No newline at end of file diff --git a/src/runtime/Synapse.Runtime.Docker/Services/DockerRuntime.cs b/src/runtime/Synapse.Runtime.Docker/Services/DockerRuntime.cs index c20fbb57..bb94fb04 100644 --- a/src/runtime/Synapse.Runtime.Docker/Services/DockerRuntime.cs +++ b/src/runtime/Synapse.Runtime.Docker/Services/DockerRuntime.cs @@ -25,8 +25,8 @@ namespace Synapse.Runtime.Docker.Services; /// The current /// The service used to create s /// The current -/// The service used to access the current -public class DockerRuntime(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IHostEnvironment environment, IOptions runner) +/// The service used to access the current +public class DockerRuntime(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IHostEnvironment environment, IOptionsMonitor runnerConfigurationMonitor) : WorkflowRuntimeBase(loggerFactory) { @@ -41,9 +41,14 @@ public class DockerRuntime(IServiceProvider serviceProvider, ILoggerFactory logg protected IHostEnvironment Environment { get; } = environment; /// - /// Gets the current + /// Gets the service used to access the current /// - protected RunnerConfiguration Runner => runner.Value; + protected IOptionsMonitor RunnerConfigurationMonitor { get; } = runnerConfigurationMonitor; + + /// + /// Gets the current + /// + protected RunnerConfiguration RunnerConfiguration => RunnerConfigurationMonitor.CurrentValue; /// /// Gets the service used to interact with the Docker API @@ -62,9 +67,9 @@ public class DockerRuntime(IServiceProvider serviceProvider, ILoggerFactory logg /// A new awaitable protected virtual Task InitializeAsync(CancellationToken cancellationToken = default) { - if (this.Runner.Runtime.Docker == null) throw new NullReferenceException($"Failed to initialize the Docker Runtime because the operator is not configured to use Docker as a runtime"); - var dockerConfiguration = new DockerClientConfiguration(this.Runner.Runtime.Docker.Api.Endpoint); - this.Docker = dockerConfiguration.CreateClient(string.IsNullOrWhiteSpace(this.Runner.Runtime.Docker.Api.Version) ? null : System.Version.Parse(this.Runner.Runtime.Docker.Api.Version!)); + if (this.RunnerConfiguration.Runtime.Docker == null) throw new NullReferenceException($"Failed to initialize the Docker Runtime because the operator is not configured to use Docker as a runtime"); + var dockerConfiguration = new DockerClientConfiguration(this.RunnerConfiguration.Runtime.Docker.Api.Endpoint); + this.Docker = dockerConfiguration.CreateClient(string.IsNullOrWhiteSpace(this.RunnerConfiguration.Runtime.Docker.Api.Version) ? null : System.Version.Parse(this.RunnerConfiguration.Runtime.Docker.Api.Version!)); return Task.CompletedTask; } @@ -78,7 +83,7 @@ public override async Task CreateProcessAsync(Workflow workflo { this.Logger.LogDebug("Creating a new Docker container for workflow instance '{workflowInstance}'...", workflowInstance.GetQualifiedName()); if (this.Docker == null) await this.InitializeAsync(cancellationToken).ConfigureAwait(false); - var container = this.Runner.Runtime.Docker!.ContainerTemplate.Clone()!; + var container = this.RunnerConfiguration.Runtime.Docker!.ContainerTemplate.Clone()!; try { await this.Docker!.Images.InspectImageAsync(container.Image, cancellationToken).ConfigureAwait(false); @@ -93,24 +98,24 @@ public override async Task CreateProcessAsync(Workflow workflo } container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Namespace, workflowInstance.GetNamespace()!); container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Name, $"{workflowInstance.GetName()}-{Guid.NewGuid().ToString("N")[..12].ToLowerInvariant()}"); - container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Api.Uri, this.Runner.Api.Uri.OriginalString); - container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.ContainerPlatform, this.Runner.ContainerPlatform); - container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.LifecycleEvents, (this.Runner.PublishLifecycleEvents ?? true).ToString()); - container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Secrets.Directory, this.Runner.Runtime.Docker.Secrets.MountPath); + container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Api.Uri, this.RunnerConfiguration.Api.Uri.OriginalString); + container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.ContainerPlatform, this.RunnerConfiguration.ContainerPlatform); + container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.LifecycleEvents, (this.RunnerConfiguration.PublishLifecycleEvents ?? true).ToString()); + container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Secrets.Directory, this.RunnerConfiguration.Runtime.Docker.Secrets.MountPath); container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.ServiceAccount.Name, serviceAccount.GetQualifiedName()); container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.ServiceAccount.Key, serviceAccount.Spec.Key); container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Workflow.Instance, workflowInstance.GetQualifiedName()); container.SetEnvironmentVariable("DOCKER_HOST", "unix:///var/run/docker.sock"); container.User = "root"; - if (this.Runner.Certificates?.Validate == false) container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.SkipCertificateValidation, "true"); - var hostConfig = this.Runner.Runtime.Docker.HostConfig?.Clone()! ?? new(); - if (!Directory.Exists(this.Runner.Runtime.Docker.Secrets.Directory)) Directory.CreateDirectory(this.Runner.Runtime.Docker.Secrets.Directory); + if (this.RunnerConfiguration.Certificates?.Validate == false) container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.SkipCertificateValidation, "true"); + var hostConfig = this.RunnerConfiguration.Runtime.Docker.HostConfig?.Clone()! ?? new(); + if (!Directory.Exists(this.RunnerConfiguration.Runtime.Docker.Secrets.Directory)) Directory.CreateDirectory(this.RunnerConfiguration.Runtime.Docker.Secrets.Directory); hostConfig.Mounts ??= []; hostConfig.Mounts.Insert(0, new() { Type = "bind", - Source = this.Runner.Runtime.Docker.Secrets.Directory, - Target = this.Runner.Runtime.Docker.Secrets.MountPath + Source = this.RunnerConfiguration.Runtime.Docker.Secrets.Directory, + Target = this.RunnerConfiguration.Runtime.Docker.Secrets.MountPath }); hostConfig.Mounts.Insert(1, new() { @@ -128,7 +133,7 @@ public override async Task CreateProcessAsync(Workflow workflo HostConfig = hostConfig }; var result = await this.Docker!.Containers.CreateContainerAsync(parameters, cancellationToken).ConfigureAwait(false); - if (this.Environment.RunsInDocker()) await this.Docker.Networks.ConnectNetworkAsync(this.Runner.Runtime.Docker.Network, new NetworkConnectParameters() { Container = result.ID }, cancellationToken); + if (this.Environment.RunsInDocker()) await this.Docker.Networks.ConnectNetworkAsync(this.RunnerConfiguration.Runtime.Docker.Network, new NetworkConnectParameters() { Container = result.ID }, cancellationToken); if (result.Warnings.Count > 0) this.Logger.LogWarning("Warnings have been raised during container creation: {warnings}", string.Join(System.Environment.NewLine, result.Warnings)); var process = ActivatorUtilities.CreateInstance(this.ServiceProvider, this.Docker!, result.ID); this.Processes.TryAdd(process.Id, process); diff --git a/src/runtime/Synapse.Runtime.Kubernetes/Services/KubernetesRuntime.cs b/src/runtime/Synapse.Runtime.Kubernetes/Services/KubernetesRuntime.cs index 95990cbb..c821e630 100644 --- a/src/runtime/Synapse.Runtime.Kubernetes/Services/KubernetesRuntime.cs +++ b/src/runtime/Synapse.Runtime.Kubernetes/Services/KubernetesRuntime.cs @@ -30,8 +30,8 @@ namespace Synapse.Runtime.Kubernetes.Services; /// The current /// The service used to create s /// The current -/// The service used to access the current -public class KubernetesRuntime(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IHostEnvironment environment, IOptions runner) +/// The service used to access the current +public class KubernetesRuntime(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IHostEnvironment environment, IOptionsMonitor runnerConfigurationMonitor) : WorkflowRuntimeBase(loggerFactory) { @@ -46,9 +46,14 @@ public class KubernetesRuntime(IServiceProvider serviceProvider, ILoggerFactory protected IHostEnvironment Environment { get; } = environment; /// - /// Gets the current + /// Gets the service used to access the current /// - protected RunnerConfiguration Runner => runner.Value; + protected IOptionsMonitor RunnerConfigurationMonitor { get; } = runnerConfigurationMonitor; + + /// + /// Gets the current + /// + protected RunnerConfiguration RunnerConfiguration => RunnerConfigurationMonitor.CurrentValue; /// /// Gets the service used to interact with the Kubernetes API @@ -67,12 +72,12 @@ public class KubernetesRuntime(IServiceProvider serviceProvider, ILoggerFactory /// A new awaitable protected virtual async Task InitializeAsync(CancellationToken cancellationToken = default) { - if (this.Runner.Runtime.Kubernetes == null) throw new NullReferenceException($"Failed to initialize the Kubernetes Runtime because the operator is not configured to use Kubernetes as a runtime"); + if (this.RunnerConfiguration.Runtime.Kubernetes == null) throw new NullReferenceException($"Failed to initialize the Kubernetes Runtime because the operator is not configured to use Kubernetes as a runtime"); var configuration = Environment.RunsInKubernetes() ? KubernetesClientConfiguration.InClusterConfig() - : (string.IsNullOrWhiteSpace(this.Runner.Runtime.Kubernetes.Kubeconfig) + : (string.IsNullOrWhiteSpace(this.RunnerConfiguration.Runtime.Kubernetes.Kubeconfig) ? KubernetesClientConfiguration.BuildDefaultConfig() - : await KubernetesClientConfiguration.BuildConfigFromConfigFileAsync(new FileInfo(this.Runner.Runtime.Kubernetes.Kubeconfig)).ConfigureAwait(false)); + : await KubernetesClientConfiguration.BuildConfigFromConfigFileAsync(new FileInfo(this.RunnerConfiguration.Runtime.Kubernetes.Kubeconfig)).ConfigureAwait(false)); this.Kubernetes = new k8s.Kubernetes(configuration); } @@ -87,16 +92,16 @@ public override async Task CreateProcessAsync(Workflow workflo this.Logger.LogDebug("Creating a new Kubernetes job for workflow instance '{workflowInstance}'...", workflowInstance.GetQualifiedName()); if (this.Kubernetes == null) await this.InitializeAsync(cancellationToken).ConfigureAwait(false); var workflowDefinition = workflow.Spec.Versions.Get(workflowInstance.Spec.Definition.Version) ?? throw new NullReferenceException($"Failed to find version '{workflowInstance.Spec.Definition.Version}' of workflow '{workflow.GetQualifiedName()}'"); - var pod = this.Runner.Runtime.Kubernetes!.PodTemplate.Clone()!; + var pod = this.RunnerConfiguration.Runtime.Kubernetes!.PodTemplate.Clone()!; pod.Metadata ??= new(); pod.Metadata.Name = $"{workflowInstance.GetQualifiedName()}-{Guid.NewGuid().ToString("N")[..12].ToLowerInvariant()}"; - if (!string.IsNullOrWhiteSpace(this.Runner.Runtime.Kubernetes.Namespace)) pod.Metadata.NamespaceProperty = this.Runner.Runtime.Kubernetes.Namespace; + if (!string.IsNullOrWhiteSpace(this.RunnerConfiguration.Runtime.Kubernetes.Namespace)) pod.Metadata.NamespaceProperty = this.RunnerConfiguration.Runtime.Kubernetes.Namespace; if (pod.Spec == null || pod.Spec.Containers == null || !pod.Spec.Containers.Any()) throw new InvalidOperationException("The configured Kubernetes runtime pod template is not valid"); var volumeMounts = new List(); pod.Spec.Volumes ??= []; if (workflowDefinition.Use?.Secrets?.Count > 0) { - var secretsVolume = new V1Volume(this.Runner.Runtime.Kubernetes.Secrets.VolumeName) + var secretsVolume = new V1Volume(this.RunnerConfiguration.Runtime.Kubernetes.Secrets.VolumeName) { Projected = new() { @@ -104,7 +109,7 @@ public override async Task CreateProcessAsync(Workflow workflo } }; pod.Spec.Volumes.Add(secretsVolume); - var secretsVolumeMount = new V1VolumeMount(this.Runner.Runtime.Kubernetes.Secrets.MountPath, secretsVolume.Name, readOnlyProperty: true); + var secretsVolumeMount = new V1VolumeMount(this.RunnerConfiguration.Runtime.Kubernetes.Secrets.MountPath, secretsVolume.Name, readOnlyProperty: true); volumeMounts.Add(secretsVolumeMount); foreach (var secret in workflowDefinition.Use.Secrets) { @@ -123,17 +128,17 @@ public override async Task CreateProcessAsync(Workflow workflo container.Env ??= []; container.Env.Add(new(SynapseDefaults.EnvironmentVariables.Runner.Namespace, valueFrom: new(fieldRef: new("metadata.namespace")))); container.Env.Add(new(SynapseDefaults.EnvironmentVariables.Runner.Name, valueFrom: new(fieldRef: new("metadata.name")))); - container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Api.Uri, this.Runner.Api.Uri.OriginalString); - container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.ContainerPlatform, this.Runner.ContainerPlatform); - container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.LifecycleEvents, (this.Runner.PublishLifecycleEvents ?? true).ToString()); - container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Secrets.Directory, this.Runner.Runtime.Kubernetes.Secrets.MountPath); + container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Api.Uri, this.RunnerConfiguration.Api.Uri.OriginalString); + container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.ContainerPlatform, this.RunnerConfiguration.ContainerPlatform); + container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.LifecycleEvents, (this.RunnerConfiguration.PublishLifecycleEvents ?? true).ToString()); + container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Secrets.Directory, this.RunnerConfiguration.Runtime.Kubernetes.Secrets.MountPath); container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.ServiceAccount.Name, serviceAccount.GetQualifiedName()); container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.ServiceAccount.Key, serviceAccount.Spec.Key); container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Workflow.Instance, workflowInstance.GetQualifiedName()); - if (this.Runner.Certificates?.Validate == false) container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.SkipCertificateValidation, "true"); + if (this.RunnerConfiguration.Certificates?.Validate == false) container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.SkipCertificateValidation, "true"); container.VolumeMounts = volumeMounts; } - if (this.Runner.ContainerPlatform == ContainerPlatform.Kubernetes) pod.Spec.ServiceAccountName = this.Runner.Runtime.Kubernetes.ServiceAccount; + if (this.RunnerConfiguration.ContainerPlatform == ContainerPlatform.Kubernetes) pod.Spec.ServiceAccountName = this.RunnerConfiguration.Runtime.Kubernetes.ServiceAccount; var process = ActivatorUtilities.CreateInstance(this.ServiceProvider, this.Kubernetes!, pod); this.Processes.AddOrUpdate(process.Id, _ => process, (key, current) => { @@ -162,8 +167,8 @@ public override async Task DeleteProcessAsync(string processId, CancellationToke Logger.LogDebug("Deleting the Kubernetes process with id '{processId}'...", processId); var components = processId.Split('.', StringSplitOptions.RemoveEmptyEntries); if (components.Length < 2) throw new ArgumentException($"The specified value '{processId}' is not valid Kubernetes process id", nameof(processId)); - var name = components[0]; - var @namespace = components[1]; + var name = string.Join('.', components.Take(components.Length - 1)); + var @namespace = components[^1]; if (Processes.TryGetValue(processId, out var process)) { try diff --git a/src/runtime/Synapse.Runtime.Native/Services/NativeRuntime.cs b/src/runtime/Synapse.Runtime.Native/Services/NativeRuntime.cs index 7309c26b..92cd11dd 100644 --- a/src/runtime/Synapse.Runtime.Native/Services/NativeRuntime.cs +++ b/src/runtime/Synapse.Runtime.Native/Services/NativeRuntime.cs @@ -24,8 +24,8 @@ namespace Synapse.Runtime.Services; /// The service used to create s /// The current /// The service used to create s -/// The service used to access the current -public class NativeRuntime(ILoggerFactory loggerFactory, IHostEnvironment environment, IHttpClientFactory httpClientFactory, IOptionsMonitor options) +/// The service used to access the current +public class NativeRuntime(ILoggerFactory loggerFactory, IHostEnvironment environment, IHttpClientFactory httpClientFactory, IOptionsMonitor runnerConfigurationMonitor) : WorkflowRuntimeBase(loggerFactory) { @@ -40,9 +40,14 @@ public class NativeRuntime(ILoggerFactory loggerFactory, IHostEnvironment enviro protected HttpClient HttpClient { get; } = httpClientFactory.CreateClient(); /// - /// Gets the current + /// Gets the service used to access the current /// - protected RunnerConfiguration Options => options.CurrentValue; + protected IOptionsMonitor RunnerConfigurationMonitor { get; } = runnerConfigurationMonitor; + + /// + /// Gets the current + /// + protected RunnerConfiguration RunnerConfiguration => RunnerConfigurationMonitor.CurrentValue; /// /// Gets a containing all known worker processes @@ -55,16 +60,16 @@ public override Task CreateProcessAsync(Workflow workflow, Wor ArgumentNullException.ThrowIfNull(workflow); ArgumentNullException.ThrowIfNull(workflowInstance); ArgumentNullException.ThrowIfNull(serviceAccount); - if (this.Options.Runtime.Native == null) throw new NullReferenceException("The native runtime must be configured"); - var fileName = this.Options.Runtime.Native.Executable; + if (this.RunnerConfiguration.Runtime.Native == null) throw new NullReferenceException("The native runtime must be configured"); + var fileName = this.RunnerConfiguration.Runtime.Native.Executable; var args = string.Empty; if (this.Environment.IsDevelopment()) args += "--debug"; - var filePath = Path.Combine(this.Options.Runtime.Native.Directory, fileName); + var filePath = Path.Combine(this.RunnerConfiguration.Runtime.Native.Directory, fileName); var startInfo = new ProcessStartInfo() { FileName = filePath, Arguments = args, - WorkingDirectory = this.Options.Runtime.Native.Directory, + WorkingDirectory = this.RunnerConfiguration.Runtime.Native.Directory, RedirectStandardOutput = true, RedirectStandardError = true, CreateNoWindow = true, @@ -72,14 +77,14 @@ public override Task CreateProcessAsync(Workflow workflow, Wor }; startInfo.Environment[SynapseDefaults.EnvironmentVariables.Runner.Namespace] = workflowInstance.GetNamespace()!; startInfo.Environment[SynapseDefaults.EnvironmentVariables.Runner.Name] = $"{workflowInstance.GetName()}-{Guid.NewGuid().ToString("N")[..12].ToLowerInvariant()}"; - startInfo.Environment[SynapseDefaults.EnvironmentVariables.Api.Uri] = this.Options.Api.Uri.OriginalString; - startInfo.Environment[SynapseDefaults.EnvironmentVariables.Runner.ContainerPlatform] = this.Options.ContainerPlatform; - startInfo.Environment[SynapseDefaults.EnvironmentVariables.Runner.LifecycleEvents] = (this.Options.PublishLifecycleEvents ?? true).ToString(); - startInfo.Environment[SynapseDefaults.EnvironmentVariables.Secrets.Directory] = this.Options.Runtime.Native.SecretsDirectory; + startInfo.Environment[SynapseDefaults.EnvironmentVariables.Api.Uri] = this.RunnerConfiguration.Api.Uri.OriginalString; + startInfo.Environment[SynapseDefaults.EnvironmentVariables.Runner.ContainerPlatform] = this.RunnerConfiguration.ContainerPlatform; + startInfo.Environment[SynapseDefaults.EnvironmentVariables.Runner.LifecycleEvents] = (this.RunnerConfiguration.PublishLifecycleEvents ?? true).ToString(); + startInfo.Environment[SynapseDefaults.EnvironmentVariables.Secrets.Directory] = this.RunnerConfiguration.Runtime.Native.SecretsDirectory; startInfo.Environment[SynapseDefaults.EnvironmentVariables.ServiceAccount.Name] = serviceAccount.GetQualifiedName(); startInfo.Environment[SynapseDefaults.EnvironmentVariables.ServiceAccount.Key] = serviceAccount.Spec.Key; startInfo.Environment[SynapseDefaults.EnvironmentVariables.Workflow.Instance] = workflowInstance.GetQualifiedName(); - if (this.Options.Certificates?.Validate == false) startInfo.Environment.Add(SynapseDefaults.EnvironmentVariables.SkipCertificateValidation, "true"); + if (this.RunnerConfiguration.Certificates?.Validate == false) startInfo.Environment.Add(SynapseDefaults.EnvironmentVariables.SkipCertificateValidation, "true"); var process = new Process() { StartInfo = startInfo,