Skip to content

Commit 5cb449e

Browse files
committed
Change IHostnameSelector to IEndpointSelector
Hide some state in Connection
1 parent 7dec998 commit 5cb449e

File tree

8 files changed

+176
-134
lines changed

8 files changed

+176
-134
lines changed

projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -157,13 +157,6 @@ public class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
157157
/// </summary>
158158
public bool AutomaticRecoveryEnabled { get; set; }
159159

160-
/// <summary>
161-
/// Used to select next hostname to try when performing
162-
/// connection recovery (re-connecting). Is not used for
163-
/// non-recovering connections.
164-
/// </summary>
165-
public IHostnameSelector HostnameSelector { get; set; } = new RandomHostnameSelector();
166-
167160
/// <summary>The host to connect to.</summary>
168161
public string HostName { get; set; } = "localhost";
169162

@@ -399,7 +392,7 @@ public IConnection CreateConnection(IList<string> hostnames)
399392
/// </exception>
400393
public IConnection CreateConnection(IList<string> hostnames, String clientProvidedName)
401394
{
402-
return CreateConnection(hostnames.Select(Endpoint.CloneWithHostname).ToList(), clientProvidedName);
395+
return CreateConnection(new DefaultEndpointSelector(hostnames.Select(Endpoint.CloneWithHostname).ToList()), clientProvidedName);
403396
}
404397

405398
/// <summary>
@@ -417,7 +410,7 @@ public IConnection CreateConnection(IList<string> hostnames, String clientProvid
417410
/// </exception>
418411
public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints)
419412
{
420-
return CreateConnection(endpoints, null);
413+
return CreateConnection(new DefaultEndpointSelector(endpoints), null);
421414
}
422415

423416
/// <summary>
@@ -439,32 +432,28 @@ public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints)
439432
/// <exception cref="BrokerUnreachableException">
440433
/// When no hostname was reachable.
441434
/// </exception>
442-
public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints, String clientProvidedName)
435+
public IConnection CreateConnection(IEndpointSelector endpoints, String clientProvidedName)
443436
{
444-
var eps = endpoints.ToList();
445437
IConnection conn;
446438
try
447439
{
448440
if (AutomaticRecoveryEnabled)
449441
{
450442
var autorecoveringConnection = new AutorecoveringConnection(this, clientProvidedName);
451-
autorecoveringConnection.Init(eps);
443+
autorecoveringConnection.Init(endpoints);
452444
conn = autorecoveringConnection;
453445
}
454446
else
455447
{
456448
IProtocol protocol = Protocols.DefaultProtocol;
457-
//We can't make this more elegant without changing the contract of the IHostnameSelector
458-
//if there are endpoints with the same hostname but different ports the first match is selected
459-
var selectedHost = HostnameSelector.NextFrom(eps.Select(ep => ep.HostName).ToList());
460-
var selectedEndpoint = eps.First(ep => ep.HostName == selectedHost);
461-
conn = protocol.CreateConnection(this, false, CreateFrameHandler(selectedEndpoint), clientProvidedName);
449+
conn = protocol.CreateConnection(this, false, endpoints.SelectOne(this.CreateFrameHandler), clientProvidedName);
462450
}
463451
}
464452
catch (Exception e)
465453
{
466454
throw new BrokerUnreachableException(e);
467455
}
456+
468457
return conn;
469458
}
470459

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2016 Pivotal Software, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// http://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v1.1:
23+
//
24+
//---------------------------------------------------------------------------
25+
// The contents of this file are subject to the Mozilla Public License
26+
// Version 1.1 (the "License"); you may not use this file except in
27+
// compliance with the License. You may obtain a copy of the License
28+
// at http://www.mozilla.org/MPL/
29+
//
30+
// Software distributed under the License is distributed on an "AS IS"
31+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32+
// the License for the specific language governing rights and
33+
// limitations under the License.
34+
//
35+
// The Original Code is RabbitMQ.
36+
//
37+
// The Initial Developer of the Original Code is Pivotal Software, Inc.
38+
// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
39+
//---------------------------------------------------------------------------
40+
41+
using System;
42+
using System.Linq;
43+
using System.Collections.Generic;
44+
45+
namespace RabbitMQ.Client
46+
{
47+
public class DefaultEndpointSelector : IEndpointSelector
48+
{
49+
private List<AmqpTcpEndpoint> endpoints = new List<AmqpTcpEndpoint> { new AmqpTcpEndpoint()};
50+
private AmqpTcpEndpoint current;
51+
52+
public DefaultEndpointSelector () {}
53+
public DefaultEndpointSelector (IEnumerable<AmqpTcpEndpoint> tcpEndpoints)
54+
{
55+
this.endpoints = tcpEndpoints.ToList();
56+
}
57+
public AmqpTcpEndpoint Current
58+
{
59+
get { return current; }
60+
}
61+
62+
public T SelectOne<T>(Func<AmqpTcpEndpoint, T> selector)
63+
{
64+
var t = default(T);
65+
Exception exception = null;
66+
AmqpTcpEndpoint selected = null;
67+
foreach(var ep in this.endpoints)
68+
{
69+
try
70+
{
71+
t = selector(ep);
72+
selected = ep;
73+
if (t != null)
74+
{
75+
return t;
76+
}
77+
}
78+
catch (Exception e)
79+
{
80+
exception = e;
81+
}
82+
}
83+
if(t.Equals(default(T)))
84+
{
85+
throw exception;
86+
}
87+
88+
this.current = selected;
89+
90+
return t;
91+
}
92+
}
93+
}

projects/client/RabbitMQ.Client/src/client/api/IHostnameSelector.cs renamed to projects/client/RabbitMQ.Client/src/client/api/IEndpointSelector.cs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,17 @@
3939
//---------------------------------------------------------------------------
4040

4141
using System;
42+
using System.Linq;
4243
using System.Collections.Generic;
44+
using RabbitMQ.Client.Impl;
45+
using RabbitMQ.Client.Framing.Impl;
4346

4447
namespace RabbitMQ.Client
4548
{
46-
public interface IHostnameSelector
49+
public interface IEndpointSelector
4750
{
48-
/// <summary>
49-
/// Picks a hostname from a list of options that should be used
50-
/// by <see cref="IConnectionFactory"/>.
51-
/// </summary>
52-
/// <param name="options"></param>
53-
/// <returns></returns>
54-
string NextFrom(IList<string> options);
51+
/// Selector can either throw or return null to indicate not found
52+
T SelectOne<T>(Func<AmqpTcpEndpoint, T> selector);
53+
AmqpTcpEndpoint Current { get; }
5554
}
56-
}
55+
}

projects/client/RabbitMQ.Client/src/client/api/IProtocol.cs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -108,22 +108,22 @@ public interface IProtocol
108108
/// </summary>
109109
IConnection CreateConnection(ConnectionFactory factory, IFrameHandler frameHandler, bool automaticRecoveryEnabled, String clientProvidedName);
110110

111-
/// <summary>
112-
/// Construct a frame handler for a given endpoint.
113-
/// </summary>
114-
/// <param name="socketFactory">Socket factory method.</param>
115-
/// <param name="connectionTimeout">Timeout in milliseconds.</param>
116-
/// <param name="endpoint">Represents a TCP-addressable AMQP peer: a host name and port number.</param>
117-
IFrameHandler CreateFrameHandler(
118-
AmqpTcpEndpoint endpoint,
119-
#if !NETFX_CORE
120-
Func<AddressFamily, ITcpClient> socketFactory,
121-
#else
122-
Func<StreamSocket> socketFactory,
123-
#endif
124-
int connectionTimeout,
125-
int readTimeout,
126-
int writeTimeout);
111+
// /// <summary>
112+
// /// Construct a frame handler for a given endpoint.
113+
// /// </summary>
114+
// /// <param name="socketFactory">Socket factory method.</param>
115+
// /// <param name="connectionTimeout">Timeout in milliseconds.</param>
116+
// /// <param name="endpoint">Represents a TCP-addressable AMQP peer: a host name and port number.</param>
117+
// IFrameHandler CreateFrameHandler(
118+
// AmqpTcpEndpoint endpoint,
119+
// #if !NETFX_CORE
120+
// Func<AddressFamily, ITcpClient> socketFactory,
121+
// #else
122+
// Func<StreamSocket> socketFactory,
123+
// #endif
124+
// int connectionTimeout,
125+
// int readTimeout,
126+
// int writeTimeout);
127127
/// <summary>
128128
/// Construct a protocol model atop a given session.
129129
/// </summary>

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs

Lines changed: 10 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,10 @@ public class AutorecoveringConnection : IConnection, IRecoverable
5656
protected Connection m_delegate;
5757
protected ConnectionFactory m_factory;
5858

59-
//retained for compatibility
60-
protected IList<string> hostnames;
6159
// list of endpoints provided on initial connection.
6260
// on re-connection, the next host in the line is chosen using
6361
// IHostnameSelector
64-
private IList<AmqpTcpEndpoint> endpoints;
62+
private IEndpointSelector endpoints;
6563

6664
public readonly object m_recordedEntitiesLock = new object();
6765
protected readonly TaskFactory recoveryTaskFactory = new TaskFactory();
@@ -538,7 +536,7 @@ public void RecordQueue(string name, RecordedQueue q)
538536

539537
public override string ToString()
540538
{
541-
return string.Format("AutorecoveringConnection({0},{1},{2})", m_delegate.m_id, Endpoint, GetHashCode());
539+
return string.Format("AutorecoveringConnection({0},{1},{2})", m_delegate.Id, Endpoint, GetHashCode());
542540
}
543541

544542
public void UnregisterModel(AutorecoveringModel model)
@@ -551,51 +549,25 @@ public void UnregisterModel(AutorecoveringModel model)
551549

552550
public void Init()
553551
{
554-
this.Init(m_factory.HostName);
552+
this.Init(new DefaultEndpointSelector());
555553
}
556554

557555
public void Init(IList<string> hostnames)
558556
{
559-
this.Init(hostnames.Select(m_factory.Endpoint.CloneWithHostname).ToList());
557+
this.Init(new DefaultEndpointSelector(hostnames.Select(m_factory.Endpoint.CloneWithHostname).ToList()));
560558
}
561559

562-
public void Init(IList<AmqpTcpEndpoint> endpoints)
560+
public void Init(IEndpointSelector endpoints)
563561
{
564562
this.endpoints = endpoints;
565-
this.hostnames = endpoints.Select(ep => ep.HostName).ToList();
566-
AmqpTcpEndpoint reachableEndpoint = null;
567-
IFrameHandler fh = null;
568-
Exception e = null;
569-
foreach (var ep in endpoints)
570-
{
571-
try
572-
{
573-
fh = m_factory.CreateFrameHandler(ep);
574-
reachableEndpoint = ep;
575-
}
576-
catch (Exception caught)
577-
{
578-
e = caught;
579-
}
580-
}
581-
if (reachableEndpoint == null)
582-
{
583-
throw e;
584-
}
585-
586-
this.Init(reachableEndpoint);
587-
}
588-
589-
protected void Init(string hostname)
590-
{
591-
this.Init(m_factory.Endpoint.CloneWithHostname(hostname));
563+
var fh = endpoints.SelectOne(m_factory.CreateFrameHandler);
564+
this.Init(fh);
592565
}
593566

594-
private void Init(AmqpTcpEndpoint endpoint)
567+
private void Init(IFrameHandler fh)
595568
{
596569
m_delegate = new Connection(m_factory, false,
597-
m_factory.CreateFrameHandler(endpoint),
598-
this.ClientProvidedName);
570+
fh, this.ClientProvidedName);
599571

600572
AutorecoveringConnection self = this;
601573
EventHandler<ShutdownEventArgs> recoveryListener = (_, args) =>
@@ -810,9 +782,7 @@ protected void RecoverConnectionDelegate()
810782
{
811783
try
812784
{
813-
var nextHostname = m_factory.HostnameSelector.NextFrom(this.hostnames);
814-
var endpoint = this.endpoints.First((e) => e.HostName == nextHostname);
815-
var fh = m_factory.CreateFrameHandler(endpoint);
785+
var fh = endpoints.SelectOne(m_factory.CreateFrameHandler);
816786
m_delegate = new Connection(m_factory, false, fh, this.ClientProvidedName);
817787
recovering = false;
818788
}

0 commit comments

Comments
 (0)