Skip to content

Commit 1667f69

Browse files
author
Matthew Sackman
committed
Added support for consumer_cancel_notify to .Net client
1 parent 89c04b7 commit 1667f69

File tree

7 files changed

+55
-4
lines changed

7 files changed

+55
-4
lines changed

docs/specs/amqp0-9-1.stripped.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,12 +384,14 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
384384
</method>
385385
<method name="cancel" synchronous="1" index="30">
386386
<chassis name="server" implement="MUST"/>
387+
<chassis name="client" implement="SHOULD"/>
387388
<response name="cancel-ok"/>
388389
<field name="consumer-tag" domain="consumer-tag"/>
389390
<field name="no-wait" domain="no-wait"/>
390391
</method>
391392
<method name="cancel-ok" synchronous="1" index="31">
392393
<chassis name="client" implement="MUST"/>
394+
<chassis name="server" implement="MAY"/>
393395
<field name="consumer-tag" domain="consumer-tag"/>
394396
</method>
395397
<method name="publish" content="1" index="40">

docs/specs/amqp0-9-1.xml

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
exchange.bind and exchange.bind-ok,
88
exchange.unbind and exchange.unbind-ok,
99
basic.nack
10-
and the ability for the Server to send basic.ack and basic.nack to the client.
10+
and the ability for the Server to send basic.ack, basic.nack and
11+
basic.cancel to the client.
1112
-->
1213

1314
<!--
@@ -154,9 +155,8 @@
154155
<amqp major = "0" minor = "9" revision = "1"
155156
port = "5672" comment = "AMQ Protocol version 0-9-1">
156157
<!--
157-
======================================================
158-
== CONSTANTS
159-
======================================================
158+
====================================================== ==
159+
CONSTANTS ======================================================
160160
-->
161161
<!-- Frame types -->
162162
<constant name = "frame-method" value = "1" />
@@ -2533,6 +2533,13 @@
25332533
messages, but it does mean the server will not send any more messages for
25342534
that consumer. The client may receive an arbitrary number of messages in
25352535
between sending the cancel method and receiving the cancel-ok reply.
2536+
2537+
It is also sent from the server to the client in the event of
2538+
the consumer being unexpectedly cancelled (i.e. cancelled for
2539+
any reason other than the server receiving the corresponding
2540+
basic.cancel from the client). This allows clients to be
2541+
notified of the loss of consumers due to events such as queue
2542+
deletion.
25362543
</doc>
25372544

25382545
<rule name = "01">
@@ -2546,6 +2553,7 @@
25462553
</rule>
25472554

25482555
<chassis name = "server" implement = "MUST" />
2556+
<chassis name = "client" implement = "SHOULD" />
25492557
<response name = "cancel-ok" />
25502558

25512559
<field name = "consumer-tag" domain = "consumer-tag" />
@@ -2557,6 +2565,7 @@
25572565
This method confirms that the cancellation was completed.
25582566
</doc>
25592567
<chassis name = "client" implement = "MUST" />
2568+
<chassis name = "server" implement = "MAY" />
25602569
<field name = "consumer-tag" domain = "consumer-tag" />
25612570
</method>
25622571

projects/client/RabbitMQ.Client/src/client/api/DefaultBasicConsumer.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,12 @@ public virtual void HandleBasicCancelOk(string consumerTag)
131131
OnCancel();
132132
}
133133

134+
///<summary>Default implementation - calls OnCancel().</summary>
135+
public virtual void HandleBasicCancel(string consumerTag)
136+
{
137+
OnCancel();
138+
}
139+
134140
///<summary>Default implementation - sets ShutdownReason and
135141
///calls OnCancel().</summary>
136142
public virtual void HandleModelShutdown(IModel model, ShutdownEventArgs reason)

projects/client/RabbitMQ.Client/src/client/api/IBasicConsumer.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,14 @@ public interface IBasicConsumer
6969
///<summary>Called upon successful deregistration of the
7070
///consumer from the broker.</summary>
7171
void HandleBasicCancelOk(string consumerTag);
72+
73+
/// <summary>
74+
/// Called when the consumer is cancelled for reasons other than by a
75+
/// basicCancel: e.g. the queue has been deleted (either by this channel or
76+
/// by any other channel). See handleCancelOk for notification of consumer
77+
/// cancellation due to basicCancel.
78+
/// </summary>
79+
void HandleBasicCancel(string consumerTag);
7280

7381
///<summary>Called when the model shuts down.</summary>
7482
void HandleModelShutdown(IModel model, ShutdownEventArgs reason);

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -801,6 +801,8 @@ void HandleBasicDeliver(string consumerTag,
801801
[AmqpContentBodyMapping]
802802
byte[] body);
803803

804+
void HandleBasicCancel(string consumerTag, bool nowait);
805+
804806
///<summary>Handle incoming Basic.Return methods. Signals a
805807
///BasicReturnEvent.</summary>
806808
void HandleBasicReturn(ushort replyCode,

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,29 @@ public void HandleBasicDeliver(string consumerTag,
556556
}
557557
}
558558

559+
public void HandleBasicCancel(string consumerTag, bool nowait)
560+
{
561+
IBasicConsumer consumer;
562+
lock (m_consumers)
563+
{
564+
consumer = (IBasicConsumer)m_consumers[consumerTag];
565+
m_consumers.Remove(consumerTag);
566+
}
567+
if (consumer == null)
568+
{
569+
consumer = DefaultConsumer;
570+
}
571+
572+
try {
573+
consumer.HandleBasicCancel(consumerTag);
574+
} catch (Exception e) {
575+
CallbackExceptionEventArgs args = new CallbackExceptionEventArgs(e);
576+
args.Detail["consumer"] = consumer;
577+
args.Detail["context"] = "HandleBasicCancel";
578+
OnCallbackException(args);
579+
}
580+
}
581+
559582
public void HandleBasicReturn(ushort replyCode,
560583
string replyText,
561584
string exchange,

projects/client/RabbitMQ.Client/src/client/impl/v0_9_1/ProtocolBase.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public ProtocolBase() {
5151
Capabilities["publisher_confirms"] = true;
5252
Capabilities["exchange_exchange_bindings"] = true;
5353
Capabilities["basic.nack"] = true;
54+
Capabilities["consumer_cancel_notify"] = true;
5455
}
5556

5657
public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint) {

0 commit comments

Comments
 (0)