Skip to content

Commit 7005353

Browse files
committed
Demonstrate that #1038 is fixed
Fixes #1038
1 parent 137abc9 commit 7005353

File tree

1 file changed

+110
-0
lines changed

1 file changed

+110
-0
lines changed
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2020 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System;
33+
using System.Threading;
34+
using System.Threading.Tasks;
35+
using RabbitMQ.Client;
36+
using RabbitMQ.Client.Events;
37+
using Xunit;
38+
using Xunit.Abstractions;
39+
40+
namespace Test.Integration
41+
{
42+
public class TestAsyncEventingBasicConsumer : IntegrationFixture
43+
{
44+
private readonly CancellationTokenSource _cts = new CancellationTokenSource(ShortSpan);
45+
private readonly CancellationTokenRegistration _ctr;
46+
private readonly TaskCompletionSource<bool> _onCallbackExceptionTcs =
47+
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
48+
private readonly TaskCompletionSource<bool> _onReceivedTcs =
49+
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
50+
51+
public TestAsyncEventingBasicConsumer(ITestOutputHelper output)
52+
: base(output, dispatchConsumersAsync: true, consumerDispatchConcurrency: 2)
53+
{
54+
_ctr = _cts.Token.Register(OnTokenCanceled);
55+
}
56+
57+
public override Task DisposeAsync()
58+
{
59+
_ctr.Dispose();
60+
_cts.Dispose();
61+
return base.DisposeAsync();
62+
}
63+
64+
private void OnTokenCanceled()
65+
{
66+
_onCallbackExceptionTcs.TrySetCanceled();
67+
_onReceivedTcs.TrySetCanceled();
68+
}
69+
70+
private void ConsumerChannelOnCallbackException(object sender, CallbackExceptionEventArgs e)
71+
{
72+
_onCallbackExceptionTcs.TrySetResult(true);
73+
}
74+
75+
private Task AsyncConsumerOnReceived(object sender, BasicDeliverEventArgs @event)
76+
{
77+
_onReceivedTcs.TrySetResult(true);
78+
throw new Exception("from async subscriber");
79+
}
80+
81+
[Fact]
82+
public async Task TestAsyncEventingBasicConsumer_GH1038()
83+
{
84+
string exchangeName = GenerateExchangeName();
85+
string queueName = GenerateQueueName();
86+
string routingKey = string.Empty;
87+
88+
await _channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct);
89+
await _channel.QueueDeclareAsync(queueName, false, false, true, null);
90+
await _channel.QueueBindAsync(queueName, exchangeName, routingKey, null);
91+
92+
_channel.CallbackException += ConsumerChannelOnCallbackException;
93+
94+
//async subscriber
95+
var consumer = new AsyncEventingBasicConsumer(_channel);
96+
consumer.Received += AsyncConsumerOnReceived;
97+
await _channel.BasicConsumeAsync(queueName, false, consumer);
98+
99+
//publisher
100+
using IChannel publisherChannel = await _conn.CreateChannelAsync();
101+
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
102+
var props = new BasicProperties();
103+
await publisherChannel.BasicPublishAsync(exchangeName, "", props, messageBodyBytes);
104+
105+
await Task.WhenAll(_onReceivedTcs.Task, _onCallbackExceptionTcs.Task);
106+
Assert.True(await _onReceivedTcs.Task);
107+
Assert.True(await _onCallbackExceptionTcs.Task);
108+
}
109+
}
110+
}

0 commit comments

Comments
 (0)