66
77namespace RabbitMQ . AMQP . Client . Impl ;
88
9- public class AmqpPublisher : AbstractLifeCycle , IPublisher
9+ public class AmqpPublisher : AbstractReconnectLifeCycle , IPublisher
1010{
1111 private SenderLink ? _senderLink = null ;
1212
@@ -38,15 +38,15 @@ protected sealed override Task OpenAsync()
3838 attachCompleted . WaitOne ( TimeSpan . FromSeconds ( 5 ) ) ;
3939 if ( _senderLink . LinkState != LinkState . Attached )
4040 {
41- throw new PublisherException ( " Failed to create sender link. Link state is not attached, error: " +
41+ throw new PublisherException ( $ " { ToString ( ) } Failed to create sender link. Link state is not attached, error: " +
4242 _senderLink . Error ? . ToString ( ) ?? "Unknown error" ) ;
4343 }
4444
4545 return base . OpenAsync ( ) ;
4646 }
4747 catch ( Exception e )
4848 {
49- throw new PublisherException ( $ "Failed to create sender link, { e } ") ;
49+ throw new PublisherException ( $ "{ ToString ( ) } Failed to create sender link, { e } ") ;
5050 }
5151 }
5252
@@ -127,7 +127,7 @@ public Task Publish(IMessage message, OutcomeDescriptorCallback outcomeCallback)
127127 }
128128 else
129129 {
130- Trace . WriteLine ( TraceLevel . Error , " Message not sent. Killing the process.") ;
130+ Trace . WriteLine ( TraceLevel . Error , $ " { ToString ( ) } Message not sent. Killing the process.") ;
131131 Process . GetCurrentProcess ( ) . Kill ( ) ;
132132 }
133133
@@ -138,7 +138,7 @@ public Task Publish(IMessage message, OutcomeDescriptorCallback outcomeCallback)
138138 }
139139 catch ( Exception e )
140140 {
141- throw new PublisherException ( $ "Failed to publish message, { e } ") ;
141+ throw new PublisherException ( $ "{ ToString ( ) } Failed to publish message, { e } ") ;
142142 }
143143
144144 return Task . CompletedTask ;
@@ -151,6 +151,7 @@ public override async Task CloseAsync()
151151 {
152152 return ;
153153 }
154+
154155 OnNewStatus ( State . Closing , null ) ;
155156 try
156157 {
@@ -173,22 +174,8 @@ await _senderLink.CloseAsync()
173174 }
174175
175176
176- internal void ChangeStatus ( State newState , Error ? error )
177- {
178- OnNewStatus ( newState , error ) ;
179- }
180-
181- internal async Task Reconnect ( )
177+ public override string ToString ( )
182178 {
183- int randomWait = Random . Shared . Next ( 200 , 800 ) ;
184- Trace . WriteLine ( TraceLevel . Information , $ "Publisher: { ToString ( ) } is reconnecting in { randomWait } ms") ;
185- await Task . Delay ( randomWait ) . ConfigureAwait ( false ) ;
186-
187- if ( _senderLink != null )
188- {
189- await _senderLink . DetachAsync ( ) . ConfigureAwait ( false ) ! ;
190- }
191-
192- await OpenAsync ( ) . ConfigureAwait ( false ) ;
179+ return $ "Publisher{{Address='{ _address } ', id={ Id } ConnectionName='{ _connection } ', State='{ State } '}}";
193180 }
194181}
0 commit comments