Skip to content

Commit 67643cd

Browse files
author
Tim Watson
committed
Merge bug26079 into default
2 parents 90e632d + 4416c59 commit 67643cd

File tree

6 files changed

+128
-8
lines changed

6 files changed

+128
-8
lines changed

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,15 @@ public interface IModel: IDisposable
126126
///otherwise.</summary>
127127
ShutdownEventArgs CloseReason { get; }
128128

129-
///<summary>Returns true if the session is still in a state
129+
///<summary>Returns true if the model is still in a state
130130
///where it can be used. Identical to checking if CloseReason
131131
///== null.</summary>
132132
bool IsOpen { get; }
133133

134+
///<summary>Returns true if the model is no longer in a state
135+
///where it can be used.</summary>
136+
bool IsClosed { get; }
137+
134138
///<summary>When in confirm mode, return the sequence number
135139
///of the next message to be published.</summary>
136140
ulong NextPublishSeqNo { get; }

projects/client/RabbitMQ.Client/src/client/api/SslHelper.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,15 @@ private bool CertificateValidationCallback(object sender,
9090
public static Stream TcpUpgrade(Stream tcpStream, SslOption sslOption, int timeout)
9191
{
9292
SslHelper helper = new SslHelper(sslOption);
93+
94+
RemoteCertificateValidationCallback remoteCertValidator =
95+
sslOption.CertificateValidationCallback ?? new RemoteCertificateValidationCallback(helper.CertificateValidationCallback);
96+
LocalCertificateSelectionCallback localCertSelector =
97+
sslOption.CertificateSelectionCallback ?? new LocalCertificateSelectionCallback(helper.CertificateSelectionCallback);
98+
9399
SslStream sslStream = new SslStream(tcpStream, false,
94-
new RemoteCertificateValidationCallback(helper.CertificateValidationCallback),
95-
new LocalCertificateSelectionCallback(helper.CertificateSelectionCallback));
100+
remoteCertValidator,
101+
localCertSelector);
96102

97103
sslStream.AuthenticateAsClient(sslOption.ServerName,
98104
sslOption.Certs,

projects/client/RabbitMQ.Client/src/client/api/SslOption.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,18 @@ public SslPolicyErrors AcceptablePolicyErrors
136136
set { m_acceptablePolicyErrors = value; }
137137
}
138138

139+
/// <summary>
140+
/// An optional client specified SSL certificate validation callback. If this is not specified,
141+
/// the default callback will be used in conjunction with the AcceptablePolicyErrors property to
142+
/// determine if the remote server certificate is valid.
143+
/// </summary>
144+
public RemoteCertificateValidationCallback CertificateValidationCallback { get; set; }
145+
146+
/// <summary>
147+
/// An optional client specified SSL certificate selection callback. If this is not specified,
148+
/// the first valid certificate found will be used.
149+
/// </summary>
150+
public LocalCertificateSelectionCallback CertificateSelectionCallback { get; set; }
139151

140152
///<summary>Construct an SslOption specifying both the server cannonical name
141153
///and the client's certificate path.
@@ -145,6 +157,8 @@ public SslOption(string serverName, string certPath, bool enabled)
145157
m_serverName= serverName;
146158
m_certPath = certPath;
147159
m_enabled = enabled;
160+
CertificateValidationCallback = null;
161+
CertificateSelectionCallback = null;
148162
}
149163

150164
///<summary>Construct an SslOption with just the server cannonical name.

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,15 @@ public bool IsOpen
513513
}
514514
}
515515

516+
public bool IsClosed
517+
{
518+
get
519+
{
520+
return !IsOpen;
521+
}
522+
}
523+
524+
516525
public ulong NextPublishSeqNo
517526
{
518527
get
@@ -1404,7 +1413,9 @@ public ConnectionSecureOrTune ConnectionStartOk(IDictionary<string, object> clie
14041413
}
14051414
catch (AlreadyClosedException)
14061415
{
1407-
// Ignored, see BasicGet
1416+
// let continuation throw OperationInterruptedException,
1417+
// which is a much more suitable exception before connection
1418+
// negotiation finishes
14081419
}
14091420
k.GetReply();
14101421
return k.m_result;
@@ -1433,7 +1444,9 @@ public ConnectionSecureOrTune ConnectionSecureOk(byte[] response)
14331444
}
14341445
catch (AlreadyClosedException)
14351446
{
1436-
// Ignored, see BasicGet
1447+
// let continuation throw OperationInterruptedException,
1448+
// which is a much more suitable exception before connection
1449+
// negotiation finishes
14371450
}
14381451
k.GetReply();
14391452
return k.m_result;
@@ -1478,7 +1491,9 @@ public string ConnectionOpen(string virtualHost,
14781491
}
14791492
catch (AlreadyClosedException)
14801493
{
1481-
// Ignored, see BasicGet
1494+
// let continuation throw OperationInterruptedException,
1495+
// which is a much more suitable exception before connection
1496+
// negotiation finishes
14821497
}
14831498
k.GetReply();
14841499
if (k.m_redirect) {

projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ public BasicDeliverEventArgs Next()
212212
// from under us by the operation of Close() from
213213
// another thread.
214214
QueueingBasicConsumer consumer = m_consumer;
215-
if (consumer == null) {
215+
if (consumer == null || m_model.IsClosed) {
216216
// Closed!
217217
m_latestEvent = null;
218218
} else {
@@ -275,9 +275,11 @@ public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
275275
// from under us by the operation of Close() from
276276
// another thread.
277277
QueueingBasicConsumer consumer = m_consumer;
278-
if (consumer == null) {
278+
if (consumer == null || m_model.IsClosed) {
279279
// Closed!
280280
m_latestEvent = null;
281+
result = null;
282+
return false;
281283
} else {
282284
BasicDeliverEventArgs qValue;
283285
if (!consumer.Queue.Dequeue(millisecondsTimeout, out qValue)) {
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (C) 2007-2013 GoPivotal, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// http://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v1.1:
23+
//
24+
//---------------------------------------------------------------------------
25+
// The contents of this file are subject to the Mozilla Public License
26+
// Version 1.1 (the "License"); you may not use this file except in
27+
// compliance with the License. You may obtain a copy of the License
28+
// at http://www.mozilla.org/MPL/
29+
//
30+
// Software distributed under the License is distributed on an "AS IS"
31+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32+
// the License for the specific language governing rights and
33+
// limitations under the License.
34+
//
35+
// The Original Code is RabbitMQ.
36+
//
37+
// The Initial Developer of the Original Code is GoPivotal, Inc.
38+
// Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
39+
//---------------------------------------------------------------------------
40+
41+
using NUnit.Framework;
42+
43+
using System;
44+
using System.Text;
45+
using System.Threading;
46+
using System.Diagnostics;
47+
48+
using RabbitMQ.Client.Events;
49+
using RabbitMQ.Client.MessagePatterns;
50+
51+
namespace RabbitMQ.Client.Unit {
52+
[TestFixture]
53+
public class TestMessagePatternsSubscription : IntegrationFixture {
54+
UTF8Encoding enc = new UTF8Encoding();
55+
56+
[Test]
57+
public void TestChannelClosureIsObservableOnSubscription()
58+
{
59+
string q = Model.QueueDeclare();
60+
Subscription sub = new Subscription(Model, q, true);
61+
62+
BasicDeliverEventArgs r1;
63+
Assert.IsFalse(sub.Next(100, out r1));
64+
65+
Model.BasicPublish("", q, null, enc.GetBytes("a message"));
66+
Model.BasicPublish("", q, null, enc.GetBytes("a message"));
67+
68+
BasicDeliverEventArgs r2;
69+
Assert.IsTrue(sub.Next(1000, out r2));
70+
Assert.IsNotNull(sub.Next());
71+
72+
Model.Close();
73+
Assert.IsNull(sub.Next());
74+
75+
BasicDeliverEventArgs r3;
76+
Assert.IsFalse(sub.Next(100, out r3));
77+
}
78+
}
79+
}

0 commit comments

Comments
 (0)