diff --git a/src/operator/Synapse.Operator/Program.cs b/src/operator/Synapse.Operator/Program.cs index a0f9dd2e0..1d0c971e6 100644 --- a/src/operator/Synapse.Operator/Program.cs +++ b/src/operator/Synapse.Operator/Program.cs @@ -62,6 +62,7 @@ services.AddScoped(); services.AddScoped>(provider => provider.GetRequiredService()); + services.AddScoped(provider => provider.GetRequiredService()); services.AddScoped(); services.AddScoped>(provider => provider.GetRequiredService()); diff --git a/src/operator/Synapse.Operator/Services/Interfaces/IOperatorController.cs b/src/operator/Synapse.Operator/Services/Interfaces/IOperatorController.cs index 46f55de2e..09cd2f472 100644 --- a/src/operator/Synapse.Operator/Services/Interfaces/IOperatorController.cs +++ b/src/operator/Synapse.Operator/Services/Interfaces/IOperatorController.cs @@ -14,7 +14,7 @@ namespace Synapse.Operator.Services; /// -/// Defines the fundamentals of the service used to access the current Synapse Operator +/// Defines the fundamentals of a service used to access the current Synapse Operator /// public interface IOperatorController : IHostedService @@ -25,4 +25,4 @@ public interface IOperatorController /// IResourceMonitor Operator { get; } -} \ No newline at end of file +} diff --git a/src/operator/Synapse.Operator/Services/Interfaces/IWorkflowController.cs b/src/operator/Synapse.Operator/Services/Interfaces/IWorkflowController.cs new file mode 100644 index 000000000..65c576065 --- /dev/null +++ b/src/operator/Synapse.Operator/Services/Interfaces/IWorkflowController.cs @@ -0,0 +1,27 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Synapse.Operator.Services; + +/// +/// Defines the fundamentals of a service used to access all monitored workflows +/// +public interface IWorkflowController +{ + + /// + /// Gets a dictionary containing all monitored workflows + /// + IReadOnlyDictionary Workflows { get; } + +} \ No newline at end of file diff --git a/src/operator/Synapse.Operator/Services/WorkflowController.cs b/src/operator/Synapse.Operator/Services/WorkflowController.cs index 940c73bcc..3d42c017f 100644 --- a/src/operator/Synapse.Operator/Services/WorkflowController.cs +++ b/src/operator/Synapse.Operator/Services/WorkflowController.cs @@ -25,7 +25,7 @@ namespace Synapse.Operator.Services; /// The current /// The service used to access the current public class WorkflowController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions> controllerOptions, IResourceRepository resources, IOptions operatorOptions, IOperatorController operatorAccessor) - : ResourceController(loggerFactory, controllerOptions, resources) + : ResourceController(loggerFactory, controllerOptions, resources), IWorkflowController { /// @@ -48,6 +48,9 @@ public class WorkflowController(IServiceProvider serviceProvider, ILoggerFactory /// protected ConcurrentDictionary Schedulers { get; } = []; + /// + public IReadOnlyDictionary Workflows => this.Resources.AsReadOnly(); + /// public override async Task StartAsync(CancellationToken cancellationToken) { diff --git a/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs b/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs index 667122e0a..37c0ee1aa 100644 --- a/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs +++ b/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +using Neuroglia.Data.Infrastructure.ResourceOriented; using Neuroglia.Data.Infrastructure.Services; namespace Synapse.Operator.Services; @@ -23,8 +24,9 @@ namespace Synapse.Operator.Services; /// The service used to access the current /// The service used to manage s /// The service used to access the current +/// The service used to access all monitored s /// The used to manage s -public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions> controllerOptions, IResourceRepository repository, IOperatorController operatorController, IRepository documents) +public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions> controllerOptions, IResourceRepository repository, IOperatorController operatorController, IWorkflowController workflowController, IRepository documents) : ResourceController(loggerFactory, controllerOptions, repository) { @@ -38,6 +40,11 @@ public class WorkflowInstanceController(IServiceProvider serviceProvider, ILogge /// protected IResourceMonitor Operator => operatorController.Operator; + /// + /// Gets a dictionary containing all monitored s + /// + protected IReadOnlyDictionary Workflows => workflowController.Workflows; + /// /// Gets the used to manage s /// @@ -87,7 +94,8 @@ protected virtual async Task CreateWorkflowInstanceHand protected virtual async Task TryClaimAsync(WorkflowInstance resource, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(resource); - if (resource.Metadata.Labels != null && resource.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out var operatorQualifiedName)) return operatorQualifiedName == this.Operator.Resource.GetQualifiedName(); + var isClaimable = this.IsWorkflowInstanceClaimable(resource); + if (isClaimable.HasValue) return isClaimable.Value; try { var originalResource = resource.Clone(); @@ -112,7 +120,8 @@ protected virtual async Task TryClaimAsync(WorkflowInstance resource, Canc protected virtual async Task TryReleaseAsync(WorkflowInstance resource, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(resource); - if (resource.Metadata.Labels != null && resource.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out var operatorQualifiedName)) return operatorQualifiedName == this.Operator.Resource.GetQualifiedName(); + var isClaimable = this.IsWorkflowInstanceClaimable(resource); + if (isClaimable.HasValue) return isClaimable.Value; try { var originalResource = resource.Clone(); @@ -205,6 +214,20 @@ protected override async Task OnResourceDeletedAsync(WorkflowInstance workflowIn /// A new awaitable protected virtual Task OnResourceSelectorChangedAsync(IDictionary? selector) => this.ReconcileAsync(this.CancellationTokenSource.Token); + /// + /// Determines whether or not the specified can be claimed by the current + /// + /// The to check + /// A boolean indicating whether or not the specified can be claimed by the current + protected virtual bool? IsWorkflowInstanceClaimable(WorkflowInstance workflowInstance) + { + ArgumentNullException.ThrowIfNull(workflowInstance); + if (workflowInstance.Metadata.Labels != null && workflowInstance.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out var operatorQualifiedName)) return operatorQualifiedName == this.Operator.Resource.GetQualifiedName(); + if (this.Workflows.TryGetValue(this.GetResourceCacheKey(workflowInstance.Spec.Definition.Name, workflowInstance.Spec.Definition.Namespace), out var workflow) && workflow != null + && workflow.Metadata.Labels != null && workflow.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out operatorQualifiedName)) return operatorQualifiedName == this.Operator.Resource.GetQualifiedName(); + return null; + } + /// protected override async ValueTask DisposeAsync(bool disposing) {