Skip to content

Commit 225e817

Browse files
committed
ensure MaxSeenDeliveryTag is incremented for BasicGet
1 parent c586827 commit 225e817

File tree

3 files changed

+46
-4
lines changed

3 files changed

+46
-4
lines changed

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,7 @@ public void HandleBasicGetEmpty()
809809
k.HandleCommand(null); // release the continuation.
810810
}
811811

812-
public void HandleBasicGetOk(ulong deliveryTag,
812+
public virtual void HandleBasicGetOk(ulong deliveryTag,
813813
bool redelivered,
814814
string exchange,
815815
string routingKey,

projects/client/RabbitMQ.Client/src/client/impl/RecoveryAwareModel.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,29 @@ public void InheritOffsetFrom(RecoveryAwareModel other)
6060
MaxSeenDeliveryTag = 0;
6161
}
6262

63+
public override void HandleBasicGetOk(ulong deliveryTag,
64+
bool redelivered,
65+
string exchange,
66+
string routingKey,
67+
uint messageCount,
68+
IBasicProperties basicProperties,
69+
byte[] body)
70+
{
71+
if (deliveryTag > MaxSeenDeliveryTag)
72+
{
73+
MaxSeenDeliveryTag = deliveryTag;
74+
}
75+
76+
base.HandleBasicGetOk(
77+
OffsetDeliveryTag(deliveryTag),
78+
redelivered,
79+
exchange,
80+
routingKey,
81+
messageCount,
82+
basicProperties,
83+
body);
84+
}
85+
6386
public override void HandleBasicDeliver(string consumerTag,
6487
ulong deliveryTag,
6588
bool redelivered,

projects/client/Unit/src/unit/TestConnectionRecovery.cs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,30 @@ public override void Init()
6565
public void TestBasicAckAfterChannelRecovery()
6666
{
6767
var latch = new ManualResetEvent(false);
68-
var cons = new AckingBasicConsumer(Model, latch, () => { CloseAndWaitForRecovery(); });
68+
var cons = new AckingBasicConsumer(Model, latch, CloseAndWaitForRecovery);
6969

7070
TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
7171
}
7272

73+
[Test]
74+
public void TestBasicAckAfterBasicGetAndChannelRecovery()
75+
{
76+
var q = GenerateQueueName();
77+
Model.QueueDeclare(q, false, false, false, null);
78+
// create an offset
79+
var bp = Model.CreateBasicProperties();
80+
Model.BasicPublish("", q, bp, new byte [] {});
81+
Thread.Sleep(50);
82+
var g = Model.BasicGet(q, false);
83+
CloseAndWaitForRecovery();
84+
Assert.IsTrue(Conn.IsOpen);
85+
Assert.IsTrue(Model.IsOpen);
86+
// ack the message after recovery - this should be out of range and ignored
87+
Model.BasicAck(g.DeliveryTag, false);
88+
// do a sync operation to 'check' there is no channel exception
89+
Model.BasicGet(q, false);
90+
}
91+
7392
[Test]
7493
public void TestBasicAckEventHandlerRecovery()
7594
{
@@ -173,7 +192,7 @@ public void TestBasicModelRecoveryOnServerRestart()
173192
public void TestBasicNackAfterChannelRecovery()
174193
{
175194
var latch = new ManualResetEvent(false);
176-
var cons = new NackingBasicConsumer(Model, latch, () => { CloseAndWaitForRecovery(); });
195+
var cons = new NackingBasicConsumer(Model, latch, CloseAndWaitForRecovery);
177196

178197
TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
179198
}
@@ -182,7 +201,7 @@ public void TestBasicNackAfterChannelRecovery()
182201
public void TestBasicRejectAfterChannelRecovery()
183202
{
184203
var latch = new ManualResetEvent(false);
185-
var cons = new RejectingBasicConsumer(Model, latch, () => { CloseAndWaitForRecovery(); });
204+
var cons = new RejectingBasicConsumer(Model, latch, CloseAndWaitForRecovery);
186205

187206
TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
188207
}

0 commit comments

Comments
 (0)