1414
1515namespace RabbitMQ . AMQP . Client . Impl
1616{
17+ static class ConsumerDefaults
18+ {
19+ public const int AttachTimeoutSeconds = 5 ;
20+ public const int MessageReceiveTimeoutSeconds = 60 ;
21+ public const int CloseTimeoutSeconds = 5 ;
22+ public const int AttachDelayMilliseconds = 10 ;
23+ }
24+
1725 /// <summary>
1826 /// Implementation of <see cref="IConsumer"/>.
1927 /// </summary>
@@ -36,7 +44,8 @@ private enum PauseStatus
3644 private readonly ConsumerConfiguration _configuration ;
3745 private readonly IMetricsReporter ? _metricsReporter ;
3846
39- internal AmqpConsumer ( AmqpConnection amqpConnection , ConsumerConfiguration configuration , IMetricsReporter ? metricsReporter )
47+ internal AmqpConsumer ( AmqpConnection amqpConnection , ConsumerConfiguration configuration ,
48+ IMetricsReporter ? metricsReporter )
4049 {
4150 _amqpConnection = amqpConnection ;
4251 _configuration = configuration ;
@@ -87,12 +96,12 @@ void OnAttached(ILink argLink, Attach argAttach)
8796 var tmpReceiverLink = new ReceiverLink ( session , _id . ToString ( ) , attach , OnAttached ) ;
8897
8998 // TODO configurable timeout
90- var waitSpan = TimeSpan . FromSeconds ( 5 ) ;
99+ var waitSpan = TimeSpan . FromSeconds ( ConsumerDefaults . AttachTimeoutSeconds ) ;
91100
92101 // TODO
93102 // Even 10ms is enough to allow the links to establish,
94103 // which tells me it allows the .NET runtime to process
95- await Task . Delay ( 10 ) . ConfigureAwait ( false ) ;
104+ await Task . Delay ( ConsumerDefaults . AttachDelayMilliseconds ) . ConfigureAwait ( false ) ;
96105
97106 _receiverLink = await attachCompletedTcs . Task . WaitAsync ( waitSpan )
98107 . ConfigureAwait ( false ) ;
@@ -102,44 +111,40 @@ void OnAttached(ILink argLink, Attach argAttach)
102111 // TODO log this case?
103112 }
104113
105- if ( _receiverLink is null )
106- {
107- throw new ConsumerException ( $ "{ ToString ( ) } Failed to create receiver link (null was returned)") ;
108- }
109- else if ( _receiverLink . LinkState != LinkState . Attached )
110- {
111- throw new ConsumerException (
112- $ "{ ToString ( ) } Failed to create receiver link. Link state is not attached, error: " +
113- _receiverLink . Error ? . ToString ( ) ?? "Unknown error" ) ;
114- }
115- else
116- {
117- _receiverLink . SetCredit ( _configuration . InitialCredits ) ;
114+ ValidateReceiverLink ( ) ;
115+ _receiverLink . SetCredit ( _configuration . InitialCredits ) ;
118116
119- // TODO save / cancel task
120- _ = Task . Run ( ProcessMessages ) ;
117+ // TODO save / cancel task
118+ _ = Task . Run ( ProcessMessages ) ;
121119
122- // TODO cancellation token
123- await base . OpenAsync ( )
124- . ConfigureAwait ( false ) ;
125- }
120+ // TODO cancellation token
121+ await base . OpenAsync ( )
122+ . ConfigureAwait ( false ) ;
126123 }
127124 catch ( Exception e )
128125 {
129126 throw new ConsumerException ( $ "{ ToString ( ) } Failed to create receiver link, { e } ") ;
130127 }
131128 }
132129
130+ private void ValidateReceiverLink ( )
131+ {
132+ if ( _receiverLink is null )
133+ {
134+ throw new ConsumerException ( $ "{ ToString ( ) } Receiver link creation failed (null was returned)") ;
135+ }
136+
137+ if ( _receiverLink . LinkState != LinkState . Attached )
138+ {
139+ var errorMessage = _receiverLink . Error ? . ToString ( ) ?? "Unknown error" ;
140+ throw new ConsumerException ( $ "{ ToString ( ) } Receiver link not attached. Error: { errorMessage } ") ;
141+ }
142+ }
143+
133144 private async Task ProcessMessages ( )
134145 {
135146 try
136147 {
137- if ( _receiverLink is null )
138- {
139- // TODO is this a serious error?
140- return ;
141- }
142-
143148 Stopwatch ? stopwatch = null ;
144149 if ( _metricsReporter is not null )
145150 {
@@ -184,7 +189,6 @@ await _configuration.Handler(context, amqpMessage)
184189 stopwatch . Stop ( ) ;
185190 _metricsReporter . Consumed ( stopwatch . Elapsed ) ;
186191 }
187-
188192 }
189193 }
190194 catch ( Exception e )
0 commit comments