Skip to content

Commit dda955d

Browse files
author
Alexandru Scvortov
committed
add basic confirm barrier
1 parent a81b7d3 commit dda955d

File tree

3 files changed

+68
-2
lines changed

3 files changed

+68
-2
lines changed

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,21 @@ uint QueueDelete(string queue,
317317
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
318318
void ConfirmSelect();
319319

320+
//<summary>Wait until all published messages have been confirmed.
321+
//<remarks>
322+
//Waits until all messages published since the last call have
323+
//been either ack'd or nack'd by the broker. Returns whether
324+
//all the messages were ack'd (and none were nack'd). Note,
325+
//when called on a non-Confirm channel, returns true
326+
//immediately.
327+
//</remarks>
328+
[AmqpMethodDoNotImplement(null)]
329+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
330+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]
331+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
332+
bool WaitForConfirms();
333+
334+
320335
///<summary>Start a Basic content-class consumer.</summary>
321336
///<remarks>
322337
///The consumer is started with noAck=false (i.e. BasicAck is required),

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ public abstract class ModelBase : IFullModel
7676
private readonly object m_flowSendLock = new object();
7777

7878
private ulong m_nextPubSeqNo;
79+
private SortedList m_unconfirmedSet =
80+
SortedList.Synchronized(new SortedList());
81+
private bool m_onlyAcksReceived = true;
7982

8083
public event ModelShutdownEventHandler ModelShutdown
8184
{
@@ -352,6 +355,8 @@ public virtual void OnBasicAck(BasicAckEventArgs args)
352355
}
353356
}
354357
}
358+
359+
handleAckNack(args.DeliveryTag, args.Multiple, false);
355360
}
356361

357362
public virtual void OnBasicNack(BasicNackEventArgs args)
@@ -373,6 +378,24 @@ public virtual void OnBasicNack(BasicNackEventArgs args)
373378
}
374379
}
375380
}
381+
382+
handleAckNack(args.DeliveryTag, args.Multiple, true);
383+
}
384+
385+
protected virtual void handleAckNack(ulong deliveryTag, bool multiple, bool isNack)
386+
{
387+
if (multiple) {
388+
for (ulong i = (ulong)m_unconfirmedSet.GetKey(0); i <= deliveryTag; i++) {
389+
m_unconfirmedSet.Remove(i);
390+
}
391+
} else {
392+
m_unconfirmedSet.Remove(deliveryTag);
393+
}
394+
lock (m_unconfirmedSet.SyncRoot) {
395+
m_onlyAcksReceived = m_onlyAcksReceived && !isNack;
396+
if (m_unconfirmedSet.Count == 0)
397+
Monitor.Pulse(m_unconfirmedSet.SyncRoot);
398+
}
376399
}
377400

378401
public virtual void OnCallbackException(CallbackExceptionEventArgs args)
@@ -902,11 +925,26 @@ public abstract uint _Private_QueueDelete(string queue,
902925
bool ifEmpty,
903926
bool nowait);
904927

905-
public void ConfirmSelect() {
928+
public void ConfirmSelect()
929+
{
906930
m_nextPubSeqNo = 1;
907931
_Private_ConfirmSelect(false);
908932
}
909933

934+
public bool WaitForConfirms()
935+
{
936+
lock (m_unconfirmedSet.SyncRoot) {
937+
while (true) {
938+
if (m_unconfirmedSet.Count == 0) {
939+
bool aux = m_onlyAcksReceived;
940+
m_onlyAcksReceived = true;
941+
return aux;
942+
}
943+
Monitor.Wait(m_unconfirmedSet.SyncRoot);
944+
}
945+
}
946+
}
947+
910948
public abstract void _Private_ConfirmSelect(bool nowait);
911949

912950
public string BasicConsume(string queue,
@@ -1143,7 +1181,10 @@ public void BasicPublish(string exchange,
11431181
{
11441182
basicProperties = CreateBasicProperties();
11451183
}
1146-
if (m_nextPubSeqNo > 0) m_nextPubSeqNo++;
1184+
if (m_nextPubSeqNo > 0) {
1185+
m_unconfirmedSet.Add(m_nextPubSeqNo, null);
1186+
m_nextPubSeqNo++;
1187+
}
11471188
_Private_BasicPublish(exchange,
11481189
routingKey,
11491190
mandatory,

projects/client/Unit/src/unit/TestExtensions.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,5 +73,15 @@ public void TestExchangeBinding()
7373
Model.ExchangeDelete("src");
7474
Model.ExchangeDelete("dest");
7575
}
76+
77+
[Test]
78+
public void TestConfirmBarrier()
79+
{
80+
Assert.That(Model.WaitForConfirms(), Is.Not.False);
81+
Model.ConfirmSelect();
82+
for (int i = 0; i < 10; i++)
83+
Model.BasicPublish("", String.Empty, null, new byte[] { });
84+
Assert.That(Model.WaitForConfirms(), Is.True);
85+
}
7686
}
7787
}

0 commit comments

Comments
 (0)