Skip to content

Commit 3db17a4

Browse files
authored
Merge pull request #432 from serverlessworkflow/feat-resource-validation
Added new `IResourceValidator` services, used to validate `Workflow` and `Correlation` resources
2 parents aaf7ba5 + e688cb7 commit 3db17a4

File tree

8 files changed

+159
-9
lines changed

8 files changed

+159
-9
lines changed

src/core/Synapse.Core.Infrastructure/Extensions/IServiceCollectionExtensions.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,15 @@ public static IServiceCollection AddSynapse(this IServiceCollection services, IC
6161
services.AddSingleton<ITextDocumentRepository<string>, RedisTextDocumentRepository<string>>();
6262
services.AddSingleton<ITextDocumentRepository>(provider => provider.GetRequiredService<ITextDocumentRepository<string>>());
6363

64+
services.AddSingleton<ISchemaHandlerProvider, SchemaHandlerProvider>();
65+
services.AddSingleton<ISchemaHandler, JsonSchemaHandler>();
66+
6467
services.AddScoped<IResourceRepository, ResourceRepository>();
6568
services.AddScoped<IAdmissionControl, AdmissionControl>();
6669
services.AddScoped<IVersionControl, VersionControl>();
6770
services.AddScoped<IResourceMutator, WorkflowInstanceMutator>();
71+
services.AddScoped<IResourceValidator, WorkflowInstanceValidator>();
72+
services.AddScoped<IResourceValidator, CorrelationValidator>();
6873
services.AddSingleton<IPatchHandler, JsonMergePatchHandler>();
6974
services.AddSingleton<IPatchHandler, JsonPatchHandler>();
7075
services.AddSingleton<IPatchHandler, JsonStrategicMergePatchHandler>();
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright © 2024-Present The Synapse Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"),
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
using Neuroglia.Data.Infrastructure.ResourceOriented.Services;
15+
using Neuroglia.Data.Infrastructure.ResourceOriented;
16+
using Synapse.Resources;
17+
using Neuroglia.Data.Infrastructure.ResourceOriented.Properties;
18+
using ServerlessWorkflow.Sdk;
19+
using Neuroglia;
20+
21+
namespace Synapse.Core.Infrastructure.Services;
22+
23+
/// <summary>
24+
/// Represents the service used to validate <see cref="Correlation"/>s
25+
/// </summary>
26+
/// <param name="resources">The service used to manage resources</param>
27+
/// <param name="schemaHandlerProvider">The service used to provide <see cref="ISchemaHandler"/> implementations</param>
28+
public class CorrelationValidator(IResourceRepository resources, ISchemaHandlerProvider schemaHandlerProvider)
29+
: IResourceValidator
30+
{
31+
32+
/// <summary>
33+
/// Gets the service used to manage resources
34+
/// </summary>
35+
protected IResourceRepository Resources { get; } = resources;
36+
37+
/// <summary>
38+
/// Gets the service used to provide <see cref="ISchemaHandler"/> implementations
39+
/// </summary>
40+
protected ISchemaHandlerProvider SchemaHandlerProvider { get; } = schemaHandlerProvider;
41+
42+
/// <inheritdoc/>
43+
public virtual bool AppliesTo(Operation operation, string group, string version, string plural, string? @namespace = null) => operation == Operation.Create && group == Correlation.ResourceDefinition.Group && version == Correlation.ResourceDefinition.Version && plural == Correlation.ResourceDefinition.Plural;
44+
45+
/// <inheritdoc/>
46+
public virtual async Task<AdmissionReviewResponse> ValidateAsync(AdmissionReviewRequest context, CancellationToken cancellationToken = default)
47+
{
48+
ArgumentNullException.ThrowIfNull(context);
49+
var correlation = context.UpdatedState.ConvertTo<Correlation>()!;
50+
switch (correlation.Spec.Outcome.Type)
51+
{
52+
case CorrelationOutcomeType.Start:
53+
if (correlation.Spec.Outcome.Start == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"The '/spec/outcome/start' property must be set when the correlation outcome type has been set to '{CorrelationOutcomeType.Start}'", new("/spec/outcome/start", UriKind.Relative)));
54+
var workflow = await this.Resources.GetAsync<Workflow>(correlation.Spec.Outcome.Start.Workflow.Name, correlation.Spec.Outcome.Start.Workflow.Namespace, cancellationToken).ConfigureAwait(false);
55+
if (workflow == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"Failed to find the specified workflow '{correlation.Spec.Outcome.Start.Workflow.Name}.{correlation.Spec.Outcome.Start.Workflow.Namespace}'", new("/spec/outcome/start/workflow", UriKind.Relative)));
56+
var workflowDefinition = workflow.Spec.Versions.Get(correlation.Spec.Outcome.Start.Workflow.Version);
57+
if (workflowDefinition == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"Failed to find version '{correlation.Spec.Outcome.Start.Workflow.Version}' of workflow '{correlation.Spec.Outcome.Start.Workflow.Name}.{correlation.Spec.Outcome.Start.Workflow.Namespace}'", new("/spec/outcome/start/workflow/version", UriKind.Relative)));
58+
if (workflowDefinition.Input?.Schema != null)
59+
{
60+
var schemaHandler = this.SchemaHandlerProvider.GetHandler(workflowDefinition.Input.Schema.Format) ?? throw new ArgumentNullException($"Failed to find an handler that supports the specified schema format '{workflowDefinition.Input.Schema.Format}'");
61+
var validationResult = await schemaHandler.ValidateAsync(correlation.Spec.Outcome.Start.Input ?? new Dictionary<string, object>(), workflowDefinition.Input.Schema, cancellationToken).ConfigureAwait(false);
62+
if (!validationResult.IsSuccess()) return new(context.Uid, false, null, new(ErrorType.Validation, ErrorTitle.Validation, ErrorStatus.Validation, $"Failed to validate the correlation outcome workflow input:\n{string.Join('\n', validationResult.Errors?.FirstOrDefault()?.Errors?.Select(e => $"- {e.Key}: {e.Value.First()}") ?? [])}", new("/spec/outcome/start/input", UriKind.Relative)));
63+
}
64+
break;
65+
case CorrelationOutcomeType.Correlate:
66+
if (correlation.Spec.Outcome.Correlate == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"The '/spec/outcome/correlate' property must be set when the correlation outcome type has been set to '{CorrelationOutcomeType.Correlate}'", new("/spec/outcome/correlate", UriKind.Relative)));
67+
var components = correlation.Spec.Outcome.Correlate.Instance.Split('.', StringSplitOptions.RemoveEmptyEntries);
68+
if (components.Length != 2) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"The specified value '{correlation.Spec.Outcome.Correlate.Instance}' is not a valid workflow instance qualified name ({{name}}.{{namespace}})", new("/spec/outcome/correlate/instance", UriKind.Relative)));
69+
var name = components[0];
70+
var @namespace = components[1];
71+
var workflowInstance = await this.Resources.GetAsync<WorkflowInstance>(name, @namespace, cancellationToken).ConfigureAwait(false);
72+
if (workflowInstance == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"Failed to find the specified workflow instance '{correlation.Spec.Outcome.Correlate.Instance}'", new("/spec/outcome/correlate/instance", UriKind.Relative)));
73+
var task = workflowInstance.Status?.Tasks?.FirstOrDefault(t => t.Reference.OriginalString == correlation.Spec.Outcome.Correlate.Task);
74+
if (task == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"Failed to find the task '{correlation.Spec.Outcome.Correlate.Task}' in workflow instance '{correlation.Spec.Outcome.Correlate.Instance}'", new("/spec/outcome/correlate/task", UriKind.Relative)));
75+
break;
76+
default:
77+
return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"The specified correlation outcome type '{correlation.Spec.Outcome.Type}' is not supported", new("/spec/outcome/type", UriKind.Relative)));
78+
}
79+
return new(context.Uid, true);
80+
}
81+
82+
}

src/core/Synapse.Core.Infrastructure/Services/WorkflowInstanceMutator.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,4 @@ public virtual Task<AdmissionReviewResponse> MutateAsync(AdmissionReviewRequest
4040
return Task.FromResult(new AdmissionReviewResponse(context.Uid, true, new(PatchType.JsonPatch, patch)));
4141
}
4242

43-
}
43+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright © 2024-Present The Synapse Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"),
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
using Neuroglia.Data.Infrastructure.ResourceOriented.Services;
15+
using Neuroglia.Data.Infrastructure.ResourceOriented;
16+
using Synapse.Resources;
17+
using Neuroglia.Data.Infrastructure.ResourceOriented.Properties;
18+
using ServerlessWorkflow.Sdk;
19+
using Neuroglia;
20+
21+
namespace Synapse.Core.Infrastructure.Services;
22+
23+
/// <summary>
24+
/// Represents the service used to validate <see cref="WorkflowInstance"/>s
25+
/// </summary>
26+
/// <param name="resources">The service used to manage resources</param>
27+
/// <param name="schemaHandlerProvider">The service used to provide <see cref="ISchemaHandler"/> implementations</param>
28+
public class WorkflowInstanceValidator(IResourceRepository resources, ISchemaHandlerProvider schemaHandlerProvider)
29+
: IResourceValidator
30+
{
31+
32+
/// <summary>
33+
/// Gets the service used to manage resources
34+
/// </summary>
35+
protected IResourceRepository Resources { get; } = resources;
36+
37+
/// <summary>
38+
/// Gets the service used to provide <see cref="ISchemaHandler"/> implementations
39+
/// </summary>
40+
protected ISchemaHandlerProvider SchemaHandlerProvider { get; } = schemaHandlerProvider;
41+
42+
/// <inheritdoc/>
43+
public virtual bool AppliesTo(Operation operation, string group, string version, string plural, string? @namespace = null) => operation == Operation.Create && group == WorkflowInstance.ResourceDefinition.Group && version == WorkflowInstance.ResourceDefinition.Version && plural == WorkflowInstance.ResourceDefinition.Plural;
44+
45+
/// <inheritdoc/>
46+
public virtual async Task<AdmissionReviewResponse> ValidateAsync(AdmissionReviewRequest context, CancellationToken cancellationToken = default)
47+
{
48+
ArgumentNullException.ThrowIfNull(context);
49+
var workflowInstance = context.UpdatedState.ConvertTo<WorkflowInstance>()!;
50+
var workflow = await this.Resources.GetAsync<Workflow>(workflowInstance.Spec.Definition.Name, workflowInstance.Spec.Definition.Namespace, cancellationToken).ConfigureAwait(false);
51+
if (workflow == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"Failed to find the specified workflow '{workflowInstance.Spec.Definition.Name}.{workflowInstance.Spec.Definition.Namespace}'", new("/spec/definition", UriKind.Relative)));
52+
var workflowDefinition = workflow.Spec.Versions.Get(workflowInstance.Spec.Definition.Version);
53+
if (workflowDefinition == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"Failed to find version '{workflowInstance.Spec.Definition.Version}' of workflow '{workflowInstance.Spec.Definition.Name}.{workflowInstance.Spec.Definition.Namespace}'", new("/spec/definition/version", UriKind.Relative)));
54+
if (workflowDefinition.Input?.Schema != null)
55+
{
56+
var schemaHandler = this.SchemaHandlerProvider.GetHandler(workflowDefinition.Input.Schema.Format) ?? throw new ArgumentNullException($"Failed to find an handler that supports the specified schema format '{workflowDefinition.Input.Schema.Format}'");
57+
var validationResult = await schemaHandler.ValidateAsync(workflowInstance.Spec.Input ?? [], workflowDefinition.Input.Schema, cancellationToken).ConfigureAwait(false);
58+
if (!validationResult.IsSuccess()) return new(context.Uid, false, null, new(ErrorType.Validation, ErrorTitle.Validation, ErrorStatus.Validation, $"Failed to validate the workflow instance's input:\n{string.Join('\n', validationResult.Errors?.FirstOrDefault()?.Errors?.Select(e => $"- {e.Key}: {e.Value.First()}") ?? [])}", new("/spec/input", UriKind.Relative)));
59+
}
60+
return new(context.Uid, true);
61+
}
62+
63+
}

src/runner/Synapse.Runner/Services/Executors/ContainerProcessExecutor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
7171
}
7272
catch(Exception ex)
7373
{
74-
this.Logger.LogError("An error occured while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
74+
this.Logger.LogError("An error occurred while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
7575
var message = ex.Message;
7676
try { if (this.Container?.StandardError != null) message = await this.Container.StandardError.ReadToEndAsync(cancellationToken).ConfigureAwait(false); } catch { }
7777
var error = ex.ToError(this.Task.Instance.Reference);

src/runner/Synapse.Runner/Services/Executors/GrpcCallExecutor.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,12 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo
6464
}
6565
catch (ErrorRaisedException ex)
6666
{
67-
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
67+
this.Logger.LogError("An error occurred while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
6868
await this.SetErrorAsync(ex.Error, cancellationToken).ConfigureAwait(false);
6969
}
7070
catch (Exception ex)
7171
{
72-
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
72+
this.Logger.LogError("An error occurred while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
7373
await this.SetErrorAsync(new()
7474
{
7575
Status = ErrorStatus.Validation,

src/runner/Synapse.Runner/Services/Executors/HttpCallExecutor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo
6060
}
6161
catch(Exception ex)
6262
{
63-
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
63+
this.Logger.LogError("An error occurred while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
6464
await this.SetErrorAsync(new()
6565
{
6666
Status = ErrorStatus.Validation,

src/runner/Synapse.Runner/Services/TaskExecutor.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public virtual async Task InitializeAsync(CancellationToken cancellationToken =
123123
}
124124
catch(HttpRequestException ex)
125125
{
126-
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
126+
this.Logger.LogError("An error occurred while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
127127
await this.SetErrorAsync(new Error()
128128
{
129129
Type = ErrorType.Communication,
@@ -135,7 +135,7 @@ await this.SetErrorAsync(new Error()
135135
}
136136
catch(Exception ex)
137137
{
138-
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
138+
this.Logger.LogError("An error occurred while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
139139
await this.SetErrorAsync(new Error()
140140
{
141141
Type = ErrorType.Runtime,
@@ -200,7 +200,7 @@ await this.SetErrorAsync(new()
200200
catch (OperationCanceledException) { }
201201
catch (HttpRequestException ex)
202202
{
203-
this.Logger.LogError("An error occured while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
203+
this.Logger.LogError("An error occurred while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
204204
await this.SetErrorAsync(new Error()
205205
{
206206
Type = ErrorType.Communication,
@@ -212,7 +212,7 @@ await this.SetErrorAsync(new Error()
212212
}
213213
catch (Exception ex)
214214
{
215-
this.Logger.LogError("An error occured while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
215+
this.Logger.LogError("An error occurred while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
216216
await this.SetErrorAsync(new Error()
217217
{
218218
Type = ErrorType.Runtime,

0 commit comments

Comments
 (0)