@@ -21,28 +21,18 @@ private enum PauseStatus
2121 PAUSED ,
2222 }
2323
24- private readonly AmqpConnection _connection ;
25- private readonly string _address ;
26- private readonly MessageHandler _messageHandler ;
27- private readonly int _initialCredits ;
28- private readonly Map _filters ;
2924 private readonly Guid _id = Guid . NewGuid ( ) ;
3025
3126 private ReceiverLink ? _receiverLink ;
3227
3328 private PauseStatus _pauseStatus = PauseStatus . UNPAUSED ;
3429 private readonly UnsettledMessageCounter _unsettledMessageCounter = new ( ) ;
30+ private readonly ConsumerConfiguration _configuration ;
3531
36- public AmqpConsumer ( AmqpConnection connection , string address ,
37- MessageHandler messageHandler , int initialCredits , Map filters )
32+ public AmqpConsumer ( ConsumerConfiguration configuration )
3833 {
39- _connection = connection ;
40- _address = address ;
41- _messageHandler = messageHandler ;
42- _initialCredits = initialCredits ;
43- _filters = filters ;
44-
45- if ( false == _connection . Consumers . TryAdd ( _id , this ) )
34+ _configuration = configuration ;
35+ if ( false == _configuration . Connection . Consumers . TryAdd ( _id , this ) )
4636 {
4737 // TODO error?
4838 }
@@ -52,9 +42,20 @@ public override async Task OpenAsync()
5242 {
5343 try
5444 {
55- TaskCompletionSource < ReceiverLink > attachCompletedTcs = new ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
45+ TaskCompletionSource < ReceiverLink > attachCompletedTcs =
46+ new ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
47+
48+ // this is an event to get the filters to the listener context
49+ // it _must_ be here because in case of reconnect the original filters could be not valid anymore
50+ // so the function must be called every time the consumer is opened normally or by reconnection
51+ // if ListenerContext is null the function will do nothing
52+ // ListenerContext will override only the filters the selected filters.
53+ _configuration . ListenerContext ? . Invoke (
54+ new IConsumerBuilder . ListenerContext ( new ListenerStreamOptions ( _configuration . Filters ) ) ) ;
5655
57- Attach attach = Utils . CreateAttach ( _address , DeliveryMode . AtLeastOnce , _id , _filters ) ;
56+
57+ Attach attach = Utils . CreateAttach ( _configuration . Address , DeliveryMode . AtLeastOnce , _id ,
58+ _configuration . Filters ) ;
5859
5960 void onAttached ( ILink argLink , Attach argAttach )
6061 {
@@ -74,7 +75,7 @@ void onAttached(ILink argLink, Attach argAttach)
7475 ReceiverLink ? tmpReceiverLink = null ;
7576 Task receiverLinkTask = Task . Run ( async ( ) =>
7677 {
77- Session session = await _connection . _nativePubSubSessions . GetOrCreateSessionAsync ( )
78+ Session session = await _configuration . Connection . _nativePubSubSessions . GetOrCreateSessionAsync ( )
7879 . ConfigureAwait ( false ) ;
7980 tmpReceiverLink = new ReceiverLink ( session , _id . ToString ( ) , attach , onAttached ) ;
8081 } ) ;
@@ -89,7 +90,7 @@ await receiverLinkTask.WaitAsync(waitSpan)
8990 . ConfigureAwait ( false ) ;
9091
9192 System . Diagnostics . Debug . Assert ( tmpReceiverLink != null ) ;
92- System . Diagnostics . Debug . Assert ( Object . ReferenceEquals ( _receiverLink , tmpReceiverLink ) ) ;
93+ System . Diagnostics . Debug . Assert ( object . ReferenceEquals ( _receiverLink , tmpReceiverLink ) ) ;
9394
9495 if ( _receiverLink is null )
9596 {
@@ -103,7 +104,7 @@ await receiverLinkTask.WaitAsync(waitSpan)
103104 }
104105 else
105106 {
106- _receiverLink . SetCredit ( _initialCredits ) ;
107+ _receiverLink . SetCredit ( _configuration . InitialCredits ) ;
107108
108109 // TODO save / cancel task
109110 _ = Task . Run ( ProcessMessages ) ;
@@ -150,7 +151,10 @@ private async Task ProcessMessages()
150151
151152 // TODO catch exceptions thrown by handlers,
152153 // then call exception handler?
153- await _messageHandler ( context , amqpMessage ) . ConfigureAwait ( false ) ;
154+ if ( _configuration . Handler != null )
155+ {
156+ await _configuration . Handler ( context , amqpMessage ) . ConfigureAwait ( false ) ;
157+ }
154158 }
155159 }
156160 catch ( Exception e )
@@ -173,20 +177,24 @@ public void Pause()
173177 if ( _receiverLink is null )
174178 {
175179 // TODO create "internal bug" exception type?
176- throw new InvalidOperationException ( "_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues" ) ;
180+ throw new InvalidOperationException (
181+ "_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues" ) ;
177182 }
178183
179- if ( ( int ) PauseStatus . UNPAUSED == Interlocked . CompareExchange ( ref Unsafe . As < PauseStatus , int > ( ref _pauseStatus ) ,
180- ( int ) PauseStatus . PAUSING , ( int ) PauseStatus . UNPAUSED ) )
184+ if ( ( int ) PauseStatus . UNPAUSED == Interlocked . CompareExchange (
185+ ref Unsafe . As < PauseStatus , int > ( ref _pauseStatus ) ,
186+ ( int ) PauseStatus . PAUSING , ( int ) PauseStatus . UNPAUSED ) )
181187 {
182188 _receiverLink . SetCredit ( credit : 0 ) ;
183189
184- if ( ( int ) PauseStatus . PAUSING != Interlocked . CompareExchange ( ref Unsafe . As < PauseStatus , int > ( ref _pauseStatus ) ,
185- ( int ) PauseStatus . PAUSED , ( int ) PauseStatus . PAUSING ) )
190+ if ( ( int ) PauseStatus . PAUSING != Interlocked . CompareExchange (
191+ ref Unsafe . As < PauseStatus , int > ( ref _pauseStatus ) ,
192+ ( int ) PauseStatus . PAUSED , ( int ) PauseStatus . PAUSING ) )
186193 {
187194 _pauseStatus = PauseStatus . UNPAUSED ;
188195 // TODO create "internal bug" exception type?
189- throw new InvalidOperationException ( "error transitioning from PAUSING -> PAUSED, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues" ) ;
196+ throw new InvalidOperationException (
197+ "error transitioning from PAUSING -> PAUSED, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues" ) ;
190198 }
191199 }
192200 else
@@ -197,24 +205,23 @@ public void Pause()
197205
198206 public long UnsettledMessageCount
199207 {
200- get
201- {
202- return _unsettledMessageCounter . Get ( ) ;
203- }
208+ get { return _unsettledMessageCounter . Get ( ) ; }
204209 }
205210
206211 public void Unpause ( )
207212 {
208213 if ( _receiverLink is null )
209214 {
210215 // TODO create "internal bug" exception type?
211- throw new InvalidOperationException ( "_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues" ) ;
216+ throw new InvalidOperationException (
217+ "_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues" ) ;
212218 }
213219
214- if ( ( int ) PauseStatus . PAUSED == Interlocked . CompareExchange ( ref Unsafe . As < PauseStatus , int > ( ref _pauseStatus ) ,
215- ( int ) PauseStatus . UNPAUSED , ( int ) PauseStatus . PAUSED ) )
220+ if ( ( int ) PauseStatus . PAUSED == Interlocked . CompareExchange (
221+ ref Unsafe . As < PauseStatus , int > ( ref _pauseStatus ) ,
222+ ( int ) PauseStatus . UNPAUSED , ( int ) PauseStatus . PAUSED ) )
216223 {
217- _receiverLink . SetCredit ( credit : _initialCredits ) ;
224+ _receiverLink . SetCredit ( credit : _configuration . InitialCredits ) ;
218225 }
219226 else
220227 {
@@ -240,19 +247,20 @@ await _receiverLink.CloseAsync(TimeSpan.FromSeconds(5))
240247 }
241248 catch ( Exception ex )
242249 {
243- Trace . WriteLine ( TraceLevel . Warning , "Failed to close receiver link. The consumer will be closed anyway" , ex ) ;
250+ Trace . WriteLine ( TraceLevel . Warning , "Failed to close receiver link. The consumer will be closed anyway" ,
251+ ex ) ;
244252 }
245253
246254 _receiverLink = null ;
247255 OnNewStatus ( State . Closed , null ) ;
248- _connection . Consumers . TryRemove ( _id , out _ ) ;
256+ _configuration . Connection . Consumers . TryRemove ( _id , out _ ) ;
249257 }
250258
251259 public override string ToString ( )
252260 {
253- return $ "Consumer{{Address='{ _address } ', " +
261+ return $ "Consumer{{Address='{ _configuration . Address } ', " +
254262 $ "id={ _id } , " +
255- $ "Connection='{ _connection } ', " +
263+ $ "Connection='{ _configuration . Connection } ', " +
256264 $ "State='{ State } '}}";
257265 }
258266 }
0 commit comments