Skip to content

Commit 07e53bb

Browse files
Introduce ConnectionFactory methods that support host lists
1 parent 00ac512 commit 07e53bb

File tree

10 files changed

+234
-43
lines changed

10 files changed

+234
-43
lines changed

projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@
139139
<Compile Include="src\client\api\IConnection.cs" />
140140
<Compile Include="src\client\api\IConnectionFactory.cs" />
141141
<Compile Include="src\client\api\IContentHeader.cs" />
142+
<Compile Include="src\client\api\IHostnameSelector.cs" />
142143
<Compile Include="src\client\api\IMethod.cs" />
143144
<Compile Include="src\client\api\IModel.cs" />
144145
<Compile Include="src\client\api\IProtocol.cs" />
@@ -216,6 +217,7 @@
216217
<Compile Include="src\client\impl\ContentHeaderBase.cs" />
217218
<Compile Include="src\client\impl\ContentHeaderPropertyReader.cs" />
218219
<Compile Include="src\client\impl\ContentHeaderPropertyWriter.cs" />
220+
<Compile Include="src\client\impl\ExtensionMethods.cs" />
219221
<Compile Include="src\client\impl\Frame.cs" />
220222
<Compile Include="src\client\impl\HardProtocolException.cs" />
221223
<Compile Include="src\client\impl\IConsumerDispatcher.cs" />
@@ -232,6 +234,7 @@
232234
<Compile Include="src\client\impl\ProtocolBase.cs" />
233235
<Compile Include="src\client\impl\ProtocolException.cs" />
234236
<Compile Include="src\client\impl\QuiescingSession.cs" />
237+
<Compile Include="src\client\impl\RandomHostnameSelector.cs" />
235238
<Compile Include="src\client\impl\RecordedBinding.cs" />
236239
<Compile Include="src\client\impl\RecordedConsumer.cs" />
237240
<Compile Include="src\client\impl\RecordedEntity.cs" />

projects/client/RabbitMQ.Client/src/client/api/AmqpTcpEndpoint.cs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ namespace RabbitMQ.Client
5555
/// the Uri "Scheme" property is ignored: only the "Host" and
5656
/// "Port" properties are extracted.
5757
/// </para>
58-
public class AmqpTcpEndpoint
58+
public class AmqpTcpEndpoint : ICloneable
5959
{
6060
/// <summary>
6161
/// Default Amqp ssl port.
@@ -119,6 +119,25 @@ public AmqpTcpEndpoint(Uri uri) : this(uri.Host, uri.Port)
119119
{
120120
}
121121

122+
/// <summary>
123+
/// Clones the endpoint.
124+
/// </summary>
125+
/// <returns>A copy with the same hostname, port, and TLS settings</returns>
126+
public object Clone()
127+
{
128+
return new AmqpTcpEndpoint(HostName, _port, Ssl);
129+
}
130+
131+
/// <summary>
132+
/// Clones the endpoint using the provided hostname.
133+
/// </summary>
134+
/// <param name="hostname">Hostname to use</param>
135+
/// <returns>A copy with the provided hostname and port/TLS settings of this endpoint</returns>
136+
public AmqpTcpEndpoint CloneWithHostname(string hostname)
137+
{
138+
return new AmqpTcpEndpoint(hostname, _port, Ssl);
139+
}
140+
122141
/// <summary>
123142
/// Retrieve or set the hostname of this <see cref="AmqpTcpEndpoint"/>.
124143
/// </summary>

projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,13 @@ public class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
157157
/// </summary>
158158
public bool AutomaticRecoveryEnabled;
159159

160+
/// <summary>
161+
/// Used to select next hostname to try when performing
162+
/// connection recovery (re-connecting). Is not used for
163+
/// non-recovering connections.
164+
/// </summary>
165+
public IHostnameSelector HostnameSelector = new RandomHostnameSelector();
166+
160167
/// <summary>The host to connect to.</summary>
161168
public String HostName = "localhost";
162169

@@ -305,33 +312,41 @@ public AuthMechanismFactory AuthMechanismFactory(IList<string> mechanismNames)
305312
/// </summary>
306313
public virtual IConnection CreateConnection()
307314
{
308-
IConnection connection;
315+
return CreateConnection(new List<string>() { HostName });
316+
}
317+
318+
public IConnection CreateConnection(IList<string> hostnames)
319+
{
320+
IConnection conn;
309321
try
310322
{
311323
if (AutomaticRecoveryEnabled)
312324
{
313325
var autorecoveringConnection = new AutorecoveringConnection(this);
314-
autorecoveringConnection.init();
315-
connection = autorecoveringConnection;
326+
autorecoveringConnection.Init(hostnames);
327+
conn = autorecoveringConnection;
316328
}
317329
else
318330
{
319331
IProtocol protocol = Protocols.DefaultProtocol;
320-
connection = protocol.CreateConnection(this, false, CreateFrameHandler());
332+
conn = protocol.CreateConnection(this, false, CreateFrameHandler());
321333
}
322334
}
323335
catch (Exception e)
324336
{
325337
throw new BrokerUnreachableException(e);
326338
}
327-
328-
return connection;
339+
return conn;
329340
}
330341

331342
public IFrameHandler CreateFrameHandler()
332343
{
333-
IProtocol protocol = Protocols.DefaultProtocol;
334-
return protocol.CreateFrameHandler(Endpoint, SocketFactory, RequestedConnectionTimeout);
344+
return Protocols.DefaultProtocol.CreateFrameHandler(Endpoint, SocketFactory, RequestedConnectionTimeout);
345+
}
346+
347+
public IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint)
348+
{
349+
return Protocols.DefaultProtocol.CreateFrameHandler(endpoint, SocketFactory, RequestedConnectionTimeout);
335350
}
336351

337352
private void SetUri(Uri uri)

projects/client/RabbitMQ.Client/src/client/api/IConnectionFactory.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ public interface IConnectionFactory
9797
/// </summary>
9898
IConnection CreateConnection();
9999

100+
/// <summary>
101+
/// Connects to the first reachable hostname from the list.
102+
/// </summary>
103+
/// <param name="hostnames">List of host names to use</param>
104+
/// <returns></returns>
105+
IConnection CreateConnection(IList<string> hostnames);
106+
100107
/// <summary>
101108
/// Advanced option.
102109
///
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (C) 2007-2014 GoPivotal, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// http://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v1.1:
23+
//
24+
//---------------------------------------------------------------------------
25+
// The contents of this file are subject to the Mozilla Public License
26+
// Version 1.1 (the "License"); you may not use this file except in
27+
// compliance with the License. You may obtain a copy of the License
28+
// at http://www.mozilla.org/MPL/
29+
//
30+
// Software distributed under the License is distributed on an "AS IS"
31+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32+
// the License for the specific language governing rights and
33+
// limitations under the License.
34+
//
35+
// The Original Code is RabbitMQ.
36+
//
37+
// The Initial Developer of the Original Code is GoPivotal, Inc.
38+
// Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
39+
//---------------------------------------------------------------------------
40+
41+
using System;
42+
using System.Collections.Generic;
43+
44+
namespace RabbitMQ.Client
45+
{
46+
public interface IHostnameSelector
47+
{
48+
/// <summary>
49+
/// Picks a hostname from a list of options that should be used
50+
/// by <see cref="IConnectionFactory"/>.
51+
/// </summary>
52+
/// <param name="options"></param>
53+
/// <returns></returns>
54+
string NextFrom(IList<string> options);
55+
}
56+
}

projects/client/RabbitMQ.Client/src/client/api/NetworkConnection.cs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,25 +47,11 @@ namespace RabbitMQ.Client
4747
/// </summary>
4848
public interface NetworkConnection
4949
{
50-
#if !NETFX_CORE
51-
/// <summary>
52-
/// Identifies local network address.
53-
/// </summary>
54-
EndPoint LocalEndPoint { get; }
55-
#endif
56-
5750
/// <summary>
5851
/// Local port.
5952
/// </summary>
6053
int LocalPort { get; }
6154

62-
#if !NETFX_CORE
63-
/// <summary>
64-
/// Identifies remote network address.
65-
/// </summary>
66-
EndPoint RemoteEndPoint { get; }
67-
#endif
68-
6955
/// <summary>
7056
/// Remote port.
7157
/// </summary>

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ public class AutorecoveringConnection : IConnection, IRecoverable
5656
protected Connection m_delegate;
5757
protected ConnectionFactory m_factory;
5858

59+
// list of hostnames provided on initial connection.
60+
// on re-connection, the next host in the line is chosen using
61+
// IHostnameSelector
62+
protected IList<string> hostnames;
63+
5964
public readonly object m_recordedEntitiesLock = new object();
6065
protected readonly TaskFactory recoveryTaskFactory = new TaskFactory();
6166
protected readonly object recoveryLockTarget = new object();
@@ -276,13 +281,6 @@ public AmqpTcpEndpoint[] KnownHosts
276281
set { m_delegate.KnownHosts = value; }
277282
}
278283

279-
#if !NETFX_CORE
280-
public EndPoint LocalEndPoint
281-
{
282-
get { return m_delegate.LocalEndPoint; }
283-
}
284-
#endif
285-
286284
public int LocalPort
287285
{
288286
get { return m_delegate.LocalPort; }
@@ -303,13 +301,6 @@ public IDictionary<string, RecordedQueue> RecordedQueues
303301
get { return m_recordedQueues; }
304302
}
305303

306-
#if !NETFX_CORE
307-
public EndPoint RemoteEndPoint
308-
{
309-
get { return m_delegate.RemoteEndPoint; }
310-
}
311-
#endif
312-
313304
public int RemotePort
314305
{
315306
get { return m_delegate.RemotePort; }
@@ -547,7 +538,36 @@ public void UnregisterModel(AutorecoveringModel model)
547538
}
548539
}
549540

550-
public void init()
541+
public void Init()
542+
{
543+
this.Init(m_factory.HostName);
544+
}
545+
546+
public void Init(IList<string> hostnames)
547+
{
548+
this.hostnames = hostnames;
549+
string reachableHostname = null;
550+
IFrameHandler fh = null;
551+
Exception e = null;
552+
foreach (var h in hostnames)
553+
{
554+
try
555+
{
556+
fh = m_factory.CreateFrameHandler(m_factory.Endpoint.CloneWithHostname(h));
557+
reachableHostname = h;
558+
} catch (Exception caught)
559+
{
560+
e = caught;
561+
}
562+
}
563+
if (reachableHostname == null)
564+
{
565+
throw e;
566+
}
567+
this.Init(reachableHostname);
568+
}
569+
570+
protected void Init(string hostname)
551571
{
552572
m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler());
553573

@@ -570,7 +590,7 @@ public void init()
570590
#else
571591
Console.WriteLine(
572592
#endif
573-
"BeginAutomaticRecovery() failed: {0}", e);
593+
"BeginAutomaticRecovery() failed: {0}", e);
574594
}
575595
}
576596
}
@@ -750,12 +770,13 @@ protected void RecoverConnectionDelegate()
750770
{
751771
try
752772
{
753-
m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler());
773+
var nextHostname = m_factory.HostnameSelector.NextFrom(this.hostnames);
774+
var fh = m_factory.CreateFrameHandler(m_factory.Endpoint.CloneWithHostname(nextHostname));
775+
m_delegate = new Connection(m_factory, false, fh);
754776
recovering = false;
755777
}
756-
catch (Exception)
778+
catch (Exception e)
757779
{
758-
// TODO: exponential back-off
759780
#if NETFX_CORE
760781
System.Threading.Tasks.Task.Delay(m_factory.NetworkRecoveryInterval).Wait();
761782
#else
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (C) 2007-2014 GoPivotal, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// http://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v1.1:
23+
//
24+
//---------------------------------------------------------------------------
25+
// The contents of this file are subject to the Mozilla Public License
26+
// Version 1.1 (the "License"); you may not use this file except in
27+
// compliance with the License. You may obtain a copy of the License
28+
// at http://www.mozilla.org/MPL/
29+
//
30+
// Software distributed under the License is distributed on an "AS IS"
31+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32+
// the License for the specific language governing rights and
33+
// limitations under the License.
34+
//
35+
// The Original Code is RabbitMQ.
36+
//
37+
// The Initial Developer of the Original Code is GoPivotal, Inc.
38+
// Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
39+
//---------------------------------------------------------------------------
40+
41+
using System;
42+
using System.Collections.Generic;
43+
using System.Linq;
44+
45+
namespace RabbitMQ.Client.Impl
46+
{
47+
public static class ExtensionMethods
48+
{
49+
/// <summary>
50+
/// Returns a random item from the list.
51+
/// </summary>
52+
/// <typeparam name="T">Element item type</typeparam>
53+
/// <param name="list">Input list</param>
54+
/// <returns></returns>
55+
public static T RandomItem<T>(this IList<T> list)
56+
{
57+
System.Diagnostics.Debug.WriteLine("List: {0}", list);
58+
var n = list.Count;
59+
if (n == 0)
60+
{
61+
return default(T);
62+
}
63+
64+
var hashCode = Math.Abs(Guid.NewGuid().GetHashCode());
65+
return list.ElementAt<T>(hashCode % n);
66+
}
67+
}
68+
}

0 commit comments

Comments
 (0)