Skip to content

Commit 74dbbcc

Browse files
committed
* Try to address TimeoutException by using WhenTcsCompetes
1 parent b199718 commit 74dbbcc

File tree

1 file changed

+31
-16
lines changed

1 file changed

+31
-16
lines changed

Tests/Consumer/ConsumerOutcomeTests.cs

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -125,20 +125,28 @@ public async Task RequeuedMessageWithAnnotationShouldContainAnnotationsOnRedeliv
125125
IConsumer consumer = await _connection.ConsumerBuilder().MessageHandler(
126126
(context, message) =>
127127
{
128-
messages.Add(message);
129-
if (requeueCount == 0)
128+
try
130129
{
131-
requeueCount++;
132-
context.Requeue(new Dictionary<string, object>
130+
messages.Add(message);
131+
if (Interlocked.Increment(ref requeueCount) == 1)
132+
{
133+
context.Requeue(new Dictionary<string, object>
134+
{
135+
{ annotationKey, annotationValue },
136+
{ annotationKey1, annotationValue1 }
137+
});
138+
}
139+
else
133140
{
134-
{ annotationKey, annotationValue }, { annotationKey1, annotationValue1 }
135-
});
141+
context.Accept();
142+
tcsRequeue.SetResult(true);
143+
}
136144
}
137-
else
145+
catch (Exception ex)
138146
{
139-
context.Accept();
140-
tcsRequeue.SetResult(true);
147+
tcsRequeue.SetException(ex);
141148
}
149+
142150
return Task.CompletedTask;
143151
}
144152
).Queue(_queueName).BuildAndStartAsync();
@@ -148,7 +156,7 @@ public async Task RequeuedMessageWithAnnotationShouldContainAnnotationsOnRedeliv
148156

149157
Assert.Equal(OutcomeState.Accepted, pr.Outcome.State);
150158

151-
await tcsRequeue.Task.WaitAsync(TimeSpan.FromSeconds(5));
159+
await WhenTcsCompletes(tcsRequeue);
152160

153161
Assert.Equal(2, messages.Count);
154162
Assert.Null(messages[0].Annotation(annotationKey));
@@ -158,11 +166,13 @@ public async Task RequeuedMessageWithAnnotationShouldContainAnnotationsOnRedeliv
158166
Assert.Equal(messages[1].Annotation(annotationKey), annotationValue);
159167
Assert.Equal(messages[1].Annotation(annotationKey1), annotationValue1);
160168
Assert.NotNull(messages[1].Annotation("x-delivery-count"));
161-
HttpApiClient client = new();
169+
170+
using HttpApiClient client = new();
162171
Queue q = await client.GetQueueAsync(_queueName);
163172
Assert.Equal(0, q.Messages);
164173

165174
await consumer.CloseAsync();
175+
consumer.Dispose();
166176
}
167177

168178
[Fact]
@@ -251,8 +261,8 @@ public async Task DiscardedMessageWithAnnotationsShouldBeDeadLeadLetteredAndCont
251261
TaskCompletionSource<bool> tcs =
252262
new(TaskCreationOptions.RunContinuationsAsynchronously);
253263
IPublisher publisher = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync();
254-
IConsumer consumer = await _connection.ConsumerBuilder().MessageHandler(
255-
(context, _) =>
264+
IConsumer consumer = await _connection.ConsumerBuilder()
265+
.MessageHandler((context, _) =>
256266
{
257267
context.Discard(new Dictionary<string, object> { { annotationKey, annotationValue } });
258268
tcs.SetResult(true);
@@ -263,8 +273,12 @@ public async Task DiscardedMessageWithAnnotationsShouldBeDeadLeadLetteredAndCont
263273
IMessage message = new AmqpMessage(RandomString());
264274
PublishResult pr = await publisher.PublishAsync(message);
265275
Assert.Equal(OutcomeState.Accepted, pr.Outcome.State);
266-
await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
276+
277+
await WhenTcsCompletes(tcs);
278+
267279
await consumer.CloseAsync();
280+
consumer.Dispose();
281+
268282
TaskCompletionSource<IMessage> tcsDl =
269283
new(TaskCreationOptions.RunContinuationsAsynchronously);
270284
IConsumer dlConsumer = await _connection.ConsumerBuilder()
@@ -276,18 +290,19 @@ public async Task DiscardedMessageWithAnnotationsShouldBeDeadLeadLetteredAndCont
276290
})
277291
.Queue(dlqQueueName).BuildAndStartAsync();
278292

279-
IMessage mResult = await tcsDl.Task.WaitAsync(TimeSpan.FromSeconds(5));
293+
IMessage mResult = await WhenTcsCompletes(tcsDl);
280294
Assert.NotNull(mResult);
281295
Assert.Equal(mResult.Annotation(annotationKey), annotationValue);
282296

283-
var client = new HttpApiClient();
297+
using HttpApiClient client = new();
284298
Queue q = await client.GetQueueAsync(_queueName);
285299
Assert.Equal(0, q.Messages);
286300

287301
Queue q1 = await client.GetQueueAsync(dlqQueueName);
288302
Assert.Equal(0, q1.Messages);
289303

290304
await dlConsumer.CloseAsync();
305+
dlConsumer.Dispose();
291306
}
292307

293308
private async Task DeclareDeadLetterTopology(string queueName, string dlxQueueName)

0 commit comments

Comments
 (0)