Skip to content

Commit a5c9b99

Browse files
authored
Merge pull request #539 from serverlessworkflow/feat-workflow-instance-process-deletion
Updated the `WorkflowInstanceController` to delete a workflow instance's process, if any, upon the deletion of its declaring resource
2 parents 9a1c63f + 9f35ac7 commit a5c9b99

File tree

10 files changed

+221
-45
lines changed

10 files changed

+221
-45
lines changed

src/core/Synapse.Core/Resources/KubernetesRuntimeConfiguration.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ public record KubernetesRuntimeConfiguration
2424
{
2525

2626
/// <summary>
27-
/// Gets the default worker <see cref="V1Pod"/>
27+
/// Gets the default worker <see cref="V1PodTemplateSpec"/>
2828
/// </summary>
29-
public static readonly V1Pod DefaultPodTemplate = new()
29+
public static readonly V1PodTemplateSpec DefaultPodTemplate = new()
3030
{
3131
Metadata = new(),
3232
Spec = new()
@@ -79,7 +79,7 @@ public KubernetesRuntimeConfiguration()
7979
/// Gets/sets the template to use to create runner containers
8080
/// </summary>
8181
[DataMember(Order = 2, Name = "podTemplate"), JsonPropertyOrder(2), JsonPropertyName("podTemplate"), YamlMember(Order = 2, Alias = "podTemplate")]
82-
public virtual V1Pod PodTemplate { get; set; } = LoadPodTemplate();
82+
public virtual V1PodTemplateSpec PodTemplate { get; set; } = LoadPodTemplate();
8383

8484
/// <summary>
8585
/// Gets/sets the configuration of the secrets used by the Kubernetes runtime
@@ -103,12 +103,12 @@ public KubernetesRuntimeConfiguration()
103103
/// Loads the runner container template
104104
/// </summary>
105105
/// <returns>The runner container template</returns>
106-
public static V1Pod LoadPodTemplate()
106+
public static V1PodTemplateSpec LoadPodTemplate()
107107
{
108108
var templateFilePath = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runtime.Kubernetes.Pod);
109109
if (string.IsNullOrWhiteSpace(templateFilePath) || !File.Exists(templateFilePath)) return DefaultPodTemplate;
110110
var yaml = File.ReadAllText(templateFilePath);
111-
return YamlSerializer.Default.Deserialize<V1Pod>(yaml)!;
111+
return YamlSerializer.Default.Deserialize<V1PodTemplateSpec>(yaml)!;
112112
}
113113

114114
}

src/core/Synapse.Core/Resources/WorkflowInstanceStatus.cs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,54 +26,60 @@ public record WorkflowInstanceStatus
2626
[DataMember(Order = 1, Name = "phase"), JsonPropertyName("phase"), JsonPropertyOrder(1), YamlMember(Alias = "phase", Order = 1)]
2727
public virtual string? Phase { get; set; }
2828

29+
/// <summary>
30+
/// Gets/sets the unique identifier of the process that is executing the workflow instance, if applicable
31+
/// </summary>
32+
[DataMember(Name = "processId", Order = 2), JsonPropertyName("processId"), JsonPropertyOrder(2), YamlMember(Alias = "processId", Order = 2)]
33+
public virtual string? ProcessId { get; set; }
34+
2935
/// <summary>
3036
/// Gets/sets the date and time the task has been started at, if applicable
3137
/// </summary>
32-
[DataMember(Name = "startedAt", Order = 2), JsonPropertyName("startedAt"), JsonPropertyOrder(2), YamlMember(Alias = "startedAt", Order = 2)]
38+
[DataMember(Name = "startedAt", Order = 3), JsonPropertyName("startedAt"), JsonPropertyOrder(3), YamlMember(Alias = "startedAt", Order = 3)]
3339
public virtual DateTimeOffset? StartedAt { get; set; }
3440

3541
/// <summary>
3642
/// Gets/sets the date and time the task has ended, if applicable
3743
/// </summary>
38-
[DataMember(Name = "endedAt", Order = 3), JsonPropertyName("endedAt"), JsonPropertyOrder(3), YamlMember(Alias = "endedAt", Order = 3)]
44+
[DataMember(Name = "endedAt", Order = 4), JsonPropertyName("endedAt"), JsonPropertyOrder(4), YamlMember(Alias = "endedAt", Order = 4)]
3945
public virtual DateTimeOffset? EndedAt { get; set; }
4046

4147
/// <summary>
4248
/// Gets/sets a list containing the tasks that are being performed -or already have been performed- by the workflow
4349
/// </summary>
44-
[DataMember(Order = 4, Name = "tasks"), JsonPropertyName("tasks"), JsonPropertyOrder(4), YamlMember(Alias = "tasks", Order = 4)]
50+
[DataMember(Order = 5, Name = "tasks"), JsonPropertyName("tasks"), JsonPropertyOrder(5), YamlMember(Alias = "tasks", Order = 5)]
4551
public virtual EquatableList<TaskInstance>? Tasks { get; set; }
4652

4753
/// <summary>
4854
/// Gets/sets a list that contains the workflow's runs, if any
4955
/// </summary>
50-
[DataMember(Order = 5, Name = "runs"), JsonPropertyName("runs"), JsonPropertyOrder(5), YamlMember(Alias = "runs", Order = 5)]
56+
[DataMember(Order = 6, Name = "runs"), JsonPropertyName("runs"), JsonPropertyOrder(6), YamlMember(Alias = "runs", Order = 6)]
5157
public virtual EquatableList<WorkflowRun>? Runs { get; set; }
5258

5359
/// <summary>
5460
/// Gets/sets a name/context map that contains the workflow's pending correlations
5561
/// </summary>
56-
[DataMember(Order = 6, Name = "correlation"), JsonPropertyName("correlation"), JsonPropertyOrder(6), YamlMember(Alias = "correlation", Order = 6)]
62+
[DataMember(Order = 7, Name = "correlation"), JsonPropertyName("correlation"), JsonPropertyOrder(7), YamlMember(Alias = "correlation", Order = 7)]
5763
public virtual WorkflowInstanceCorrelationStatus? Correlation { get; set; }
5864

5965
/// <summary>
6066
/// Gets/sets the error, if any, that has occurred during the workflow's execution
6167
/// </summary>
62-
[DataMember(Name = "error", Order = 7), JsonPropertyName("error"), JsonPropertyOrder(7), YamlMember(Alias = "error", Order = 7)]
68+
[DataMember(Name = "error", Order = 8), JsonPropertyName("error"), JsonPropertyOrder(8), YamlMember(Alias = "error", Order = 8)]
6369
public virtual Error? Error { get; set; }
6470

6571
/// <summary>
6672
/// Gets/sets a reference to the workflow's context data, if any
6773
/// </summary>
6874
[Required, MinLength(1)]
69-
[DataMember(Order = 8, Name = "contextReference"), JsonPropertyName("contextReference"), JsonPropertyOrder(8), YamlMember(Alias = "contextReference", Order = 8)]
75+
[DataMember(Order = 9, Name = "contextReference"), JsonPropertyName("contextReference"), JsonPropertyOrder(9), YamlMember(Alias = "contextReference", Order = 9)]
7076
public virtual string ContextReference { get; set; } = null!;
7177

7278
/// <summary>
7379
/// Gets/sets a reference to the workflow's context data, if any
7480
/// </summary>
7581
[Required, MinLength(1)]
76-
[DataMember(Order = 9, Name = "outputReference"), JsonPropertyName("outputReference"), JsonPropertyOrder(9), YamlMember(Alias = "outputReference", Order = 9)]
82+
[DataMember(Order = 10, Name = "outputReference"), JsonPropertyName("outputReference"), JsonPropertyOrder(10), YamlMember(Alias = "outputReference", Order = 10)]
7783
public virtual string? OutputReference { get; set; }
7884

7985
}

src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@ namespace Synapse.Operator.Services;
2222
/// <param name="loggerFactory">The service used to create <see cref="ILogger"/>s</param>
2323
/// <param name="controllerOptions">The service used to access the current <see cref="IOptions{TOptions}"/></param>
2424
/// <param name="repository">The service used to manage <see cref="IResource"/>s</param>
25-
/// <param name="operatorController">The service used to access the current <see cref="Resources.Operator"/></param>
26-
/// <param name="workflowController">The service used to access all monitored <see cref="Workflow"/>s</param>
25+
/// <param name="operator">The service used to access the current <see cref="Resources.Operator"/></param>
26+
/// <param name="workflows">The service used to access all monitored <see cref="Workflow"/>s</param>
27+
/// <param name="workflowRuntime">The service used to run workflows</param>
2728
/// <param name="documents">The <see cref="IRepository"/> used to manage <see cref="Document"/>s</param>
28-
public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<WorkflowInstance>> controllerOptions, IResourceRepository repository, IOperatorController operatorController, IWorkflowController workflowController, IRepository<Document, string> documents)
29+
public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<WorkflowInstance>> controllerOptions, IResourceRepository repository, IOperatorController @operator, IWorkflowController workflows, IWorkflowRuntime workflowRuntime, IRepository<Document, string> documents)
2930
: ResourceController<WorkflowInstance>(loggerFactory, controllerOptions, repository)
3031
{
3132

@@ -37,12 +38,17 @@ public class WorkflowInstanceController(IServiceProvider serviceProvider, ILogge
3738
/// <summary>
3839
/// Gets the service used to monitor the current <see cref="Operator"/>
3940
/// </summary>
40-
protected IResourceMonitor<Resources.Operator> Operator => operatorController.Operator;
41+
protected IResourceMonitor<Resources.Operator> Operator => @operator.Operator;
4142

4243
/// <summary>
4344
/// Gets a dictionary containing all monitored <see cref="Workflow"/>s
4445
/// </summary>
45-
protected IReadOnlyDictionary<string, Workflow> Workflows => workflowController.Workflows;
46+
protected IReadOnlyDictionary<string, Workflow> Workflows => workflows.Workflows;
47+
48+
/// <summary>
49+
/// Gets the service used to run workflows
50+
/// </summary>
51+
protected IWorkflowRuntime WorkflowRuntime { get; } = workflowRuntime;
4652

4753
/// <summary>
4854
/// Gets the <see cref="IRepository"/> used to manage <see cref="Document"/>s
@@ -231,11 +237,18 @@ protected override async Task OnResourceDeletedAsync(WorkflowInstance workflowIn
231237
if (this.Handlers.TryRemove(workflowInstance.GetQualifiedName(), out var process)) await process.DisposeAsync().ConfigureAwait(false);
232238
var selectors = new LabelSelector[]
233239
{
234-
new(SynapseDefaults.Resources.Labels.WorkflowInstance, LabelSelectionOperator.Equals, workflowInstance.GetQualifiedName())
240+
new(SynapseDefaults.Resources.Labels.WorkflowInstance, LabelSelectionOperator.Equals, workflowInstance.GetQualifiedName())
235241
};
236242
await foreach (var correlation in this.Repository.GetAllAsync<Correlation>(null, selectors, cancellationToken: cancellationToken))
237243
{
238-
await this.Repository.RemoveAsync<Correlation>(correlation.GetName(), correlation.GetNamespace(), false, cancellationToken).ConfigureAwait(false);
244+
try
245+
{
246+
await this.Repository.RemoveAsync<Correlation>(correlation.GetName(), correlation.GetNamespace(), false, cancellationToken).ConfigureAwait(false);
247+
}
248+
catch(Exception ex)
249+
{
250+
Logger.LogWarning(ex, "Failed to delete correlation '{correlation}' for workflow instance '{workflowInstance}'", correlation.GetQualifiedName(), workflowInstance.GetQualifiedName());
251+
}
239252
}
240253
if (workflowInstance.Status != null)
241254
{
@@ -251,12 +264,33 @@ protected override async Task OnResourceDeletedAsync(WorkflowInstance workflowIn
251264
if (!string.IsNullOrWhiteSpace(task.OutputReference)) documentReferences.Add(task.OutputReference);
252265
}
253266
}
254-
foreach (var documentReference in documentReferences.Distinct()) await this.Documents.RemoveAsync(documentReference, cancellationToken).ConfigureAwait(false);
267+
foreach (var documentReference in documentReferences.Distinct())
268+
{
269+
try
270+
{
271+
await this.Documents.RemoveAsync(documentReference, cancellationToken).ConfigureAwait(false);
272+
}
273+
catch (Exception ex)
274+
{
275+
Logger.LogWarning(ex, "Failed to delete document '{document}' for workflow instance '{workflowInstance}'", documentReference, workflowInstance.GetQualifiedName());
276+
}
277+
}
278+
if (!string.IsNullOrWhiteSpace(workflowInstance.Status.ProcessId))
279+
{
280+
try
281+
{
282+
await WorkflowRuntime.DeleteProcessAsync(workflowInstance.Status.ProcessId, cancellationToken).ConfigureAwait(false);
283+
}
284+
catch(Exception ex)
285+
{
286+
Logger.LogWarning(ex, "Failed to delete process with id '{processId}' for workflow instance '{workflowInstance}'", workflowInstance.Status.ProcessId, workflowInstance.GetQualifiedName());
287+
}
288+
}
255289
}
256290
}
257291
catch(Exception ex)
258292
{
259-
this.Logger.LogError("An error occured while handling the deletion of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex);
293+
Logger.LogError("An error occurred while handling the deletion of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex);
260294
}
261295
}
262296

src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,12 @@ protected virtual async Task StartProcessAsync(CancellationToken cancellationTok
133133
this.LogSubscription?.Dispose();
134134
var workflow = await this.GetWorkflowAsync(cancellationToken).ConfigureAwait(false);
135135
var serviceAccount = await this.GetServiceAccountAsync(cancellationToken).ConfigureAwait(false);
136+
if (!string.IsNullOrWhiteSpace(WorkflowInstance.Resource.Status?.ProcessId)) await Runtime.DeleteProcessAsync(WorkflowInstance.Resource.Status.ProcessId, cancellationToken).ConfigureAwait(false);
136137
this.Process = await this.Runtime.CreateProcessAsync(workflow, this.WorkflowInstance.Resource, serviceAccount, cancellationToken).ConfigureAwait(false);
138+
if (!string.IsNullOrWhiteSpace(this.Process.Id)) await UpdateWorkflowInstanceStatusAsync(status =>
139+
{
140+
status.ProcessId = this.Process.Id;
141+
}, cancellationToken).ConfigureAwait(false);
137142
await this.Process.StartAsync(cancellationToken).ConfigureAwait(false);
138143
this.LogSubscription = this.Process.StandardOutput?.Subscribe(this.LogBatchQueue.Enqueue);
139144
this.LogBatchTimer ??= new(async _ => await this.OnPersistLogBatchAsync(), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
@@ -205,6 +210,34 @@ protected virtual async Task OnPersistLogBatchAsync()
205210
this._persistingLogs = false;
206211
}
207212

213+
/// <summary>
214+
/// Updates the status of the handled <see cref="Resources.WorkflowInstance"/>
215+
/// </summary>
216+
/// <param name="statusUpdate">An <see cref="Action{T}"/> used to update the <see cref="Resources.WorkflowInstance"/>'s status</param>
217+
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
218+
/// <returns>A new awaitable <see cref="Task"/></returns>
219+
protected virtual async Task UpdateWorkflowInstanceStatusAsync(Action<WorkflowInstanceStatus> statusUpdate, CancellationToken cancellationToken)
220+
{
221+
ArgumentNullException.ThrowIfNull(statusUpdate);
222+
var maxRetries = 3;
223+
for (var attempt = 0; attempt < maxRetries; attempt++)
224+
{
225+
try
226+
{
227+
var original = this.WorkflowInstance.Resource;
228+
var updated = original.Clone()!;
229+
updated.Status ??= new();
230+
statusUpdate(updated.Status);
231+
var patch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated);
232+
await this.Resources.PatchAsync<WorkflowInstance>(new Patch(PatchType.JsonPatch, patch), updated.GetName(), updated.GetNamespace(), original.Metadata.ResourceVersion, false, cancellationToken).ConfigureAwait(false);
233+
}
234+
catch (ConcurrencyException) when (attempt + 1 < maxRetries)
235+
{
236+
await Task.Delay(TimeSpan.FromMilliseconds(100 * (attempt + 1)), cancellationToken).ConfigureAwait(false);
237+
}
238+
}
239+
}
240+
208241
/// <summary>
209242
/// Disposes of the <see cref="WorkflowInstanceHandler"/>
210243
/// </summary>

src/runtime/Synapse.Runtime.Abstractions/Services/Interfaces/IWorkflowRuntime.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,12 @@ public interface IWorkflowRuntime
3030
/// <returns>A new <see cref="IWorkflowProcess"/></returns>
3131
Task<IWorkflowProcess> CreateProcessAsync(Workflow workflow, WorkflowInstance workflowInstance, ServiceAccount serviceAccount, CancellationToken cancellationToken = default);
3232

33+
/// <summary>
34+
/// Deletes the <see cref="IWorkflowProcess"/> with the specified id
35+
/// </summary>
36+
/// <param name="processId">The unique identifier of the process to delete</param>
37+
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
38+
/// <returns>A new awaitable <see cref="Task"/></returns>
39+
Task DeleteProcessAsync(string processId, CancellationToken cancellationToken = default);
40+
3341
}

src/runtime/Synapse.Runtime.Abstractions/Services/WorkflowRuntimeBase.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ protected WorkflowRuntimeBase(ILoggerFactory loggerFactory)
3939
/// <inheritdoc/>
4040
public abstract Task<IWorkflowProcess> CreateProcessAsync(Workflow workflow, WorkflowInstance workflowInstance, ServiceAccount serviceAccount, CancellationToken cancellationToken = default);
4141

42+
/// <inheritdoc/>
43+
public abstract Task DeleteProcessAsync(string processId, CancellationToken cancellationToken = default);
44+
4245
/// <summary>
4346
/// Disposes of the <see cref="WorkflowProcessBase"/>
4447
/// </summary>

src/runtime/Synapse.Runtime.Docker/Services/DockerRuntime.cs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
using Docker.DotNet.Models;
1616
using Microsoft.Extensions.DependencyInjection;
1717
using Synapse.Runtime.Services;
18-
using static Synapse.SynapseDefaults.Resources;
1918
using System.Net;
2019

2120
namespace Synapse.Runtime.Docker.Services;
@@ -143,6 +142,29 @@ public override async Task<IWorkflowProcess> CreateProcessAsync(Workflow workflo
143142
}
144143
}
145144

145+
/// <inheritdoc/>
146+
public override async Task DeleteProcessAsync(string processId, CancellationToken cancellationToken = default)
147+
{
148+
ArgumentException.ThrowIfNullOrWhiteSpace(processId);
149+
try
150+
{
151+
Logger.LogDebug("Deleting the Docker process with id '{processId}'...", processId);
152+
await Docker!.Containers.RemoveContainerAsync(processId, new()
153+
{
154+
Force = true,
155+
RemoveVolumes = true,
156+
RemoveLinks = true
157+
}, cancellationToken).ConfigureAwait(false);
158+
Processes.TryRemove(processId, out _);
159+
Logger.LogDebug("The Docker process with id '{processId}' has been successfully deleted", processId);
160+
}
161+
catch(Exception ex)
162+
{
163+
Logger.LogError("An error occurred while deleting the Docker process with id '{processId}': {ex}", processId, ex);
164+
throw;
165+
}
166+
}
167+
146168
/// <inheritdoc/>
147169
protected override ValueTask DisposeAsync(bool disposing)
148170
{

0 commit comments

Comments
 (0)