Skip to content

Commit 2aec0cc

Browse files
committed
fix(Operator): Fixed the WorkflowController to label correlations created by a workflow schedule
fix(Operator): Fixed the WorkflowController/WorkflowInstanceController to delete correlations owned by a deleted workflow/workflow instance fix(Runner): Fixed the WorkflowExecutionContext to label correlations created by a workflow instance
1 parent eed78c3 commit 2aec0cc

File tree

5 files changed

+31
-2
lines changed

5 files changed

+31
-2
lines changed

src/core/Synapse.Core/SynapseDefaults.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,11 @@ public static class Labels
360360
/// Gets the label used by Synapse to indicate the version of the workflow used by a workflow instance
361361
/// </summary>
362362
public const string WorkflowVersion = Prefix + "workflow/version";
363+
/// <summary>
364+
/// Gets the label used by Synapse to indicate the qualified name of the workflow instance that owns a concept, such as a correlation
365+
/// </summary>
366+
public const string WorkflowInstance = Prefix + "workflow-instance";
367+
363368
}
364369

365370
}

src/operator/Synapse.Operator/Services/WorkflowController.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,18 @@ protected override async Task OnResourceDeletedAsync(Workflow workflow, Cancella
163163
{
164164
await base.OnResourceDeletedAsync(workflow, cancellationToken).ConfigureAwait(false);
165165
if (this.Schedulers.TryRemove(workflow.GetQualifiedName(), out var scheduler)) await scheduler.DisposeAsync().ConfigureAwait(false);
166-
await foreach(var instance in this.Repository.GetAllAsync<WorkflowInstance>(cancellationToken: cancellationToken))
166+
var selectors = new LabelSelector[]
167+
{
168+
new(SynapseDefaults.Resources.Labels.Workflow, LabelSelectionOperator.Equals, workflow.GetQualifiedName())
169+
};
170+
await foreach(var instance in this.Repository.GetAllAsync<WorkflowInstance>(null, selectors, cancellationToken: cancellationToken))
167171
{
168172
await this.Repository.RemoveAsync<WorkflowInstance>(instance.GetName(), instance.GetNamespace(), false, cancellationToken).ConfigureAwait(false);
169173
}
174+
await foreach (var correlation in this.Repository.GetAllAsync<Correlation>(null, selectors, cancellationToken: cancellationToken))
175+
{
176+
await this.Repository.RemoveAsync<Correlation>(correlation.GetName(), correlation.GetNamespace(), false, cancellationToken).ConfigureAwait(false);
177+
}
170178
}
171179

172180
/// <summary>

src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,14 @@ protected override async Task OnResourceDeletedAsync(WorkflowInstance workflowIn
150150
{
151151
await base.OnResourceDeletedAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
152152
if (this.Handlers.TryRemove(workflowInstance.GetQualifiedName(), out var process)) await process.DisposeAsync().ConfigureAwait(false);
153+
var selectors = new LabelSelector[]
154+
{
155+
new(SynapseDefaults.Resources.Labels.WorkflowInstance, LabelSelectionOperator.Equals, workflowInstance.GetQualifiedName())
156+
};
157+
await foreach (var correlation in this.Repository.GetAllAsync<Correlation>(null, selectors, cancellationToken: cancellationToken))
158+
{
159+
await this.Repository.RemoveAsync<Correlation>(correlation.GetName(), correlation.GetNamespace(), false, cancellationToken).ConfigureAwait(false);
160+
}
153161
}
154162

155163
/// <summary>

src/operator/Synapse.Operator/Services/WorkflowScheduler.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ public virtual async Task ScheduleAsync(CancellationToken cancellationToken = de
7878
{
7979
Namespace = this.Workflow.Resource.GetNamespace(),
8080
Name = correlationName,
81+
Labels = new Dictionary<string, string>()
82+
{
83+
{ SynapseDefaults.Resources.Labels.Workflow, this.Workflow.Resource.GetQualifiedName() }
84+
}
8185
},
8286
Spec = new()
8387
{

src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,11 @@ public virtual async Task<CorrelationContext> CorrelateAsync(ITaskExecutionConte
235235
Metadata = new()
236236
{
237237
Namespace = @namespace,
238-
Name = name
238+
Name = name,
239+
Labels = new Dictionary<string, string>()
240+
{
241+
{ SynapseDefaults.Resources.Labels.WorkflowInstance, this.Instance.GetQualifiedName() }
242+
}
239243
},
240244
Spec = new()
241245
{

0 commit comments

Comments
 (0)