Skip to content

Commit 559c72e

Browse files
author
Alexandru Scvortov
committed
add WaitForConfirmsOrDie
1 parent dda955d commit 559c72e

File tree

2 files changed

+37
-5
lines changed

2 files changed

+37
-5
lines changed

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,17 @@ uint QueueDelete(string queue,
331331
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
332332
bool WaitForConfirms();
333333

334+
//<summary>Wait until all published messages have been confirmed.
335+
//<remarks>
336+
//Waits until all messages published since the last call have
337+
//been ack'd by the broker. If a nack is received, throws an
338+
//OperationInterrupedException exception immediately.
339+
//</remarks>
340+
[AmqpMethodDoNotImplement(null)]
341+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
342+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]
343+
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
344+
void WaitForConfirmsOrDie();
334345

335346
///<summary>Start a Basic content-class consumer.</summary>
336347
///<remarks>

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,8 @@ public virtual void OnModelShutdown(ShutdownEventArgs reason)
312312
}
313313
}
314314
}
315+
lock (m_unconfirmedSet.SyncRoot)
316+
Monitor.Pulse(m_unconfirmedSet.SyncRoot);
315317
m_flowControlBlock.Set();
316318
}
317319

@@ -935,6 +937,9 @@ public bool WaitForConfirms()
935937
{
936938
lock (m_unconfirmedSet.SyncRoot) {
937939
while (true) {
940+
if (CloseReason != null)
941+
throw new OperationInterruptedException(CloseReason);
942+
938943
if (m_unconfirmedSet.Count == 0) {
939944
bool aux = m_onlyAcksReceived;
940945
m_onlyAcksReceived = true;
@@ -945,6 +950,17 @@ public bool WaitForConfirms()
945950
}
946951
}
947952

953+
public void WaitForConfirmsOrDie()
954+
{
955+
if (!WaitForConfirms()) {
956+
Close(new ShutdownEventArgs(ShutdownInitiator.Application,
957+
CommonFraming.Constants.ReplySuccess,
958+
"Goodbye", new IOException("nack received")),
959+
false);
960+
throw new OperationInterruptedException(CloseReason);
961+
}
962+
}
963+
948964
public abstract void _Private_ConfirmSelect(bool nowait);
949965

950966
public string BasicConsume(string queue,
@@ -1229,7 +1245,7 @@ public void Close()
12291245
Close(CommonFraming.Constants.ReplySuccess, "Goodbye");
12301246
}
12311247

1232-
public void Close(ushort replyCode, string replyText)
1248+
public void Close(ushort replyCode, string replyText)
12331249
{
12341250
Close(replyCode, replyText, false);
12351251
}
@@ -1245,16 +1261,21 @@ public void Abort(ushort replyCode, string replyText)
12451261
}
12461262

12471263
public void Close(ushort replyCode, string replyText, bool abort)
1264+
{
1265+
Close(new ShutdownEventArgs(ShutdownInitiator.Application,
1266+
replyCode, replyText),
1267+
abort);
1268+
}
1269+
1270+
public void Close(ShutdownEventArgs reason, bool abort)
12481271
{
12491272
ShutdownContinuation k = new ShutdownContinuation();
12501273
ModelShutdown += new ModelShutdownEventHandler(k.OnShutdown);
12511274

12521275
try {
1253-
if (SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Application,
1254-
replyCode,
1255-
replyText)))
1276+
if (SetCloseReason(reason))
12561277
{
1257-
_Private_ChannelClose(replyCode, replyText, 0, 0);
1278+
_Private_ChannelClose(reason.ReplyCode, reason.ReplyText, 0, 0);
12581279
}
12591280
k.Wait();
12601281
} catch (AlreadyClosedException ace) {

0 commit comments

Comments
 (0)