Skip to content

Commit ab9e4e7

Browse files
authored
Implement consumer part (#22)
* Consumer Implementation --- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 930385e commit ab9e4e7

27 files changed

+914
-143
lines changed

README.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@ You can find an example in: `docs/Examples/GettingStarted`
1515
## TODO
1616

1717
- [x] Declare queues
18-
- [ ] Declare exchanges
19-
- [ ] Declare bindings
18+
- [x] Declare exchanges
19+
- [x] Declare bindings
2020
- [x] Simple Publish messages
21-
- [x] Implement backpressure ( atm it is implemented with MaxInflightMessages `MaxInFlight(2000).`)
22-
- [ ] Simple Consume messages
21+
- [x] Implement backpressure (it is implemented with MaxInflightMessages `MaxInFlight(2000).`)
22+
- [x] Simple Consume messages
23+
- [ ] Complete the consumer part with `pause` and `unpause`
24+
- [ ] Complete the binding/unbinding with the special characters
25+
- [ ] Complete the queues/exchanges name with the special characters
2326
- [ ] Implement metrics ( See `System.Diagnostics.DiagnosticSource` [Link](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/metrics-instrumentation) )
2427
- [x] Recovery connection on connection lost
2528
- [x] Recovery management on connection lost

RabbitMQ.AMQP.Client/IClosable.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,17 @@ public override string ToString()
2222

2323
public interface IClosable // TODO: Create an abstract class with the event and the State property
2424
{
25-
public State State { get; }
26-
2725
Task CloseAsync();
2826

29-
public delegate void LifeCycleCallBack(object sender, State previousState, State currentState, Error? failureCause);
27+
}
28+
29+
public delegate void LifeCycleCallBack(object sender, State previousState, State currentState, Error? failureCause);
30+
31+
public interface IResourceStatus
32+
{
33+
public State State { get; }
34+
3035

3136
event LifeCycleCallBack ChangeState;
37+
3238
}

RabbitMQ.AMQP.Client/IConnection.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
1+
using System.Collections.ObjectModel;
2+
13
namespace RabbitMQ.AMQP.Client;
24

35
public class ConnectionException(string? message) : Exception(message);
46

5-
public interface IConnection
7+
public interface IConnection : IResourceStatus, IClosable
68
{
79
IManagement Management();
8-
Task ConnectAsync();
10+
11+
IPublisherBuilder PublisherBuilder();
12+
13+
IConsumerBuilder ConsumerBuilder();
14+
15+
16+
public ReadOnlyCollection<IPublisher> GetPublishers();
917
}

RabbitMQ.AMQP.Client/IConsumer.cs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
using Amqp.Listener;
2+
3+
namespace RabbitMQ.AMQP.Client;
4+
5+
6+
public class ConsumerException(string message) : Exception(message);
7+
public delegate void MessageHandler(IContext context, IMessage message);
8+
9+
public interface IConsumer : IResourceStatus, IClosable
10+
{
11+
void Pause();
12+
13+
long UnsettledMessageCount();
14+
15+
void Unpause();
16+
}
17+
18+
public interface IMessageHandler
19+
{
20+
void Handle(Context context, IMessage message);
21+
}
22+
23+
public interface IContext
24+
{
25+
void Accept();
26+
27+
void Discard();
28+
29+
void Requeue();
30+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
namespace RabbitMQ.AMQP.Client;
2+
3+
4+
public enum StreamOffsetSpecification
5+
{
6+
First,
7+
Last,
8+
Next
9+
}
10+
11+
public interface IConsumerBuilder
12+
{
13+
IConsumerBuilder Queue(string queue);
14+
15+
IConsumerBuilder MessageHandler(MessageHandler handler);
16+
17+
IConsumerBuilder InitialCredits(int initialCredits);
18+
19+
IStreamOptions Stream();
20+
21+
IConsumer Build();
22+
23+
public interface IStreamOptions
24+
{
25+
IStreamOptions Offset(long offset);
26+
27+
// IStreamOptions offset(Instant timestamp);
28+
29+
IStreamOptions Offset(StreamOffsetSpecification specification);
30+
31+
IStreamOptions Offset(string interval);
32+
33+
IStreamOptions FilterValues(string[] values);
34+
35+
IStreamOptions FilterMatchUnfiltered(bool matchUnfiltered);
36+
37+
IConsumerBuilder Builder();
38+
}
39+
40+
41+
}

RabbitMQ.AMQP.Client/IManagement.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ public class ModelException(string message) : Exception(message);
44

55
public class PreconditionFailedException(string message) : Exception(message);
66

7-
public interface IManagement : IClosable
7+
public interface IManagement : IResourceStatus, IClosable
88
{
99
IQueueSpecification Queue();
1010
IQueueSpecification Queue(string name);

RabbitMQ.AMQP.Client/IMessage.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ namespace RabbitMQ.AMQP.Client;
22

33
public interface IMessage
44
{
5+
// TODO: Complete the IMessage interface with all the properties
56
public object Body();
7+
68
// properties
79
string MessageId();
810
IMessage MessageId(string id);
@@ -16,4 +18,8 @@ public interface IMessage
1618
string Subject();
1719
IMessage Subject(string subject);
1820

21+
22+
public IMessage Annotation(string key, object value);
23+
24+
public object Annotation(string key);
1925
}

RabbitMQ.AMQP.Client/IPublisher.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public class OutcomeDescriptor(ulong code, string description, OutcomeState stat
1919

2020
public delegate void OutcomeDescriptorCallback(IMessage message, OutcomeDescriptor outcomeDescriptor);
2121

22-
public interface IPublisher : IClosable
22+
public interface IPublisher : IResourceStatus, IClosable
2323
{
2424
Task Publish(IMessage message,
2525
OutcomeDescriptorCallback outcomeCallback); // TODO: Add CancellationToken and callBack

RabbitMQ.AMQP.Client/Impl/AbstractClosable.cs renamed to RabbitMQ.AMQP.Client/Impl/AbstractResourceStatus.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@ namespace RabbitMQ.AMQP.Client.Impl;
22

33
public class AmqpClosedException(string message) : Exception(message);
44

5-
public abstract class AbstractClosable : IClosable
5+
public abstract class AbstractResourceStatus : IResourceStatus
66
{
77
public State State { get; internal set; } = State.Closed;
8-
public abstract Task CloseAsync();
98
protected void ThrowIfClosed()
109
{
1110
if (State == State.Closed)
@@ -27,5 +26,5 @@ protected void OnNewStatus(State newState, Error? error)
2726
ChangeState?.Invoke(this, oldStatus, newState, error);
2827
}
2928

30-
public event IClosable.LifeCycleCallBack? ChangeState;
29+
public event LifeCycleCallBack? ChangeState;
3130
}

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ await Management.Queue(spec).Declare()
3333
/// AmqpConnection is the concrete implementation of <see cref="IConnection"/>
3434
/// It is a wrapper around the AMQP.Net Lite <see cref="Connection"/> class
3535
/// </summary>
36-
public class AmqpConnection : AbstractClosable, IConnection
36+
public class AmqpConnection : AbstractResourceStatus, IConnection
3737
{
3838
private const string ConnectionNotRecoveredCode = "CONNECTION_NOT_RECOVERED";
3939
private const string ConnectionNotRecoveredMessage = "Connection not recovered";
@@ -45,7 +45,9 @@ public class AmqpConnection : AbstractClosable, IConnection
4545

4646
private readonly AmqpManagement _management = new();
4747
private readonly RecordingTopologyListener _recordingTopologyListener = new();
48-
private readonly TaskCompletionSource<bool> _connectionCloseTaskCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
48+
49+
private readonly TaskCompletionSource<bool> _connectionCloseTaskCompletionSource =
50+
new(TaskCreationOptions.RunContinuationsAsynchronously);
4951

5052
private readonly ConnectionSettings _connectionSettings;
5153
internal readonly AmqpSessionManagement NativePubSubSessions;
@@ -60,9 +62,12 @@ public class AmqpConnection : AbstractClosable, IConnection
6062
/// They key is the publisher Id ( a Guid)
6163
/// See <see cref="AmqpPublisher"/>
6264
/// </summary>
63-
internal ConcurrentDictionary<string, AmqpPublisher> Publishers { get; } = new();
65+
internal ConcurrentDictionary<string, IPublisher> Publishers { get; } = new();
66+
67+
internal ConcurrentDictionary<string, IConsumer> Consumers { get; } = new();
68+
6469

65-
public ReadOnlyCollection<AmqpPublisher> GetPublishers()
70+
public ReadOnlyCollection<IPublisher> GetPublishers()
6671
{
6772
return Publishers.Values.ToList().AsReadOnly();
6873
}
@@ -75,7 +80,7 @@ public ReadOnlyCollection<AmqpPublisher> GetPublishers()
7580
/// </summary>
7681
/// <param name="connectionSettings"></param>
7782
/// <returns></returns>
78-
public static async Task<AmqpConnection> CreateAsync(ConnectionSettings connectionSettings)
83+
public static async Task<IConnection> CreateAsync(ConnectionSettings connectionSettings)
7984
{
8085
var connection = new AmqpConnection(connectionSettings);
8186
await connection.ConnectAsync()
@@ -105,15 +110,26 @@ private void ResumeAllPublishers()
105110
/// </summary>
106111
private async Task CloseAllPublishers()
107112
{
108-
var cloned = new List<AmqpPublisher>(Publishers.Values);
113+
var cloned = new List<IPublisher>(Publishers.Values);
109114

110-
foreach (AmqpPublisher publisher in cloned)
115+
foreach (IPublisher publisher in cloned)
111116
{
112117
await publisher.CloseAsync()
113118
.ConfigureAwait(false);
114119
}
115120
}
116121

122+
private async Task CloseAllConsumers()
123+
{
124+
var cloned = new List<IConsumer>(Consumers.Values);
125+
126+
foreach (IConsumer consumer in cloned)
127+
{
128+
await consumer.CloseAsync()
129+
.ConfigureAwait(false);
130+
}
131+
}
132+
117133
private AmqpConnection(ConnectionSettings connectionSettings)
118134
{
119135
_connectionSettings = connectionSettings;
@@ -125,7 +141,12 @@ public IManagement Management()
125141
return _management;
126142
}
127143

128-
public Task ConnectAsync()
144+
public IConsumerBuilder ConsumerBuilder()
145+
{
146+
return new AmqpConsumerBuilder(this);
147+
}
148+
149+
private Task ConnectAsync()
129150
{
130151
EnsureConnection();
131152
OnNewStatus(State.Open, null);
@@ -288,7 +309,7 @@ await _recordingTopologyListener.Accept(visitor)
288309
{
289310
_semaphoreClose.Release();
290311
}
291-
312+
292313
_connectionCloseTaskCompletionSource.SetResult(true);
293314
};
294315
}
@@ -306,14 +327,14 @@ public IPublisherBuilder PublisherBuilder()
306327
}
307328

308329

309-
public override async Task CloseAsync()
330+
public async Task CloseAsync()
310331
{
311332
await _semaphoreClose.WaitAsync()
312333
.ConfigureAwait(false);
313334
try
314335
{
315-
await CloseAllPublishers()
316-
.ConfigureAwait(false);
336+
await CloseAllPublishers().ConfigureAwait(false);
337+
await CloseAllConsumers().ConfigureAwait(false);
317338

318339
_recordingTopologyListener.Clear();
319340

@@ -337,10 +358,10 @@ await _management.CloseAsync()
337358
{
338359
_semaphoreClose.Release();
339360
}
340-
361+
341362
await _connectionCloseTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(10))
342363
.ConfigureAwait(false);
343-
364+
344365
OnNewStatus(State.Closed, null);
345366
}
346367

0 commit comments

Comments
 (0)