1111// See the License for the specific language governing permissions and
1212// limitations under the License.
1313
14+ using Neuroglia ;
15+ using Neuroglia . Data . Expressions ;
16+ using Neuroglia . Data . Infrastructure . ResourceOriented ;
17+ using System . Security . Cryptography ;
18+ using System . Text ;
19+
1420namespace Synapse . Runner . Services . Executors ;
1521
1622/// <summary>
@@ -23,19 +29,102 @@ namespace Synapse.Runner.Services.Executors;
2329/// <param name="context">The current <see cref="ITaskExecutionContext"/></param>
2430/// <param name="schemaHandlerProvider">The service used to provide <see cref="ISchemaHandler"/> implementations</param>
2531/// <param name="serializer">The service used to serialize/deserialize objects to/from JSON</param>
26- public class WorkflowProcessExecutor ( IServiceProvider serviceProvider , ILogger < WorkflowProcessExecutor > logger , ITaskExecutionContextFactory executionContextFactory , ITaskExecutorFactory executorFactory , ITaskExecutionContext < RunTaskDefinition > context , ISchemaHandlerProvider schemaHandlerProvider , IJsonSerializer serializer )
32+ /// <param name="api">The service used to interact with the Synapse API</param>
33+ public class WorkflowProcessExecutor ( IServiceProvider serviceProvider , ILogger < WorkflowProcessExecutor > logger , ITaskExecutionContextFactory executionContextFactory , ITaskExecutorFactory executorFactory ,
34+ ITaskExecutionContext < RunTaskDefinition > context , ISchemaHandlerProvider schemaHandlerProvider , IJsonSerializer serializer , ISynapseApiClient api )
2735 : TaskExecutor < RunTaskDefinition > ( serviceProvider , logger , executionContextFactory , executorFactory , context , schemaHandlerProvider , serializer )
2836{
2937
38+ /// <summary>
39+ /// Gets the service used to interact with the Synapse API
40+ /// </summary>
41+ protected ISynapseApiClient Api { get ; } = api ;
42+
3043 /// <summary>
3144 /// Gets the definition of the shell process to run
3245 /// </summary>
3346 protected WorkflowProcessDefinition ProcessDefinition => this . Task . Definition . Run . Workflow ! ;
3447
3548 /// <inheritdoc/>
36- protected override Task DoExecuteAsync ( CancellationToken cancellationToken )
49+ protected override async Task DoExecuteAsync ( CancellationToken cancellationToken )
3750 {
38- throw new NotImplementedException ( ) ;
51+ var hash = Convert . ToHexString ( MD5 . HashData ( Encoding . UTF8 . GetBytes ( $ "{ Environment . GetEnvironmentVariable ( SynapseDefaults . EnvironmentVariables . Runner . Name ) } { this . Task . Instance . Reference } ") ) ) . ToLowerInvariant ( ) ;
52+ var workflowInstanceName = $ "{ this . ProcessDefinition . Name } -{ hash } ";
53+ var workflowInstanceNamespace = Environment . GetEnvironmentVariable ( SynapseDefaults . EnvironmentVariables . Runner . Namespace ) ! ;
54+ WorkflowInstance workflowInstance ;
55+ try
56+ {
57+ workflowInstance = await this . Api . WorkflowInstances . GetAsync ( workflowInstanceName , workflowInstanceNamespace , cancellationToken ) . ConfigureAwait ( false ) ;
58+ switch ( workflowInstance . Status ? . Phase )
59+ {
60+ case WorkflowInstanceStatusPhase . Cancelled :
61+ await this . SetErrorAsync ( new ( )
62+ {
63+ Type = ErrorType . Runtime ,
64+ Status = ErrorStatus . Runtime ,
65+ Title = ErrorTitle . Runtime ,
66+ Detail = $ "The execution of workflow instance '{ workflowInstance . GetQualifiedName ( ) } ' has been cancelled"
67+ } , cancellationToken ) . ConfigureAwait ( false ) ;
68+ return ;
69+ case WorkflowInstanceStatusPhase . Faulted :
70+ await this . SetErrorAsync ( workflowInstance . Status . Error ! , cancellationToken ) . ConfigureAwait ( false ) ;
71+ return ;
72+ case WorkflowInstanceStatusPhase . Completed :
73+ var output = string . IsNullOrWhiteSpace ( workflowInstance . Status ? . OutputReference ) ? null : ( await this . Api . Documents . GetAsync ( workflowInstance . Status . OutputReference , cancellationToken ) . ConfigureAwait ( false ) ) . Content ;
74+ await this . SetResultAsync ( output , this . Task . Definition . Then , cancellationToken ) . ConfigureAwait ( false ) ;
75+ return ;
76+ }
77+ }
78+ catch
79+ {
80+ var workflow = await this . Api . Workflows . GetAsync ( this . ProcessDefinition . Name , this . ProcessDefinition . Namespace , cancellationToken ) . ConfigureAwait ( false ) ;
81+ var workflowDefinition = this . ProcessDefinition . Version == "latest"
82+ ? workflow . Spec . Versions . Last ( )
83+ : workflow . Spec . Versions . Get ( this . ProcessDefinition . Version ) ?? throw new NullReferenceException ( $ "Failed to find version '{ this . ProcessDefinition . Version } ' of workflow '{ workflow . GetQualifiedName ( ) } '") ;
84+ var input = await this . Task . Workflow . Expressions . EvaluateAsync < EquatableDictionary < string , object > > ( this . ProcessDefinition . Input ?? new ( ) , this . Task . Input , this . GetExpressionEvaluationArguments ( ) , cancellationToken : cancellationToken ) . ConfigureAwait ( false ) ;
85+ workflowInstance = new WorkflowInstance ( )
86+ {
87+ Metadata = new ( )
88+ {
89+ Namespace = workflowInstanceNamespace ,
90+ Name = workflowInstanceName
91+ } ,
92+ Spec = new ( )
93+ {
94+ Definition = new ( )
95+ {
96+ Namespace = this . ProcessDefinition . Namespace ,
97+ Name = this . ProcessDefinition . Name ,
98+ Version = this . ProcessDefinition . Version
99+ } ,
100+ Input = input
101+ }
102+ } ;
103+ workflowInstance = await this . Api . WorkflowInstances . CreateAsync ( workflowInstance , cancellationToken ) . ConfigureAwait ( false ) ;
104+ }
105+ var watchEvents = await this . Api . WorkflowInstances . MonitorAsync ( workflowInstance . GetName ( ) , workflowInstance . GetNamespace ( ) ! , cancellationToken ) . ConfigureAwait ( false ) ;
106+ await foreach ( var watchEvent in watchEvents )
107+ {
108+ switch ( watchEvent . Resource . Status ? . Phase )
109+ {
110+ case WorkflowInstanceStatusPhase . Cancelled :
111+ await this . SetErrorAsync ( new ( )
112+ {
113+ Type = ErrorType . Runtime ,
114+ Status = ErrorStatus . Runtime ,
115+ Title = ErrorTitle . Runtime ,
116+ Detail = $ "The execution of workflow instance '{ workflowInstance . GetQualifiedName ( ) } ' has been cancelled"
117+ } , cancellationToken ) . ConfigureAwait ( false ) ;
118+ break ;
119+ case WorkflowInstanceStatusPhase . Faulted :
120+ await this . SetErrorAsync ( workflowInstance . Status ! . Error ! , cancellationToken ) . ConfigureAwait ( false ) ;
121+ return ;
122+ case WorkflowInstanceStatusPhase . Completed :
123+ var output = string . IsNullOrWhiteSpace ( watchEvent . Resource . Status ? . OutputReference ) ? null : ( await this . Api . Documents . GetAsync ( watchEvent . Resource . Status . OutputReference , cancellationToken ) . ConfigureAwait ( false ) ) . Content ;
124+ await this . SetResultAsync ( output , this . Task . Definition . Then , cancellationToken ) . ConfigureAwait ( false ) ;
125+ return ;
126+ }
127+ }
39128 }
40129
41130}
0 commit comments