Skip to content

Commit 6d4a283

Browse files
Fix error handling.
1 parent 5c4b999 commit 6d4a283

10 files changed

+343
-34
lines changed

Directory.Build.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
<PackageProjectUrl>https://github.com/squidex/squidex</PackageProjectUrl>
1212
<PublishRepositoryUrl>true</PublishRepositoryUrl>
1313
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
14-
<Version>7.24.0</Version>
14+
<Version>7.25.0</Version>
1515
</PropertyGroup>
1616

1717
<PropertyGroup Condition="'$(GITHUB_ACTIONS)' == 'true'">
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
{
2+
InstanceId: Guid_1,
3+
OwnerId: Guid_2,
4+
DefinitionId: Guid_3,
5+
Definition: {
6+
InitialStepId: StepId1,
7+
Steps: {
8+
StepId1: {
9+
IgnoreError: false,
10+
Step: {}
11+
}
12+
}
13+
},
14+
Context: {},
15+
Description: ,
16+
ScheduleKey: ,
17+
SchedulePartition: 75,
18+
Steps: {
19+
StepId1: {
20+
Status: Failed,
21+
IsPrepared: false,
22+
Attempts: [
23+
{
24+
Started: DateTimeOffset_1,
25+
Completed: DateTimeOffset_1,
26+
Error: Step Error
27+
}
28+
]
29+
}
30+
},
31+
Created: DateTimeOffset_1,
32+
Completed: DateTimeOffset_1,
33+
Expires: DateTimeOffset_2,
34+
Status: Failed
35+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
{
2+
InstanceId: Guid_1,
3+
OwnerId: Guid_2,
4+
DefinitionId: Guid_3,
5+
Definition: {
6+
InitialStepId: StepId1,
7+
Steps: {
8+
StepId1: {
9+
IgnoreError: false,
10+
Step: {}
11+
}
12+
}
13+
},
14+
Context: {},
15+
Description: ,
16+
ScheduleKey: ,
17+
SchedulePartition: 75,
18+
Steps: {
19+
StepId1: {
20+
Status: Failed,
21+
IsPrepared: true,
22+
Attempts: [
23+
{
24+
Started: DateTimeOffset_1,
25+
Completed: DateTimeOffset_1,
26+
Error: Step Error
27+
},
28+
{
29+
Started: DateTimeOffset_1,
30+
Completed: DateTimeOffset_1,
31+
Error: Step Error
32+
},
33+
{
34+
Started: DateTimeOffset_1,
35+
Completed: DateTimeOffset_1,
36+
Error: Step Error
37+
},
38+
{
39+
Started: DateTimeOffset_1,
40+
Completed: DateTimeOffset_1,
41+
Error: Step Error
42+
},
43+
{
44+
Started: DateTimeOffset_1,
45+
Completed: DateTimeOffset_1,
46+
Error: Step Error
47+
}
48+
]
49+
}
50+
},
51+
Created: DateTimeOffset_1,
52+
Completed: DateTimeOffset_1,
53+
Expires: DateTimeOffset_2,
54+
Status: Failed
55+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
{
2+
InstanceId: Guid_1,
3+
OwnerId: Guid_2,
4+
DefinitionId: Guid_3,
5+
Definition: {
6+
InitialStepId: StepId1,
7+
Steps: {
8+
StepId1: {
9+
NextStepId: StepId2,
10+
IgnoreError: false,
11+
Step: {}
12+
},
13+
StepId2: {
14+
IgnoreError: false,
15+
Step: {}
16+
}
17+
}
18+
},
19+
Context: {},
20+
Description: ,
21+
ScheduleKey: ,
22+
SchedulePartition: 75,
23+
Steps: {
24+
StepId1: {
25+
Status: Completed,
26+
IsPrepared: true,
27+
Attempts: [
28+
{
29+
Started: DateTimeOffset_1,
30+
Completed: DateTimeOffset_1
31+
}
32+
]
33+
}
34+
},
35+
Created: DateTimeOffset_1,
36+
Completed: DateTimeOffset_1,
37+
Expires: DateTimeOffset_2,
38+
Status: Completed
39+
}

flows/Squidex.Flows.Tests/DefaultFlowExecutorTests.cs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,38 @@ public async Task Should_execute_first_step_with_error()
153153
await Verify(state).AddNamedGuid(stepId1, "StepId1");
154154
}
155155

156+
[Fact]
157+
public async Task Should_abort_flow_when_preparation_fails()
158+
{
159+
var step1 = A.Fake<FlowStep>();
160+
161+
A.CallTo(() => step1.PrepareAsync(A<FlowExecutionContext>._, A<CancellationToken>._))
162+
.Throws(new InvalidOperationException("Step Error"));
163+
164+
var state = sut.CreateState(
165+
new CreateFlowInstanceRequest<TestFlowContext>
166+
{
167+
Context = new TestFlowContext(),
168+
DefinitionId = Guid.NewGuid().ToString(),
169+
Definition = new FlowDefinition
170+
{
171+
Steps = new Dictionary<Guid, FlowStepDefinition>
172+
{
173+
[stepId1] = new FlowStepDefinition { Step = step1 },
174+
},
175+
InitialStepId = stepId1,
176+
},
177+
OwnerId = Guid.NewGuid().ToString(),
178+
});
179+
180+
await sut.ExecuteAsync(state, default);
181+
182+
AssertPrepared(step1, 1);
183+
AssertExecuted(step1, 0);
184+
185+
await Verify(state).AddNamedGuid(stepId1, "StepId1");
186+
}
187+
156188
[Fact]
157189
public async Task Should_execute_first_step_again_after_error()
158190
{
@@ -226,7 +258,7 @@ public async Task Should_execute_first_step_again_after_attempts_exeeded()
226258
}
227259

228260
[Fact]
229-
public async Task Should_execute_too_end_when_simulating()
261+
public async Task Should_execute_to_end_when_simulating()
230262
{
231263
var step1 = A.Fake<FlowStep>();
232264

@@ -338,7 +370,7 @@ public async Task Should_execute_next_step_after_ignored_error()
338370
}
339371

340372
[Fact]
341-
public async Task Should_not_execute_next_step_if_steps_completes_flow()
373+
public async Task Should_not_execute_next_step_if_step_completes_flow()
342374
{
343375
var step1 = A.Fake<FlowStep>();
344376
var step2 = A.Fake<FlowStep>();
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// ==========================================================================
2+
// Squidex Headless CMS
3+
// ==========================================================================
4+
// Copyright (c) Squidex UG (haftungsbeschraenkt)
5+
// All rights reserved. Licensed under the MIT license.
6+
// ==========================================================================
7+
8+
using System.Threading.Channels;
9+
using Squidex.Flows.Internal.Execution.Utils;
10+
11+
namespace Squidex.Flows;
12+
13+
public class PartionedSchedulerTests
14+
{
15+
[Fact]
16+
public async Task Should_handle_events_in_order_for_same_key()
17+
{
18+
var counter = 0;
19+
20+
var sut = new PartitionedScheduler<int>(async (_, ct) =>
21+
{
22+
var x = counter;
23+
await Task.Delay(1, ct);
24+
counter = x + 1;
25+
}, 32, 2);
26+
27+
for (var i = 0; i < 100; i++)
28+
{
29+
await sut.ScheduleAsync(0, i);
30+
}
31+
32+
await sut.CompleteAsync();
33+
34+
Assert.Equal(100, counter);
35+
}
36+
37+
[Fact]
38+
public async Task Should_not_handle_events_in_order_for_different_keys()
39+
{
40+
var counter = 0;
41+
42+
var sut = new PartitionedScheduler<int>(async (_, ct) =>
43+
{
44+
var x = counter;
45+
await Task.Delay(1, ct);
46+
counter = x + 1;
47+
}, 32, 2);
48+
49+
for (var i = 0; i < 500; i++)
50+
{
51+
await sut.ScheduleAsync(i, i);
52+
}
53+
54+
await sut.CompleteAsync();
55+
56+
Assert.True(counter < 500);
57+
}
58+
59+
[Fact]
60+
public async Task Should_not_swallow_exception()
61+
{
62+
var sut = new PartitionedScheduler<int>((_, ct) =>
63+
{
64+
throw new InvalidOperationException();
65+
}, 32, 2);
66+
67+
var ex = await Assert.ThrowsAsync<ChannelClosedException>(async () => await ScheduleUntilFailed(sut, 0, 0));
68+
Assert.IsType<InvalidOperationException>(ex.InnerException);
69+
}
70+
71+
[Fact]
72+
public async Task Should_not_swallow_exception_from_other_partition()
73+
{
74+
var sut = new PartitionedScheduler<int>((_, ct) =>
75+
{
76+
throw new InvalidOperationException();
77+
}, 32, 2);
78+
79+
var ex1 = await Assert.ThrowsAsync<ChannelClosedException>(async () => await ScheduleUntilFailed(sut, 1, 1));
80+
var ex2 = await Assert.ThrowsAsync<ChannelClosedException>(async () => await sut.ScheduleAsync(0, 0));
81+
82+
Assert.IsType<InvalidOperationException>(ex1.InnerException);
83+
Assert.IsType<InvalidOperationException>(ex2.InnerException);
84+
}
85+
86+
private static async Task ScheduleUntilFailed<T>(PartitionedScheduler<T> sut, object key, T value)
87+
{
88+
for (var i = 0; i < 100; i++)
89+
{
90+
await sut.ScheduleAsync(key, value);
91+
}
92+
}
93+
}

flows/Squidex.Flows/FlowOptions.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ public sealed class FlowOptions : IValidatableOptions
2626

2727
public int NumPartitions { get; set; } = 120;
2828

29+
public int BufferSizePerWorker { get; set; } = 2;
30+
2931
public int WorkerIndex { get; set; }
3032

3133
public Func<Exception, bool>? IsSafeException { get; set; } = _ => true;

flows/Squidex.Flows/Internal/Execution/DefaultFlowExecutor.cs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -181,20 +181,29 @@ public async Task ExecuteAsync(FlowExecutionState<TContext> state,
181181
{
182182
ArgumentNullException.ThrowIfNull(nameof(state));
183183

184-
while (true)
184+
try
185185
{
186-
if (state.Status is FlowExecutionStatus.Completed or FlowExecutionStatus.Failed)
186+
while (true)
187187
{
188-
break;
189-
}
188+
if (state.Status is FlowExecutionStatus.Completed or FlowExecutionStatus.Failed)
189+
{
190+
break;
191+
}
190192

191-
var now = Clock.GetCurrentInstant();
192-
if (state.NextRun > now)
193-
{
194-
break;
195-
}
193+
var now = Clock.GetCurrentInstant();
194+
if (state.NextRun > now)
195+
{
196+
break;
197+
}
196198

197-
await ExecuteCoreAsync(state, false, DefaultTimeout, ct);
199+
await ExecuteCoreAsync(state, false, DefaultTimeout, ct);
200+
}
201+
}
202+
catch
203+
{
204+
// This method should never fail, when used properly.
205+
state.Failed(Clock.GetCurrentInstant());
206+
throw;
198207
}
199208
}
200209

@@ -246,9 +255,11 @@ void Log(string message, string? dump)
246255
throw new InvalidOperationException("Flow step has already been completed.");
247256
}
248257

249-
await PrepareAsync(executionContext, stepDefinition.Step, stepState, cts.Token);
250258
try
251259
{
260+
// Ensure that we also catch errors in the preparation.
261+
await PrepareAsync(executionContext, stepDefinition.Step, stepState, cts.Token);
262+
252263
stepState.Status = FlowExecutionStatus.Running;
253264

254265
var result = await pipeline(executionContext, ct) ??
@@ -296,7 +307,12 @@ private void HandleError(
296307
// Ensure to take the time after the step execution and preparation.
297308
var now = Clock.GetCurrentInstant();
298309

299-
if (stepDefinition.IgnoreError)
310+
if (!stepState.IsPrepared)
311+
{
312+
state.Failed(now);
313+
stepState.Status = FlowExecutionStatus.Failed;
314+
}
315+
else if (stepDefinition.IgnoreError)
300316
{
301317
var nextId = state.GetNextStep(stepDefinition, default);
302318
if (nextId != null)

0 commit comments

Comments
 (0)