Skip to content

Commit 592e5d0

Browse files
committed
remove static wait and switch to thread subscription
1 parent a38724e commit 592e5d0

File tree

1 file changed

+55
-22
lines changed

1 file changed

+55
-22
lines changed

FlowableExternalWorkerClient.Tests/ExternalWorkerClientTests.cs

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,44 @@ public class ExternalWorkerClientTests
1515

1616
private string FixturesPath => Path.Combine(AppContext.BaseDirectory, "Fixtures");
1717

18-
private ExternalWorkerAcquireJobResponse ConsumeOneCall(
18+
/// <summary>
19+
/// Subscribes to a topic, waits for a single job to be processed (including the
20+
/// work result's Execute call), and returns the job. Uses an ExecutionAwareWorkResult
21+
/// wrapper to properly await the subscription's async work, avoiding static delays.
22+
/// </summary>
23+
private async Task<ExternalWorkerAcquireJobResponse> ConsumeOneCallAsync(
1924
ExternalWorkerClient client,
2025
Func<ExternalWorkerAcquireJobResponse, IWorkResultBuilder, IWorkResult?> callbackHandler,
2126
string topic = "myTopic")
2227
{
2328
ExternalWorkerAcquireJobResponse? resultJob = null;
24-
var completed = false;
29+
var executionCompleted = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
2530

2631
var handler = new DelegateCallbackHandler((job, work) =>
2732
{
2833
var result = callbackHandler(job, work);
2934
resultJob = job;
30-
completed = true;
31-
return result;
35+
if (result == null)
36+
{
37+
executionCompleted.TrySetResult(true);
38+
return null;
39+
}
40+
return new ExecutionAwareWorkResult(result, executionCompleted);
3241
});
3342

3443
var subscription = client.Subscribe(topic, handler, waitPeriodSeconds: 1);
3544

36-
var attempts = 0;
37-
while (attempts < 50 && !completed)
45+
try
46+
{
47+
await executionCompleted.Task.WaitAsync(TimeSpan.FromSeconds(10));
48+
}
49+
catch (TimeoutException)
3850
{
39-
Thread.Sleep(200);
40-
attempts++;
51+
subscription.Unsubscribe();
52+
Assert.Fail("Callback was not invoked or execution did not complete within the timeout period");
4153
}
4254

4355
subscription.Unsubscribe();
44-
Assert.True(completed, "Callback was not invoked within the timeout period");
4556
return resultJob!;
4657
}
4758

@@ -59,7 +70,7 @@ public async Task TestConsume()
5970
var processInstanceId = await BpmnUtils.StartProcess(httpClient, BaseUrl, definitionId);
6071
try
6172
{
62-
ConsumeOneCall(client, (job, work) => work.Success());
73+
await ConsumeOneCallAsync(client, (job, work) => work.Success());
6374

6475
var activityIds = await BpmnUtils.ExecutedActivityIds(httpClient, BaseUrl, processInstanceId);
6576
Assert.Equal(
@@ -90,17 +101,14 @@ public async Task TestConsumeWithFailure()
90101
var processInstanceId = await BpmnUtils.StartProcess(httpClient, BaseUrl, definitionId);
91102
try
92103
{
93-
ConsumeOneCall(client, (job, work) =>
104+
await ConsumeOneCallAsync(client, (job, work) =>
94105
work.Failure()
95106
.ErrorMessage("Test Error Message")
96107
.ErrorDetails("Some error details")
97108
.Retries(3)
98109
.RetryTimeout("PT5M")
99110
);
100111

101-
// Give the server a moment to process
102-
if (!cassette.IsReplaying) await Task.Delay(500);
103-
104112
var activityIds = await BpmnUtils.ExecutedActivityIds(httpClient, BaseUrl, processInstanceId);
105113
Assert.Equal(
106114
new List<string> { "bpmnSequenceFlow_2", "bpmnTask_3", "startnoneevent1" },
@@ -128,15 +136,12 @@ public async Task TestConsumeWithCmmnTerminate()
128136
var caseInstanceId = await CmmnUtils.StartCase(httpClient, BaseUrl, definitionId);
129137
try
130138
{
131-
ConsumeOneCall(
139+
await ConsumeOneCallAsync(
132140
client,
133141
(job, work) => work.CmmnTerminate().Variable("testVar", "test terminate"),
134142
topic: "cmmnTopic"
135143
);
136144

137-
// Give the server a moment to process
138-
if (!cassette.IsReplaying) await Task.Delay(500);
139-
140145
var variable = await CmmnUtils.GetCaseVariable(httpClient, BaseUrl, caseInstanceId, "testVar");
141146
Assert.NotNull(variable);
142147
Assert.Equal("string", variable.Value.GetProperty("type").GetString());
@@ -163,15 +168,12 @@ public async Task TestSubscribeWithBpmnError()
163168
var processInstanceId = await BpmnUtils.StartProcess(httpClient, BaseUrl, definitionId);
164169
try
165170
{
166-
ConsumeOneCall(client, (job, work) =>
171+
await ConsumeOneCallAsync(client, (job, work) =>
167172
work.BpmnError()
168173
.Variable("testVar", "test failure")
169174
.ErrorCode("errorCode1")
170175
);
171176

172-
// Give the server a moment to process
173-
if (!cassette.IsReplaying) await Task.Delay(500);
174-
175177
var variable =
176178
await BpmnUtils.GetProcessVariable(httpClient, BaseUrl, processInstanceId, "testVar");
177179
Assert.NotNull(variable);
@@ -209,4 +211,35 @@ public DelegateCallbackHandler(
209211
return _handler(job, work);
210212
}
211213
}
214+
215+
/// <summary>
216+
/// Wraps an IWorkResult to signal a TaskCompletionSource when Execute completes.
217+
/// This lets ConsumeOneCallAsync wait for the subscription's async work to finish
218+
/// instead of using a static delay.
219+
/// </summary>
220+
private class ExecutionAwareWorkResult : IWorkResult
221+
{
222+
private readonly IWorkResult _inner;
223+
private readonly TaskCompletionSource<bool> _tcs;
224+
225+
public ExecutionAwareWorkResult(IWorkResult inner, TaskCompletionSource<bool> tcs)
226+
{
227+
_inner = inner;
228+
_tcs = tcs;
229+
}
230+
231+
public async Task Execute(IFlowableExternalWorkerRestClient client)
232+
{
233+
try
234+
{
235+
await _inner.Execute(client);
236+
_tcs.TrySetResult(true);
237+
}
238+
catch (Exception ex)
239+
{
240+
_tcs.TrySetException(ex);
241+
throw;
242+
}
243+
}
244+
}
212245
}

0 commit comments

Comments
 (0)