Skip to content

Commit e732125

Browse files
Inherit offsets between autorecovering channel delegates
1 parent 56f484f commit e732125

File tree

3 files changed

+26
-8
lines changed

3 files changed

+26
-8
lines changed

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,8 @@ public IModel CreateModel()
388388
AutorecoveringModel m;
389389
lock(this)
390390
{
391-
m = new AutorecoveringModel(this, (Model)this.CreateNonRecoveringModel());
391+
m = new AutorecoveringModel(this,
392+
(RecoveryAwareModel)this.CreateNonRecoveringModel());
392393
m_models.Add(m);
393394
}
394395
return m;
@@ -402,10 +403,10 @@ public void UnregisterModel(AutorecoveringModel model)
402403
}
403404
}
404405

405-
protected IModel CreateNonRecoveringModel()
406+
public RecoveryAwareModel CreateNonRecoveringModel()
406407
{
407-
ISession session = m_delegate.CreateSession();
408-
IFullModel result = (IFullModel)(new RecoveryAwareModel(session));
408+
var session = m_delegate.CreateSession();
409+
var result = new RecoveryAwareModel(session);
409410
result._Private_ChannelOpen("");
410411
return result;
411412
}

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringModel.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ namespace RabbitMQ.Client.Impl
5151
public class AutorecoveringModel : IFullModel, IRecoverable
5252
{
5353
protected AutorecoveringConnection m_connection;
54-
protected Model m_delegate;
54+
protected RecoveryAwareModel m_delegate;
5555

5656
public readonly object m_eventLock = new object();
5757

@@ -73,7 +73,7 @@ public class AutorecoveringModel : IFullModel, IRecoverable
7373
protected bool usesPublisherConfirms = false;
7474
protected bool usesTransactions = false;
7575

76-
public AutorecoveringModel(AutorecoveringConnection conn, Model _delegate)
76+
public AutorecoveringModel(AutorecoveringConnection conn, RecoveryAwareModel _delegate)
7777
{
7878
this.m_connection = conn;
7979
this.m_delegate = _delegate;
@@ -82,8 +82,10 @@ public AutorecoveringModel(AutorecoveringConnection conn, Model _delegate)
8282
public void AutomaticallyRecover(AutorecoveringConnection conn, IConnection connDelegate)
8383
{
8484
this.m_connection = conn;
85-
this.m_delegate = (Model)connDelegate.CreateModel();
86-
// TODO: inherit ack offset
85+
var defunctModel = this.m_delegate;
86+
87+
this.m_delegate = conn.CreateNonRecoveringModel();
88+
this.m_delegate.InheritOffsetFrom(defunctModel);
8789

8890
this.RecoverModelShutdownHandlers();
8991
this.RecoverState();

projects/client/RabbitMQ.Client/src/client/impl/RecoveryAwareModel.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ public class RecoveryAwareModel : Model, IFullModel, IRecoverable
4949
private ulong maxSeenDeliveryTag = 0;
5050
private ulong activeDeliveryTagOffset = 0;
5151

52+
public ulong MaxSeenDeliveryTag
53+
{
54+
get { return maxSeenDeliveryTag; }
55+
}
56+
57+
public ulong ActiveDeliveryTagOffset
58+
{
59+
get { return activeDeliveryTagOffset; }
60+
}
61+
5262
public RecoveryAwareModel(ISession session) : base(session) {}
5363

5464
public override void HandleBasicDeliver(string consumerTag,
@@ -101,6 +111,11 @@ public override void BasicNack(ulong deliveryTag,
101111
}
102112
}
103113

114+
public void InheritOffsetFrom(RecoveryAwareModel other)
115+
{
116+
this.activeDeliveryTagOffset = other.ActiveDeliveryTagOffset + other.MaxSeenDeliveryTag;
117+
this.maxSeenDeliveryTag = 0;
118+
}
104119

105120
protected ulong OffsetDeliveryTag(ulong deliveryTag)
106121
{

0 commit comments

Comments
 (0)