Skip to content

Commit 87ac5f4

Browse files
Avoid reconnection fix (#140)
* Handle CreateProducer/Consumer error * When A client is disconnected due to a Permissions error it has to be closed to avoid reconnection loop * Raise an exception during the initialization * Avoid the reconnection during the init part Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Luke Bakken <[email protected]>
1 parent e855dcd commit 87ac5f4

File tree

3 files changed

+68
-34
lines changed

3 files changed

+68
-34
lines changed

RabbitMQ.Stream.Client/Reliable/ReliableConsumer.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,18 +91,15 @@ protected override async Task GetNewReliable(bool boot)
9191
_reliableConsumerConfig.ReconnectStrategy.WhenConnected(ToString());
9292
}
9393

94-
catch (CreateProducerException ce)
94+
catch (Exception e)
9595
{
96-
LogEventSource.Log.LogError("ReliableConsumer closed. ", ce);
96+
LogEventSource.Log.LogError("Error during consumer initialization: ", e);
97+
throw;
9798
}
98-
catch (Exception e)
99+
finally
99100
{
100-
LogEventSource.Log.LogError("Error during initialization: ", e);
101101
SemaphoreSlim.Release();
102-
await TryToReconnect(_reliableConsumerConfig.ReconnectStrategy);
103102
}
104-
105-
SemaphoreSlim.Release();
106103
}
107104

108105
// just close the consumer. See base/metadataupdate
@@ -111,7 +108,10 @@ protected override async Task CloseReliable()
111108
await SemaphoreSlim.WaitAsync(10);
112109
try
113110
{
114-
await _consumer.Close();
111+
if (_consumer != null)
112+
{
113+
await _consumer.Close();
114+
}
115115
}
116116
finally
117117
{

RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -116,27 +116,26 @@ protected override async Task GetNewReliable(bool boot)
116116
await _producer.GetLastPublishingId());
117117
}
118118
}
119-
120-
catch (CreateProducerException ce)
119+
catch (Exception e)
121120
{
122-
LogEventSource.Log.LogError("ReliableProducer closed", ce);
121+
LogEventSource.Log.LogError("Error during producer initialization: ", e);
122+
throw;
123123
}
124-
catch (Exception e)
124+
finally
125125
{
126-
LogEventSource.Log.LogError("Error during initialization: ", e);
127126
SemaphoreSlim.Release();
128-
await TryToReconnect(_reliableProducerConfig.ReconnectStrategy);
129127
}
130-
131-
SemaphoreSlim.Release();
132128
}
133129

134130
protected override async Task CloseReliable()
135131
{
136132
await SemaphoreSlim.WaitAsync(10);
137133
try
138134
{
139-
await _producer.Close();
135+
if (_producer != null)
136+
{
137+
await _producer.Close();
138+
}
140139
}
141140
finally
142141
{
@@ -151,7 +150,10 @@ public override async Task Close()
151150
{
152151
_needReconnect = false;
153152
_confirmationPipe.Stop();
154-
await _producer.Close();
153+
if (_producer != null)
154+
{
155+
await _producer.Close();
156+
}
155157
}
156158
finally
157159
{

Tests/PermissionTests.cs

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
// 2.0, and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2007-2020 VMware, Inc.
44

5+
using System;
56
using RabbitMQ.Stream.Client;
7+
using RabbitMQ.Stream.Client.Reliable;
68
using Xunit;
79

810
namespace Tests
@@ -18,38 +20,68 @@ public async void AccessToStreamWithoutGrantsShouldRaiseErrorTest()
1820
// load definition creates users and streams to test the access
1921
// the user "test" can't access on "no_access_stream"
2022
const string stream = "no_access_stream";
21-
var config = new StreamSystemConfig()
22-
{
23-
Password = "test",
24-
UserName = "test",
25-
VirtualHost = "/"
26-
};
23+
var config = new StreamSystemConfig() { Password = "test", UserName = "test", VirtualHost = "/" };
2724
var system = await StreamSystem.Create(config);
2825

2926
await Assert.ThrowsAsync<CreateProducerException>(
3027
async () =>
3128
{
3229
await system.CreateProducer(
33-
new ProducerConfig
34-
{
35-
Reference = "producer",
36-
Stream = stream,
37-
});
30+
new ProducerConfig { Reference = "producer", Stream = stream, });
31+
}
32+
);
33+
34+
await Assert.ThrowsAsync<CreateProducerException>(
35+
async () =>
36+
{
37+
await ReliableProducer.CreateReliableProducer(
38+
new ReliableProducerConfig() { Stream = stream, StreamSystem = system });
3839
}
3940
);
4041

42+
ReliableProducer reliableProducer = null;
43+
try
44+
{
45+
reliableProducer = await ReliableProducer.CreateReliableProducer(
46+
new ReliableProducerConfig() { Stream = stream, StreamSystem = system });
47+
}
48+
catch (Exception)
49+
{
50+
// we already tested the Exception (CreateProducerException)
51+
}
52+
// here the reliableProducer must be closed due of the CreateProducerException
53+
Assert.False(reliableProducer != null && reliableProducer.IsOpen());
54+
4155
await Assert.ThrowsAsync<CreateConsumerException>(
4256
async () =>
4357
{
4458
await system.CreateConsumer(
45-
new ConsumerConfig()
46-
{
47-
Reference = "consumer",
48-
Stream = stream,
49-
});
59+
new ConsumerConfig() { Reference = "consumer", Stream = stream, });
5060
}
5161
);
5262

63+
await Assert.ThrowsAsync<CreateConsumerException>(
64+
async () =>
65+
{
66+
await ReliableConsumer.CreateReliableConsumer(
67+
new ReliableConsumerConfig() { Stream = stream, StreamSystem = system });
68+
}
69+
);
70+
71+
ReliableConsumer reliableConsumer = null;
72+
try
73+
{
74+
reliableConsumer = await ReliableConsumer.CreateReliableConsumer(
75+
new ReliableConsumerConfig() { Stream = stream, StreamSystem = system });
76+
}
77+
catch (Exception)
78+
{
79+
// we already tested the Exception (CreateConsumerException)
80+
}
81+
82+
// here the reliableConsumer must be closed due of the CreateConsumerException
83+
Assert.False(reliableConsumer != null && reliableConsumer.IsOpen());
84+
5385
await system.Close();
5486
}
5587
}

0 commit comments

Comments
 (0)