Skip to content

Commit 56f484f

Browse files
Towards recovery-aware delivery tag management in models
1 parent d51a128 commit 56f484f

File tree

3 files changed

+121
-8
lines changed

3 files changed

+121
-8
lines changed

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,10 @@ public void UnregisterModel(AutorecoveringModel model)
404404

405405
protected IModel CreateNonRecoveringModel()
406406
{
407-
return m_delegate.CreateModel();
407+
ISession session = m_delegate.CreateSession();
408+
IFullModel result = (IFullModel)(new RecoveryAwareModel(session));
409+
result._Private_ChannelOpen("");
410+
return result;
408411
}
409412

410413
public IList<ShutdownReportEntry> ShutdownReport

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -587,13 +587,13 @@ public MethodBase ModelRpc(MethodBase method, ContentHeaderBase header, byte[] b
587587

588588
public abstract bool DispatchAsynchronous(Command cmd);
589589

590-
public void HandleBasicDeliver(string consumerTag,
591-
ulong deliveryTag,
592-
bool redelivered,
593-
string exchange,
594-
string routingKey,
595-
IBasicProperties basicProperties,
596-
byte[] body)
590+
public virtual void HandleBasicDeliver(string consumerTag,
591+
ulong deliveryTag,
592+
bool redelivered,
593+
string exchange,
594+
string routingKey,
595+
IBasicProperties basicProperties,
596+
byte[] body)
597597
{
598598
IBasicConsumer consumer;
599599
lock (m_consumers)
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (C) 2007-2014 GoPivotal, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// http://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v1.1:
23+
//
24+
//---------------------------------------------------------------------------
25+
// The contents of this file are subject to the Mozilla Public License
26+
// Version 1.1 (the "License"); you may not use this file except in
27+
// compliance with the License. You may obtain a copy of the License
28+
// at http://www.mozilla.org/MPL/
29+
//
30+
// Software distributed under the License is distributed on an "AS IS"
31+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32+
// the License for the specific language governing rights and
33+
// limitations under the License.
34+
//
35+
// The Original Code is RabbitMQ.
36+
//
37+
// The Initial Developer of the Original Code is GoPivotal, Inc.
38+
// Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
39+
//---------------------------------------------------------------------------
40+
41+
using System;
42+
43+
using RabbitMQ.Client.Framing.Impl;
44+
45+
namespace RabbitMQ.Client.Impl
46+
{
47+
public class RecoveryAwareModel : Model, IFullModel, IRecoverable
48+
{
49+
private ulong maxSeenDeliveryTag = 0;
50+
private ulong activeDeliveryTagOffset = 0;
51+
52+
public RecoveryAwareModel(ISession session) : base(session) {}
53+
54+
public override void HandleBasicDeliver(string consumerTag,
55+
ulong deliveryTag,
56+
bool redelivered,
57+
string exchange,
58+
string routingKey,
59+
IBasicProperties basicProperties,
60+
byte[] body)
61+
{
62+
if(deliveryTag > maxSeenDeliveryTag)
63+
{
64+
maxSeenDeliveryTag = deliveryTag;
65+
}
66+
67+
base.HandleBasicDeliver(consumerTag,
68+
OffsetDeliveryTag(deliveryTag),
69+
redelivered,
70+
exchange,
71+
routingKey,
72+
basicProperties,
73+
body);
74+
}
75+
76+
public override void BasicAck(ulong deliveryTag,
77+
bool multiple) {
78+
var realTag = deliveryTag - activeDeliveryTagOffset;
79+
if(realTag > 0)
80+
{
81+
base.BasicAck(deliveryTag, multiple);
82+
}
83+
}
84+
85+
public override void BasicReject(ulong deliveryTag,
86+
bool requeue) {
87+
var realTag = deliveryTag - activeDeliveryTagOffset;
88+
if(realTag > 0)
89+
{
90+
base.BasicReject(deliveryTag, requeue);
91+
}
92+
}
93+
94+
public override void BasicNack(ulong deliveryTag,
95+
bool multiple,
96+
bool requeue) {
97+
var realTag = deliveryTag - activeDeliveryTagOffset;
98+
if(realTag > 0)
99+
{
100+
base.BasicNack(deliveryTag, multiple, requeue);
101+
}
102+
}
103+
104+
105+
protected ulong OffsetDeliveryTag(ulong deliveryTag)
106+
{
107+
return deliveryTag + this.activeDeliveryTagOffset;
108+
}
109+
}
110+
}

0 commit comments

Comments
 (0)