Skip to content

Commit 430c4f5

Browse files
Merge default into bug25191
2 parents 8185944 + 2f0b261 commit 430c4f5

File tree

11 files changed

+454
-12
lines changed

11 files changed

+454
-12
lines changed

docs/specs/amqp0-9-1.stripped.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,15 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
167167
<chassis name="client" implement="MUST"/>
168168
<chassis name="server" implement="MUST"/>
169169
</method>
170+
<method name="blocked" index="60">
171+
<chassis name="server" implement="MUST"/>
172+
<chassis name="client" implement="MUST"/>
173+
<field name="reason" domain="shortstr" />
174+
</method>
175+
<method name="unblocked" index="61">
176+
<chassis name="server" implement="MUST"/>
177+
<chassis name="client" implement="MUST"/>
178+
</method>
170179
</class>
171180
<class name="channel" handler="channel" index="20">
172181
<chassis name="server" implement="MUST"/>

docs/specs/amqp0-9-1.xml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
exchange.bind and exchange.bind-ok,
88
exchange.unbind and exchange.unbind-ok,
99
basic.nack,
10+
connection.blocked and connection.unblocked,
1011
the ability for the Server to send basic.ack, basic.nack and
1112
basic.cancel to the client, and
1213
the un-deprecation of exchange.declare{auto-delete} and exchange.declare{internal}
@@ -893,6 +894,31 @@
893894
<chassis name = "client" implement = "MUST" />
894895
<chassis name = "server" implement = "MUST" />
895896
</method>
897+
898+
<!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
899+
900+
<method name = "blocked" index = "60" label = "indicates a connection is blocked">
901+
<doc>Notifies client that the connection is blocked</doc>
902+
903+
<field name = "reason" domain = "shortstr">
904+
<doc>
905+
Provides an explanation why the connection was blocked.
906+
</doc>
907+
</field>
908+
909+
<chassis name = "client" implement = "MUST" />
910+
<chassis name = "server" implement = "MUST" />
911+
</method>
912+
</class>
913+
914+
<!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
915+
916+
<method name = "unblocked" index = "61" label = "indicates a connection is unblocked">
917+
<doc>Notifies client that the connection is unblocked</doc>
918+
919+
<chassis name = "client" implement = "MUST" />
920+
<chassis name = "server" implement = "MUST" />
921+
</method>
896922
</class>
897923

898924
<!-- == CHANNEL ========================================================== -->

projects/client/RabbitMQ.Client/src/client/api/IConnection.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ public interface IConnection: IDisposable
7979
///</remarks>
8080
event CallbackExceptionEventHandler CallbackException;
8181

82+
event ConnectionBlockedEventHandler ConnectionBlocked;
83+
event ConnectionUnblockedEventHandler ConnectionUnblocked;
84+
8285
///<summary>Retrieve the endpoint this connection is connected
8386
///to.</summary>
8487
AmqpTcpEndpoint Endpoint { get; }
@@ -279,5 +282,12 @@ public interface IConnection: IDisposable
279282
///contain information about any errors reported while closing the
280283
///connection in the order they appeared</summary>
281284
IList ShutdownReport { get; }
285+
286+
287+
///<summary>Handle incoming Connection.Blocked methods.</summary>
288+
void HandleConnectionBlocked(string reason);
289+
290+
///<summary>Handle incoming Connection.Unblocked methods.</summary>
291+
void HandleConnectionUnblocked();
282292
}
283293
}

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,6 +1017,20 @@ void HandleConnectionClose(ushort replyCode,
10171017
string replyText,
10181018
ushort classId,
10191019
ushort methodId);
1020+
1021+
///<summary>Handle an incoming Connection.Blocked.</summary>
1022+
[AmqpMethodMapping(null, "connection", "blocked")]
1023+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
1024+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]
1025+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
1026+
void HandleConnectionBlocked(string reason);
1027+
1028+
///<summary>Handle an incominga Connection.Unblocked.</summary>
1029+
[AmqpMethodMapping(null, "connection", "unblocked")]
1030+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
1031+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]
1032+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
1033+
void HandleConnectionUnblocked();
10201034
}
10211035

10221036
///<summary>Essential information from an incoming Connection.Tune
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (C) 2007-2013 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// http://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v1.1:
23+
//
24+
//---------------------------------------------------------------------------
25+
// The contents of this file are subject to the Mozilla Public License
26+
// Version 1.1 (the "License"); you may not use this file except in
27+
// compliance with the License. You may obtain a copy of the License
28+
// at http://www.mozilla.org/MPL/
29+
//
30+
// Software distributed under the License is distributed on an "AS IS"
31+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32+
// the License for the specific language governing rights and
33+
// limitations under the License.
34+
//
35+
// The Original Code is RabbitMQ.
36+
//
37+
// The Initial Developer of the Original Code is VMware, Inc.
38+
// Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
39+
//---------------------------------------------------------------------------
40+
41+
42+
using System;
43+
44+
namespace RabbitMQ.Client.Events
45+
{
46+
47+
///<summary>Delegate used to process connection blocked events.</summary>
48+
public delegate void ConnectionBlockedEventHandler(IConnection sender, ConnectionBlockedEventArgs args);
49+
50+
///<summary>Event relating to connection being blocked</summary>
51+
public class ConnectionBlockedEventArgs : EventArgs
52+
{
53+
private readonly string m_reason;
54+
55+
public ConnectionBlockedEventArgs(string reason)
56+
{
57+
m_reason = reason;
58+
}
59+
60+
///<summary>Access the reason why connection is blocked</summary>
61+
public string Reason { get { return m_reason; } }
62+
}
63+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (C) 2007-2013 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// http://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v1.1:
23+
//
24+
//---------------------------------------------------------------------------
25+
// The contents of this file are subject to the Mozilla Public License
26+
// Version 1.1 (the "License"); you may not use this file except in
27+
// compliance with the License. You may obtain a copy of the License
28+
// at http://www.mozilla.org/MPL/
29+
//
30+
// Software distributed under the License is distributed on an "AS IS"
31+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32+
// the License for the specific language governing rights and
33+
// limitations under the License.
34+
//
35+
// The Original Code is RabbitMQ.
36+
//
37+
// The Initial Developer of the Original Code is VMware, Inc.
38+
// Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
39+
//---------------------------------------------------------------------------
40+
41+
42+
using System;
43+
44+
namespace RabbitMQ.Client.Events
45+
{
46+
47+
///<summary>Delegate used to process connection unblocked events.</summary>
48+
public delegate void ConnectionUnblockedEventHandler(IConnection sender);
49+
}

projects/client/RabbitMQ.Client/src/client/impl/ConnectionBase.cs

Lines changed: 99 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ public abstract class ConnectionBase : IConnection
9090
public volatile ShutdownEventArgs m_closeReason = null;
9191
public CallbackExceptionEventHandler m_callbackException;
9292

93+
public ConnectionBlockedEventHandler m_connectionBlocked;
94+
public ConnectionUnblockedEventHandler m_connectionUnblocked;
95+
9396
public ManualResetEvent m_appContinuation = new ManualResetEvent(false);
9497
public AutoResetEvent m_heartbeatRead = new AutoResetEvent(false);
9598
public AutoResetEvent m_heartbeatWrite = new AutoResetEvent(false);
@@ -146,6 +149,42 @@ public event ConnectionShutdownEventHandler ConnectionShutdown
146149
}
147150
}
148151

152+
public event ConnectionBlockedEventHandler ConnectionBlocked
153+
{
154+
add
155+
{
156+
lock (m_eventLock)
157+
{
158+
m_connectionBlocked += value;
159+
}
160+
}
161+
remove
162+
{
163+
lock (m_eventLock)
164+
{
165+
m_connectionBlocked -= value;
166+
}
167+
}
168+
}
169+
170+
public event ConnectionUnblockedEventHandler ConnectionUnblocked
171+
{
172+
add
173+
{
174+
lock (m_eventLock)
175+
{
176+
m_connectionUnblocked += value;
177+
}
178+
}
179+
remove
180+
{
181+
lock (m_eventLock)
182+
{
183+
m_connectionUnblocked -= value;
184+
}
185+
}
186+
}
187+
149188
public event CallbackExceptionEventHandler CallbackException
150189
{
151190
add
@@ -338,12 +377,12 @@ void IDisposable.Dispose()
338377
Abort();
339378
if (ShutdownReport.Count > 0)
340379
{
341-
foreach (ShutdownReportEntry entry in ShutdownReport)
342-
{
343-
if (entry.Exception != null)
344-
throw entry.Exception;
345-
}
346-
throw new OperationInterruptedException(null);
380+
foreach (ShutdownReportEntry entry in ShutdownReport)
381+
{
382+
if (entry.Exception != null)
383+
throw entry.Exception;
384+
}
385+
throw new OperationInterruptedException(null);
347386
}
348387
}
349388

@@ -858,6 +897,60 @@ public void PrettyPrintShutdownReport()
858897
}
859898
}
860899

900+
public void HandleConnectionBlocked(string reason)
901+
{
902+
ConnectionBlockedEventArgs args = new ConnectionBlockedEventArgs(reason);
903+
OnConnectionBlocked(args);
904+
}
905+
906+
public void OnConnectionBlocked(ConnectionBlockedEventArgs args)
907+
{
908+
ConnectionBlockedEventHandler handler;
909+
lock (m_eventLock)
910+
{
911+
handler = m_connectionBlocked;
912+
}
913+
if (handler != null)
914+
{
915+
foreach (ConnectionBlockedEventHandler h in handler.GetInvocationList()) {
916+
try {
917+
h(this, args);
918+
} catch (Exception e) {
919+
CallbackExceptionEventArgs cee_args = new CallbackExceptionEventArgs(e);
920+
cee_args.Detail["context"] = "OnConnectionBlocked";
921+
OnCallbackException(cee_args);
922+
}
923+
}
924+
}
925+
}
926+
927+
928+
public void HandleConnectionUnblocked()
929+
{
930+
OnConnectionUnblocked();
931+
}
932+
933+
public void OnConnectionUnblocked()
934+
{
935+
ConnectionUnblockedEventHandler handler;
936+
lock (m_eventLock)
937+
{
938+
handler = m_connectionUnblocked;
939+
}
940+
if (handler != null)
941+
{
942+
foreach (ConnectionUnblockedEventHandler h in handler.GetInvocationList()) {
943+
try {
944+
h(this);
945+
} catch (Exception e) {
946+
CallbackExceptionEventArgs args = new CallbackExceptionEventArgs(e);
947+
args.Detail["context"] = "OnConnectionUnblocked";
948+
OnCallbackException(args);
949+
}
950+
}
951+
}
952+
}
953+
861954
///<summary>Broadcasts notification of the final shutdown of the connection.</summary>
862955
public void OnShutdown()
863956
{

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,20 @@ public void HandleConnectionClose(ushort replyCode,
724724
}
725725
}
726726

727+
public void HandleConnectionBlocked(string reason)
728+
{
729+
ConnectionBase cb = ((ConnectionBase)m_session.Connection);
730+
731+
cb.HandleConnectionBlocked(reason);
732+
}
733+
734+
public void HandleConnectionUnblocked()
735+
{
736+
ConnectionBase cb = ((ConnectionBase)m_session.Connection);
737+
738+
cb.HandleConnectionUnblocked();
739+
}
740+
727741
public void HandleChannelClose(ushort replyCode,
728742
string replyText,
729743
ushort classId,
@@ -1321,12 +1335,12 @@ void IDisposable.Dispose()
13211335

13221336
public void Close()
13231337
{
1324-
Close(CommonFraming.Constants.ReplySuccess, "Goodbye");
1338+
Close(CommonFraming.Constants.ReplySuccess, "Goodbye");
13251339
}
13261340

13271341
public void Close(ushort replyCode, string replyText)
13281342
{
1329-
Close(replyCode, replyText, false);
1343+
Close(replyCode, replyText, false);
13301344
}
13311345

13321346
public void Abort()
@@ -1358,11 +1372,11 @@ public void Close(ShutdownEventArgs reason, bool abort)
13581372
}
13591373
k.Wait();
13601374
} catch (AlreadyClosedException ace) {
1361-
if (!abort)
1362-
throw ace;
1375+
if (!abort)
1376+
throw ace;
13631377
} catch (IOException ioe) {
1364-
if (!abort)
1365-
throw ioe;
1378+
if (!abort)
1379+
throw ioe;
13661380
}
13671381
}
13681382

0 commit comments

Comments
 (0)