Skip to content

Commit c49a3dd

Browse files
authored
Merge pull request #412 from serverlessworkflow/feat-workflow-process
Implemented the WorkflowProcessExecutor
2 parents ac7f8e9 + f1f7ef5 commit c49a3dd

File tree

10 files changed

+114
-21
lines changed

10 files changed

+114
-21
lines changed

src/core/Synapse.Core/WorkflowInstanceStatusPhase.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@ public static class WorkflowInstanceStatusPhase
2828
/// </summary>
2929
public const string Running = "running";
3030
/// <summary>
31-
/// Indicates that the workflow ran to completion
32-
/// </summary>
33-
public const string Completed = "completed";
34-
/// <summary>
3531
/// Indicates that the workflow's execution is waiting for user or event input
3632
/// </summary>
3733
public const string Waiting = "waiting";
3834
/// <summary>
35+
/// Indicates that the workflow ran to completion
36+
/// </summary>
37+
public const string Completed = "completed";
38+
/// <summary>
3939
/// Indicates that the workflow's execution has been cancelled
4040
/// </summary>
4141
public const string Cancelled = "cancelled";

src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ protected virtual async Task CreateOrUpdateContextAsync(CorrelationContext conte
367367
Metadata = new()
368368
{
369369
Namespace = this.Correlation.Resource.Spec.Outcome.Start!.Workflow.Namespace,
370-
Name = $"{this.Correlation.Resource.Spec.Outcome.Start!.Workflow.Namespace}-"
370+
Name = $"{this.Correlation.Resource.Spec.Outcome.Start!.Workflow.Name}-"
371371
},
372372
Spec = new()
373373
{

src/runner/Synapse.Runner/Services/Executors/ContainerProcessExecutor.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
7171
}
7272
catch(Exception ex)
7373
{
74+
this.Logger.LogError("An error occured while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
7475
var message = ex.Message;
7576
try { if (this.Container?.StandardError != null) message = await this.Container.StandardError.ReadToEndAsync(cancellationToken).ConfigureAwait(false); } catch { }
7677
var error = ex.ToError(this.Task.Instance.Reference);

src/runner/Synapse.Runner/Services/Executors/ExtensionTaskExecutor.cs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,6 @@ public class ExtensionTaskExecutor(IServiceProvider serviceProvider, ILogger<Ext
2828
{
2929

3030
/// <inheritdoc/>
31-
protected override Task DoExecuteAsync(CancellationToken cancellationToken)
32-
{
33-
this.GetType();
34-
throw new NotImplementedException();
35-
}
31+
protected override Task DoExecuteAsync(CancellationToken cancellationToken) => throw new NotImplementedException();
3632

3733
}

src/runner/Synapse.Runner/Services/Executors/GrpcCallExecutor.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,14 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo
6262
var callInvoker = new DefaultCallInvoker(channel);
6363
this.GrpcClient = DynamicGrpcClient.FromDescriptorProtos(callInvoker: callInvoker, [fileDescriptor]);
6464
}
65-
catch (ErrorRaisedException ex) { await this.SetErrorAsync(ex.Error, cancellationToken).ConfigureAwait(false); }
65+
catch (ErrorRaisedException ex)
66+
{
67+
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
68+
await this.SetErrorAsync(ex.Error, cancellationToken).ConfigureAwait(false);
69+
}
6670
catch (Exception ex)
6771
{
72+
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
6873
await this.SetErrorAsync(new()
6974
{
7075
Status = ErrorStatus.Validation,

src/runner/Synapse.Runner/Services/Executors/HttpCallExecutor.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo
6060
}
6161
catch(Exception ex)
6262
{
63+
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
6364
await this.SetErrorAsync(new()
6465
{
6566
Status = ErrorStatus.Validation,

src/runner/Synapse.Runner/Services/Executors/SwitchTaskExecutor.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
3636
var defaultCase = this.Task.Definition.Switch.FirstOrDefault(kvp => string.IsNullOrWhiteSpace(kvp.Value.When));
3737
foreach (var @case in this.Task.Definition.Switch!.Where(c => !string.IsNullOrWhiteSpace(c.Value.When)))
3838
{
39-
if (await this.Task.Workflow.Expressions.EvaluateConditionAsync(@case.Value.When!, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false))
40-
{
41-
matches.Add(@case);
42-
}
39+
if (await this.Task.Workflow.Expressions.EvaluateConditionAsync(@case.Value.When!, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false)) matches.Add(@case);
4340
}
4441
if (matches.Count == 1) await this.SetResultAsync(this.Task.Input, matches.First().Value.Then, cancellationToken).ConfigureAwait(false);
4542
else if (matches.Count > 1) await this.SetErrorAsync(Error.Configuration(this.Task.Instance.Reference, $"At most one matching case is allowed, but cases {string.Join(", ", matches.Select(m => m.Key))} have been matched."), cancellationToken).ConfigureAwait(false);

src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs

Lines changed: 92 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@
1111
// See the License for the specific language governing permissions and
1212
// limitations under the License.
1313

14+
using Neuroglia;
15+
using Neuroglia.Data.Expressions;
16+
using Neuroglia.Data.Infrastructure.ResourceOriented;
17+
using System.Security.Cryptography;
18+
using System.Text;
19+
1420
namespace Synapse.Runner.Services.Executors;
1521

1622
/// <summary>
@@ -23,19 +29,102 @@ namespace Synapse.Runner.Services.Executors;
2329
/// <param name="context">The current <see cref="ITaskExecutionContext"/></param>
2430
/// <param name="schemaHandlerProvider">The service used to provide <see cref="ISchemaHandler"/> implementations</param>
2531
/// <param name="serializer">The service used to serialize/deserialize objects to/from JSON</param>
26-
public class WorkflowProcessExecutor(IServiceProvider serviceProvider, ILogger<WorkflowProcessExecutor> logger, ITaskExecutionContextFactory executionContextFactory, ITaskExecutorFactory executorFactory, ITaskExecutionContext<RunTaskDefinition> context, ISchemaHandlerProvider schemaHandlerProvider, IJsonSerializer serializer)
32+
/// <param name="api">The service used to interact with the Synapse API</param>
33+
public class WorkflowProcessExecutor(IServiceProvider serviceProvider, ILogger<WorkflowProcessExecutor> logger, ITaskExecutionContextFactory executionContextFactory, ITaskExecutorFactory executorFactory,
34+
ITaskExecutionContext<RunTaskDefinition> context, ISchemaHandlerProvider schemaHandlerProvider, IJsonSerializer serializer, ISynapseApiClient api)
2735
: TaskExecutor<RunTaskDefinition>(serviceProvider, logger, executionContextFactory, executorFactory, context, schemaHandlerProvider, serializer)
2836
{
2937

38+
/// <summary>
39+
/// Gets the service used to interact with the Synapse API
40+
/// </summary>
41+
protected ISynapseApiClient Api { get; } = api;
42+
3043
/// <summary>
3144
/// Gets the definition of the shell process to run
3245
/// </summary>
3346
protected WorkflowProcessDefinition ProcessDefinition => this.Task.Definition.Run.Workflow!;
3447

3548
/// <inheritdoc/>
36-
protected override Task DoExecuteAsync(CancellationToken cancellationToken)
49+
protected override async Task DoExecuteAsync(CancellationToken cancellationToken)
3750
{
38-
throw new NotImplementedException();
51+
var hash = Convert.ToHexString(MD5.HashData(Encoding.UTF8.GetBytes($"{Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Name)}{this.Task.Instance.Reference}"))).ToLowerInvariant();
52+
var workflowInstanceName = $"{this.ProcessDefinition.Name}-{hash}";
53+
var workflowInstanceNamespace = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Namespace)!;
54+
WorkflowInstance workflowInstance;
55+
try
56+
{
57+
workflowInstance = await this.Api.WorkflowInstances.GetAsync(workflowInstanceName, workflowInstanceNamespace, cancellationToken).ConfigureAwait(false);
58+
switch (workflowInstance.Status?.Phase)
59+
{
60+
case WorkflowInstanceStatusPhase.Cancelled:
61+
await this.SetErrorAsync(new()
62+
{
63+
Type = ErrorType.Runtime,
64+
Status = ErrorStatus.Runtime,
65+
Title = ErrorTitle.Runtime,
66+
Detail = $"The execution of workflow instance '{workflowInstance.GetQualifiedName()}' has been cancelled"
67+
}, cancellationToken).ConfigureAwait(false);
68+
return;
69+
case WorkflowInstanceStatusPhase.Faulted:
70+
await this.SetErrorAsync(workflowInstance.Status.Error!, cancellationToken).ConfigureAwait(false);
71+
return;
72+
case WorkflowInstanceStatusPhase.Completed:
73+
var output = string.IsNullOrWhiteSpace(workflowInstance.Status?.OutputReference) ? null : (await this.Api.Documents.GetAsync(workflowInstance.Status.OutputReference, cancellationToken).ConfigureAwait(false)).Content;
74+
await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
75+
return;
76+
}
77+
}
78+
catch
79+
{
80+
var workflow = await this.Api.Workflows.GetAsync(this.ProcessDefinition.Name, this.ProcessDefinition.Namespace, cancellationToken).ConfigureAwait(false);
81+
var workflowDefinition = this.ProcessDefinition.Version == "latest"
82+
? workflow.Spec.Versions.Last()
83+
: workflow.Spec.Versions.Get(this.ProcessDefinition.Version) ?? throw new NullReferenceException($"Failed to find version '{this.ProcessDefinition.Version}' of workflow '{workflow.GetQualifiedName()}'");
84+
var input = await this.Task.Workflow.Expressions.EvaluateAsync<EquatableDictionary<string, object>>(this.ProcessDefinition.Input ?? new(), this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken: cancellationToken).ConfigureAwait(false);
85+
workflowInstance = new WorkflowInstance()
86+
{
87+
Metadata = new()
88+
{
89+
Namespace = workflowInstanceNamespace,
90+
Name = workflowInstanceName
91+
},
92+
Spec = new()
93+
{
94+
Definition = new()
95+
{
96+
Namespace = this.ProcessDefinition.Namespace,
97+
Name = this.ProcessDefinition.Name,
98+
Version = this.ProcessDefinition.Version
99+
},
100+
Input = input
101+
}
102+
};
103+
workflowInstance = await this.Api.WorkflowInstances.CreateAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
104+
}
105+
var watchEvents = await this.Api.WorkflowInstances.MonitorAsync(workflowInstance.GetName(), workflowInstance.GetNamespace()!, cancellationToken).ConfigureAwait(false);
106+
await foreach(var watchEvent in watchEvents)
107+
{
108+
switch (watchEvent.Resource.Status?.Phase)
109+
{
110+
case WorkflowInstanceStatusPhase.Cancelled:
111+
await this.SetErrorAsync(new()
112+
{
113+
Type = ErrorType.Runtime,
114+
Status = ErrorStatus.Runtime,
115+
Title = ErrorTitle.Runtime,
116+
Detail = $"The execution of workflow instance '{workflowInstance.GetQualifiedName()}' has been cancelled"
117+
}, cancellationToken).ConfigureAwait(false);
118+
break;
119+
case WorkflowInstanceStatusPhase.Faulted:
120+
await this.SetErrorAsync(workflowInstance.Status!.Error!, cancellationToken).ConfigureAwait(false);
121+
return;
122+
case WorkflowInstanceStatusPhase.Completed:
123+
var output = string.IsNullOrWhiteSpace(watchEvent.Resource.Status?.OutputReference) ? null : (await this.Api.Documents.GetAsync(watchEvent.Resource.Status.OutputReference, cancellationToken).ConfigureAwait(false)).Content;
124+
await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
125+
return;
126+
}
127+
}
39128
}
40129

41130
}

src/runner/Synapse.Runner/Services/TaskExecutor.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ public virtual async Task InitializeAsync(CancellationToken cancellationToken =
123123
}
124124
catch(HttpRequestException ex)
125125
{
126+
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
126127
await this.SetErrorAsync(new Error()
127128
{
128129
Type = ErrorType.Communication,
@@ -134,6 +135,7 @@ await this.SetErrorAsync(new Error()
134135
}
135136
catch(Exception ex)
136137
{
138+
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
137139
await this.SetErrorAsync(new Error()
138140
{
139141
Type = ErrorType.Runtime,
@@ -198,6 +200,7 @@ await this.SetErrorAsync(new()
198200
catch (OperationCanceledException) { }
199201
catch (HttpRequestException ex)
200202
{
203+
this.Logger.LogError("An error occured while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
201204
await this.SetErrorAsync(new Error()
202205
{
203206
Type = ErrorType.Communication,
@@ -209,6 +212,7 @@ await this.SetErrorAsync(new Error()
209212
}
210213
catch (Exception ex)
211214
{
215+
this.Logger.LogError("An error occured while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
212216
await this.SetErrorAsync(new Error()
213217
{
214218
Type = ErrorType.Runtime,

src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ namespace Synapse.Runner.Services;
2929
/// <param name="services">The current <see cref="IServiceProvider"/></param>
3030
/// <param name="expressionEvaluator">The service used to evaluate runtime expressions</param>
3131
/// <param name="jsonSerializer">The service used to serialize/deserialize objects to/from JSON</param>
32-
/// <param name="cloudFlowsApi">The service used to interact with the Synapse API</param>
32+
/// <param name="api">The service used to interact with the Synapse API</param>
3333
/// <param name="options">The service used to access the current <see cref="RunnerOptions"/></param>
3434
/// <param name="definition">The <see cref="WorkflowDefinition"/> of the <see cref="WorkflowInstance"/> to execute</param>
3535
/// <param name="instance">The <see cref="WorkflowInstance"/> to execute</param>
36-
public class WorkflowExecutionContext(IServiceProvider services, IExpressionEvaluator expressionEvaluator, IJsonSerializer jsonSerializer, ISynapseApiClient cloudFlowsApi, IOptions<RunnerOptions> options, WorkflowDefinition definition, WorkflowInstance instance)
36+
public class WorkflowExecutionContext(IServiceProvider services, IExpressionEvaluator expressionEvaluator, IJsonSerializer jsonSerializer, ISynapseApiClient api, IOptions<RunnerOptions> options, WorkflowDefinition definition, WorkflowInstance instance)
3737
: IWorkflowExecutionContext
3838
{
3939

@@ -51,7 +51,7 @@ public class WorkflowExecutionContext(IServiceProvider services, IExpressionEval
5151
/// <summary>
5252
/// Gets the service used to interact with the Synapse API
5353
/// </summary>
54-
protected ISynapseApiClient Api { get; } = cloudFlowsApi;
54+
protected ISynapseApiClient Api { get; } = api;
5555

5656
/// <summary>
5757
/// Gets the current <see cref="RunnerOptions"/>

0 commit comments

Comments
 (0)