|
36 | 36 | using System.Threading.Tasks;
|
37 | 37 | using Integration;
|
38 | 38 | using RabbitMQ.Client;
|
| 39 | +using RabbitMQ.Client.Events; |
39 | 40 | using RabbitMQ.Client.Exceptions;
|
40 | 41 | using Toxiproxy.Net.Toxics;
|
41 | 42 | using Xunit;
|
@@ -409,6 +410,47 @@ public async Task TestPublisherConfirmationThrottling()
|
409 | 410 | Assert.Equal(TotalMessageCount, publishCount);
|
410 | 411 | }
|
411 | 412 |
|
| 413 | + [SkippableFact] |
| 414 | + [Trait("Category", "Toxiproxy")] |
| 415 | + public async Task TestBasicConsumeCancellation_GH1750() |
| 416 | + { |
| 417 | + Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); |
| 418 | + Assert.NotNull(_toxiproxyManager); |
| 419 | + Assert.Null(_conn); |
| 420 | + Assert.Null(_channel); |
| 421 | + |
| 422 | + ConnectionFactory cf = CreateConnectionFactory(); |
| 423 | + cf.AutomaticRecoveryEnabled = false; |
| 424 | + cf.TopologyRecoveryEnabled = false; |
| 425 | + |
| 426 | + cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), _proxyPort); |
| 427 | + _conn = await cf.CreateConnectionAsync(); |
| 428 | + _channel = await _conn.CreateChannelAsync(); |
| 429 | + |
| 430 | + QueueDeclareOk q = await _channel.QueueDeclareAsync(); |
| 431 | + |
| 432 | + var consumer = new AsyncEventingBasicConsumer(_channel); |
| 433 | + consumer.ReceivedAsync += (o, a) => |
| 434 | + { |
| 435 | + return Task.CompletedTask; |
| 436 | + }; |
| 437 | + |
| 438 | + string toxicName = $"rmq-localhost-delay-{Now}-{GenerateShortUuid()}"; |
| 439 | + var latencyToxic = new LatencyToxic |
| 440 | + { |
| 441 | + Name = toxicName |
| 442 | + }; |
| 443 | + latencyToxic.Attributes.Latency = 500; |
| 444 | + latencyToxic.Toxicity = 1.0; |
| 445 | + latencyToxic.Stream = ToxicDirection.DownStream; |
| 446 | + |
| 447 | + Task<LatencyToxic> addToxicTask = _toxiproxyManager.AddToxicAsync(latencyToxic); |
| 448 | + await addToxicTask.WaitAsync(WaitSpan); |
| 449 | + |
| 450 | + using var cts = new CancellationTokenSource(5); |
| 451 | + await _channel.BasicConsumeAsync(q.QueueName, true, consumer, cts.Token); |
| 452 | + } |
| 453 | + |
412 | 454 | private bool AreToxiproxyTestsEnabled
|
413 | 455 | {
|
414 | 456 | get
|
|
0 commit comments