Skip to content

Commit d9333f0

Browse files
committed
Revert "Change IHostnameSelector to IEndpointSelector"
This reverts commit 5cb449e.
1 parent 5cb449e commit d9333f0

File tree

8 files changed

+134
-176
lines changed

8 files changed

+134
-176
lines changed

projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,13 @@ public class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
157157
/// </summary>
158158
public bool AutomaticRecoveryEnabled { get; set; }
159159

160+
/// <summary>
161+
/// Used to select next hostname to try when performing
162+
/// connection recovery (re-connecting). Is not used for
163+
/// non-recovering connections.
164+
/// </summary>
165+
public IHostnameSelector HostnameSelector { get; set; } = new RandomHostnameSelector();
166+
160167
/// <summary>The host to connect to.</summary>
161168
public string HostName { get; set; } = "localhost";
162169

@@ -392,7 +399,7 @@ public IConnection CreateConnection(IList<string> hostnames)
392399
/// </exception>
393400
public IConnection CreateConnection(IList<string> hostnames, String clientProvidedName)
394401
{
395-
return CreateConnection(new DefaultEndpointSelector(hostnames.Select(Endpoint.CloneWithHostname).ToList()), clientProvidedName);
402+
return CreateConnection(hostnames.Select(Endpoint.CloneWithHostname).ToList(), clientProvidedName);
396403
}
397404

398405
/// <summary>
@@ -410,7 +417,7 @@ public IConnection CreateConnection(IList<string> hostnames, String clientProvid
410417
/// </exception>
411418
public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints)
412419
{
413-
return CreateConnection(new DefaultEndpointSelector(endpoints), null);
420+
return CreateConnection(endpoints, null);
414421
}
415422

416423
/// <summary>
@@ -432,28 +439,32 @@ public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints)
432439
/// <exception cref="BrokerUnreachableException">
433440
/// When no hostname was reachable.
434441
/// </exception>
435-
public IConnection CreateConnection(IEndpointSelector endpoints, String clientProvidedName)
442+
public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints, String clientProvidedName)
436443
{
444+
var eps = endpoints.ToList();
437445
IConnection conn;
438446
try
439447
{
440448
if (AutomaticRecoveryEnabled)
441449
{
442450
var autorecoveringConnection = new AutorecoveringConnection(this, clientProvidedName);
443-
autorecoveringConnection.Init(endpoints);
451+
autorecoveringConnection.Init(eps);
444452
conn = autorecoveringConnection;
445453
}
446454
else
447455
{
448456
IProtocol protocol = Protocols.DefaultProtocol;
449-
conn = protocol.CreateConnection(this, false, endpoints.SelectOne(this.CreateFrameHandler), clientProvidedName);
457+
//We can't make this more elegant without changing the contract of the IHostnameSelector
458+
//if there are endpoints with the same hostname but different ports the first match is selected
459+
var selectedHost = HostnameSelector.NextFrom(eps.Select(ep => ep.HostName).ToList());
460+
var selectedEndpoint = eps.First(ep => ep.HostName == selectedHost);
461+
conn = protocol.CreateConnection(this, false, CreateFrameHandler(selectedEndpoint), clientProvidedName);
450462
}
451463
}
452464
catch (Exception e)
453465
{
454466
throw new BrokerUnreachableException(e);
455467
}
456-
457468
return conn;
458469
}
459470

projects/client/RabbitMQ.Client/src/client/api/DefaultEndpointSelector.cs

Lines changed: 0 additions & 93 deletions
This file was deleted.

projects/client/RabbitMQ.Client/src/client/api/IEndpointSelector.cs renamed to projects/client/RabbitMQ.Client/src/client/api/IHostnameSelector.cs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,18 @@
3939
//---------------------------------------------------------------------------
4040

4141
using System;
42-
using System.Linq;
4342
using System.Collections.Generic;
44-
using RabbitMQ.Client.Impl;
45-
using RabbitMQ.Client.Framing.Impl;
4643

4744
namespace RabbitMQ.Client
4845
{
49-
public interface IEndpointSelector
46+
public interface IHostnameSelector
5047
{
51-
/// Selector can either throw or return null to indicate not found
52-
T SelectOne<T>(Func<AmqpTcpEndpoint, T> selector);
53-
AmqpTcpEndpoint Current { get; }
48+
/// <summary>
49+
/// Picks a hostname from a list of options that should be used
50+
/// by <see cref="IConnectionFactory"/>.
51+
/// </summary>
52+
/// <param name="options"></param>
53+
/// <returns></returns>
54+
string NextFrom(IList<string> options);
5455
}
55-
}
56+
}

projects/client/RabbitMQ.Client/src/client/api/IProtocol.cs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -108,22 +108,22 @@ public interface IProtocol
108108
/// </summary>
109109
IConnection CreateConnection(ConnectionFactory factory, IFrameHandler frameHandler, bool automaticRecoveryEnabled, String clientProvidedName);
110110

111-
// /// <summary>
112-
// /// Construct a frame handler for a given endpoint.
113-
// /// </summary>
114-
// /// <param name="socketFactory">Socket factory method.</param>
115-
// /// <param name="connectionTimeout">Timeout in milliseconds.</param>
116-
// /// <param name="endpoint">Represents a TCP-addressable AMQP peer: a host name and port number.</param>
117-
// IFrameHandler CreateFrameHandler(
118-
// AmqpTcpEndpoint endpoint,
119-
// #if !NETFX_CORE
120-
// Func<AddressFamily, ITcpClient> socketFactory,
121-
// #else
122-
// Func<StreamSocket> socketFactory,
123-
// #endif
124-
// int connectionTimeout,
125-
// int readTimeout,
126-
// int writeTimeout);
111+
/// <summary>
112+
/// Construct a frame handler for a given endpoint.
113+
/// </summary>
114+
/// <param name="socketFactory">Socket factory method.</param>
115+
/// <param name="connectionTimeout">Timeout in milliseconds.</param>
116+
/// <param name="endpoint">Represents a TCP-addressable AMQP peer: a host name and port number.</param>
117+
IFrameHandler CreateFrameHandler(
118+
AmqpTcpEndpoint endpoint,
119+
#if !NETFX_CORE
120+
Func<AddressFamily, ITcpClient> socketFactory,
121+
#else
122+
Func<StreamSocket> socketFactory,
123+
#endif
124+
int connectionTimeout,
125+
int readTimeout,
126+
int writeTimeout);
127127
/// <summary>
128128
/// Construct a protocol model atop a given session.
129129
/// </summary>

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,12 @@ public class AutorecoveringConnection : IConnection, IRecoverable
5656
protected Connection m_delegate;
5757
protected ConnectionFactory m_factory;
5858

59+
//retained for compatibility
60+
protected IList<string> hostnames;
5961
// list of endpoints provided on initial connection.
6062
// on re-connection, the next host in the line is chosen using
6163
// IHostnameSelector
62-
private IEndpointSelector endpoints;
64+
private IList<AmqpTcpEndpoint> endpoints;
6365

6466
public readonly object m_recordedEntitiesLock = new object();
6567
protected readonly TaskFactory recoveryTaskFactory = new TaskFactory();
@@ -536,7 +538,7 @@ public void RecordQueue(string name, RecordedQueue q)
536538

537539
public override string ToString()
538540
{
539-
return string.Format("AutorecoveringConnection({0},{1},{2})", m_delegate.Id, Endpoint, GetHashCode());
541+
return string.Format("AutorecoveringConnection({0},{1},{2})", m_delegate.m_id, Endpoint, GetHashCode());
540542
}
541543

542544
public void UnregisterModel(AutorecoveringModel model)
@@ -549,25 +551,51 @@ public void UnregisterModel(AutorecoveringModel model)
549551

550552
public void Init()
551553
{
552-
this.Init(new DefaultEndpointSelector());
554+
this.Init(m_factory.HostName);
553555
}
554556

555557
public void Init(IList<string> hostnames)
556558
{
557-
this.Init(new DefaultEndpointSelector(hostnames.Select(m_factory.Endpoint.CloneWithHostname).ToList()));
559+
this.Init(hostnames.Select(m_factory.Endpoint.CloneWithHostname).ToList());
558560
}
559561

560-
public void Init(IEndpointSelector endpoints)
562+
public void Init(IList<AmqpTcpEndpoint> endpoints)
561563
{
562564
this.endpoints = endpoints;
563-
var fh = endpoints.SelectOne(m_factory.CreateFrameHandler);
564-
this.Init(fh);
565+
this.hostnames = endpoints.Select(ep => ep.HostName).ToList();
566+
AmqpTcpEndpoint reachableEndpoint = null;
567+
IFrameHandler fh = null;
568+
Exception e = null;
569+
foreach (var ep in endpoints)
570+
{
571+
try
572+
{
573+
fh = m_factory.CreateFrameHandler(ep);
574+
reachableEndpoint = ep;
575+
}
576+
catch (Exception caught)
577+
{
578+
e = caught;
579+
}
580+
}
581+
if (reachableEndpoint == null)
582+
{
583+
throw e;
584+
}
585+
586+
this.Init(reachableEndpoint);
587+
}
588+
589+
protected void Init(string hostname)
590+
{
591+
this.Init(m_factory.Endpoint.CloneWithHostname(hostname));
565592
}
566593

567-
private void Init(IFrameHandler fh)
594+
private void Init(AmqpTcpEndpoint endpoint)
568595
{
569596
m_delegate = new Connection(m_factory, false,
570-
fh, this.ClientProvidedName);
597+
m_factory.CreateFrameHandler(endpoint),
598+
this.ClientProvidedName);
571599

572600
AutorecoveringConnection self = this;
573601
EventHandler<ShutdownEventArgs> recoveryListener = (_, args) =>
@@ -782,7 +810,9 @@ protected void RecoverConnectionDelegate()
782810
{
783811
try
784812
{
785-
var fh = endpoints.SelectOne(m_factory.CreateFrameHandler);
813+
var nextHostname = m_factory.HostnameSelector.NextFrom(this.hostnames);
814+
var endpoint = this.endpoints.First((e) => e.HostName == nextHostname);
815+
var fh = m_factory.CreateFrameHandler(endpoint);
786816
m_delegate = new Connection(m_factory, false, fh, this.ClientProvidedName);
787817
recovering = false;
788818
}

0 commit comments

Comments
 (0)