Skip to content

Commit 68e0104

Browse files
authored
Merge pull request #537 from serverlessworkflow/feat-workflow-instance-resource-deletion
Updated the `WorkflowInstanceController` to run a background loop to clean-up outlived workflow instances
2 parents 8d68c67 + 828ed72 commit 68e0104

File tree

5 files changed

+96
-2
lines changed

5 files changed

+96
-2
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright © 2024-Present The Synapse Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"),
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
namespace Synapse.Resources;
15+
16+
/// <summary>
17+
/// Represents the options used to configure the cleanup behavior of a Synapse Operator
18+
/// </summary>
19+
[DataContract]
20+
public record OperatorCleanupOptions
21+
{
22+
23+
/// <summary>
24+
/// Gets or sets the time to live for completed workflow instances. Defaults to 7 days. If null, the operator will not delete completed workflow instances.
25+
/// </summary>
26+
[DataMember(Order = 1, Name = "ttl"), JsonPropertyOrder(1), JsonPropertyName("ttl"), YamlMember(Order = 1, Alias = "ttl")]
27+
public TimeSpan? Ttl { get; set; } = TimeSpan.FromDays(7);
28+
29+
/// <summary>
30+
/// Gets or sets the interval at which the operator sweeps for completed workflow instances to delete. Defaults to 5 minutes.
31+
/// </summary>
32+
[DataMember(Order = 2, Name = "interval"), JsonPropertyOrder(2), JsonPropertyName("interval"), YamlMember(Order = 2, Alias = "interval")]
33+
public TimeSpan Interval { get; set; } = TimeSpan.FromMinutes(5);
34+
35+
}

src/core/Synapse.Core/Resources/OperatorSpec.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,10 @@ public record OperatorSpec
3636
[DataMember(Order = 2, Name = "selector"), JsonPropertyOrder(2), JsonPropertyName("selector"), YamlMember(Order = 2, Alias = "selector")]
3737
public virtual IDictionary<string, string>? Selector { get; set; }
3838

39+
/// <summary>
40+
/// Gets/sets options controlling retention of completed workflow instances and cleanup sweep behavior.
41+
/// </summary>
42+
[DataMember(Order = 3, Name = "cleanup"), JsonPropertyOrder(3), JsonPropertyName("cleanup"), YamlMember(Order = 3, Alias = "cleanup")]
43+
public virtual OperatorCleanupOptions? Cleanup { get; set; } = new();
44+
3945
}

src/core/Synapse.Core/Resources/RunnerConfiguration.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,5 @@ public record RunnerConfiguration
5555
/// </summary>
5656
[DataMember(Order = 5, Name = "publishLifecycleEvents"), JsonPropertyOrder(5), JsonPropertyName("publishLifecycleEvents"), YamlMember(Order = 5, Alias = "publishLifecycleEvents")]
5757
public virtual bool? PublishLifecycleEvents { get; set; } = true;
58+
5859
}

src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
// See the License for the specific language governing permissions and
1212
// limitations under the License.
1313

14-
using Neuroglia.Data.Infrastructure.ResourceOriented;
1514
using Neuroglia.Data.Infrastructure.Services;
1615

1716
namespace Synapse.Operator.Services;
@@ -61,6 +60,7 @@ public override async Task StartAsync(CancellationToken cancellationToken)
6160
await base.StartAsync(cancellationToken).ConfigureAwait(false);
6261
this.Operator!.Select(b => b.Resource.Spec.Selector).SubscribeAsync(this.OnResourceSelectorChangedAsync, cancellationToken: cancellationToken);
6362
await this.OnResourceSelectorChangedAsync(this.Operator!.Resource.Spec.Selector).ConfigureAwait(false);
63+
if (this.Operator?.Resource?.Spec?.Cleanup != null)_ = Task.Run(this.CleanupAsync, CancellationTokenSource.Token);
6464
}
6565

6666
/// <inheritdoc/>
@@ -70,6 +70,59 @@ protected override Task ReconcileAsync(CancellationToken cancellationToken = def
7070
return base.ReconcileAsync(cancellationToken);
7171
}
7272

73+
/// <inheritdoc/>
74+
protected virtual async Task CleanupAsync()
75+
{
76+
while (!CancellationTokenSource.IsCancellationRequested)
77+
{
78+
try
79+
{
80+
if (this.Operator?.Resource?.Spec?.Cleanup == null) break;
81+
var cutoff = DateTimeOffset.UtcNow - this.Operator?.Resource?.Spec?.Cleanup.Ttl;
82+
var deleted = 0;
83+
var selectors = this.Options.LabelSelectors;
84+
85+
await foreach (var instance in this.Repository.GetAllAsync<WorkflowInstance>(labelSelectors: selectors, cancellationToken: CancellationTokenSource.Token))
86+
{
87+
if (Handlers.ContainsKey(instance.GetQualifiedName())) continue;
88+
if (instance.IsOperative) continue;
89+
var finishedAt = instance.Status?.EndedAt ?? instance.Metadata.CreationTimestamp;
90+
if (finishedAt <= cutoff && !this.Handlers.ContainsKey(instance.GetQualifiedName()))
91+
{
92+
try { await this.TryReleaseAsync(instance, CancellationTokenSource.Token).ConfigureAwait(false); } catch { }
93+
try
94+
{
95+
await this.Repository.RemoveAsync<WorkflowInstance>(instance.GetName(), instance.GetNamespace(), false, CancellationTokenSource.Token).ConfigureAwait(false);
96+
deleted++;
97+
}
98+
catch (Exception ex)
99+
{
100+
Logger.LogWarning(ex, "Failed to delete expired workflow instance {instance}", instance.GetQualifiedName());
101+
}
102+
}
103+
}
104+
}
105+
catch (OperationCanceledException) when (CancellationTokenSource.Token.IsCancellationRequested)
106+
{
107+
break;
108+
}
109+
catch (Exception ex)
110+
{
111+
this.Logger.LogError(ex, "Instance cleanup sweep failed");
112+
try { await Task.Delay(TimeSpan.FromSeconds(5), CancellationTokenSource.Token).ConfigureAwait(false); } catch { }
113+
}
114+
try
115+
{
116+
var delay = this.Operator?.Resource?.Spec?.Cleanup?.Interval ?? TimeSpan.FromMinutes(5);
117+
await Task.Delay(delay, CancellationTokenSource.Token).ConfigureAwait(false);
118+
}
119+
catch (OperationCanceledException) when (CancellationTokenSource.IsCancellationRequested)
120+
{
121+
break;
122+
}
123+
}
124+
}
125+
73126
/// <summary>
74127
/// Creates a new <see cref="WorkflowInstanceHandler"/> for the specified workflow
75128
/// </summary>

src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
// limitations under the License.
1313

1414
using Neuroglia.Data.Infrastructure.Services;
15-
using Neuroglia.Mediation;
1615
using System.Text;
1716

1817
namespace Synapse.Operator.Services;

0 commit comments

Comments
 (0)