Skip to content

Commit a8e1ad7

Browse files
Add a test for consumer recovery between delivery and attempted basic.ack
1 parent afb4032 commit a8e1ad7

File tree

1 file changed

+68
-1
lines changed

1 file changed

+68
-1
lines changed

projects/client/Unit/src/unit/TestConnectionRecovery.cs

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -580,7 +580,69 @@ public void TestThatCancelledConsumerDoesNotReappearOnRecovery()
580580
AssertConsumerCount(q, 0);
581581
}
582582

583-
// TODO: TestBasicAckAfterChannelRecovery
583+
public class TestBasicConsumer1 : DefaultBasicConsumer
584+
{
585+
private ushort counter = 0;
586+
private AutoResetEvent latch;
587+
private Action action;
588+
589+
public TestBasicConsumer1(IModel model, AutoResetEvent latch, Action fn) : base(model) {
590+
this.latch = latch;
591+
this.action = fn;
592+
}
593+
594+
public override void HandleBasicDeliver(string consumerTag,
595+
ulong deliveryTag,
596+
bool redelivered,
597+
string exchange,
598+
string routingKey,
599+
IBasicProperties properties,
600+
byte[] body)
601+
{
602+
try
603+
{
604+
if(deliveryTag == 7 && counter < 10)
605+
{
606+
this.action();
607+
}
608+
if(counter == 9)
609+
{
610+
this.latch.Set();
611+
}
612+
base.m_model.BasicAck(deliveryTag, false);
613+
} finally
614+
{
615+
counter += 1;
616+
}
617+
}
618+
}
619+
620+
[Test]
621+
public void TestBasicAckAfterChannelRecovery()
622+
{
623+
var q = Model.QueueDeclare(GenerateQueueName(), false, false, false, null).QueueName;
624+
var n = 30;
625+
626+
var latch = new AutoResetEvent(false);
627+
var cons = new TestBasicConsumer1(Model, latch, () => {
628+
CloseAndWaitForRecovery();
629+
});
630+
Model.BasicQos(0, 1, false);
631+
Model.BasicConsume(q, false, cons);
632+
633+
var publishingConn = CreateAutorecoveringConnection();
634+
var publishingModel = publishingConn.CreateModel();
635+
636+
for(var i = 0; i < n; i++)
637+
{
638+
publishingModel.BasicPublish("", q, null, enc.GetBytes(""));
639+
}
640+
641+
Wait(latch, TimeSpan.FromSeconds(20));
642+
Model.QueueDelete(q);
643+
publishingModel.Close();
644+
publishingConn.Close();
645+
}
584646

585647

586648
//
@@ -640,6 +702,11 @@ protected void Wait(AutoResetEvent latch)
640702
Assert.IsTrue(latch.WaitOne(TimeSpan.FromSeconds(8)));
641703
}
642704

705+
protected void Wait(AutoResetEvent latch, TimeSpan timeSpan)
706+
{
707+
Assert.IsTrue(latch.WaitOne(timeSpan));
708+
}
709+
643710
protected override void ReleaseResources()
644711
{
645712
Unblock();

0 commit comments

Comments
 (0)