@@ -39,7 +39,6 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
3939 private const string ConnectionNotRecoveredMessage = "Connection not recovered" ;
4040 private readonly SemaphoreSlim _semaphoreClose = new ( 1 , 1 ) ;
4141
42-
4342 // The native AMQP.Net Lite connection
4443 private Connection ? _nativeConnection ;
4544
@@ -71,7 +70,6 @@ private void ChangeConsumersStatus(State state, Error? error)
7170 }
7271 }
7372
74-
7573 private async Task ReconnectEntities ( )
7674 {
7775 await ReconnectPublishers ( ) . ConfigureAwait ( false ) ;
@@ -102,7 +100,6 @@ private async Task ReconnectConsumers()
102100 // TODO: Implement the semaphore to avoid multiple connections
103101 // private readonly SemaphoreSlim _semaphore = new(1, 1);
104102
105-
106103 /// <summary>
107104 /// Publishers contains all the publishers created by the connection.
108105 /// Each connection can have multiple publishers.
@@ -113,7 +110,6 @@ private async Task ReconnectConsumers()
113110
114111 internal ConcurrentDictionary < string , IConsumer > Consumers { get ; } = new ( ) ;
115112
116-
117113 public ReadOnlyCollection < IPublisher > GetPublishers ( )
118114 {
119115 return Publishers . Values . ToList ( ) . AsReadOnly ( ) ;
@@ -179,14 +175,18 @@ public IConsumerBuilder ConsumerBuilder()
179175 return new AmqpConsumerBuilder ( this ) ;
180176 }
181177
182- protected override Task OpenAsync ( )
178+ protected override async Task OpenAsync ( )
183179 {
184- EnsureConnection ( ) ;
185- return base . OpenAsync ( ) ;
180+ await EnsureConnection ( )
181+ . ConfigureAwait ( false ) ;
182+ await base . OpenAsync ( )
183+ . ConfigureAwait ( false ) ;
186184 }
187185
188- private void EnsureConnection ( )
186+ private async Task EnsureConnection ( )
189187 {
188+ // TODO: do this!
189+ // await _semaphore.WaitAsync();
190190 try
191191 {
192192 if ( _nativeConnection is { IsClosed : false } )
@@ -196,22 +196,53 @@ private void EnsureConnection()
196196
197197 var open = new Open
198198 {
199- HostName = $ "vhost:{ _connectionSettings . VirtualHost ( ) } ",
199+ HostName = $ "vhost:{ _connectionSettings . VirtualHost } ",
200200 Properties = new Fields ( )
201201 {
202- [ new Symbol ( "connection_name" ) ] = _connectionSettings . ConnectionName ( ) ,
202+ [ new Symbol ( "connection_name" ) ] = _connectionSettings . ConnectionName ,
203203 }
204204 } ;
205205
206- var manualReset = new ManualResetEvent ( false ) ;
207- _nativeConnection = new Connection ( _connectionSettings . Address , null , open , ( connection , open1 ) =>
206+ void onOpened ( Amqp . IConnection connection , Open open1 )
208207 {
209- manualReset . Set ( ) ;
210208 Trace . WriteLine ( TraceLevel . Verbose , $ "Connection opened. Info: { ToString ( ) } ") ;
211209 OnNewStatus ( State . Open , null ) ;
212- } ) ;
210+ }
211+
212+ var cf = new ConnectionFactory ( ) ;
213+
214+ if ( _connectionSettings . UseSsl && _connectionSettings . TlsSettings is not null )
215+ {
216+ cf . SSL . Protocols = _connectionSettings . TlsSettings . Protocols ;
217+ cf . SSL . CheckCertificateRevocation = _connectionSettings . TlsSettings . CheckCertificateRevocation ;
218+
219+ if ( _connectionSettings . TlsSettings . ClientCertificates . Count > 0 )
220+ {
221+ cf . SSL . ClientCertificates = _connectionSettings . TlsSettings . ClientCertificates ;
222+ }
223+
224+ if ( _connectionSettings . TlsSettings . LocalCertificateSelectionCallback is not null )
225+ {
226+ cf . SSL . LocalCertificateSelectionCallback = _connectionSettings . TlsSettings . LocalCertificateSelectionCallback ;
227+ }
228+
229+ if ( _connectionSettings . TlsSettings . RemoteCertificateValidationCallback is not null )
230+ {
231+ cf . SSL . RemoteCertificateValidationCallback = _connectionSettings . TlsSettings . RemoteCertificateValidationCallback ;
232+ }
233+ }
234+
235+ try
236+ {
237+ _nativeConnection = await cf . CreateAsync ( _connectionSettings . Address , open : open , onOpened : onOpened )
238+ . ConfigureAwait ( false ) ;
239+ }
240+ catch ( Exception ex )
241+ {
242+ throw new ConnectionException (
243+ $ "Connection failed. Info: { ToString ( ) } ", ex ) ;
244+ }
213245
214- manualReset . WaitOne ( TimeSpan . FromSeconds ( 5 ) ) ;
215246 if ( _nativeConnection . IsClosed )
216247 {
217248 throw new ConnectionException (
@@ -294,7 +325,8 @@ await Task.Run(async () =>
294325 await Task . Delay ( TimeSpan . FromMilliseconds ( next ) )
295326 . ConfigureAwait ( false ) ;
296327
297- EnsureConnection ( ) ;
328+ await EnsureConnection ( )
329+ . ConfigureAwait ( false ) ;
298330 connected = true ;
299331 }
300332 catch ( Exception e )
0 commit comments