Skip to content

Commit f0da4b1

Browse files
author
Alexandru Scvortov
committed
merge bug 24538 into default (WaitForConfirms with timeouts)
2 parents 63309a6 + c78dfd7 commit f0da4b1

File tree

2 files changed

+80
-6
lines changed

2 files changed

+80
-6
lines changed

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,28 @@ uint QueueDelete(string queue,
332332
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
333333
bool WaitForConfirms();
334334

335+
///<summary>Wait until all published messages have been confirmed.
336+
///</summary>
337+
///<returns>true if no nacks were received within the timeout,
338+
///otherwise false</returns>
339+
///<param name="timeout">How long to wait (at most) before returning
340+
///whether or not any nacks were returned</param>
341+
///<param name="timedOut">True if the method returned because
342+
///the timeout elapsed, not because all messages were ack'd
343+
///or at least one nack'd.</param>
344+
///<remarks>
345+
///Waits until all messages published since the last call have
346+
///been either ack'd or nack'd by the broker. Returns whether
347+
///all the messages were ack'd (and none were nack'd). Note,
348+
///when called on a non-Confirm channel, returns true
349+
///immediately.
350+
///</remarks>
351+
[AmqpMethodDoNotImplement(null)]
352+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
353+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]
354+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
355+
bool WaitForConfirms(TimeSpan timeout, out bool timedOut);
356+
335357
///<summary>Wait until all published messages have been confirmed.
336358
///</summary>
337359
///<remarks>
@@ -345,6 +367,20 @@ uint QueueDelete(string queue,
345367
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
346368
void WaitForConfirmsOrDie();
347369

370+
///<summary>Wait until all published messages have been confirmed.
371+
///</summary>
372+
///<remarks>
373+
///Waits until all messages published since the last call have
374+
///been ack'd by the broker. If a nack is received or the timeout
375+
///elapses, throws an OperationInterrupedException exception
376+
///immediately.
377+
///</remarks>
378+
[AmqpMethodDoNotImplement(null)]
379+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
380+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]
381+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
382+
void WaitForConfirmsOrDie(TimeSpan timeout);
383+
348384
///<summary>Start a Basic content-class consumer.</summary>
349385
///<remarks>
350386
///The consumer is started with noAck=false (i.e. BasicAck is required),

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -958,32 +958,70 @@ public void ConfirmSelect()
958958
_Private_ConfirmSelect(false);
959959
}
960960

961-
public bool WaitForConfirms()
961+
public bool WaitForConfirms(TimeSpan timeout, out bool timedOut)
962962
{
963-
lock (m_unconfirmedSet.SyncRoot) {
964-
while (true) {
963+
bool isWaitInfinite = (timeout.TotalMilliseconds == Timeout.Infinite);
964+
Stopwatch stopwatch = Stopwatch.StartNew();
965+
lock (m_unconfirmedSet.SyncRoot)
966+
{
967+
while (true)
968+
{
965969
if (CloseReason != null)
966970
throw new AlreadyClosedException(CloseReason);
967971

968-
if (m_unconfirmedSet.Count == 0) {
972+
if (m_unconfirmedSet.Count == 0)
973+
{
969974
bool aux = m_onlyAcksReceived;
970975
m_onlyAcksReceived = true;
976+
timedOut = false;
971977
return aux;
972978
}
973-
Monitor.Wait(m_unconfirmedSet.SyncRoot);
979+
if (isWaitInfinite)
980+
Monitor.Wait(m_unconfirmedSet.SyncRoot);
981+
else
982+
{
983+
TimeSpan elapsed = stopwatch.Elapsed;
984+
if(elapsed > timeout || !Monitor.Wait(
985+
m_unconfirmedSet.SyncRoot, timeout - elapsed))
986+
{
987+
timedOut = true;
988+
return true;
989+
}
990+
}
974991
}
975992
}
976993
}
977994

995+
public bool WaitForConfirms()
996+
{
997+
bool timedOut;
998+
return WaitForConfirms(TimeSpan.FromMilliseconds(Timeout.Infinite), out timedOut);
999+
}
1000+
9781001
public void WaitForConfirmsOrDie()
9791002
{
980-
if (!WaitForConfirms()) {
1003+
WaitForConfirmsOrDie(TimeSpan.FromMilliseconds(Timeout.Infinite));
1004+
}
1005+
1006+
public void WaitForConfirmsOrDie(TimeSpan timeout)
1007+
{
1008+
bool timedOut;
1009+
bool onlyAcksReceived = WaitForConfirms(timeout, out timedOut);
1010+
if (!onlyAcksReceived) {
9811011
Close(new ShutdownEventArgs(ShutdownInitiator.Application,
9821012
CommonFraming.Constants.ReplySuccess,
9831013
"Nacks Received", new IOException("nack received")),
9841014
false);
9851015
throw new IOException("Nacks Received");
9861016
}
1017+
if (timedOut) {
1018+
Close(new ShutdownEventArgs(ShutdownInitiator.Application,
1019+
CommonFraming.Constants.ReplySuccess,
1020+
"Timed out waiting for acks",
1021+
new IOException("timed out waiting for acks")),
1022+
false);
1023+
throw new IOException("Timed out waiting for acks");
1024+
}
9871025
}
9881026

9891027
public abstract void _Private_ConfirmSelect(bool nowait);

0 commit comments

Comments
 (0)