Skip to content

Commit 44785ed

Browse files
authored
Merge pull request #526 from serverlessworkflow/fix-workflow-operator-routing
Fixed the `WorkflowInstanceController` not to claim instances of workflows explicitly assigned to other operators
2 parents a1bc5c4 + dc67b91 commit 44785ed

File tree

5 files changed

+60
-6
lines changed

5 files changed

+60
-6
lines changed

src/operator/Synapse.Operator/Program.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262

6363
services.AddScoped<WorkflowController>();
6464
services.AddScoped<IResourceController<Workflow>>(provider => provider.GetRequiredService<WorkflowController>());
65+
services.AddScoped<IWorkflowController>(provider => provider.GetRequiredService<WorkflowController>());
6566

6667
services.AddScoped<WorkflowInstanceController>();
6768
services.AddScoped<IResourceController<WorkflowInstance>>(provider => provider.GetRequiredService<WorkflowInstanceController>());

src/operator/Synapse.Operator/Services/Interfaces/IOperatorController.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
namespace Synapse.Operator.Services;
1515

1616
/// <summary>
17-
/// Defines the fundamentals of the service used to access the current Synapse Operator
17+
/// Defines the fundamentals of a service used to access the current Synapse Operator
1818
/// </summary>
1919
public interface IOperatorController
2020
: IHostedService
@@ -25,4 +25,4 @@ public interface IOperatorController
2525
/// </summary>
2626
IResourceMonitor<Resources.Operator> Operator { get; }
2727

28-
}
28+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright © 2024-Present The Synapse Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"),
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
namespace Synapse.Operator.Services;
15+
16+
/// <summary>
17+
/// Defines the fundamentals of a service used to access all monitored workflows
18+
/// </summary>
19+
public interface IWorkflowController
20+
{
21+
22+
/// <summary>
23+
/// Gets a dictionary containing all monitored workflows
24+
/// </summary>
25+
IReadOnlyDictionary<string, Workflow> Workflows { get; }
26+
27+
}

src/operator/Synapse.Operator/Services/WorkflowController.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ namespace Synapse.Operator.Services;
2525
/// <param name="operatorOptions">The current <see cref="Configuration.OperatorOptions"/></param>
2626
/// <param name="operatorAccessor">The service used to access the current <see cref="Resources.Operator"/></param>
2727
public class WorkflowController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<Workflow>> controllerOptions, IResourceRepository resources, IOptions<OperatorOptions> operatorOptions, IOperatorController operatorAccessor)
28-
: ResourceController<Workflow>(loggerFactory, controllerOptions, resources)
28+
: ResourceController<Workflow>(loggerFactory, controllerOptions, resources), IWorkflowController
2929
{
3030

3131
/// <summary>
@@ -48,6 +48,9 @@ public class WorkflowController(IServiceProvider serviceProvider, ILoggerFactory
4848
/// </summary>
4949
protected ConcurrentDictionary<string, WorkflowScheduler> Schedulers { get; } = [];
5050

51+
/// <inheritdoc/>
52+
public IReadOnlyDictionary<string, Workflow> Workflows => this.Resources.AsReadOnly();
53+
5154
/// <inheritdoc/>
5255
public override async Task StartAsync(CancellationToken cancellationToken)
5356
{

src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
// See the License for the specific language governing permissions and
1212
// limitations under the License.
1313

14+
using Neuroglia.Data.Infrastructure.ResourceOriented;
1415
using Neuroglia.Data.Infrastructure.Services;
1516

1617
namespace Synapse.Operator.Services;
@@ -23,8 +24,9 @@ namespace Synapse.Operator.Services;
2324
/// <param name="controllerOptions">The service used to access the current <see cref="IOptions{TOptions}"/></param>
2425
/// <param name="repository">The service used to manage <see cref="IResource"/>s</param>
2526
/// <param name="operatorController">The service used to access the current <see cref="Resources.Operator"/></param>
27+
/// <param name="workflowController">The service used to access all monitored <see cref="Workflow"/>s</param>
2628
/// <param name="documents">The <see cref="IRepository"/> used to manage <see cref="Document"/>s</param>
27-
public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<WorkflowInstance>> controllerOptions, IResourceRepository repository, IOperatorController operatorController, IRepository<Document, string> documents)
29+
public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<WorkflowInstance>> controllerOptions, IResourceRepository repository, IOperatorController operatorController, IWorkflowController workflowController, IRepository<Document, string> documents)
2830
: ResourceController<WorkflowInstance>(loggerFactory, controllerOptions, repository)
2931
{
3032

@@ -38,6 +40,11 @@ public class WorkflowInstanceController(IServiceProvider serviceProvider, ILogge
3840
/// </summary>
3941
protected IResourceMonitor<Resources.Operator> Operator => operatorController.Operator;
4042

43+
/// <summary>
44+
/// Gets a dictionary containing all monitored <see cref="Workflow"/>s
45+
/// </summary>
46+
protected IReadOnlyDictionary<string, Workflow> Workflows => workflowController.Workflows;
47+
4148
/// <summary>
4249
/// Gets the <see cref="IRepository"/> used to manage <see cref="Document"/>s
4350
/// </summary>
@@ -87,7 +94,8 @@ protected virtual async Task<WorkflowInstanceHandler> CreateWorkflowInstanceHand
8794
protected virtual async Task<bool> TryClaimAsync(WorkflowInstance resource, CancellationToken cancellationToken)
8895
{
8996
ArgumentNullException.ThrowIfNull(resource);
90-
if (resource.Metadata.Labels != null && resource.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out var operatorQualifiedName)) return operatorQualifiedName == this.Operator.Resource.GetQualifiedName();
97+
var isClaimable = this.IsWorkflowInstanceClaimable(resource);
98+
if (isClaimable.HasValue) return isClaimable.Value;
9199
try
92100
{
93101
var originalResource = resource.Clone();
@@ -112,7 +120,8 @@ protected virtual async Task<bool> TryClaimAsync(WorkflowInstance resource, Canc
112120
protected virtual async Task<bool> TryReleaseAsync(WorkflowInstance resource, CancellationToken cancellationToken)
113121
{
114122
ArgumentNullException.ThrowIfNull(resource);
115-
if (resource.Metadata.Labels != null && resource.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out var operatorQualifiedName)) return operatorQualifiedName == this.Operator.Resource.GetQualifiedName();
123+
var isClaimable = this.IsWorkflowInstanceClaimable(resource);
124+
if (isClaimable.HasValue) return isClaimable.Value;
116125
try
117126
{
118127
var originalResource = resource.Clone();
@@ -205,6 +214,20 @@ protected override async Task OnResourceDeletedAsync(WorkflowInstance workflowIn
205214
/// <returns>A new awaitable <see cref="Task"/></returns>
206215
protected virtual Task OnResourceSelectorChangedAsync(IDictionary<string, string>? selector) => this.ReconcileAsync(this.CancellationTokenSource.Token);
207216

217+
/// <summary>
218+
/// Determines whether or not the specified <see cref="WorkflowInstance"/> can be claimed by the current <see cref="Resources.Operator"/>
219+
/// </summary>
220+
/// <param name="workflowInstance">The <see cref="WorkflowInstance"/> to check</param>
221+
/// <returns>A boolean indicating whether or not the specified <see cref="WorkflowInstance"/> can be claimed by the current <see cref="Resources.Operator"/></returns>
222+
protected virtual bool? IsWorkflowInstanceClaimable(WorkflowInstance workflowInstance)
223+
{
224+
ArgumentNullException.ThrowIfNull(workflowInstance);
225+
if (workflowInstance.Metadata.Labels != null && workflowInstance.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out var operatorQualifiedName)) return operatorQualifiedName == this.Operator.Resource.GetQualifiedName();
226+
if (this.Workflows.TryGetValue(this.GetResourceCacheKey(workflowInstance.Spec.Definition.Name, workflowInstance.Spec.Definition.Namespace), out var workflow) && workflow != null
227+
&& workflow.Metadata.Labels != null && workflow.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out operatorQualifiedName)) return operatorQualifiedName == this.Operator.Resource.GetQualifiedName();
228+
return null;
229+
}
230+
208231
/// <inheritdoc/>
209232
protected override async ValueTask DisposeAsync(bool disposing)
210233
{

0 commit comments

Comments
 (0)