Skip to content

Commit 12ef2ce

Browse files
authored
Fix consumers reconnection (#1060)
* Fix consumers reconnection * Increase timeout * Decrease operation timeout * Rename hostname back * Switch to disconnect via api * Do not cancel consumer * :-/
1 parent 0a39601 commit 12ef2ce

21 files changed

+194
-95
lines changed

Source/EasyNetQ.IntegrationTests/DockerProxy.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ public async Task StopContainerAsync(string name, CancellationToken token = defa
9494
await Task.WhenAll(stopTasks);
9595
}
9696

97+
public Task StopContainerByIdAsync(string id, CancellationToken token = default)
98+
{
99+
return client.Containers.StopContainerAsync(id, new ContainerStopParameters(), token);
100+
}
101+
97102
public async Task RemoveContainerAsync(string name, CancellationToken token = default)
98103
{
99104
var ids = await FindContainerIdsAsync(name).ConfigureAwait(false);
@@ -123,7 +128,7 @@ private static List<PortBinding> HostPorts(IEnumerable<string> hostPorts)
123128
return hostPorts.Select(x => new PortBinding {HostPort = x}).ToList();
124129
}
125130

126-
private async Task<IEnumerable<string>> FindContainerIdsAsync(string name)
131+
public async Task<IEnumerable<string>> FindContainerIdsAsync(string name)
127132
{
128133
var containers = await client.Containers
129134
.ListContainersAsync(new ContainersListParameters {All = true, Filters = ListFilters(name)})

Source/EasyNetQ.IntegrationTests/PubSub/When_publish_and_subscribe_polymorphic.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public class When_publish_and_subscribe_polymorphic : IDisposable
1313
{
1414
public When_publish_and_subscribe_polymorphic(RabbitMQFixture fixture)
1515
{
16-
bus = RabbitHutch.CreateBus($"host={fixture.Host};prefetchCount=1");
16+
bus = RabbitHutch.CreateBus($"host={fixture.Host};prefetchCount=1;timeout=5");
1717
}
1818

1919
public void Dispose()
@@ -28,7 +28,7 @@ public void Dispose()
2828
[Fact]
2929
public async Task Test()
3030
{
31-
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
31+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
3232

3333
var subscriptionId = Guid.NewGuid().ToString();
3434

@@ -52,12 +52,12 @@ public async Task Test()
5252
}
5353
}))
5454
{
55-
await bus.PublishBatchAsync(bunnies.Concat(rabbits), timeoutCts.Token)
55+
await bus.PublishBatchAsync(bunnies.Concat(rabbits), cts.Token)
5656
.ConfigureAwait(false);
5757

5858
await Task.WhenAll(
59-
bunniesSink.WaitAllReceivedAsync(timeoutCts.Token),
60-
rabbitsSink.WaitAllReceivedAsync(timeoutCts.Token)
59+
bunniesSink.WaitAllReceivedAsync(cts.Token),
60+
rabbitsSink.WaitAllReceivedAsync(cts.Token)
6161
).ConfigureAwait(false);
6262

6363
bunniesSink.ReceivedMessages.Should().Equal(bunnies);

Source/EasyNetQ.IntegrationTests/PubSub/When_publish_and_subscribe_with_default_options.cs

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@ namespace EasyNetQ.IntegrationTests.PubSub
1010
[Collection("RabbitMQ")]
1111
public class When_publish_and_subscribe_with_default_options : IDisposable
1212
{
13-
public When_publish_and_subscribe_with_default_options(RabbitMQFixture fixture)
13+
private readonly RabbitMQFixture rmqFixture;
14+
15+
public When_publish_and_subscribe_with_default_options(RabbitMQFixture rmqFixture)
1416
{
15-
bus = RabbitHutch.CreateBus($"host={fixture.Host};prefetchCount=1");
17+
this.rmqFixture = rmqFixture;
18+
bus = RabbitHutch.CreateBus($"host={rmqFixture.Host};prefetchCount=1;timeout=5");
1619
}
1720

1821
public void Dispose()
@@ -27,25 +30,25 @@ public void Dispose()
2730
[Fact]
2831
public async Task Should_publish_and_consume()
2932
{
30-
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
33+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
3134

3235
var subscriptionId = Guid.NewGuid().ToString();
3336
var messagesSink = new MessagesSink(MessagesCount);
3437
var messages = MessagesFactories.Create(MessagesCount);
3538

3639
using (bus.Subscribe<Message>(subscriptionId, messagesSink.Receive))
3740
{
38-
await bus.PublishBatchAsync(messages, timeoutCts.Token).ConfigureAwait(false);
41+
await bus.PublishBatchAsync(messages, cts.Token).ConfigureAwait(false);
3942

40-
await messagesSink.WaitAllReceivedAsync(timeoutCts.Token).ConfigureAwait(false);
43+
await messagesSink.WaitAllReceivedAsync(cts.Token).ConfigureAwait(false);
4144
messagesSink.ReceivedMessages.Should().Equal(messages);
4245
}
4346
}
4447

4548
[Fact]
4649
public async Task Should_publish_and_consume_with_multiple_subscription_ids()
4750
{
48-
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
51+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
4952

5053
var firstConsumerMessagesSink = new MessagesSink(MessagesCount);
5154
var secondConsumerMessagesSink = new MessagesSink(MessagesCount);
@@ -54,11 +57,11 @@ public async Task Should_publish_and_consume_with_multiple_subscription_ids()
5457
using (bus.Subscribe<Message>(Guid.NewGuid().ToString(), firstConsumerMessagesSink.Receive))
5558
using (bus.Subscribe<Message>(Guid.NewGuid().ToString(), secondConsumerMessagesSink.Receive))
5659
{
57-
await bus.PublishBatchAsync(messages, timeoutCts.Token).ConfigureAwait(false);
60+
await bus.PublishBatchAsync(messages, cts.Token).ConfigureAwait(false);
5861

5962
await Task.WhenAll(
60-
firstConsumerMessagesSink.WaitAllReceivedAsync(timeoutCts.Token),
61-
secondConsumerMessagesSink.WaitAllReceivedAsync(timeoutCts.Token)
63+
firstConsumerMessagesSink.WaitAllReceivedAsync(cts.Token),
64+
secondConsumerMessagesSink.WaitAllReceivedAsync(cts.Token)
6265
).ConfigureAwait(false);
6366

6467
firstConsumerMessagesSink.ReceivedMessages.Should().BeEquivalentTo(messages);
@@ -69,7 +72,7 @@ await Task.WhenAll(
6972
[Fact]
7073
public async Task Should_publish_and_consume_with_same_subscription_ids()
7174
{
72-
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
75+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
7376

7477
var subscriptionId = Guid.NewGuid().ToString();
7578
var messagesSink = new MessagesSink(MessagesCount);
@@ -79,11 +82,28 @@ public async Task Should_publish_and_consume_with_same_subscription_ids()
7982
using (bus.Subscribe<Message>(subscriptionId, messagesSink.Receive))
8083
using (bus.Subscribe<Message>(subscriptionId, messagesSink.Receive))
8184
{
82-
await bus.PublishBatchAsync(messages, timeoutCts.Token).ConfigureAwait(false);
85+
await bus.PublishBatchAsync(messages, cts.Token).ConfigureAwait(false);
8386

84-
await messagesSink.WaitAllReceivedAsync(timeoutCts.Token).ConfigureAwait(false);
87+
await messagesSink.WaitAllReceivedAsync(cts.Token).ConfigureAwait(false);
8588
messagesSink.ReceivedMessages.Should().BeEquivalentTo(messages);
8689
}
8790
}
91+
92+
[Fact]
93+
public async Task Should_survive_restart()
94+
{
95+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
96+
97+
var subscriptionId = Guid.NewGuid().ToString();
98+
var messagesSink = new MessagesSink(2);
99+
using (bus.Subscribe<Message>(subscriptionId, messagesSink.Receive))
100+
{
101+
var message = new Message(0);
102+
await bus.PublishAsync(message).ConfigureAwait(false);
103+
await rmqFixture.ManagementClient.KillAllConnectionsAsync(cts.Token);
104+
await bus.PublishAsync(message).ConfigureAwait(false);
105+
await messagesSink.WaitAllReceivedAsync(cts.Token).ConfigureAwait(false);
106+
}
107+
}
88108
}
89109
}

Source/EasyNetQ.IntegrationTests/PubSub/When_publish_and_subscribe_with_exclusive.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public class When_publish_and_subscribe_with_exclusive : IDisposable
1212
{
1313
public When_publish_and_subscribe_with_exclusive(RabbitMQFixture fixture)
1414
{
15-
bus = RabbitHutch.CreateBus($"host={fixture.Host};prefetchCount=1");
15+
bus = RabbitHutch.CreateBus($"host={fixture.Host};prefetchCount=1;timeout=5");
1616
}
1717

1818
public void Dispose()
@@ -27,7 +27,7 @@ public void Dispose()
2727
[Fact]
2828
public async Task Test()
2929
{
30-
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
30+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
3131

3232
var firstConsumerMessagesSink = new MessagesSink(MessagesCount);
3333
var secondConsumerMessagesSink = new MessagesSink(0);
@@ -43,7 +43,7 @@ public async Task Test()
4343
)
4444
{
4545
// To ensure that ^ subscriber started successfully
46-
await Task.Delay(TimeSpan.FromSeconds(1), timeoutCts.Token).ConfigureAwait(false);
46+
await Task.Delay(TimeSpan.FromSeconds(1), cts.Token).ConfigureAwait(false);
4747

4848
using (
4949
bus.Subscribe<Message>(
@@ -53,11 +53,11 @@ public async Task Test()
5353
)
5454
)
5555
{
56-
await bus.PublishBatchAsync(messages, timeoutCts.Token).ConfigureAwait(false);
56+
await bus.PublishBatchAsync(messages, cts.Token).ConfigureAwait(false);
5757

5858
await Task.WhenAll(
59-
firstConsumerMessagesSink.WaitAllReceivedAsync(timeoutCts.Token),
60-
secondConsumerMessagesSink.WaitAllReceivedAsync(timeoutCts.Token)
59+
firstConsumerMessagesSink.WaitAllReceivedAsync(cts.Token),
60+
secondConsumerMessagesSink.WaitAllReceivedAsync(cts.Token)
6161
).ConfigureAwait(false);
6262

6363
firstConsumerMessagesSink.ReceivedMessages.Should().Equal(messages);

Source/EasyNetQ.IntegrationTests/PubSub/When_publish_and_subscribe_with_priority.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public class When_publish_and_subscribe_with_priority : IDisposable
1313
{
1414
public When_publish_and_subscribe_with_priority(RabbitMQFixture fixture)
1515
{
16-
bus = RabbitHutch.CreateBus($"host={fixture.Host};prefetchCount=1");
16+
bus = RabbitHutch.CreateBus($"host={fixture.Host};prefetchCount=1;timeout=5");
1717
}
1818

1919
public void Dispose()
@@ -30,7 +30,7 @@ public void Dispose()
3030
[Fact]
3131
public async Task Test()
3232
{
33-
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
33+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
3434

3535
var messagesSink = new MessagesSink(MessagesCount * 2);
3636
var highPriorityMessages = MessagesFactories.Create(MessagesCount);
@@ -42,15 +42,15 @@ public async Task Test()
4242
}
4343

4444
await bus.PublishBatchAsync(
45-
lowPriorityMessages, x => x.WithPriority(LowPriority), timeoutCts.Token
45+
lowPriorityMessages, x => x.WithPriority(LowPriority), cts.Token
4646
).ConfigureAwait(false);
4747
await bus.PublishBatchAsync(
48-
highPriorityMessages, x => x.WithPriority(HighPriority), timeoutCts.Token
48+
highPriorityMessages, x => x.WithPriority(HighPriority), cts.Token
4949
).ConfigureAwait(false);
5050

5151
using (bus.Subscribe<Message>(subscriptionId, messagesSink.Receive, x => x.WithMaxPriority(2)))
5252
{
53-
await messagesSink.WaitAllReceivedAsync(timeoutCts.Token).ConfigureAwait(false);
53+
await messagesSink.WaitAllReceivedAsync(cts.Token).ConfigureAwait(false);
5454

5555
messagesSink.ReceivedMessages.Should().Equal(highPriorityMessages.Concat(lowPriorityMessages));
5656
}

Source/EasyNetQ.IntegrationTests/PubSub/When_publish_and_subscribe_with_publish_confirms.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public class When_publish_and_subscribe_with_publish_confirms : IDisposable
1212
{
1313
public When_publish_and_subscribe_with_publish_confirms(RabbitMQFixture fixture)
1414
{
15-
bus = RabbitHutch.CreateBus($"host={fixture.Host};prefetchCount=1;publisherConfirms=True");
15+
bus = RabbitHutch.CreateBus($"host={fixture.Host};prefetchCount=1;publisherConfirms=True;timeout=5");
1616
}
1717

1818
public void Dispose()
@@ -27,7 +27,7 @@ public void Dispose()
2727
[Fact]
2828
public async Task Test()
2929
{
30-
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
30+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
3131

3232
var subscriptionId = Guid.NewGuid().ToString();
3333

@@ -36,9 +36,9 @@ public async Task Test()
3636

3737
using (bus.Subscribe<Message>(subscriptionId, messagesSink.Receive))
3838
{
39-
await bus.PublishBatchAsync(messages, timeoutCts.Token).ConfigureAwait(false);
39+
await bus.PublishBatchAsync(messages, cts.Token).ConfigureAwait(false);
4040

41-
await messagesSink.WaitAllReceivedAsync(timeoutCts.Token).ConfigureAwait(false);
41+
await messagesSink.WaitAllReceivedAsync(cts.Token).ConfigureAwait(false);
4242
messagesSink.ReceivedMessages.Should().Equal(messages);
4343
}
4444
}

Source/EasyNetQ.IntegrationTests/PubSub/When_publish_and_subscribe_with_topic.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public class When_publish_and_subscribe_with_topic : IDisposable
1212
{
1313
public When_publish_and_subscribe_with_topic(RabbitMQFixture fixture)
1414
{
15-
bus = RabbitHutch.CreateBus($"host={fixture.Host};prefetchCount=1");
15+
bus = RabbitHutch.CreateBus($"host={fixture.Host};prefetchCount=1;timeout=5");
1616
}
1717

1818
public void Dispose()
@@ -27,7 +27,7 @@ public void Dispose()
2727
[Fact]
2828
public async Task Test()
2929
{
30-
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
30+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
3131

3232
var firstTopicMessagesSink = new MessagesSink(MessagesCount);
3333
var secondTopicMessagesSink = new MessagesSink(MessagesCount);
@@ -51,15 +51,15 @@ public async Task Test()
5151
)
5252
{
5353
await bus.PublishBatchAsync(
54-
firstTopicMessages, x => x.WithTopic("first"), timeoutCts.Token
54+
firstTopicMessages, x => x.WithTopic("first"), cts.Token
5555
).ConfigureAwait(false);
5656
await bus.PublishBatchAsync(
57-
secondTopicMessages, x => x.WithTopic("second"), timeoutCts.Token
57+
secondTopicMessages, x => x.WithTopic("second"), cts.Token
5858
).ConfigureAwait(false);
5959

6060
await Task.WhenAll(
61-
firstTopicMessagesSink.WaitAllReceivedAsync(timeoutCts.Token),
62-
secondTopicMessagesSink.WaitAllReceivedAsync(timeoutCts.Token)
61+
firstTopicMessagesSink.WaitAllReceivedAsync(cts.Token),
62+
secondTopicMessagesSink.WaitAllReceivedAsync(cts.Token)
6363
).ConfigureAwait(false);
6464

6565
firstTopicMessagesSink.ReceivedMessages.Should().Equal(firstTopicMessages);

Source/EasyNetQ.IntegrationTests/RabbitMQFixture.cs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Linq;
34
using System.Runtime.InteropServices;
45
using System.Threading;
56
using System.Threading.Tasks;
@@ -30,23 +31,20 @@ public RabbitMQFixture()
3031

3132
public async Task InitializeAsync()
3233
{
33-
using (var timeoutCts = new CancellationTokenSource(InitializationTimeout))
34-
{
35-
dockerEngineOsPlatform =
36-
await dockerProxy.GetDockerEngineOsAsync(timeoutCts.Token).ConfigureAwait(false);
37-
dockerNetworkName = dockerEngineOsPlatform == OSPlatform.Windows ? null : "bridgeWhaleNet";
38-
await DisposeAsync(timeoutCts.Token).ConfigureAwait(false);
39-
await CreateNetworkAsync(timeoutCts.Token).ConfigureAwait(false);
40-
var rabbitMQDockerImage = await PullImageAsync(timeoutCts.Token).ConfigureAwait(false);
41-
var containerId = await RunContainerAsync(rabbitMQDockerImage, timeoutCts.Token).ConfigureAwait(false);
42-
if (dockerEngineOsPlatform == OSPlatform.Windows)
43-
Host = await dockerProxy.GetContainerIpAsync(containerId, timeoutCts.Token).ConfigureAwait(false);
44-
ManagementClient = new ManagementClient(
45-
Host, Configuration.RabbitMqUser, Configuration.RabbitMqPassword,
46-
Configuration.RabbitMqManagementPort
47-
);
48-
await WaitForRabbitMqReadyAsync(timeoutCts.Token);
49-
}
34+
using var cts = new CancellationTokenSource(InitializationTimeout);
35+
dockerEngineOsPlatform = await dockerProxy.GetDockerEngineOsAsync(cts.Token).ConfigureAwait(false);
36+
dockerNetworkName = dockerEngineOsPlatform == OSPlatform.Windows ? null : "bridgeWhaleNet";
37+
await DisposeAsync(cts.Token).ConfigureAwait(false);
38+
await CreateNetworkAsync(cts.Token).ConfigureAwait(false);
39+
var rabbitMQDockerImage = await PullImageAsync(cts.Token).ConfigureAwait(false);
40+
var containerId = await RunNewContainerAsync(rabbitMQDockerImage, cts.Token).ConfigureAwait(false);
41+
if (dockerEngineOsPlatform == OSPlatform.Windows)
42+
Host = await dockerProxy.GetContainerIpAsync(containerId, cts.Token).ConfigureAwait(false);
43+
ManagementClient = new ManagementClient(
44+
Host, Configuration.RabbitMqUser, Configuration.RabbitMqPassword,
45+
Configuration.RabbitMqManagementPort
46+
);
47+
await WaitForRabbitMqReadyAsync(cts.Token);
5048
}
5149

5250
public async Task DisposeAsync()
@@ -84,7 +82,7 @@ await dockerProxy.PullImageAsync(rabbitMQDockerImageName, rabbitMQDockerImageTag
8482
return $"{rabbitMQDockerImageName}:{rabbitMQDockerImageTag}";
8583
}
8684

87-
private async Task<string> RunContainerAsync(string rabbitMQDockerImage, CancellationToken cancellationToken)
85+
private async Task<string> RunNewContainerAsync(string rabbitMQDockerImage, CancellationToken cancellationToken)
8886
{
8987
var portMappings = new Dictionary<string, ISet<string>>
9088
{

Source/EasyNetQ.IntegrationTests/Rpc/When_request_and_respond_polymorphic.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ public class When_request_and_respond_polymorphic : IDisposable
1010
{
1111
public When_request_and_respond_polymorphic(RabbitMQFixture fixture)
1212
{
13-
bus = RabbitHutch.CreateBus($"host={fixture.Host};prefetchCount=1");
13+
bus = RabbitHutch.CreateBus($"host={fixture.Host};prefetchCount=1;timeout=5");
1414
}
1515

1616
public void Dispose()

0 commit comments

Comments
 (0)