Skip to content

Commit 6c0d84b

Browse files
reliable consumer (#115)
* reliable consumer work in progress Signed-off-by: Gabriele Santomaggio <[email protected]> * reliable consumer Add tests Refactor adding the base class for Rproducer and Rconsumer Signed-off-by: Gabriele Santomaggio <[email protected]> * reliable consumer Add tests Refactor adding the base class for Rproducer and Rconsumer Signed-off-by: Gabriele Santomaggio <[email protected]> * Add R Producer Documentation [skip ci] Signed-off-by: Gabriele Santomaggio <[email protected]> * Add new test Fix a small bug during the reconnection Signed-off-by: Gabriele Santomaggio <[email protected]> * formatting Signed-off-by: Gabriele Santomaggio <[email protected]> * Increse timeout Signed-off-by: Gabriele Santomaggio <[email protected]> * formatting Signed-off-by: Gabriele Santomaggio <[email protected]> * update unshipped Signed-off-by: Gabriele Santomaggio <[email protected]> * avoid timeout test Signed-off-by: Gabriele Santomaggio <[email protected]> * avoid timeout test Signed-off-by: Gabriele Santomaggio <[email protected]> * Add logs Signed-off-by: Gabriele Santomaggio <[email protected]> * Wait on conditions rather than a set number of seconds * fixup * Fixup test * Try to fix test on GHA * Add logs Signed-off-by: Gabriele Santomaggio <[email protected]> * Fix handler == null Signed-off-by: Gabriele Santomaggio <[email protected]> Fix handler == null * Escape HTTP connection name Signed-off-by: Gabriele Santomaggio <[email protected]> * new Uri() should do the right thing * fixup * fixup * Check HTTP return code consistently * Add code to see what is going on with those connection names and encoding * Use Uri.EscapeDataString * Change the IReconnectStrategy method. Use a function istead of out var Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Luke Bakken <[email protected]>
1 parent b3a646b commit 6c0d84b

File tree

14 files changed

+734
-153
lines changed

14 files changed

+734
-153
lines changed

Docker/Dockerfile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
FROM pivotalrabbitmq/rabbitmq-stream
22

3+
4+
ENV TZ=Europe/Minsk
5+
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
6+
7+
38
RUN wget https://packages.microsoft.com/config/ubuntu/21.04/packages-microsoft-prod.deb -O packages-microsoft-prod.deb
49
RUN dpkg -i packages-microsoft-prod.deb
510
RUN rm packages-microsoft-prod.deb
611

712
RUN apt-get update && \
813
apt-get install -y apt-transport-https && \
914
apt-get update && \
15+
apt-get install -y git && \
1016
apt-get install -y dotnet-sdk-6.0
1117

1218
RUN apt-get install make -y
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
using System.Buffers;
2+
using System.Text;
3+
using RabbitMQ.Stream.Client;
4+
using RabbitMQ.Stream.Client.Reliable;
5+
6+
namespace example;
7+
8+
public class ReliableConsumerExample
9+
{
10+
public async Task StartDefaultConfigurations()
11+
{
12+
Console.WriteLine("Reliable .NET Cosumer");
13+
var config = new StreamSystemConfig();
14+
const string stream = "my-reliable-stream-consumer";
15+
var system = await StreamSystem.Create(config);
16+
await system.CreateStream(new StreamSpec(stream));
17+
var rConsumer = await ReliableConsumer.CreateReliableConsumer(new ReliableConsumerConfig()
18+
{
19+
Stream = stream,
20+
StreamSystem = system,
21+
OffsetSpec = new OffsetTypeFirst(),
22+
ClientProvidedName = "My-Reliable-Consumer",
23+
MessageHandler = async (_, _, message) =>
24+
{
25+
Console.WriteLine(
26+
$"message: {Encoding.Default.GetString(message.Data.Contents.ToArray())}");
27+
await Task.CompletedTask;
28+
}
29+
});
30+
}
31+
32+
// when finished:
33+
//rConsumer.Close();
34+
}

Examples/Reliable/ReliableProducer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace example;
88

9-
public class ReliableProducer
9+
public class ReliableProducerExample
1010
{
1111
public async Task StartDefaultConfigurations()
1212
{

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,5 @@ rabbitmq-server:
2121
run-test-in-docker:
2222
docker stop dotnet-test || true
2323
docker build -t stream-dotnet-test -f Docker/Dockerfile . && \
24-
docker run -d --rm --name dotnet-test -v $(shell pwd):/source --cpuset-cpus="0-1" stream-dotnet-test && \
24+
docker run -d --rm --name dotnet-test -v $(shell pwd):/source --cpuset-cpus="1" stream-dotnet-test && \
2525
docker exec -it dotnet-test /bin/sh -c "cd /source && make" || true

README.md

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
- [Handle Metadata Update](#handle-metadata-update)
3232
- [Reliable](#reliable)
3333
- [Reliable Producer](#reliable-producer)
34+
- [Reliable Consumer](#reliable-consumer)
3435
- [Build from source](#build-from-source)
3536
- [Project Status](#project-status)
3637
- [Release Process](#release-process)
@@ -412,8 +413,10 @@ You can use `MetadataHandler` to handle it:
412413
```
413414
### Reliable
414415
- Reliable Producer
415-
- Reliable Consumer (not ready yet)
416+
- Reliable Consumer </p>
417+
See the directory [Examples/Reliable](./Examples/Reliable)
416418

419+
417420
### Reliable Producer
418421
Reliable Producer is a smart layer built up of the standard `Producer`. </b>
419422
The idea is to leave the user decides what to use, the standard or reliable producer. </b>
@@ -423,7 +426,7 @@ The main features are:
423426
- Auto-Reconnect in case of disconnection
424427
- Trace sent and received messages
425428
- Invalidate messages
426-
- Handle the metadata Update
429+
- [Handle the metadata Update](#reliable-handle-metadata-update)
427430

428431
#### Provide publishingID automatically
429432
Reliable Producer retrieves the last publishingID given the producer name. </b>
@@ -433,6 +436,7 @@ Zero(0) is the default value in case there is no a publishingID.
433436
Reliable Producer restores the TCP connection in case the Producer is disconnected for some reason.
434437
During the reconnection it continues to store the messages in a local-list.
435438
The user will receive back the confirmed or un-confirmed messages.
439+
See [Reconnection Strategy](#reconnection-strategy)
436440

437441
#### Trace sent and received messages
438442
Reliable Producer keeps in memory each sent message and remove from the memory when the message is confirmed or goes in timout.
@@ -457,22 +461,35 @@ ConfirmationHandler = confirmation =>
457461
If the client doesn't receive a confirmation within 2 seconds Reliable Producer removes the message from the internal messages cache.
458462
The user will receive `ConfirmationStatus.TimeoutError` in the `ConfirmationHandler`.
459463

460-
#### Handle the metadata Update
461-
If the streams changes the topology (ex:Stream deleted or add/remove follower), the client receives an `MetadataUpdate` event.
462-
Reliable Producer detects the event and tries to reconnect the producer if the stream still exist else closes the producer.
463464
#### Send API
464465
Reliable Producer implements two `send(..)`
465466
- `Send(Message message)` // standard
466467
- `Send(List<Message> messages, CompressionType compressionType)` //sub-batching with compression
467468
468-
### Reconnection Strategy
469-
By default Reliable Producer uses an `BackOffReconnectStrategy` to reconnect the client.
469+
470+
### Reliable Consumer
471+
Reliable Consumer is a smart layer built up of the standard `Consumer`. </b>
472+
The idea is to leave the user decides what to use, the standard or reliable Consumer. </b>
473+
474+
The main features are:
475+
- Auto-Reconnect in case of disconnection
476+
- Auto restart consuming from the last offset
477+
- [Handle the metadata Update](#reliable-handle-metadata-update)
478+
479+
#### Auto-Reconnect
480+
Reliable Consumer restores the TCP connection in case the Producer is disconnected for some reason.
481+
Reliable Consumer will restart consuming from the last offset stored.
482+
See [Reconnection Strategy](#reconnection-strategy)
483+
484+
### Reconnection Strategy
485+
By default Reliable Producer/Consumer uses an `BackOffReconnectStrategy` to reconnect the client.
470486
You can customize the behaviour implementing the `IReconnectStrategy` interface:
471487
```csharp
472-
void WhenDisconnected(out bool reconnect);
473-
void WhenConnected();
488+
bool WhenDisconnected(string connectionInfo);
489+
void WhenConnected(string connectionInfo);
474490
```
475-
with `reconnect` you can decide when reconnect the producer.
491+
If `WhenDisconnected` return is `true` Producer/Consumer will be reconnected else closed.
492+
`connectionInfo` add information about the connection.
476493

477494
You can use it:
478495
```csharp
@@ -482,9 +499,10 @@ var p = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig
482499
ReconnectStrategy = MyReconnectStrategy
483500
```
484501

485-
#### Examples
486-
See the directory [Examples/Reliable](./Examples/Reliable)
487502

503+
### Reliable handle metadata update
504+
If the streams changes the topology (ex:Stream deleted or add/remove follower), the client receives an `MetadataUpdate` event.
505+
Reliable Producer detects the event and tries to reconnect the producer if the stream still exist else closes the producer/consumer.
488506

489507
## Build from source
490508

RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,11 @@ public static int WriteAny(Span<byte> seq, object value)
2929
bool bo => WriteBool(seq, bo),
3030
byte[] bArr => bArr.Length == 0 ? WriteNull(seq) : WriteBytes(seq, bArr),
3131
DateTime d => d == DateTime.MinValue ? WriteNull(seq) : WriteTimestamp(seq, d),
32-
_ => throw new AMQP.AmqpParseException($"WriteAny Invalid type {value}")
32+
_ => throw new AmqpParseException($"WriteAny Invalid type {value}")
3333
};
34-
35-
;
3634
}
3735

38-
public static int WriteString(Span<byte> seq, string value)
36+
private static int WriteString(Span<byte> seq, string value)
3937
{
4038
var len = value.Length;
4139
var offset = 0;
@@ -55,7 +53,7 @@ public static int WriteString(Span<byte> seq, string value)
5553
return offset;
5654
}
5755

58-
public static int WriteUInt64(Span<byte> seq, ulong value)
56+
private static int WriteUInt64(Span<byte> seq, ulong value)
5957
{
6058
if (value == 0)
6159
{
@@ -201,7 +199,7 @@ private static int WriteBool(Span<byte> seq, bool value)
201199
return WireFormatting.WriteByte(seq, value ? FormatCode.BoolTrue : FormatCode.BoolFalse);
202200
}
203201

204-
public static int WriteTimestamp(Span<byte> seq, DateTime value)
202+
private static int WriteTimestamp(Span<byte> seq, DateTime value)
205203
{
206204
var offset = WireFormatting.WriteByte(seq, FormatCode.Timestamp);
207205
var unixTime = ((DateTimeOffset)value).ToUnixTimeMilliseconds();
@@ -336,7 +334,7 @@ public static int GetAnySize(object value)
336334
byte[] bArr => GetBytesSize(bArr),
337335
byte => 1,
338336
DateTime d => GetTimestampSize(d),
339-
_ => throw new AMQP.AmqpParseException($"WriteAny Invalid type {value}")
337+
_ => throw new AmqpParseException($"WriteAny Invalid type {value}")
340338
};
341339
}
342340
}

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
abstract RabbitMQ.Stream.Client.Reliable.ReliableBase.Close() -> System.Threading.Tasks.Task
2+
abstract RabbitMQ.Stream.Client.Reliable.ReliableBase.CloseReliable() -> System.Threading.Tasks.Task
3+
abstract RabbitMQ.Stream.Client.Reliable.ReliableBase.GetNewReliable(bool boot) -> System.Threading.Tasks.Task
14
const RabbitMQ.Stream.Client.AMQP.DescribedFormatCode.AmqpValue = 119 -> byte
25
const RabbitMQ.Stream.Client.AMQP.DescribedFormatCode.ApplicationData = 117 -> byte
36
const RabbitMQ.Stream.Client.AMQP.DescribedFormatCode.ApplicationProperties = 116 -> byte
@@ -84,6 +87,10 @@ const RabbitMQ.Stream.Client.UnsubscribeRequest.Key = 12 -> ushort
8487
const RabbitMQ.Stream.Client.UnsubscribeResponse.Key = 12 -> ushort
8588
override RabbitMQ.Stream.Client.LeaderLocator.ToString() -> string
8689
override RabbitMQ.Stream.Client.LogEventListener.Dispose() -> void
90+
override RabbitMQ.Stream.Client.Reliable.ReliableConsumer.Close() -> System.Threading.Tasks.Task
91+
override RabbitMQ.Stream.Client.Reliable.ReliableConsumer.ToString() -> string
92+
override RabbitMQ.Stream.Client.Reliable.ReliableProducer.Close() -> System.Threading.Tasks.Task
93+
override RabbitMQ.Stream.Client.Reliable.ReliableProducer.ToString() -> string
8794
RabbitMQ.Stream.Client.AbstractEntity
8895
RabbitMQ.Stream.Client.AbstractEntity.AbstractEntity() -> void
8996
RabbitMQ.Stream.Client.AbstractEntity.client -> RabbitMQ.Stream.Client.Client
@@ -613,18 +620,39 @@ RabbitMQ.Stream.Client.Reliable.IPublishingIdStrategy
613620
RabbitMQ.Stream.Client.Reliable.IPublishingIdStrategy.GetPublishingId() -> ulong
614621
RabbitMQ.Stream.Client.Reliable.IPublishingIdStrategy.InitPublishingId() -> System.Threading.Tasks.Task
615622
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
616-
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy.WhenConnected() -> void
617-
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy.WhenDisconnected(out bool reconnect) -> void
623+
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy.WhenConnected(string connectionInfo) -> void
624+
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy.WhenDisconnected(string connectionInfo) -> bool
618625
RabbitMQ.Stream.Client.Reliable.MessagesConfirmation
619626
RabbitMQ.Stream.Client.Reliable.MessagesConfirmation.InsertDateTime.get -> System.DateTime
620627
RabbitMQ.Stream.Client.Reliable.MessagesConfirmation.InsertDateTime.init -> void
621628
RabbitMQ.Stream.Client.Reliable.MessagesConfirmation.Messages.get -> System.Collections.Generic.List<RabbitMQ.Stream.Client.Message>
622629
RabbitMQ.Stream.Client.Reliable.MessagesConfirmation.MessagesConfirmation() -> void
623630
RabbitMQ.Stream.Client.Reliable.MessagesConfirmation.PublishingId.get -> ulong
624631
RabbitMQ.Stream.Client.Reliable.MessagesConfirmation.Status.get -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
632+
RabbitMQ.Stream.Client.Reliable.ReliableBase
633+
RabbitMQ.Stream.Client.Reliable.ReliableBase.Init() -> System.Threading.Tasks.Task
634+
RabbitMQ.Stream.Client.Reliable.ReliableBase.IsOpen() -> bool
635+
RabbitMQ.Stream.Client.Reliable.ReliableBase.ReliableBase() -> void
636+
RabbitMQ.Stream.Client.Reliable.ReliableBase.TryToReconnect(RabbitMQ.Stream.Client.Reliable.IReconnectStrategy reconnectStrategy) -> System.Threading.Tasks.Task
637+
RabbitMQ.Stream.Client.Reliable.ReliableBase._inReconnection -> bool
638+
RabbitMQ.Stream.Client.Reliable.ReliableBase._needReconnect -> bool
639+
RabbitMQ.Stream.Client.Reliable.ReliableConsumer
640+
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig
641+
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.ClientProvidedName.get -> string
642+
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.ClientProvidedName.set -> void
643+
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.MessageHandler.get -> System.Func<RabbitMQ.Stream.Client.Consumer, RabbitMQ.Stream.Client.MessageContext, RabbitMQ.Stream.Client.Message, System.Threading.Tasks.Task>
644+
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.MessageHandler.set -> void
645+
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.OffsetSpec.get -> RabbitMQ.Stream.Client.IOffsetType
646+
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.OffsetSpec.set -> void
647+
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.ReconnectStrategy.get -> RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
648+
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.ReconnectStrategy.set -> void
649+
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.Reference.get -> string
650+
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.Reference.set -> void
651+
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.Stream.get -> string
652+
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.Stream.set -> void
653+
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.StreamSystem.get -> RabbitMQ.Stream.Client.StreamSystem
654+
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.StreamSystem.set -> void
625655
RabbitMQ.Stream.Client.Reliable.ReliableProducer
626-
RabbitMQ.Stream.Client.Reliable.ReliableProducer.Close() -> System.Threading.Tasks.Task
627-
RabbitMQ.Stream.Client.Reliable.ReliableProducer.IsOpen() -> bool
628656
RabbitMQ.Stream.Client.Reliable.ReliableProducer.Send(RabbitMQ.Stream.Client.Message message) -> System.Threading.Tasks.ValueTask
629657
RabbitMQ.Stream.Client.Reliable.ReliableProducer.Send(System.Collections.Generic.List<RabbitMQ.Stream.Client.Message> messages, RabbitMQ.Stream.Client.CompressionType compressionType) -> System.Threading.Tasks.ValueTask
630658
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig
@@ -812,12 +840,10 @@ RabbitMQ.Stream.Client.UnsubscribeResponse.Write(System.Span<byte> span) -> int
812840
RabbitMQ.Stream.Client.Version
813841
RabbitMQ.Stream.Client.VirtualHostAccessFailureException
814842
RabbitMQ.Stream.Client.VirtualHostAccessFailureException.VirtualHostAccessFailureException(string s) -> void
843+
readonly RabbitMQ.Stream.Client.Reliable.ReliableBase.SemaphoreSlim -> System.Threading.SemaphoreSlim
815844
static RabbitMQ.Stream.Client.AMQP.AmqpWireFormatting.GetAnySize(object value) -> int
816845
static RabbitMQ.Stream.Client.AMQP.AmqpWireFormatting.GetSequenceSize(System.Buffers.ReadOnlySequence<byte> data) -> int
817846
static RabbitMQ.Stream.Client.AMQP.AmqpWireFormatting.WriteAny(System.Span<byte> seq, object value) -> int
818-
static RabbitMQ.Stream.Client.AMQP.AmqpWireFormatting.WriteString(System.Span<byte> seq, string value) -> int
819-
static RabbitMQ.Stream.Client.AMQP.AmqpWireFormatting.WriteTimestamp(System.Span<byte> seq, System.DateTime value) -> int
820-
static RabbitMQ.Stream.Client.AMQP.AmqpWireFormatting.WriteUInt64(System.Span<byte> seq, ulong value) -> int
821847
static RabbitMQ.Stream.Client.AMQP.Data.Parse(System.Buffers.ReadOnlySequence<byte> amqpData, ref int byteRead) -> RabbitMQ.Stream.Client.AMQP.Data
822848
static RabbitMQ.Stream.Client.AMQP.DescribedFormatCode.Read(System.Buffers.ReadOnlySequence<byte> amqpData) -> byte
823849
static RabbitMQ.Stream.Client.AMQP.DescribedFormatCode.Write(System.Span<byte> span, byte data) -> int
@@ -837,6 +863,7 @@ static RabbitMQ.Stream.Client.PooledTaskSource<T>.Rent() -> RabbitMQ.Stream.Clie
837863
static RabbitMQ.Stream.Client.PooledTaskSource<T>.Return(RabbitMQ.Stream.Client.ManualResetValueTaskSource<T> task) -> void
838864
static RabbitMQ.Stream.Client.Producer.Create(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.ProducerConfig config, RabbitMQ.Stream.Client.StreamInfo metaStreamInfo) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Producer>
839865
static RabbitMQ.Stream.Client.Publish.Version.get -> byte
866+
static RabbitMQ.Stream.Client.Reliable.ReliableConsumer.CreateReliableConsumer(RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig reliableConsumerConfig) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.ReliableConsumer>
840867
static RabbitMQ.Stream.Client.Reliable.ReliableProducer.CreateReliableProducer(RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig reliableProducerConfig) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.ReliableProducer>
841868
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupLeaderConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
842869
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupRandomConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>

RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,51 @@
22
// 2.0, and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2007-2020 VMware, Inc.
44

5+
using System;
6+
using System.Threading;
7+
58
namespace RabbitMQ.Stream.Client.Reliable;
69

10+
/// <summary>
11+
/// IReconnectStrategy is the interface to reconnect the TCP client
12+
/// </summary>
713
public interface IReconnectStrategy
814
{
9-
void WhenDisconnected(out bool reconnect);
10-
void WhenConnected();
15+
/// <summary>
16+
/// WhenDisconnected is raised when the TPC client
17+
/// is disconnected for some reason.
18+
/// </summary>
19+
/// <param name="connectionInfo">Additional connection info. Just for logging</param>
20+
/// <returns>if True the client will be reconnected else closed</returns>
21+
bool WhenDisconnected(string connectionInfo);
22+
23+
/// <summary>
24+
/// It is raised when the TCP client is connected successfully
25+
/// </summary>
26+
/// <param name="connectionInfo">Additional connection info. Just for logging</param>
27+
void WhenConnected(string connectionInfo);
28+
}
29+
30+
/// <summary>
31+
/// BackOffReconnectStrategy is the default IReconnectStrategy
32+
/// implementation for Producer and Consumer
33+
/// It implements a BackOff pattern.
34+
/// </summary>
35+
internal class BackOffReconnectStrategy : IReconnectStrategy
36+
{
37+
private int Tentatives { get; set; } = 1;
38+
39+
public bool WhenDisconnected(string connectionInfo)
40+
{
41+
Tentatives <<= 1;
42+
LogEventSource.Log.LogInformation(
43+
$"{connectionInfo} disconnected, check if reconnection needed in {Tentatives * 100} ms.");
44+
Thread.Sleep(TimeSpan.FromMilliseconds(Tentatives * 100));
45+
return true;
46+
}
47+
48+
public void WhenConnected(string _)
49+
{
50+
Tentatives = 1;
51+
}
1152
}

0 commit comments

Comments
 (0)