Skip to content

Commit 332edf8

Browse files
authored
Change locator reconnection (#134)
* Change locator reconnection - Fixes: #133 - Remove PublishingIdStrategy interface. It is not necessary, the idea was to create a generic interface to get the publishingId. - Add interlock _publishingId in ReliableProducer to be thread-safe - Add GetLastPublishingId on the Producer Class - Use GetLastPublishingId on the ReliableProducer class Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 80b9df8 commit 332edf8

File tree

8 files changed

+49
-83
lines changed

8 files changed

+49
-83
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,10 @@ Consider a Producer instance like a long-lived object, do not create one to send
236236
| ConnectionClosedHandler | Event when the client is disconnected | It is an event |
237237
| MaxInFlight | Max Number of messages before send | 1000 |
238238

239+
Producer with a reference name stores the sequence id on the server.
240+
It is possible to retrieve the id using `producer.GetLastPublishingId()`
241+
or more generic `system.QuerySequence("reference", "my_stream")`
242+
239243
### Publish Messages
240244

241245
#### Standard publish

RabbitMQ.Stream.Client/Client.cs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ private Client(ClientParameters parameters)
161161
_heartBeatHandler = new HeartBeatHandler(
162162
SendHeartBeat,
163163
Close,
164-
(int) parameters.Heartbeat.TotalSeconds);
164+
(int)parameters.Heartbeat.TotalSeconds);
165165
IsClosed = false;
166166
}
167167

@@ -330,7 +330,7 @@ private uint NextCorrelationId()
330330

331331
private async Task HandleClosed(string reason)
332332
{
333-
IsClosed = true;
333+
InternalClose();
334334
await OnConnectionClosed(reason);
335335
}
336336

@@ -451,7 +451,7 @@ private void HandleCorrelatedCommand(ushort tag, ref ReadOnlySequence<byte> fram
451451
case CloseResponse.Key:
452452
CloseResponse.Read(frame, out var closeResponse);
453453
HandleCorrelatedResponse(closeResponse);
454-
IsClosed = true;
454+
InternalClose();
455455
break;
456456
case HeartBeatHandler.Key:
457457
_heartBeatHandler.UpdateHeartBeat();
@@ -484,30 +484,33 @@ private async ValueTask<bool> SendHeartBeat()
484484
return await Publish(new HeartBeatRequest());
485485
}
486486

487+
private void InternalClose()
488+
{
489+
_heartBeatHandler.Close();
490+
IsClosed = true;
491+
}
492+
487493
public async Task<CloseResponse> Close(string reason)
488494
{
489495
if (IsClosed)
490496
{
491497
return new CloseResponse(0, ResponseCode.Ok);
492498
}
493499

494-
_heartBeatHandler.Close();
495-
496500
// TODO LRB timeout
497501
var result =
498502
await Request<CloseRequest, CloseResponse>(corr => new CloseRequest(corr, reason),
499503
TimeSpan.FromSeconds(30));
500-
501504
try
502505
{
506+
InternalClose();
503507
connection.Dispose();
504508
}
505509
catch (Exception e)
506510
{
507511
LogEventSource.Log.LogError($"An error occurred while calling {nameof(connection.Dispose)}.", e);
508512
}
509513

510-
IsClosed = true;
511514
return result;
512515
}
513516

RabbitMQ.Stream.Client/Producer.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,18 @@ private async Task SemaphoreWait()
151151
}
152152
}
153153

154+
/// <summary>
155+
/// GetLastPublishingId
156+
/// </summary>
157+
/// <returns>The last sequence id stored by the producer.</returns>
158+
public async Task<ulong> GetLastPublishingId()
159+
{
160+
var response = await client.QueryPublisherSequence(config.Reference, config.Stream);
161+
ClientExceptions.MaybeThrowException(response.ResponseCode,
162+
$"GetLastPublishingId stream: {config.Stream}, reference: {config.Reference}");
163+
return response.Sequence;
164+
}
165+
154166
public async ValueTask Send(ulong publishingId, Message message)
155167
{
156168
await SemaphoreWait();

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,7 @@ RabbitMQ.Stream.Client.Producer
534534
RabbitMQ.Stream.Client.Producer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
535535
RabbitMQ.Stream.Client.Producer.ConfirmFrames.get -> int
536536
RabbitMQ.Stream.Client.Producer.Dispose() -> void
537+
RabbitMQ.Stream.Client.Producer.GetLastPublishingId() -> System.Threading.Tasks.Task<ulong>
537538
RabbitMQ.Stream.Client.Producer.IncomingFrames.get -> int
538539
RabbitMQ.Stream.Client.Producer.MessagesSent.get -> int
539540
RabbitMQ.Stream.Client.Producer.PendingCount.get -> int
@@ -624,9 +625,6 @@ RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.StreamNotAvailable = 6 -> Rab
624625
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.TimeoutError = 2 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
625626
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.UndefinedError = 200 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
626627
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.WaitForConfirmation = 0 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
627-
RabbitMQ.Stream.Client.Reliable.IPublishingIdStrategy
628-
RabbitMQ.Stream.Client.Reliable.IPublishingIdStrategy.GetPublishingId() -> ulong
629-
RabbitMQ.Stream.Client.Reliable.IPublishingIdStrategy.InitPublishingId() -> System.Threading.Tasks.Task
630628
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
631629
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy.WhenConnected(string connectionInfo) -> void
632630
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy.WhenDisconnected(string connectionInfo) -> bool

RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs

Lines changed: 0 additions & 18 deletions
This file was deleted.

RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs

Lines changed: 13 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -9,36 +9,6 @@
99

1010
namespace RabbitMQ.Stream.Client.Reliable;
1111

12-
internal class AutoPublishingId : IPublishingIdStrategy
13-
{
14-
private ulong _lastPublishingId = 0;
15-
private readonly ReliableProducerConfig _reliableProducerConfig;
16-
17-
public ulong GetPublishingId()
18-
{
19-
return ++_lastPublishingId;
20-
}
21-
22-
public AutoPublishingId(ReliableProducerConfig reliableProducerConfig)
23-
{
24-
_reliableProducerConfig = reliableProducerConfig;
25-
}
26-
27-
public async Task InitPublishingId()
28-
{
29-
try
30-
{
31-
_lastPublishingId =
32-
await _reliableProducerConfig.StreamSystem.QuerySequence(_reliableProducerConfig.Reference,
33-
_reliableProducerConfig.Stream);
34-
}
35-
catch (Exception)
36-
{
37-
_lastPublishingId = 0;
38-
}
39-
}
40-
}
41-
4212
public record ReliableProducerConfig
4313
{
4414
public StreamSystem StreamSystem { get; set; }
@@ -63,14 +33,13 @@ public record ReliableProducerConfig
6333
public class ReliableProducer : ReliableBase
6434
{
6535
private Producer _producer;
66-
private readonly AutoPublishingId _autoPublishingId;
36+
private ulong _publishingId;
6737
private readonly ReliableProducerConfig _reliableProducerConfig;
6838
private readonly ConfirmationPipe _confirmationPipe;
6939

7040
private ReliableProducer(ReliableProducerConfig reliableProducerConfig)
7141
{
7242
_reliableProducerConfig = reliableProducerConfig;
73-
_autoPublishingId = new AutoPublishingId(_reliableProducerConfig);
7443
_confirmationPipe = new ConfirmationPipe(reliableProducerConfig.ConfirmationHandler);
7544
_confirmationPipe.Start();
7645
}
@@ -85,10 +54,6 @@ public static async Task<ReliableProducer> CreateReliableProducer(ReliableProduc
8554
protected override async Task GetNewReliable(bool boot)
8655
{
8756
await SemaphoreSlim.WaitAsync();
88-
if (boot)
89-
{
90-
await _autoPublishingId.InitPublishingId();
91-
}
9257

9358
try
9459
{
@@ -124,6 +89,12 @@ protected override async Task GetNewReliable(bool boot)
12489
}
12590
});
12691
_reliableProducerConfig.ReconnectStrategy.WhenConnected(ToString());
92+
if (boot)
93+
{
94+
// Init the publishing id
95+
Interlocked.Exchange(ref _publishingId,
96+
await _producer.GetLastPublishingId());
97+
}
12798
}
12899

129100
catch (CreateProducerException ce)
@@ -170,8 +141,8 @@ public override async Task Close()
170141

171142
public async ValueTask Send(Message message)
172143
{
173-
var pid = _autoPublishingId.GetPublishingId();
174-
_confirmationPipe.AddUnConfirmedMessage(pid, message);
144+
Interlocked.Increment(ref _publishingId);
145+
_confirmationPipe.AddUnConfirmedMessage(_publishingId, message);
175146
await SemaphoreSlim.WaitAsync();
176147
try
177148
{
@@ -182,7 +153,7 @@ public async ValueTask Send(Message message)
182153
// on the _waitForConfirmation list. The user will get Timeout Error
183154
if (!(_inReconnection))
184155
{
185-
await _producer.Send(pid, message);
156+
await _producer.Send(_publishingId, message);
186157
}
187158
}
188159

@@ -198,14 +169,14 @@ public async ValueTask Send(Message message)
198169

199170
public async ValueTask Send(List<Message> messages, CompressionType compressionType)
200171
{
201-
var pid = _autoPublishingId.GetPublishingId();
202-
_confirmationPipe.AddUnConfirmedMessage(pid, messages);
172+
Interlocked.Increment(ref _publishingId);
173+
_confirmationPipe.AddUnConfirmedMessage(_publishingId, messages);
203174
await SemaphoreSlim.WaitAsync();
204175
try
205176
{
206177
if (!_inReconnection)
207178
{
208-
await _producer.Send(pid, messages, compressionType);
179+
await _producer.Send(_publishingId, messages, compressionType);
209180
}
210181
}
211182

RabbitMQ.Stream.Client/StreamSystem.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ private async Task MayBeReconnectLocator()
9393
{
9494
if (client.IsClosed)
9595
{
96-
client = await Client.Create(clientParameters with
96+
client = await Client.Create(client.Parameters with
9797
{
9898
ClientProvidedName = clientParameters.ClientProvidedName
9999
});
@@ -197,7 +197,6 @@ public async Task<ulong> QuerySequence(string reference, string stream)
197197
{
198198
await MayBeReconnectLocator();
199199
MaybeThrowQueryException(reference, stream);
200-
201200
var response = await client.QueryPublisherSequence(reference, stream);
202201
ClientExceptions.MaybeThrowException(response.ResponseCode,
203202
$"QuerySequence stream: {stream}, reference: {reference}");

Tests/ProducerSystemTests.cs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -92,19 +92,10 @@ public async Task ProducerShouldRaiseAnExceptionIfStreamOrBatchSizeAreNotValid()
9292
var system = await StreamSystem.Create(config);
9393

9494
await Assert.ThrowsAsync<CreateProducerException>(() => system.CreateProducer(
95-
new ProducerConfig
96-
{
97-
Reference = "producer",
98-
Stream = "",
99-
}));
95+
new ProducerConfig { Reference = "producer", Stream = "", }));
10096

10197
await Assert.ThrowsAsync<CreateProducerException>(() => system.CreateProducer(
102-
new ProducerConfig
103-
{
104-
Reference = "producer",
105-
Stream = "TEST",
106-
MessagesBufferSize = -1,
107-
}));
98+
new ProducerConfig { Reference = "producer", Stream = "TEST", MessagesBufferSize = -1, }));
10899

109100
await system.Close();
110101
}
@@ -275,6 +266,12 @@ await SystemUtils.PublishMessages(system, stream, NumberOfMessages,
275266
// sequence start from zero
276267
Assert.True(resAfter == (NumberOfMessages - 1));
277268

269+
var producer = await system.CreateProducer(new ProducerConfig()
270+
{
271+
Stream = stream,
272+
Reference = ProducerName
273+
});
274+
Assert.True(await producer.GetLastPublishingId() == (NumberOfMessages - 1));
278275
await system.DeleteStream(stream);
279276
await system.Close();
280277
}

0 commit comments

Comments
 (0)