Skip to content

Commit 77f69a1

Browse files
author
Alexandru Scvortov
committed
removed confirm from qpid 0-8; right ordering for method parameters; Model keeps track of published message count
1 parent 15c005e commit 77f69a1

File tree

8 files changed

+64
-76
lines changed

8 files changed

+64
-76
lines changed

docs/specs/amqp0-8.stripped.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -680,10 +680,10 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
680680
<method name="select" synchronous="1" index="10">
681681
<chassis name="server" implement="MUST"/>
682682
<response name="select-ok"/>
683-
<field name = "nowait" type = "bit">
684-
</field>
685683
<field name = "multiple" type = "bit">
686684
</field>
685+
<field name = "nowait" type = "bit">
686+
</field>
687687
</method>
688688
<method name="select-ok" synchronous="1" index="11">
689689
<chassis name="client" implement="MUST"/>

docs/specs/amqp0-8.xml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3694,14 +3694,6 @@ confirm transaction mode
36943694
</doc>
36953695
<chassis name="server" implement="MUST"/>
36963696
<response name="select-ok"/>
3697-
<field name = "nowait" type = "bit">
3698-
do not send a reply method
3699-
<doc>
3700-
If set, the server will not respond to the method. The client should
3701-
not wait for a reply method. If the server could not complete the
3702-
method it will raise a channel or connection exception.
3703-
</doc>
3704-
</field>
37053697
<field name = "multiple" type = "bit">
37063698
acknowledge multiple messages
37073699
<doc>
@@ -3711,6 +3703,14 @@ confirm transaction mode
37113703
to zero, the delivery tag refers to a single message.
37123704
</doc>
37133705
</field>
3706+
<field name = "nowait" type = "bit">
3707+
do not send a reply method
3708+
<doc>
3709+
If set, the server will not respond to the method. The client should
3710+
not wait for a reply method. If the server could not complete the
3711+
method it will raise a channel or connection exception.
3712+
</doc>
3713+
</field>
37143714
</method>
37153715
<method name="select-ok" synchronous="1" index="11">
37163716
acknowledge confirm mode

docs/specs/amqp0-9.stripped.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -712,10 +712,10 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
712712
<method name="select" synchronous="1" index="10">
713713
<chassis name="server" implement="MUST"/>
714714
<response name="select-ok"/>
715-
<field name = "nowait" type = "bit">
716-
</field>
717715
<field name = "multiple" type = "bit">
718716
</field>
717+
<field name = "nowait" type = "bit">
718+
</field>
719719
</method>
720720
<method name="select-ok" synchronous="1" index="11">
721721
<chassis name="client" implement="MUST"/>

docs/specs/amqp0-9.xml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4394,14 +4394,6 @@
43944394
</doc>
43954395
<chassis name="server" implement="MUST"/>
43964396
<response name="select-ok"/>
4397-
<field name = "nowait" type = "bit">
4398-
do not send a reply method
4399-
<doc>
4400-
If set, the server will not respond to the method. The client should
4401-
not wait for a reply method. If the server could not complete the
4402-
method it will raise a channel or connection exception.
4403-
</doc>
4404-
</field>
44054397
<field name = "multiple" type = "bit">
44064398
acknowledge multiple messages
44074399
<doc>
@@ -4411,6 +4403,14 @@
44114403
to zero, the delivery tag refers to a single message.
44124404
</doc>
44134405
</field>
4406+
<field name = "nowait" type = "bit">
4407+
do not send a reply method
4408+
<doc>
4409+
If set, the server will not respond to the method. The client should
4410+
not wait for a reply method. If the server could not complete the
4411+
method it will raise a channel or connection exception.
4412+
</doc>
4413+
</field>
44144414
</method>
44154415
<method name="select-ok" synchronous="1" index="11">
44164416
acknowledge confirm mode

docs/specs/qpid-amqp.0-8.stripped.xml

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,6 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
446446
</method>
447447
<method name="ack" index="80">
448448
<chassis name="server" implement="MUST"/>
449-
<chassis name="client" implement="MUST"/>
450449
<field name="delivery tag" domain="delivery tag"/>
451450
<field name="multiple" type="bit"/>
452451
</method>
@@ -678,19 +677,6 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
678677
<chassis name="client" implement="MUST"/>
679678
</method>
680679
</class>
681-
<class name="confirm" handler="channel" index="85">
682-
<method name="select" synchronous="1" index="10">
683-
<chassis name="server" implement="MUST"/>
684-
<response name="select-ok"/>
685-
<field name = "nowait" type = "bit">
686-
</field>
687-
<field name = "multiple" type = "bit">
688-
</field>
689-
</method>
690-
<method name="select-ok" synchronous="1" index="11">
691-
<chassis name="client" implement="MUST"/>
692-
</method>
693-
</class>
694680
<class name="tunnel" handler="tunnel" index="110">
695681
<chassis name="server" implement="MAY"/>
696682
<chassis name="client" implement="MAY"/>

docs/specs/qpid-amqp.0-8.xml

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2430,7 +2430,6 @@ localised reply text
24302430
message.
24312431
</doc>
24322432
<chassis name = "server" implement = "MUST" />
2433-
<chassis name = "client" implement = "MUST" />
24342433
<field name = "delivery tag" domain = "delivery tag" />
24352434

24362435
<field name = "multiple" type = "bit">
@@ -3716,43 +3715,6 @@ confirm transaction mode
37163715
<chassis name="client" implement="MUST"/>
37173716
</method>
37183717
</class>
3719-
<class name="confirm" handler="channel" index="85">
3720-
<method name="select" synchronous="1" index="10">
3721-
select confirm mode (i.e. enable publisher acknowledgements)
3722-
<doc>
3723-
This method sets the channel to use publisher acknowledgements.
3724-
The client can only use this method on a non-transactional
3725-
channel.
3726-
</doc>
3727-
<chassis name="server" implement="MUST"/>
3728-
<response name="select-ok"/>
3729-
<field name = "nowait" type = "bit">
3730-
do not send a reply method
3731-
<doc>
3732-
If set, the server will not respond to the method. The client should
3733-
not wait for a reply method. If the server could not complete the
3734-
method it will raise a channel or connection exception.
3735-
</doc>
3736-
</field>
3737-
<field name = "multiple" type = "bit">
3738-
acknowledge multiple messages
3739-
<doc>
3740-
If set to 1, the delivery tag in the returned acks is
3741-
treated as "up to and including", so that the server can
3742-
acknowledge multiple messages with a single method. If set
3743-
to zero, the delivery tag refers to a single message.
3744-
</doc>
3745-
</field>
3746-
</method>
3747-
<method name="select-ok" synchronous="1" index="11">
3748-
acknowledge confirm mode
3749-
<doc>
3750-
This method confirms to the client that the channel was successfully
3751-
set to use publisher acknowledgements.
3752-
</doc>
3753-
<chassis name="client" implement="MUST"/>
3754-
</method>
3755-
</class>
37563718
<class name="tunnel" handler="tunnel" index="110">
37573719
<!--
37583720
======================================================

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ public interface IModel: IDisposable
131131
///== null.</summary>
132132
bool IsOpen { get; }
133133

134+
///<summary>Returns the number of messages published since the
135+
///channel was put in confirm mode.</summary>
136+
ulong? PublishedMessageCount { get; }
137+
134138
///<summary>Construct a completely empty content header for
135139
///use with the Basic content class.</summary>
136140
[AmqpContentHeaderFactory("basic")]
@@ -272,10 +276,15 @@ uint QueueDelete(string queue,
272276
[AmqpNowaitArgument(null, "0xFFFFFFFF")]
273277
bool nowait);
274278

279+
///<summary>Enable publisher acknowledgements.</summary>
280+
[AmqpMethodDoNotImplement(null)]
281+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
282+
void ConfirmSelect(bool multiple);
283+
275284
///<summary>(Spec method) Enable publisher acknowledgements.</summary>
276-
void ConfirmSelect(bool multiple,
277-
[AmqpNowaitArgument(null, "0xFFFFFFFF")]
278-
bool nowait);
285+
[AmqpMethodDoNotImplement(null)]
286+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
287+
void ConfirmSelect(bool multiple, bool nowait);
279288

280289
///<summary>Start a Basic content-class consumer.</summary>
281290
///<remarks>
@@ -556,6 +565,15 @@ void _Private_BasicConsume(string queue,
556565
"arguments")]
557566
IDictionary filter);
558567

568+
///<summary>Used to send a Confirm.Select method. The public
569+
///confirm API calls this while also managing internal
570+
///datastructures.</summary>
571+
[AmqpMethodMapping(null, "confirm", "select")]
572+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
573+
void _Private_ConfirmSelect(bool multiple,
574+
bool nowait);
575+
576+
559577
///<summary>Handle incoming Basic.ConsumeOk methods.</summary>
560578
void HandleBasicConsumeOk(string consumerTag);
561579

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ public abstract class ModelBase : IFullModel
8686

8787
public ManualResetEvent m_flowControlBlock = new ManualResetEvent(true);
8888

89+
private ulong? m_pubMsgCount = null;
90+
8991
public event ModelShutdownEventHandler ModelShutdown
9092
{
9193
add
@@ -367,6 +369,14 @@ public bool IsOpen
367369
}
368370
}
369371

372+
public ulong? PublishedMessageCount
373+
{
374+
get
375+
{
376+
return m_pubMsgCount;
377+
}
378+
}
379+
370380
public void ModelSend(MethodBase method, ContentHeaderBase header, byte[] body)
371381
{
372382
if (method.HasContent) {
@@ -616,7 +626,17 @@ public abstract uint QueueDelete(string queue,
616626
bool ifEmpty,
617627
bool nowait);
618628

619-
public abstract void ConfirmSelect(bool multiple, bool nowait);
629+
public void ConfirmSelect(bool multiple) {
630+
ConfirmSelect(multiple, false);
631+
}
632+
633+
public void ConfirmSelect(bool multiple, bool nowait) {
634+
m_pubMsgCount = 0;
635+
_Private_ConfirmSelect(multiple, nowait);
636+
}
637+
638+
public abstract void _Private_ConfirmSelect(bool multiple,
639+
bool nowait);
620640

621641
public string BasicConsume(string queue,
622642
IDictionary filter,
@@ -830,6 +850,8 @@ public void BasicPublish(string exchange,
830850
{
831851
basicProperties = CreateBasicProperties();
832852
}
853+
if (m_pubMsgCount.HasValue)
854+
m_pubMsgCount++;
833855
_Private_BasicPublish(exchange,
834856
routingKey,
835857
mandatory,

0 commit comments

Comments
 (0)