Skip to content

Commit 361f9b6

Browse files
author
Alexandru Scvortov
committed
dotnet-client observers 0.9.1 channel shutdown protocol
1 parent 1e5eb8e commit 361f9b6

File tree

2 files changed

+45
-28
lines changed

2 files changed

+45
-28
lines changed

projects/client/RabbitMQ.Client/src/client/impl/ConnectionBase.cs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -786,12 +786,13 @@ public void NotifyReceivedCloseOk()
786786
TerminateMainloop();
787787
m_closed = true;
788788
}
789-
789+
790790
///<summary>
791791
/// Sets the channel named in the SoftProtocolException into
792792
/// "quiescing mode", where we issue a channel.close and
793-
/// ignore everything up to the channel.close-ok reply that
794-
/// should eventually arrive.
793+
/// ignore everything except for subsequent channel.close
794+
/// messages and the channel.close-ok reply that should
795+
/// eventually arrive.
795796
///</summary>
796797
///<remarks>
797798
///<para>
@@ -816,23 +817,12 @@ public void NotifyReceivedCloseOk()
816817
///</para>
817818
///</remarks>
818819
public void QuiesceChannel(SoftProtocolException pe) {
819-
// First, construct the close request and QuiescingSession
820-
// that we'll use during the quiesce process.
821-
822-
Command request;
823-
int replyClassId;
824-
int replyMethodId;
825-
Protocol.CreateChannelClose(pe.ReplyCode,
826-
pe.Message,
827-
out request,
828-
out replyClassId,
829-
out replyMethodId);
820+
// Construct the QuiescingSession that we'll use during
821+
// the quiesce process.
830822

831823
ISession newSession = new QuiescingSession(this,
832824
pe.Channel,
833-
pe.ShutdownReason,
834-
replyClassId,
835-
replyMethodId);
825+
pe.ShutdownReason);
836826

837827
// Here we detach the session from the connection. It's
838828
// still alive: it just won't receive any further frames
@@ -851,7 +841,7 @@ public void QuiesceChannel(SoftProtocolException pe) {
851841
// our peer. The peer will respond through the lower
852842
// layers - specifically, through the QuiescingSession we
853843
// installed above.
854-
newSession.Transmit(request);
844+
newSession.Transmit(ChannelCloseWrapper(pe.ReplyCode, pe.Message));
855845
}
856846

857847
public void HandleMainLoopException(ShutdownEventArgs reason) {
@@ -959,7 +949,19 @@ public Command ConnectionCloseWrapper(ushort reasonCode, string reasonText)
959949
out replyClassId,
960950
out replyMethodId);
961951
return request;
962-
}
952+
}
953+
954+
protected Command ChannelCloseWrapper(ushort reasonCode, string reasonText)
955+
{
956+
Command request;
957+
int replyClassId, replyMethodId;
958+
Protocol.CreateChannelClose(reasonCode,
959+
reasonText,
960+
out request,
961+
out replyClassId,
962+
out replyMethodId);
963+
return request;
964+
}
963965

964966
private static uint NegotiatedMaxValue(uint clientValue, uint serverValue)
965967
{

projects/client/RabbitMQ.Client/src/client/impl/QuiescingSession.cs

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,45 +65,60 @@
6565
// the versions we support*. Obviously we may need to revisit this if
6666
// that ever changes.
6767
using CommonFraming = RabbitMQ.Client.Framing.v0_9;
68+
using CommonFramingSpecs = RabbitMQ.Client.Framing.Impl.v0_9;
6869

6970
namespace RabbitMQ.Client.Impl
7071
{
7172
///<summary>Small ISession implementation used during channel quiescing.</summary>
7273
public class QuiescingSession: SessionBase
7374
{
7475
public ShutdownEventArgs m_reason;
75-
public int m_replyClassId;
76-
public int m_replyMethodId;
76+
protected int m_closeClassId;
77+
protected int m_closeMethodId;
78+
protected int m_closeOkClassId;
79+
protected int m_closeOkMethodId;
7780

7881
public QuiescingSession(ConnectionBase connection,
7982
int channelNumber,
80-
ShutdownEventArgs reason,
81-
int replyClassId,
82-
int replyMethodId)
83+
ShutdownEventArgs reason)
8384
: base(connection, channelNumber)
8485
{
8586
m_reason = reason;
86-
m_replyClassId = replyClassId;
87-
m_replyMethodId = replyMethodId;
87+
m_closeClassId = CommonFramingSpecs.ChannelClose.ClassId;
88+
m_closeMethodId = CommonFramingSpecs.ChannelClose.MethodId;
89+
m_closeOkClassId = CommonFramingSpecs.ChannelCloseOk.ClassId;
90+
m_closeOkMethodId = CommonFramingSpecs.ChannelCloseOk.MethodId;
8891
}
8992

9093
public override void HandleFrame(Frame frame)
9194
{
9295
if (frame.Type == CommonFraming.Constants.FrameMethod) {
9396
MethodBase method = Connection.Protocol.DecodeMethodFrom(frame.GetReader());
94-
if ((method.ProtocolClassId == m_replyClassId)
95-
&& (method.ProtocolMethodId == m_replyMethodId))
97+
if ((method.ProtocolClassId == m_closeOkClassId)
98+
&& (method.ProtocolMethodId == m_closeOkMethodId))
9699
{
97100
// This is the reply we were looking for. Release
98101
// the channel with the reason we were passed in
99102
// our constructor.
100103
Close(m_reason);
101104
return;
102105
}
106+
else if ((method.ProtocolClassId == m_closeClassId)
107+
&& (method.ProtocolMethodId == m_closeMethodId))
108+
{
109+
// We're already shutting down the channel, so
110+
// just send back an ok.
111+
Transmit(CreateChannelCloseOk());
112+
return;
113+
}
103114
}
104115

105116
// Either a non-method frame, or not what we were looking
106117
// for. Ignore it - we're quiescing.
107118
}
119+
120+
protected Command CreateChannelCloseOk() {
121+
return new Command(new CommonFramingSpecs.ConnectionCloseOk());
122+
}
108123
}
109124
}

0 commit comments

Comments
 (0)