Skip to content

Commit e855dcd

Browse files
Improved client timeout handling (#139)
* Make message timeout configurable; specify error source is library * The timeout confirmation was hard-coded in 2 seconds. With this commit, the timeout is increased to 3 seconds, and it is possible to configure the timeout externally. Co-authored-by: Gabriele Santomaggio <[email protected]>
1 parent 332edf8 commit e855dcd

File tree

5 files changed

+126
-38
lines changed

5 files changed

+126
-38
lines changed

README.md

Lines changed: 70 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@
77
[![codecov](https://codecov.io/gh/rabbitmq/rabbitmq-stream-dotnet-client/branch/main/graph/badge.svg?token=OIA04ZQD79)](https://codecov.io/gh/rabbitmq/rabbitmq-stream-dotnet-client)
88
</div>
99

10-
#@ Table of Contents
11-
10+
# Table of Contents
1211

1312
---
1413

@@ -116,7 +115,7 @@ await system.DeleteStream(stream);
116115
await system.Close();
117116
```
118117

119-
### Usage
118+
## Usage
120119

121120
---
122121

@@ -238,7 +237,7 @@ Consider a Producer instance like a long-lived object, do not create one to send
238237

239238
Producer with a reference name stores the sequence id on the server.
240239
It is possible to retrieve the id using `producer.GetLastPublishingId()`
241-
or more generic `system.QuerySequence("reference", "my_stream")`
240+
or more generic `system.QuerySequence("reference", "my_stream")`.
242241

243242
### Publish Messages
244243

@@ -294,15 +293,19 @@ var producer = await system.CreateProducer(
294293
Stream = "my_stream",
295294
});
296295
```
296+
297297
then:
298+
298299
```csharp
299300
var publishingId = 0;
300301
var message = new Message(Encoding.UTF8.GetBytes($"my deduplicate message {i}"));
301302
await producer.Send(publishingId, message);
302303
```
303304

304305
### Consume Messages
306+
305307
Define a consumer:
308+
306309
```csharp
307310
var consumer = await system.CreateConsumer(
308311
new ConsumerConfig
@@ -317,8 +320,11 @@ var consumer = await system.CreateConsumer(
317320
}
318321
});
319322
```
323+
320324
### Offset Types
325+
321326
There are five types of Offset and they can be set by the `ConsumerConfig.OffsetSpec` property that must be passed to the Consumer constructor, in the example we use `OffsetTypeFirst`:
327+
322328
```csharp
323329
var consumerOffsetTypeFirst = await system.CreateConsumer(
324330
new ConsumerConfig
@@ -333,29 +339,40 @@ var consumerOffsetTypeFirst = await system.CreateConsumer(
333339
}
334340
});
335341
```
342+
336343
The five types are:
337-
- First: it takes messages from the first message of the stream.
344+
- First: it takes messages from the first message of the stream.
345+
338346
```csharp
339347
var offsetTypeFirst = new OffsetTypeFirst();
340348
```
349+
341350
- Last: it takes messages from the last chunk of the stream, i.e. it doesnt start from the last message, but the lastgroupof messages.
351+
342352
```csharp
343353
var offsetTypeLast = new OffsetTypeLast();
344354
```
355+
345356
- Next: it takes messages published after the consumer connection.
357+
346358
```csharp
347359
var offsetTypeNext = new OffsetTypeNext()
348360
```
361+
349362
- Offset: it takes messages starting from the message with id equal to the passed value. If the value is less than the first message of the stream, it starts from the first (i.e. if you pass 0, but the stream starts from 10, it starts from 10). If the message with the id hasnt yet been published it waits until this publishingId is reached.
363+
350364
```csharp
351365
ulong iWantToStartFromPubId = 10;
352366
var offsetTypeOffset = new OffsetTypeOffset(iWantToStartFromPubId);
353367
```
368+
354369
- Timestamp: it takes messages starting from the first message with timestamp bigger than the one passed
370+
355371
```csharp
356372
var anHourAgo = (long)DateTime.UtcNow.AddHours(-1).Subtract(new DateTime(1970, 1, 1)).TotalSeconds;
357373
var offsetTypeTimestamp = new OffsetTypeTimestamp(anHourAgo);
358374
```
375+
359376
### Track Offset
360377

361378
The server can store the current delivered offset given a consumer with `StoreOffset` in this way:
@@ -374,8 +391,7 @@ var consumer = await system.CreateConsumer(
374391
}
375392
```
376393

377-
Note: </b>
378-
**Avoid** to store the offset for each single message, it can reduce the performances.
394+
Note: **Avoid** storing the offset for every single message, it can reduce performance.
379395

380396
It is possible to retrieve the offset with `QueryOffset`:
381397

@@ -389,9 +405,10 @@ var consumer = await system.CreateConsumer(
389405
OffsetSpec = new OffsetTypeOffset(trackedOffset),
390406
```
391407

392-
OBS. if don't have stored an offset for the consumer's reference on the stream you get an OffsetNotFoundException exception.
408+
Note: if you try to store an offset that doesn't exist yet for the consumer's reference on the stream you get will get an `OffsetNotFoundException` exception.
393409

394410
### Handle Close
411+
395412
Producers/Consumers raise and event when the client is disconnected:
396413

397414
```csharp
@@ -405,8 +422,10 @@ Producers/Consumers raise and event when the client is disconnected:
405422
```
406423

407424
### Handle Metadata Update
425+
408426
Stream metadata update is raised when the stream topology changes or the stream is deleted.
409427
You can use `MetadataHandler` to handle it:
428+
410429
```csharp
411430
new ProducerConfig/ConsumerConfig
412431
{
@@ -420,49 +439,60 @@ You can use `MetadataHandler` to handle it:
420439
### Heartbeat
421440

422441
It is possible to configure the heartbeat using:
442+
423443
```csharp
424444
var config = new StreamSystemConfig()
425445
{
426446
Heartbeat = TimeSpan.FromSeconds(30),
427447
}
428448
```
449+
429450
- `60` (`TimeSpan.FromSeconds(60)`) seconds is the default value
430451
- `0` (`TimeSpan.FromSeconds(0)`) will advise server to disable heartbeat
431452

432453
Heartbeat value shouldn't be too low.
433454

434455
### Reliable
435456
- Reliable Producer
436-
- Reliable Consumer </p>
437-
See the directory [Examples/Reliable](./Examples/Reliable)
457+
- Reliable Consumer
458+
459+
See the directory [Examples/Reliable](./Examples/Reliable) for code examples.
438460

439461

440462
### Reliable Producer
441-
Reliable Producer is a smart layer built up of the standard `Producer`. </b>
442-
The idea is to leave the user decides what to use, the standard or reliable producer. </b>
463+
464+
Reliable Producer is a smart layer built up of the standard `Producer`.
465+
466+
The idea is to give the user ability to choose between the standard or reliable producer.
443467

444468
The main features are:
469+
445470
- Provide publishingID automatically
446471
- Auto-Reconnect in case of disconnection
447472
- Trace sent and received messages
448473
- Invalidate messages
449474
- [Handle the metadata Update](#reliable-handle-metadata-update)
450475

451476
#### Provide publishingID automatically
452-
Reliable Producer retrieves the last publishingID given the producer name. </b>
453-
Zero(0) is the default value in case there is no a publishingID.
477+
478+
Reliable Producer retrieves the last publishingID given the producer name.
479+
480+
Zero(0) is the default value in case there is no publishingID for given producer reference.
454481

455482
#### Auto-Reconnect
483+
456484
Reliable Producer restores the TCP connection in case the Producer is disconnected for some reason.
457485
During the reconnection it continues to store the messages in a local-list.
458486
The user will receive back the confirmed or un-confirmed messages.
459487
See [Reconnection Strategy](#reconnection-strategy)
460488

461489
#### Trace sent and received messages
462-
Reliable Producer keeps in memory each sent message and remove from the memory when the message is confirmed or goes in timout.
490+
491+
Reliable Producer keeps in memory each sent message and removes it from the memory when the message is confirmed or times out.
463492
`ConfirmationHandler` receives the messages back with the status.
464493
`confirmation.Status` can have different values, but in general `ConfirmationStatus.Confirmed` means the messages
465-
is stored on the server other status means that there was a problem with the message/messages.
494+
is stored on the server. Other statuses mean that there was a problem with the message/messages under given publishing id.
495+
466496
```csharp
467497
ConfirmationHandler = confirmation =>
468498
{
@@ -477,17 +507,34 @@ ConfirmationHandler = confirmation =>
477507
}
478508
}
479509
```
510+
511+
#### Currently defined confirmation statuses
512+
513+
| Status | Description | Source |
514+
|-----------------------|--------------------------------------------------------------------------------------------------------------|--------|
515+
| Confirmed | Message has been confirmed by the server and written to disk. | Server |
516+
| ClientTimeoutError | Client gave up waiting for the message (read more [here](#invalidate-messages)). | Client |
517+
| StreamNotAvailable | Stream was deleted or otherwise become unavailable. | Server |
518+
| InternalError | | Server |
519+
| AccessRefused | Provided credentials are invalid or you lack permissions for specific vhost/etc. | Server |
520+
| PreconditionFailed | Catch-all for validation on server (eg. requested to create stream with different parameters but same name). | Server |
521+
| PublisherDoesNotExist | | Server |
522+
| UndefinedError | Catch-all for any new status that is not yet handled in the library. | Server |
523+
480524
#### Invalidate messages
481-
If the client doesn't receive a confirmation within 2 seconds Reliable Producer removes the message from the internal messages cache.
482-
The user will receive `ConfirmationStatus.TimeoutError` in the `ConfirmationHandler`.
525+
526+
If the client doesn't receive a confirmation within configured timeout (3 seconds by default), Reliable Producer removes the message from the internal messages cache.
527+
The user will receive `ConfirmationStatus.ClientTimeoutError` in the `ConfirmationHandler`.
483528

484529
#### Send API
485530
Reliable Producer implements two `send(..)`
531+
486532
- `Send(Message message)` // standard
487533
- `Send(List<Message> messages, CompressionType compressionType)` //sub-batching with compression
488534
489535

490536
### Reliable Consumer
537+
491538
Reliable Consumer is a smart layer built up of the standard `Consumer`. </b>
492539
The idea is to leave the user decides what to use, the standard or reliable Consumer. </b>
493540

@@ -497,17 +544,21 @@ The main features are:
497544
- [Handle the metadata Update](#reliable-handle-metadata-update)
498545

499546
#### Auto-Reconnect
547+
500548
Reliable Consumer restores the TCP connection in case the Producer is disconnected for some reason.
501549
Reliable Consumer will restart consuming from the last offset stored.
502550
See [Reconnection Strategy](#reconnection-strategy)
503551

504552
### Reconnection Strategy
553+
505554
By default Reliable Producer/Consumer uses an `BackOffReconnectStrategy` to reconnect the client.
506555
You can customize the behaviour implementing the `IReconnectStrategy` interface:
556+
507557
```csharp
508558
bool WhenDisconnected(string connectionInfo);
509559
void WhenConnected(string connectionInfo);
510560
```
561+
511562
If `WhenDisconnected` return is `true` Producer/Consumer will be reconnected else closed.
512563
`connectionInfo` add information about the connection.
513564

@@ -519,8 +570,8 @@ var p = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig
519570
ReconnectStrategy = MyReconnectStrategy
520571
```
521572

522-
523573
### Reliable handle metadata update
574+
524575
If the streams changes the topology (ex:Stream deleted or add/remove follower), the client receives an `MetadataUpdate` event.
525576
Reliable Producer detects the event and tries to reconnect the producer if the stream still exist else closes the producer/consumer.
526577

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -611,18 +611,18 @@ RabbitMQ.Stream.Client.QueryPublisherResponse.Write(System.Span<byte> span) -> i
611611
RabbitMQ.Stream.Client.Reliable.ConfirmationPipe
612612
RabbitMQ.Stream.Client.Reliable.ConfirmationPipe.AddUnConfirmedMessage(ulong publishingId, RabbitMQ.Stream.Client.Message message) -> void
613613
RabbitMQ.Stream.Client.Reliable.ConfirmationPipe.AddUnConfirmedMessage(ulong publishingId, System.Collections.Generic.List<RabbitMQ.Stream.Client.Message> messages) -> void
614-
RabbitMQ.Stream.Client.Reliable.ConfirmationPipe.ConfirmationPipe(System.Func<RabbitMQ.Stream.Client.Reliable.MessagesConfirmation, System.Threading.Tasks.Task> confirmHandler) -> void
614+
RabbitMQ.Stream.Client.Reliable.ConfirmationPipe.ConfirmationPipe(System.Func<RabbitMQ.Stream.Client.Reliable.MessagesConfirmation, System.Threading.Tasks.Task> confirmHandler, System.TimeSpan messageTimeout) -> void
615615
RabbitMQ.Stream.Client.Reliable.ConfirmationPipe.RemoveUnConfirmedMessage(ulong publishingId, RabbitMQ.Stream.Client.Reliable.ConfirmationStatus confirmationStatus) -> System.Threading.Tasks.Task
616616
RabbitMQ.Stream.Client.Reliable.ConfirmationPipe.Start() -> void
617617
RabbitMQ.Stream.Client.Reliable.ConfirmationPipe.Stop() -> void
618618
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
619619
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.AccessRefused = 16 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
620+
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.ClientTimeoutError = 2 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
620621
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.Confirmed = 1 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
621622
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.InternalError = 15 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
622623
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.PreconditionFailed = 17 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
623624
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.PublisherDoesNotExist = 18 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
624625
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.StreamNotAvailable = 6 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
625-
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.TimeoutError = 2 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
626626
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.UndefinedError = 200 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
627627
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.WaitForConfirmation = 0 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
628628
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
@@ -674,6 +674,8 @@ RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.Stream.get -> string
674674
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.Stream.set -> void
675675
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.StreamSystem.get -> RabbitMQ.Stream.Client.StreamSystem
676676
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.StreamSystem.set -> void
677+
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.TimeoutMessageAfter.get -> System.TimeSpan
678+
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.TimeoutMessageAfter.init -> void
677679
RabbitMQ.Stream.Client.ResponseCode
678680
RabbitMQ.Stream.Client.ResponseCode.AccessRefused = 16 -> RabbitMQ.Stream.Client.ResponseCode
679681
RabbitMQ.Stream.Client.ResponseCode.AuthenticationFailure = 8 -> RabbitMQ.Stream.Client.ResponseCode

RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,22 @@ namespace RabbitMQ.Stream.Client.Reliable;
1919
public enum ConfirmationStatus : ushort
2020
{
2121
WaitForConfirmation = 0,
22+
/// <summary>
23+
/// Message was confirmed to be received and stored by server.
24+
/// </summary>
2225
Confirmed = 1,
23-
TimeoutError = 2,
26+
/// <summary>
27+
/// Client gave up on waiting for this publishing id.
28+
/// </summary>
29+
ClientTimeoutError = 2,
30+
/// <summary>
31+
/// Stream is not available anymore (it was deleted).
32+
/// </summary>
2433
StreamNotAvailable = 6,
2534
InternalError = 15,
35+
/// <summary>
36+
/// Signals either bad credentials, or insufficient permissions.
37+
/// </summary>
2638
AccessRefused = 16,
2739
PreconditionFailed = 17,
2840
PublisherDoesNotExist = 18,
@@ -53,10 +65,12 @@ public class ConfirmationPipe
5365
private readonly ConcurrentDictionary<ulong, MessagesConfirmation> _waitForConfirmation = new();
5466
private readonly Timer _invalidateTimer = new();
5567
private Func<MessagesConfirmation, Task> ConfirmHandler { get; }
68+
private readonly TimeSpan _messageTimeout;
5669

57-
public ConfirmationPipe(Func<MessagesConfirmation, Task> confirmHandler)
70+
public ConfirmationPipe(Func<MessagesConfirmation, Task> confirmHandler, TimeSpan messageTimeout)
5871
{
5972
ConfirmHandler = confirmHandler;
73+
_messageTimeout = messageTimeout;
6074
}
6175

6276
public void Start()
@@ -82,7 +96,7 @@ public void Start()
8296
});
8397

8498
_invalidateTimer.Elapsed += OnTimedEvent;
85-
_invalidateTimer.Interval = 2000;
99+
_invalidateTimer.Interval = _messageTimeout.TotalMilliseconds;
86100
_invalidateTimer.Enabled = true;
87101
}
88102

@@ -94,24 +108,24 @@ public void Stop()
94108

95109
private async void OnTimedEvent(object sender, ElapsedEventArgs e)
96110
{
111+
var timedOutMessages = _waitForConfirmation.Where(pair =>
112+
(DateTime.Now - pair.Value.InsertDateTime).TotalSeconds > _messageTimeout.TotalSeconds);
113+
114+
foreach (var pair in timedOutMessages)
97115
{
98-
foreach (var pair in _waitForConfirmation.Where(pair =>
99-
(DateTime.Now - pair.Value.InsertDateTime).Seconds > 2))
100-
{
101-
await RemoveUnConfirmedMessage(pair.Value.PublishingId, ConfirmationStatus.TimeoutError);
102-
}
116+
await RemoveUnConfirmedMessage(pair.Value.PublishingId, ConfirmationStatus.ClientTimeoutError);
103117
}
104118
}
105119

106120
public void AddUnConfirmedMessage(ulong publishingId, Message message)
107121
{
108-
AddUnConfirmedMessage(publishingId, new List<Message>() { message });
122+
AddUnConfirmedMessage(publishingId, new List<Message> { message });
109123
}
110124

111125
public void AddUnConfirmedMessage(ulong publishingId, List<Message> messages)
112126
{
113127
_waitForConfirmation.TryAdd(publishingId,
114-
new MessagesConfirmation()
128+
new MessagesConfirmation
115129
{
116130
Messages = messages,
117131
PublishingId = publishingId,

0 commit comments

Comments
 (0)