Skip to content

Commit bd0079e

Browse files
author
Tim Watson
committed
Merge bug25082 into default
2 parents a89ca47 + d8b5392 commit bd0079e

File tree

11 files changed

+236
-78
lines changed

11 files changed

+236
-78
lines changed

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,15 @@ public interface IModel: IDisposable
126126
///otherwise.</summary>
127127
ShutdownEventArgs CloseReason { get; }
128128

129-
///<summary>Returns true if the session is still in a state
129+
///<summary>Returns true if the model is still in a state
130130
///where it can be used. Identical to checking if CloseReason
131131
///== null.</summary>
132132
bool IsOpen { get; }
133133

134+
///<summary>Returns true if the model is no longer in a state
135+
///where it can be used.</summary>
136+
bool IsClosed { get; }
137+
134138
///<summary>When in confirm mode, return the sequence number
135139
///of the next message to be published.</summary>
136140
ulong NextPublishSeqNo { get; }

projects/client/RabbitMQ.Client/src/client/api/SslHelper.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,15 @@ private bool CertificateValidationCallback(object sender,
9090
public static Stream TcpUpgrade(Stream tcpStream, SslOption sslOption, int timeout)
9191
{
9292
SslHelper helper = new SslHelper(sslOption);
93+
94+
RemoteCertificateValidationCallback remoteCertValidator =
95+
sslOption.CertificateValidationCallback ?? new RemoteCertificateValidationCallback(helper.CertificateValidationCallback);
96+
LocalCertificateSelectionCallback localCertSelector =
97+
sslOption.CertificateSelectionCallback ?? new LocalCertificateSelectionCallback(helper.CertificateSelectionCallback);
98+
9399
SslStream sslStream = new SslStream(tcpStream, false,
94-
new RemoteCertificateValidationCallback(helper.CertificateValidationCallback),
95-
new LocalCertificateSelectionCallback(helper.CertificateSelectionCallback));
100+
remoteCertValidator,
101+
localCertSelector);
96102

97103
sslStream.AuthenticateAsClient(sslOption.ServerName,
98104
sslOption.Certs,

projects/client/RabbitMQ.Client/src/client/api/SslOption.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,18 @@ public SslPolicyErrors AcceptablePolicyErrors
136136
set { m_acceptablePolicyErrors = value; }
137137
}
138138

139+
/// <summary>
140+
/// An optional client specified SSL certificate validation callback. If this is not specified,
141+
/// the default callback will be used in conjunction with the AcceptablePolicyErrors property to
142+
/// determine if the remote server certificate is valid.
143+
/// </summary>
144+
public RemoteCertificateValidationCallback CertificateValidationCallback { get; set; }
145+
146+
/// <summary>
147+
/// An optional client specified SSL certificate selection callback. If this is not specified,
148+
/// the first valid certificate found will be used.
149+
/// </summary>
150+
public LocalCertificateSelectionCallback CertificateSelectionCallback { get; set; }
139151

140152
///<summary>Construct an SslOption specifying both the server cannonical name
141153
///and the client's certificate path.
@@ -145,6 +157,8 @@ public SslOption(string serverName, string certPath, bool enabled)
145157
m_serverName= serverName;
146158
m_certPath = certPath;
147159
m_enabled = enabled;
160+
CertificateValidationCallback = null;
161+
CertificateSelectionCallback = null;
148162
}
149163

150164
///<summary>Construct an SslOption with just the server cannonical name.

projects/client/RabbitMQ.Client/src/client/exceptions/AlreadyClosedException.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,6 @@ public class AlreadyClosedException: OperationInterruptedException
4949
///<summary>Construct an instance containing the given
5050
///shutdown reason.</summary>
5151
public AlreadyClosedException(ShutdownEventArgs reason)
52-
: base(reason) { }
52+
: base(reason, "Already closed") { }
5353
}
5454
}

projects/client/RabbitMQ.Client/src/client/exceptions/OperationInterruptedException.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,16 @@ public OperationInterruptedException(ShutdownEventArgs reason)
8181
m_shutdownReason = reason;
8282
}
8383

84+
///<summary>Construct an OperationInterruptedException with
85+
///the passed-in explanation and prefix, if any.</summary>
86+
public OperationInterruptedException(ShutdownEventArgs reason, String prefix)
87+
: base(reason == null ? (prefix + ": The AMQP operation was interrupted") :
88+
string.Format("{0}: The AMQP operation was interrupted: {1}",
89+
prefix, reason))
90+
{
91+
m_shutdownReason = reason;
92+
}
93+
8494
///<summary>Retrieves the explanation for the shutdown. May
8595
///return null if no explanation is available.</summary>
8696
public ShutdownEventArgs ShutdownReason { get { return m_shutdownReason; } }

projects/client/RabbitMQ.Client/src/client/impl/ConnectionBase.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,11 @@ public void Close(ShutdownEventArgs reason, bool abort, int timeout)
485485
if (!abort)
486486
throw ace;
487487
}
488+
catch (NotSupportedException nse)
489+
{
490+
// buffered stream had unread data in it and Flush()
491+
// was called, ignore to not confuse the user
492+
}
488493
catch (IOException ioe)
489494
{
490495
if (m_model0.CloseReason == null)
@@ -665,7 +670,15 @@ public void MainLoop()
665670
// connection closes.
666671
if (shutdownCleanly)
667672
{
668-
ClosingLoop();
673+
try
674+
{
675+
ClosingLoop();
676+
} catch (SocketException se)
677+
{
678+
// means that socket was closed when frame handler
679+
// attempted to use it. Since we are shutting down,
680+
// ignore it.
681+
}
669682
}
670683

671684
FinishClose();
@@ -1055,9 +1068,7 @@ protected void StartAndTune()
10551068
connectionStartCell.Value;
10561069

10571070
if (connectionStart == null){
1058-
throw new ProtocolVersionMismatchException(Protocol.MajorVersion,
1059-
Protocol.MinorVersion,
1060-
-1, -1);
1071+
throw new IOException("connection.start was never received, likely due to a network timeout");
10611072
}
10621073

10631074
ServerProperties = connectionStart.m_serverProperties;

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 25 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -494,16 +494,7 @@ public void Enqueue(IRpcContinuation k)
494494
public void TransmitAndEnqueue(Command cmd, IRpcContinuation k)
495495
{
496496
Enqueue(k);
497-
try
498-
{
499-
m_session.Transmit(cmd);
500-
}
501-
catch (AlreadyClosedException)
502-
{
503-
// Ignored, since the continuation will be told about
504-
// the closure via an OperationInterruptedException because
505-
// of the shutdown event propagation.
506-
}
497+
m_session.Transmit(cmd);
507498
}
508499

509500
public ShutdownEventArgs CloseReason
@@ -522,6 +513,15 @@ public bool IsOpen
522513
}
523514
}
524515

516+
public bool IsClosed
517+
{
518+
get
519+
{
520+
return !IsOpen;
521+
}
522+
}
523+
524+
525525
public ulong NextPublishSeqNo
526526
{
527527
get
@@ -894,16 +894,7 @@ private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bo
894894
{
895895
QueueDeclareRpcContinuation k = new QueueDeclareRpcContinuation();
896896
Enqueue(k);
897-
try
898-
{
899-
_Private_QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments);
900-
}
901-
catch (AlreadyClosedException)
902-
{
903-
// Ignored, since the continuation will be told about
904-
// the closure via an OperationInterruptedException because
905-
// of the shutdown event propagation.
906-
}
897+
_Private_QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments);
907898
k.GetReply();
908899
return k.m_result;
909900
}
@@ -988,7 +979,7 @@ public bool WaitForConfirms(TimeSpan timeout, out bool timedOut)
988979
{
989980
while (true)
990981
{
991-
if (CloseReason != null)
982+
if (!IsOpen)
992983
throw new AlreadyClosedException(CloseReason);
993984

994985
if (m_unconfirmedSet.Count == 0)
@@ -1095,17 +1086,8 @@ public string BasicConsume(string queue,
10951086
Enqueue(k);
10961087
// Non-nowait. We have an unconventional means of getting
10971088
// the RPC response, but a response is still expected.
1098-
try
1099-
{
1100-
_Private_BasicConsume(queue, consumerTag, noLocal, noAck, exclusive,
1089+
_Private_BasicConsume(queue, consumerTag, noLocal, noAck, exclusive,
11011090
/*nowait:*/ false, arguments);
1102-
}
1103-
catch (AlreadyClosedException)
1104-
{
1105-
// Ignored, since the continuation will be told about
1106-
// the closure via an OperationInterruptedException because
1107-
// of the shutdown event propagation.
1108-
}
11091091
k.GetReply();
11101092
string actualConsumerTag = k.m_consumerTag;
11111093

@@ -1141,17 +1123,7 @@ public void BasicCancel(string consumerTag)
11411123

11421124
Enqueue(k);
11431125

1144-
try
1145-
{
1146-
_Private_BasicCancel(consumerTag, false);
1147-
}
1148-
catch (AlreadyClosedException)
1149-
{
1150-
// Ignored, since the continuation will be told about
1151-
// the closure via an OperationInterruptedException because
1152-
// of the shutdown event propagation.
1153-
}
1154-
1126+
_Private_BasicCancel(consumerTag, false);
11551127
k.GetReply();
11561128

11571129
ModelShutdown -= new ModelShutdownEventHandler(k.m_consumer.HandleModelShutdown);
@@ -1197,16 +1169,7 @@ public BasicGetResult BasicGet(string queue,
11971169
{
11981170
BasicGetRpcContinuation k = new BasicGetRpcContinuation();
11991171
Enqueue(k);
1200-
try
1201-
{
1202-
_Private_BasicGet(queue, noAck);
1203-
}
1204-
catch (AlreadyClosedException)
1205-
{
1206-
// Ignored, since the continuation will be told about
1207-
// the closure via an OperationInterruptedException because
1208-
// of the shutdown event propagation.
1209-
}
1172+
_Private_BasicGet(queue, noAck);
12101173
k.GetReply();
12111174
return k.m_result;
12121175
}
@@ -1218,18 +1181,7 @@ public void BasicRecover(bool requeue)
12181181
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();
12191182

12201183
Enqueue(k);
1221-
1222-
try
1223-
{
1224-
_Private_BasicRecover(requeue);
1225-
}
1226-
catch (AlreadyClosedException)
1227-
{
1228-
// Ignored, since the continuation will be told about
1229-
// the closure via an OperationInterruptedException because
1230-
// of the shutdown event propagation.
1231-
}
1232-
1184+
_Private_BasicRecover(requeue);
12331185
k.GetReply();
12341186
}
12351187

@@ -1461,7 +1413,9 @@ public ConnectionSecureOrTune ConnectionStartOk(IDictionary<string, object> clie
14611413
}
14621414
catch (AlreadyClosedException)
14631415
{
1464-
// Ignored, see BasicGet
1416+
// let continuation throw OperationInterruptedException,
1417+
// which is a much more suitable exception before connection
1418+
// negotiation finishes
14651419
}
14661420
k.GetReply();
14671421
return k.m_result;
@@ -1490,7 +1444,9 @@ public ConnectionSecureOrTune ConnectionSecureOk(byte[] response)
14901444
}
14911445
catch (AlreadyClosedException)
14921446
{
1493-
// Ignored, see BasicGet
1447+
// let continuation throw OperationInterruptedException,
1448+
// which is a much more suitable exception before connection
1449+
// negotiation finishes
14941450
}
14951451
k.GetReply();
14961452
return k.m_result;
@@ -1535,7 +1491,9 @@ public string ConnectionOpen(string virtualHost,
15351491
}
15361492
catch (AlreadyClosedException)
15371493
{
1538-
// Ignored, see BasicGet
1494+
// let continuation throw OperationInterruptedException,
1495+
// which is a much more suitable exception before connection
1496+
// negotiation finishes
15391497
}
15401498
k.GetReply();
15411499
if (k.m_redirect) {

projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ public BasicDeliverEventArgs Next()
212212
// from under us by the operation of Close() from
213213
// another thread.
214214
QueueingBasicConsumer consumer = m_consumer;
215-
if (consumer == null) {
215+
if (consumer == null || m_model.IsClosed) {
216216
// Closed!
217217
m_latestEvent = null;
218218
} else {
@@ -275,9 +275,11 @@ public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
275275
// from under us by the operation of Close() from
276276
// another thread.
277277
QueueingBasicConsumer consumer = m_consumer;
278-
if (consumer == null) {
278+
if (consumer == null || m_model.IsClosed) {
279279
// Closed!
280280
m_latestEvent = null;
281+
result = null;
282+
return false;
281283
} else {
282284
BasicDeliverEventArgs qValue;
283285
if (!consumer.Queue.Dequeue(millisecondsTimeout, out qValue)) {

projects/client/Unit/src/unit/TestConnectionBlocked.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ public static bool IsRunningOnMono()
162162
protected void Publish(IConnection conn)
163163
{
164164
IModel ch = conn.CreateModel();
165-
ch.BasicPublish("", "amq.fanout", null, enc.GetBytes("message"));
165+
ch.BasicPublish("amq.fanout", "", null, enc.GetBytes("message"));
166166
}
167167

168168
protected override void ReleaseResources()

0 commit comments

Comments
 (0)