diff --git a/src/core/Synapse.Core/Resources/OperatorCleanupOptions.cs b/src/core/Synapse.Core/Resources/OperatorCleanupOptions.cs new file mode 100644 index 00000000..0bbb2f5f --- /dev/null +++ b/src/core/Synapse.Core/Resources/OperatorCleanupOptions.cs @@ -0,0 +1,35 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Synapse.Resources; + +/// +/// Represents the options used to configure the cleanup behavior of a Synapse Operator +/// +[DataContract] +public record OperatorCleanupOptions +{ + + /// + /// Gets or sets the time to live for completed workflow instances. Defaults to 7 days. If null, the operator will not delete completed workflow instances. + /// + [DataMember(Order = 1, Name = "ttl"), JsonPropertyOrder(1), JsonPropertyName("ttl"), YamlMember(Order = 1, Alias = "ttl")] + public TimeSpan? Ttl { get; set; } = TimeSpan.FromDays(7); + + /// + /// Gets or sets the interval at which the operator sweeps for completed workflow instances to delete. Defaults to 5 minutes. + /// + [DataMember(Order = 2, Name = "interval"), JsonPropertyOrder(2), JsonPropertyName("interval"), YamlMember(Order = 2, Alias = "interval")] + public TimeSpan Interval { get; set; } = TimeSpan.FromMinutes(5); + +} diff --git a/src/core/Synapse.Core/Resources/OperatorSpec.cs b/src/core/Synapse.Core/Resources/OperatorSpec.cs index fc420dbf..3474c5b4 100644 --- a/src/core/Synapse.Core/Resources/OperatorSpec.cs +++ b/src/core/Synapse.Core/Resources/OperatorSpec.cs @@ -36,4 +36,10 @@ public record OperatorSpec [DataMember(Order = 2, Name = "selector"), JsonPropertyOrder(2), JsonPropertyName("selector"), YamlMember(Order = 2, Alias = "selector")] public virtual IDictionary? Selector { get; set; } + /// + /// Gets/sets options controlling retention of completed workflow instances and cleanup sweep behavior. + /// + [DataMember(Order = 3, Name = "cleanup"), JsonPropertyOrder(3), JsonPropertyName("cleanup"), YamlMember(Order = 3, Alias = "cleanup")] + public virtual OperatorCleanupOptions? Cleanup { get; set; } = new(); + } diff --git a/src/core/Synapse.Core/Resources/RunnerConfiguration.cs b/src/core/Synapse.Core/Resources/RunnerConfiguration.cs index f5056264..bc537acf 100644 --- a/src/core/Synapse.Core/Resources/RunnerConfiguration.cs +++ b/src/core/Synapse.Core/Resources/RunnerConfiguration.cs @@ -55,4 +55,5 @@ public record RunnerConfiguration /// [DataMember(Order = 5, Name = "publishLifecycleEvents"), JsonPropertyOrder(5), JsonPropertyName("publishLifecycleEvents"), YamlMember(Order = 5, Alias = "publishLifecycleEvents")] public virtual bool? PublishLifecycleEvents { get; set; } = true; + } \ No newline at end of file diff --git a/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs b/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs index b4c28903..8f8b8b83 100644 --- a/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs +++ b/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs @@ -11,7 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -using Neuroglia.Data.Infrastructure.ResourceOriented; using Neuroglia.Data.Infrastructure.Services; namespace Synapse.Operator.Services; @@ -61,6 +60,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) await base.StartAsync(cancellationToken).ConfigureAwait(false); this.Operator!.Select(b => b.Resource.Spec.Selector).SubscribeAsync(this.OnResourceSelectorChangedAsync, cancellationToken: cancellationToken); await this.OnResourceSelectorChangedAsync(this.Operator!.Resource.Spec.Selector).ConfigureAwait(false); + if (this.Operator?.Resource?.Spec?.Cleanup != null)_ = Task.Run(this.CleanupAsync, CancellationTokenSource.Token); } /// @@ -70,6 +70,59 @@ protected override Task ReconcileAsync(CancellationToken cancellationToken = def return base.ReconcileAsync(cancellationToken); } + /// + protected virtual async Task CleanupAsync() + { + while (!CancellationTokenSource.IsCancellationRequested) + { + try + { + if (this.Operator?.Resource?.Spec?.Cleanup == null) break; + var cutoff = DateTimeOffset.UtcNow - this.Operator?.Resource?.Spec?.Cleanup.Ttl; + var deleted = 0; + var selectors = this.Options.LabelSelectors; + + await foreach (var instance in this.Repository.GetAllAsync(labelSelectors: selectors, cancellationToken: CancellationTokenSource.Token)) + { + if (Handlers.ContainsKey(instance.GetQualifiedName())) continue; + if (instance.IsOperative) continue; + var finishedAt = instance.Status?.EndedAt ?? instance.Metadata.CreationTimestamp; + if (finishedAt <= cutoff && !this.Handlers.ContainsKey(instance.GetQualifiedName())) + { + try { await this.TryReleaseAsync(instance, CancellationTokenSource.Token).ConfigureAwait(false); } catch { } + try + { + await this.Repository.RemoveAsync(instance.GetName(), instance.GetNamespace(), false, CancellationTokenSource.Token).ConfigureAwait(false); + deleted++; + } + catch (Exception ex) + { + Logger.LogWarning(ex, "Failed to delete expired workflow instance {instance}", instance.GetQualifiedName()); + } + } + } + } + catch (OperationCanceledException) when (CancellationTokenSource.Token.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + this.Logger.LogError(ex, "Instance cleanup sweep failed"); + try { await Task.Delay(TimeSpan.FromSeconds(5), CancellationTokenSource.Token).ConfigureAwait(false); } catch { } + } + try + { + var delay = this.Operator?.Resource?.Spec?.Cleanup?.Interval ?? TimeSpan.FromMinutes(5); + await Task.Delay(delay, CancellationTokenSource.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) when (CancellationTokenSource.IsCancellationRequested) + { + break; + } + } + } + /// /// Creates a new for the specified workflow /// diff --git a/src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs b/src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs index d0de68eb..a556834f 100644 --- a/src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs +++ b/src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs @@ -12,7 +12,6 @@ // limitations under the License. using Neuroglia.Data.Infrastructure.Services; -using Neuroglia.Mediation; using System.Text; namespace Synapse.Operator.Services;