Skip to content

Commit a6cfa91

Browse files
committed
* More improvements to TestBasicRoundtripConcurrentManyMessages
1 parent 6d6210c commit a6cfa91

File tree

1 file changed

+12
-7
lines changed

1 file changed

+12
-7
lines changed

projects/Test/Integration/TestAsyncConsumer.cs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
159159
{
160160
if (args.Initiator == ShutdownInitiator.Peer)
161161
{
162-
publish1SyncSource.TrySetResult(false);
163-
publish2SyncSource.TrySetResult(false);
162+
publish1SyncSource.TrySetException(ea.Exception);
163+
publish2SyncSource.TrySetException(ea.Exception);
164164
}
165165
});
166166
};
@@ -171,8 +171,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
171171
{
172172
if (args.Initiator == ShutdownInitiator.Peer)
173173
{
174-
publish1SyncSource.TrySetResult(false);
175-
publish2SyncSource.TrySetResult(false);
174+
publish1SyncSource.TrySetException(ea.Exception);
175+
publish2SyncSource.TrySetException(ea.Exception);
176176
}
177177
});
178178
};
@@ -209,25 +209,30 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
209209
int publish1_count = 0;
210210
int publish2_count = 0;
211211

212-
consumer.Received += async (o, a) =>
212+
consumer.Received += (o, a) =>
213213
{
214214
string decoded = _encoding.GetString(a.Body.ToArray());
215215
if (decoded == publish1)
216216
{
217217
if (Interlocked.Increment(ref publish1_count) >= publish_total)
218218
{
219219
publish1SyncSource.TrySetResult(true);
220-
await publish2SyncSource.Task;
221220
}
222221
}
223222
else if (decoded == publish2)
224223
{
225224
if (Interlocked.Increment(ref publish2_count) >= publish_total)
226225
{
227226
publish2SyncSource.TrySetResult(true);
228-
await publish1SyncSource.Task;
229227
}
230228
}
229+
else
230+
{
231+
var ex = new InvalidOperationException("incorrect message - should never happen!");
232+
publish1SyncSource.TrySetException(ex);
233+
publish2SyncSource.TrySetException(ex);
234+
}
235+
return Task.CompletedTask;
231236
};
232237

233238
await consumeChannel.BasicConsumeAsync(queueName, true, string.Empty, false, false, null, consumer);

0 commit comments

Comments
 (0)