Skip to content

Commit df30ce6

Browse files
Merge branch 'stable'
2 parents e41d31d + ed641f3 commit df30ce6

File tree

3 files changed

+49
-7
lines changed

3 files changed

+49
-7
lines changed

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,7 @@ public void HandleBasicGetEmpty()
809809
k.HandleCommand(null); // release the continuation.
810810
}
811811

812-
public void HandleBasicGetOk(ulong deliveryTag,
812+
public virtual void HandleBasicGetOk(ulong deliveryTag,
813813
bool redelivered,
814814
string exchange,
815815
string routingKey,

projects/client/RabbitMQ.Client/src/client/impl/RecoveryAwareModel.cs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,29 @@ public void InheritOffsetFrom(RecoveryAwareModel other)
6060
MaxSeenDeliveryTag = 0;
6161
}
6262

63+
public override void HandleBasicGetOk(ulong deliveryTag,
64+
bool redelivered,
65+
string exchange,
66+
string routingKey,
67+
uint messageCount,
68+
IBasicProperties basicProperties,
69+
byte[] body)
70+
{
71+
if (deliveryTag > MaxSeenDeliveryTag)
72+
{
73+
MaxSeenDeliveryTag = deliveryTag;
74+
}
75+
76+
base.HandleBasicGetOk(
77+
OffsetDeliveryTag(deliveryTag),
78+
redelivered,
79+
exchange,
80+
routingKey,
81+
messageCount,
82+
basicProperties,
83+
body);
84+
}
85+
6386
public override void HandleBasicDeliver(string consumerTag,
6487
ulong deliveryTag,
6588
bool redelivered,
@@ -86,7 +109,7 @@ public override void BasicAck(ulong deliveryTag,
86109
bool multiple)
87110
{
88111
ulong realTag = deliveryTag - ActiveDeliveryTagOffset;
89-
if (realTag > 0)
112+
if (realTag > 0 && realTag <= deliveryTag)
90113
{
91114
base.BasicAck(realTag, multiple);
92115
}
@@ -97,7 +120,7 @@ public override void BasicNack(ulong deliveryTag,
97120
bool requeue)
98121
{
99122
ulong realTag = deliveryTag - ActiveDeliveryTagOffset;
100-
if (realTag > 0)
123+
if (realTag > 0 && realTag <= deliveryTag)
101124
{
102125
base.BasicNack(realTag, multiple, requeue);
103126
}
@@ -107,7 +130,7 @@ public override void BasicReject(ulong deliveryTag,
107130
bool requeue)
108131
{
109132
ulong realTag = deliveryTag - ActiveDeliveryTagOffset;
110-
if (realTag > 0)
133+
if (realTag > 0 && realTag <= deliveryTag)
111134
{
112135
base.BasicReject(realTag, requeue);
113136
}

projects/client/Unit/src/unit/TestConnectionRecovery.cs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,30 @@ public override void Init()
8888
public void TestBasicAckAfterChannelRecovery()
8989
{
9090
var latch = new ManualResetEvent(false);
91-
var cons = new AckingBasicConsumer(Model, latch, () => { CloseAndWaitForRecovery(); });
91+
var cons = new AckingBasicConsumer(Model, latch, CloseAndWaitForRecovery);
9292

9393
TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
9494
}
9595

96+
[Test]
97+
public void TestBasicAckAfterBasicGetAndChannelRecovery()
98+
{
99+
var q = GenerateQueueName();
100+
Model.QueueDeclare(q, false, false, false, null);
101+
// create an offset
102+
var bp = Model.CreateBasicProperties();
103+
Model.BasicPublish("", q, bp, new byte [] {});
104+
Thread.Sleep(50);
105+
var g = Model.BasicGet(q, false);
106+
CloseAndWaitForRecovery();
107+
Assert.IsTrue(Conn.IsOpen);
108+
Assert.IsTrue(Model.IsOpen);
109+
// ack the message after recovery - this should be out of range and ignored
110+
Model.BasicAck(g.DeliveryTag, false);
111+
// do a sync operation to 'check' there is no channel exception
112+
Model.BasicGet(q, false);
113+
}
114+
96115
[Test]
97116
public void TestBasicAckEventHandlerRecovery()
98117
{
@@ -200,7 +219,7 @@ public void TestBasicModelRecoveryOnServerRestart()
200219
public void TestBasicNackAfterChannelRecovery()
201220
{
202221
var latch = new ManualResetEvent(false);
203-
var cons = new NackingBasicConsumer(Model, latch, () => { CloseAndWaitForRecovery(); });
222+
var cons = new NackingBasicConsumer(Model, latch, CloseAndWaitForRecovery);
204223

205224
TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
206225
}
@@ -209,7 +228,7 @@ public void TestBasicNackAfterChannelRecovery()
209228
public void TestBasicRejectAfterChannelRecovery()
210229
{
211230
var latch = new ManualResetEvent(false);
212-
var cons = new RejectingBasicConsumer(Model, latch, () => { CloseAndWaitForRecovery(); });
231+
var cons = new RejectingBasicConsumer(Model, latch, CloseAndWaitForRecovery);
213232

214233
TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
215234
}

0 commit comments

Comments
 (0)