Skip to content

Commit 1e02319

Browse files
Merge branch 'rabbitmq-dotnet-client-195'
2 parents d9333f0 + e6738d0 commit 1e02319

File tree

10 files changed

+296
-158
lines changed

10 files changed

+296
-158
lines changed

projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs

Lines changed: 46 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -157,13 +157,6 @@ public class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
157157
/// </summary>
158158
public bool AutomaticRecoveryEnabled { get; set; }
159159

160-
/// <summary>
161-
/// Used to select next hostname to try when performing
162-
/// connection recovery (re-connecting). Is not used for
163-
/// non-recovering connections.
164-
/// </summary>
165-
public IHostnameSelector HostnameSelector { get; set; } = new RandomHostnameSelector();
166-
167160
/// <summary>The host to connect to.</summary>
168161
public string HostName { get; set; } = "localhost";
169162

@@ -194,6 +187,17 @@ public TimeSpan ContinuationTimeout
194187
set { m_continuationTimeout = value; }
195188
}
196189

190+
/// <summary>
191+
/// Factory function for creating the <see cref="IEndpointResolver">
192+
/// used to generate a list of endpoints for the ConnectionFactory
193+
/// to try in order.
194+
/// The default value creates an instance of the <see cref="DefaultEndpointResolver">
195+
/// using the list of endpoints passed in. The DefaultEndpointResolver shuffles the
196+
/// provided list each time it is requested.
197+
/// </summary>
198+
public Func<IEnumerable<AmqpTcpEndpoint>, IEndpointResolver> EndpointResolverFactory { get; set; } =
199+
endpoints => new DefaultEndpointResolver(endpoints);
200+
197201
/// <summary>
198202
/// The port to connect on. <see cref="AmqpTcpEndpoint.UseDefaultPort"/>
199203
/// indicates the default for the protocol should be used.
@@ -258,6 +262,7 @@ public AmqpTcpEndpoint Endpoint
258262
}
259263
}
260264

265+
261266
/// <summary>
262267
/// Set connection parameters using the amqp or amqps scheme.
263268
/// </summary>
@@ -333,18 +338,22 @@ public AuthMechanismFactory AuthMechanismFactory(IList<string> mechanismNames)
333338
}
334339

335340
/// <summary>
336-
/// Create a connection to the specified endpoint.
341+
/// Create a connection to one of the endpoints provided by the IEndpointResolver
342+
/// returned by the EndpointResolverFactory. By default the configured
343+
/// hostname and port are used.
337344
/// </summary>
338345
/// <exception cref="BrokerUnreachableException">
339346
/// When the configured hostname was not reachable.
340347
/// </exception>
341348
public virtual IConnection CreateConnection()
342349
{
343-
return CreateConnection(new List<string> { HostName }, null);
350+
return CreateConnection(this.EndpointResolverFactory(LocalEndpoints()), null);
344351
}
345352

346353
/// <summary>
347-
/// Create a connection to the specified endpoint.
354+
/// Create a connection to one of the endpoints provided by the IEndpointResolver
355+
/// returned by the EndpointResolverFactory. By default the configured
356+
/// hostname and port are used.
348357
/// </summary>
349358
/// <param name="clientProvidedName">
350359
/// Application-specific connection name, will be displayed in the management UI
@@ -357,13 +366,14 @@ public virtual IConnection CreateConnection()
357366
/// </exception>
358367
public IConnection CreateConnection(String clientProvidedName)
359368
{
360-
return CreateConnection(new List<string> { HostName }, clientProvidedName);
369+
return CreateConnection(EndpointResolverFactory(LocalEndpoints()), clientProvidedName);
361370
}
362371

363372
/// <summary>
364-
/// Create a connection using a list of hostnames. The first reachable
365-
/// hostname will be used initially. Subsequent hostname picks are determined
366-
/// by the <see cref="IHostnameSelector" /> configured.
373+
/// Create a connection using a list of hostnames using the configured port.
374+
/// By default each hostname is tried in a random order until a successful connection is
375+
/// found or the list is exhausted using the DefaultEndpointResolver.
376+
/// The selection behaviour can be overriden by configuring the EndpointResolverFactory.
367377
/// </summary>
368378
/// <param name="hostnames">
369379
/// List of hostnames to use for the initial
@@ -379,9 +389,10 @@ public IConnection CreateConnection(IList<string> hostnames)
379389
}
380390

381391
/// <summary>
382-
/// Create a connection using a list of hostnames. The first reachable
383-
/// hostname will be used initially. Subsequent hostname picks are determined
384-
/// by the <see cref="IHostnameSelector" /> configured.
392+
/// Create a connection using a list of hostnames using the configured port.
393+
/// By default each endpoint is tried in a random order until a successful connection is
394+
/// found or the list is exhausted.
395+
/// The selection behaviour can be overriden by configuring the EndpointResolverFactory.
385396
/// </summary>
386397
/// <param name="hostnames">
387398
/// List of hostnames to use for the initial
@@ -399,13 +410,14 @@ public IConnection CreateConnection(IList<string> hostnames)
399410
/// </exception>
400411
public IConnection CreateConnection(IList<string> hostnames, String clientProvidedName)
401412
{
402-
return CreateConnection(hostnames.Select(Endpoint.CloneWithHostname).ToList(), clientProvidedName);
413+
var endpoints = hostnames.Select(h => new AmqpTcpEndpoint(h, this.Port, this.Ssl));
414+
return CreateConnection(new DefaultEndpointResolver(endpoints), clientProvidedName);
403415
}
404416

405417
/// <summary>
406-
/// Create a connection using a list of hostnames. The first reachable
407-
/// hostname will be used initially. Subsequent hostname picks are determined
408-
/// by the <see cref="IHostnameSelector" /> configured.
418+
/// Create a connection using a list of endpoints. By default each endpoint will be tried
419+
/// in a random order until a successful connection is found or the list is exhausted.
420+
/// The selection behaviour can be overriden by configuring the EndpointResolverFactory.
409421
/// </summary>
410422
/// <param name="endpoints">
411423
/// List of endpoints to use for the initial
@@ -417,17 +429,14 @@ public IConnection CreateConnection(IList<string> hostnames, String clientProvid
417429
/// </exception>
418430
public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints)
419431
{
420-
return CreateConnection(endpoints, null);
432+
return CreateConnection(new DefaultEndpointResolver(endpoints), null);
421433
}
422434

423435
/// <summary>
424-
/// Create a connection using a list of hostnames. The first reachable
425-
/// hostname will be used initially. Subsequent hostname picks are determined
426-
/// by the <see cref="IHostnameSelector" /> configured.
436+
/// Create a connection using an IEndpointResolver.
427437
/// </summary>
428-
/// <param name="endpoints">
429-
/// List of endpoints to use for the initial
430-
/// connection and recovery.
438+
/// <param name="endpointResolver">
439+
/// The endpointResolver that returns the endpoints to use for the connection attempt.
431440
/// </param>
432441
/// <param name="clientProvidedName">
433442
/// Application-specific connection name, will be displayed in the management UI
@@ -439,32 +448,28 @@ public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints)
439448
/// <exception cref="BrokerUnreachableException">
440449
/// When no hostname was reachable.
441450
/// </exception>
442-
public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints, String clientProvidedName)
451+
public IConnection CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
443452
{
444-
var eps = endpoints.ToList();
445453
IConnection conn;
446454
try
447455
{
448456
if (AutomaticRecoveryEnabled)
449457
{
450458
var autorecoveringConnection = new AutorecoveringConnection(this, clientProvidedName);
451-
autorecoveringConnection.Init(eps);
459+
autorecoveringConnection.Init(endpointResolver);
452460
conn = autorecoveringConnection;
453461
}
454462
else
455463
{
456464
IProtocol protocol = Protocols.DefaultProtocol;
457-
//We can't make this more elegant without changing the contract of the IHostnameSelector
458-
//if there are endpoints with the same hostname but different ports the first match is selected
459-
var selectedHost = HostnameSelector.NextFrom(eps.Select(ep => ep.HostName).ToList());
460-
var selectedEndpoint = eps.First(ep => ep.HostName == selectedHost);
461-
conn = protocol.CreateConnection(this, false, CreateFrameHandler(selectedEndpoint), clientProvidedName);
465+
conn = protocol.CreateConnection(this, false, endpointResolver.SelectOne(this.CreateFrameHandler), clientProvidedName);
462466
}
463467
}
464468
catch (Exception e)
465469
{
466470
throw new BrokerUnreachableException(e);
467471
}
472+
468473
return conn;
469474
}
470475

@@ -566,5 +571,10 @@ private static string UriDecode(string uri)
566571
{
567572
return System.Uri.UnescapeDataString(uri.Replace("+", "%2B"));
568573
}
574+
575+
private List<AmqpTcpEndpoint> LocalEndpoints ()
576+
{
577+
return new List<AmqpTcpEndpoint> { this.Endpoint };
578+
}
569579
}
570580
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2016 Pivotal Software, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// http://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v1.1:
23+
//
24+
//---------------------------------------------------------------------------
25+
// The contents of this file are subject to the Mozilla Public License
26+
// Version 1.1 (the "License"); you may not use this file except in
27+
// compliance with the License. You may obtain a copy of the License
28+
// at http://www.mozilla.org/MPL/
29+
//
30+
// Software distributed under the License is distributed on an "AS IS"
31+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32+
// the License for the specific language governing rights and
33+
// limitations under the License.
34+
//
35+
// The Original Code is RabbitMQ.
36+
//
37+
// The Initial Developer of the Original Code is Pivotal Software, Inc.
38+
// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
39+
//---------------------------------------------------------------------------
40+
41+
using System;
42+
using System.Linq;
43+
using System.Collections.Generic;
44+
45+
namespace RabbitMQ.Client
46+
{
47+
public class DefaultEndpointResolver : IEndpointResolver
48+
{
49+
private List<AmqpTcpEndpoint> endpoints;
50+
private Random rnd = new Random();
51+
52+
public DefaultEndpointResolver (IEnumerable<AmqpTcpEndpoint> tcpEndpoints)
53+
{
54+
this.endpoints = tcpEndpoints.ToList();
55+
}
56+
57+
public IEnumerable<AmqpTcpEndpoint> All()
58+
{
59+
return endpoints.OrderBy(item => rnd.Next());
60+
}
61+
}
62+
}

projects/client/RabbitMQ.Client/src/client/api/IHostnameSelector.cs renamed to projects/client/RabbitMQ.Client/src/client/api/IEndpointResolver.cs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,15 @@
3838
// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
3939
//---------------------------------------------------------------------------
4040

41-
using System;
4241
using System.Collections.Generic;
4342

4443
namespace RabbitMQ.Client
4544
{
46-
public interface IHostnameSelector
45+
public interface IEndpointResolver
4746
{
4847
/// <summary>
49-
/// Picks a hostname from a list of options that should be used
50-
/// by <see cref="IConnectionFactory"/>.
48+
/// Return all AmqpTcpEndpoints in the order they should be tried.
5149
/// </summary>
52-
/// <param name="options"></param>
53-
/// <returns></returns>
54-
string NextFrom(IList<string> options);
50+
IEnumerable<AmqpTcpEndpoint> All();
5551
}
56-
}
52+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2016 Pivotal Software, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// http://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v1.1:
23+
//
24+
//---------------------------------------------------------------------------
25+
// The contents of this file are subject to the Mozilla Public License
26+
// Version 1.1 (the "License"); you may not use this file except in
27+
// compliance with the License. You may obtain a copy of the License
28+
// at http://www.mozilla.org/MPL/
29+
//
30+
// Software distributed under the License is distributed on an "AS IS"
31+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32+
// the License for the specific language governing rights and
33+
// limitations under the License.
34+
//
35+
// The Original Code is RabbitMQ.
36+
//
37+
// The Initial Developer of the Original Code is Pivotal Software, Inc.
38+
// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
39+
//---------------------------------------------------------------------------
40+
41+
using System;
42+
using System.Linq;
43+
using System.Collections.Generic;
44+
45+
namespace RabbitMQ.Client
46+
{
47+
public static class EndpointResolverExtensions
48+
{
49+
public static T SelectOne<T>(this IEndpointResolver resolver, Func<AmqpTcpEndpoint, T> selector)
50+
{
51+
var t = default(T);
52+
Exception exception = null;
53+
foreach(var ep in resolver.All())
54+
{
55+
try
56+
{
57+
t = selector(ep);
58+
if(t.Equals(default(T)) == false)
59+
{
60+
return t;
61+
}
62+
}
63+
catch (Exception e)
64+
{
65+
exception = e;
66+
}
67+
}
68+
if(t.Equals(default(T)))
69+
{
70+
throw exception;
71+
}
72+
73+
return t;
74+
}
75+
}
76+
}

projects/client/RabbitMQ.Client/src/client/api/IProtocol.cs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -108,22 +108,6 @@ public interface IProtocol
108108
/// </summary>
109109
IConnection CreateConnection(ConnectionFactory factory, IFrameHandler frameHandler, bool automaticRecoveryEnabled, String clientProvidedName);
110110

111-
/// <summary>
112-
/// Construct a frame handler for a given endpoint.
113-
/// </summary>
114-
/// <param name="socketFactory">Socket factory method.</param>
115-
/// <param name="connectionTimeout">Timeout in milliseconds.</param>
116-
/// <param name="endpoint">Represents a TCP-addressable AMQP peer: a host name and port number.</param>
117-
IFrameHandler CreateFrameHandler(
118-
AmqpTcpEndpoint endpoint,
119-
#if !NETFX_CORE
120-
Func<AddressFamily, ITcpClient> socketFactory,
121-
#else
122-
Func<StreamSocket> socketFactory,
123-
#endif
124-
int connectionTimeout,
125-
int readTimeout,
126-
int writeTimeout);
127111
/// <summary>
128112
/// Construct a protocol model atop a given session.
129113
/// </summary>

0 commit comments

Comments
 (0)