Skip to content

Commit 7d6e31e

Browse files
author
Daniil Fedotov
committed
Support client specified connection name
1 parent 11dba57 commit 7d6e31e

File tree

7 files changed

+102
-11
lines changed

7 files changed

+102
-11
lines changed

projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,19 @@ public AuthMechanismFactory AuthMechanismFactory(IList<string> mechanismNames)
348348
/// </exception>
349349
public virtual IConnection CreateConnection()
350350
{
351-
return CreateConnection(new List<string>() { HostName });
351+
return CreateConnection(new List<string>() { HostName }, null);
352+
}
353+
354+
/// <summary>
355+
/// Create a connection to the specified endpoint.
356+
/// </summary>
357+
/// <param name="connectionName">Connection name client property</param>
358+
/// <exception cref="BrokerUnreachableException">
359+
/// When the configured hostname was not reachable.
360+
/// </exception>
361+
public IConnection CreateConnection(String connectionName)
362+
{
363+
return CreateConnection(new List<string>() { HostName }, connectionName);
352364
}
353365

354366
/// <summary>
@@ -365,20 +377,39 @@ public virtual IConnection CreateConnection()
365377
/// When no hostname was reachable.
366378
/// </exception>
367379
public IConnection CreateConnection(IList<string> hostnames)
380+
{
381+
return CreateConnection(hostnames, null);
382+
}
383+
384+
/// <summary>
385+
/// Create a connection using a list of hostnames. The first reachable
386+
/// hostname will be used initially. Subsequent hostname picks are determined
387+
/// by the <see cref="IHostnameSelector" /> configured.
388+
/// </summary>
389+
/// <param name="hostnames">
390+
/// List of hostnames to use for the initial
391+
/// connection and recovery.
392+
/// </param>
393+
/// <param name="connectionName">Connection name client property</param>
394+
/// <returns>Open connection</returns>
395+
/// <exception cref="BrokerUnreachableException">
396+
/// When no hostname was reachable.
397+
/// </exception>
398+
public IConnection CreateConnection(IList<string> hostnames, String connectionName)
368399
{
369400
IConnection conn;
370401
try
371402
{
372403
if (AutomaticRecoveryEnabled)
373404
{
374-
var autorecoveringConnection = new AutorecoveringConnection(this);
405+
var autorecoveringConnection = new AutorecoveringConnection(this, connectionName);
375406
autorecoveringConnection.Init(hostnames);
376407
conn = autorecoveringConnection;
377408
}
378409
else
379410
{
380411
IProtocol protocol = Protocols.DefaultProtocol;
381-
conn = protocol.CreateConnection(this, false, CreateFrameHandler());
412+
conn = protocol.CreateConnection(this, false, CreateFrameHandler(), connectionName);
382413
}
383414
}
384415
catch (Exception e)

projects/client/RabbitMQ.Client/src/client/api/IConnection.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,11 @@ public interface IConnection : NetworkConnection, IDisposable
152152
/// </summary>
153153
IList<ShutdownReportEntry> ShutdownReport { get; }
154154

155+
/// <summary>
156+
/// Returns connection name from client properties
157+
/// </summary>
158+
String ConnectionName { get; }
159+
155160
/// <summary>
156161
/// Signalled when an exception occurs in a callback invoked by the connection.
157162
/// </summary>

projects/client/RabbitMQ.Client/src/client/api/IConnectionFactory.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,16 +97,31 @@ public interface IConnectionFactory
9797
/// </summary>
9898
IConnection CreateConnection();
9999

100+
/// <summary>
101+
/// Create a connection to the specified endpoint.
102+
/// </summary>
103+
/// <param name="connectionName">Connection name client property</param>
104+
/// <returns></returns>
105+
IConnection CreateConnection(String connectionName);
106+
100107
/// <summary>
101108
/// Connects to the first reachable hostname from the list.
102109
/// </summary>
103110
/// <param name="hostnames">List of host names to use</param>
104111
/// <returns></returns>
105112
IConnection CreateConnection(IList<string> hostnames);
106113

114+
/// <summary>
115+
/// Connects to the first reachable hostname from the list.
116+
/// </summary>
117+
/// <param name="hostnames">List of host names to use</param>
118+
/// <param name="connectionName">Connection name client property</param>
119+
/// <returns></returns>
120+
IConnection CreateConnection(IList<string> hostnames, String connectionName);
121+
107122
/// <summary>
108123
/// Advanced option.
109-
///
124+
///
110125
/// What task scheduler should consumer dispatcher use.
111126
/// </summary>
112127
TaskScheduler TaskScheduler { get; set; }

projects/client/RabbitMQ.Client/src/client/api/IProtocol.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,19 @@ public interface IProtocol
9595
/// </summary>
9696
IConnection CreateConnection(ConnectionFactory factory, IFrameHandler frameHandler, bool automaticRecoveryEnabled);
9797

98+
/// <summary>
99+
/// Construct a connection from a given set of parameters,
100+
/// a frame handler, and no automatic recovery.
101+
/// The "insist" parameter is passed on to the AMQP connection.open method.
102+
/// </summary>
103+
IConnection CreateConnection(IConnectionFactory factory, bool insist, IFrameHandler frameHandler, String connectionName);
104+
105+
/// <summary>
106+
/// Construct a connection from a given set of parameters,
107+
/// a frame handler, and automatic recovery settings.
108+
/// </summary>
109+
IConnection CreateConnection(ConnectionFactory factory, IFrameHandler frameHandler, bool automaticRecoveryEnabled, String connectionName);
110+
98111
/// <summary>
99112
/// Construct a frame handler for a given endpoint.
100113
/// </summary>

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,10 @@ public class AutorecoveringConnection : IConnection, IRecoverable
9494
private EventHandler<QueueNameChangedAfterRecoveryEventArgs> m_queueNameChange;
9595
private EventHandler<EventArgs> m_recovery;
9696

97-
public AutorecoveringConnection(ConnectionFactory factory)
97+
public AutorecoveringConnection(ConnectionFactory factory, String connectionName = null)
9898
{
9999
m_factory = factory;
100+
ConnectionName = connectionName;
100101
}
101102

102103
public event EventHandler<CallbackExceptionEventArgs> CallbackException
@@ -231,6 +232,8 @@ public event EventHandler<EventArgs> Recovery
231232
}
232233
}
233234

235+
public String ConnectionName { get; private set; }
236+
234237
public bool AutoClose
235238
{
236239
get { return m_delegate.AutoClose; }
@@ -575,7 +578,8 @@ public void Init(IList<string> hostnames)
575578
protected void Init(string hostname)
576579
{
577580
m_delegate = new Connection(m_factory, false,
578-
m_factory.CreateFrameHandlerForHostname(hostname));
581+
m_factory.CreateFrameHandlerForHostname(hostname),
582+
ConnectionName);
579583

580584
AutorecoveringConnection self = this;
581585
EventHandler<ShutdownEventArgs> recoveryListener = (_, args) =>
@@ -792,7 +796,7 @@ protected void RecoverConnectionDelegate()
792796
{
793797
var nextHostname = m_factory.HostnameSelector.NextFrom(this.hostnames);
794798
var fh = m_factory.CreateFrameHandler(m_factory.Endpoint.CloneWithHostname(nextHostname));
795-
m_delegate = new Connection(m_factory, false, fh);
799+
m_delegate = new Connection(m_factory, false, fh, ConnectionName);
796800
recovering = false;
797801
}
798802
catch (Exception e)

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,9 @@ public class Connection : IConnection
113113

114114
public ConsumerWorkService ConsumerWorkService { get; private set; }
115115

116-
public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHandler)
116+
public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHandler, String connectionName = null)
117117
{
118+
ConnectionName = connectionName;
118119
KnownHosts = null;
119120
FrameMax = 0;
120121
m_factory = factory;
@@ -225,6 +226,8 @@ public event EventHandler<EventArgs> ConnectionUnblocked
225226
}
226227
}
227228

229+
public String ConnectionName { get; private set; }
230+
228231
public bool AutoClose
229232
{
230233
get { return m_sessionManager.AutoClose; }
@@ -995,7 +998,7 @@ public void MaybeStartHeartbeatTimers()
995998
_heartbeatWriteTimer = new Timer(HeartbeatWriteTimerCallback);
996999
_heartbeatReadTimer = new Timer(HeartbeatReadTimerCallback);
9971000
#if NETFX_CORE
998-
_heartbeatWriteTimer.Change(200, m_heartbeatTimeSpan.Milliseconds);
1001+
_heartbeatWriteTimer.Change(200, m_heartbeatTimeSpan.Milliseconds);
9991002
_heartbeatReadTimer.Change(200, m_heartbeatTimeSpan.Milliseconds);
10001003
#else
10011004
_heartbeatWriteTimer.Change(TimeSpan.FromMilliseconds(200), m_heartbeatTimeSpan);
@@ -1274,6 +1277,7 @@ protected void StartAndTune()
12741277

12751278
m_clientProperties = new Dictionary<string, object>(m_factory.ClientProperties);
12761279
m_clientProperties["capabilities"] = Protocol.Capabilities;
1280+
m_clientProperties["connection_name"] = ConnectionName;
12771281

12781282
// FIXME: parse out locales properly!
12791283
ConnectionTuneDetails connectionTune = default(ConnectionTuneDetails);

projects/client/RabbitMQ.Client/src/client/impl/ProtocolBase.cs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,14 +131,33 @@ public IConnection CreateConnection(IConnectionFactory factory,
131131
bool insist,
132132
IFrameHandler frameHandler)
133133
{
134-
return new Connection(factory, insist, frameHandler);
134+
return new Connection(factory, insist, frameHandler, null);
135+
}
136+
137+
138+
public IConnection CreateConnection(IConnectionFactory factory,
139+
bool insist,
140+
IFrameHandler frameHandler,
141+
String connectionName)
142+
{
143+
return new Connection(factory, insist, frameHandler, connectionName);
135144
}
136145

137146
public IConnection CreateConnection(ConnectionFactory factory,
138147
IFrameHandler frameHandler,
139148
bool automaticRecoveryEnabled)
140149
{
141-
var ac = new AutorecoveringConnection(factory);
150+
var ac = new AutorecoveringConnection(factory, null);
151+
ac.Init();
152+
return ac;
153+
}
154+
155+
public IConnection CreateConnection(ConnectionFactory factory,
156+
IFrameHandler frameHandler,
157+
bool automaticRecoveryEnabled,
158+
String connectionName)
159+
{
160+
var ac = new AutorecoveringConnection(factory, connectionName);
142161
ac.Init();
143162
return ac;
144163
}

0 commit comments

Comments
 (0)