Skip to content

Commit dc124a4

Browse files
Merge branch 'master' of https://github.com/YulerB/rabbitmq-dotnet-client into YulerB-master
2 parents 8eb42f4 + 61d9e79 commit dc124a4

15 files changed

+311
-254
lines changed

RabbitMQDotNetClient.sln.DotSettings

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,6 @@
126126
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAlwaysTreatStructAsNotReorderableMigration/@EntryIndexedValue">True</s:Boolean>
127127
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
128128
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean>
129-
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsCodeFormatterSettingsUpgrader/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
129+
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsCodeFormatterSettingsUpgrader/@EntryIndexedValue">True</s:Boolean>
130+
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsParsFormattingSettingsUpgrader/@EntryIndexedValue">True</s:Boolean>
131+
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsWrapperSettingsUpgrader/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>

projects/client/RabbitMQ.Client/src/client/impl/Command.cs

Lines changed: 26 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,8 @@ public class Command
5555
// - 2 bytes of channel number
5656
// - 4 bytes of frame payload length
5757
// - 1 byte of payload trailer FrameEnd byte
58-
public const int EmptyFrameSize = 8;
59-
60-
public byte[] m_body0;
61-
public IList<byte[]> m_bodyN;
58+
private const int EmptyFrameSize = 8;
59+
private readonly MemoryStream m_body;
6260
private static readonly byte[] m_emptyByteArray = new byte[0];
6361

6462
static Command()
@@ -68,18 +66,26 @@ static Command()
6866

6967
public Command() : this(null, null, null)
7068
{
69+
m_body = new MemoryStream();
7170
}
7271

7372
public Command(MethodBase method) : this(method, null, null)
7473
{
74+
m_body = new MemoryStream();
7575
}
7676

7777
public Command(MethodBase method, ContentHeaderBase header, byte[] body)
7878
{
7979
Method = method;
8080
Header = header;
81-
m_body0 = body;
82-
m_bodyN = null;
81+
if (body != null)
82+
{
83+
m_body = new MemoryStream(body);
84+
}
85+
else
86+
{
87+
m_body = new MemoryStream();
88+
}
8389
}
8490

8591
public byte[] Body
@@ -93,7 +99,7 @@ public byte[] Body
9399

94100
public static void CheckEmptyFrameSize()
95101
{
96-
var f = new Frame(Constants.FrameBody, 0, m_emptyByteArray);
102+
var f = new EmptyWriteFrame();
97103
var stream = new MemoryStream();
98104
var writer = new NetworkBinaryWriter(stream);
99105
f.WriteTo(writer);
@@ -111,50 +117,20 @@ public static void CheckEmptyFrameSize()
111117

112118
public void AppendBodyFragment(byte[] fragment)
113119
{
114-
if (m_body0 == null)
120+
if (fragment != null)
115121
{
116-
m_body0 = fragment;
117-
}
118-
else
119-
{
120-
if (m_bodyN == null)
121-
{
122-
m_bodyN = new List<byte[]>();
123-
}
124-
m_bodyN.Add(fragment);
122+
m_body.Write(fragment, 0, fragment.Length);
125123
}
126124
}
127125

128126
public byte[] ConsolidateBody()
129127
{
130-
if (m_bodyN == null)
131-
{
132-
return m_body0 ?? m_emptyByteArray;
133-
}
134-
else
135-
{
136-
int totalSize = m_body0.Length;
137-
foreach (byte[] fragment in m_bodyN)
138-
{
139-
totalSize += fragment.Length;
140-
}
141-
var result = new byte[totalSize];
142-
Array.Copy(m_body0, 0, result, 0, m_body0.Length);
143-
int offset = m_body0.Length;
144-
foreach (byte[] fragment in m_bodyN)
145-
{
146-
Array.Copy(fragment, 0, result, offset, fragment.Length);
147-
offset += fragment.Length;
148-
}
149-
m_body0 = result;
150-
m_bodyN = null;
151-
return m_body0;
152-
}
128+
return m_body.Length == 0 ? m_emptyByteArray : m_body.ToArray();
153129
}
154130

155131
public void Transmit(int channelNumber, Connection connection)
156132
{
157-
if(Method.HasContent)
133+
if (Method.HasContent)
158134
{
159135
TransmitAsFrameSet(channelNumber, connection);
160136
}
@@ -166,52 +142,25 @@ public void Transmit(int channelNumber, Connection connection)
166142

167143
public void TransmitAsSingleFrame(int channelNumber, Connection connection)
168144
{
169-
var frame = new Frame(Constants.FrameMethod, channelNumber);
170-
NetworkBinaryWriter writer = frame.GetWriter();
171-
writer.Write((ushort)Method.ProtocolClassId);
172-
writer.Write((ushort)Method.ProtocolMethodId);
173-
var argWriter = new MethodArgumentWriter(writer);
174-
Method.WriteArgumentsTo(argWriter);
175-
argWriter.Flush();
176-
connection.WriteFrame(frame);
145+
connection.WriteFrame(new MethodWriteFrame(channelNumber, Method));
177146
}
178147

179148
public void TransmitAsFrameSet(int channelNumber, Connection connection)
180149
{
181-
var frame = new Frame(Constants.FrameMethod, channelNumber);
182-
NetworkBinaryWriter writer = frame.GetWriter();
183-
writer.Write((ushort)Method.ProtocolClassId);
184-
writer.Write((ushort)Method.ProtocolMethodId);
185-
var argWriter = new MethodArgumentWriter(writer);
186-
Method.WriteArgumentsTo(argWriter);
187-
argWriter.Flush();
188-
189-
var frames = new List<Frame>();
190-
frames.Add(frame);
191-
150+
var frames = new List<WriteFrame>();
151+
frames.Add(new MethodWriteFrame(channelNumber, Method));
192152
if (Method.HasContent)
193153
{
194-
byte[] body = Body;
195-
196-
frame = new Frame(Constants.FrameHeader, channelNumber);
197-
writer = frame.GetWriter();
198-
writer.Write((ushort)Header.ProtocolClassId);
199-
Header.WriteTo(writer, (ulong)body.Length);
200-
frames.Add(frame);
154+
var body = ConsolidateBody(); // Cache, since the property is compiled.
201155

156+
frames.Add(new HeaderWriteFrame(channelNumber, Header, body.Length));
202157
var frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
203-
int bodyPayloadMax = (frameMax == 0)
204-
? body.Length
205-
: frameMax - EmptyFrameSize;
158+
var bodyPayloadMax = (frameMax == 0) ? body.Length : frameMax - EmptyFrameSize;
206159
for (int offset = 0; offset < body.Length; offset += bodyPayloadMax)
207160
{
208-
int remaining = body.Length - offset;
209-
210-
frame = new Frame(Constants.FrameBody, channelNumber);
211-
writer = frame.GetWriter();
212-
writer.Write(body, offset,
213-
(remaining < bodyPayloadMax) ? remaining : bodyPayloadMax);
214-
frames.Add(frame);
161+
var remaining = body.Length - offset;
162+
var count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
163+
frames.Add(new BodySegmentWriteFrame(channelNumber, body, offset, count));
215164
}
216165
}
217166

projects/client/RabbitMQ.Client/src/client/impl/CommandAssembler.cs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,13 @@ public CommandAssembler(ProtocolBase protocol)
6868
Reset();
6969
}
7070

71-
public Command HandleFrame(Frame f)
71+
public Command HandleFrame(ReadFrame f)
7272
{
7373
switch (m_state)
7474
{
7575
case AssemblyState.ExpectingMethod:
7676
{
77-
if (f.Type != Constants.FrameMethod)
77+
if (!f.IsMethod())
7878
{
7979
throw new UnexpectedFrameException(f);
8080
}
@@ -86,7 +86,7 @@ public Command HandleFrame(Frame f)
8686
}
8787
case AssemblyState.ExpectingContentHeader:
8888
{
89-
if (f.Type != Constants.FrameHeader)
89+
if (!f.IsHeader())
9090
{
9191
throw new UnexpectedFrameException(f);
9292
}
@@ -98,20 +98,19 @@ public Command HandleFrame(Frame f)
9898
}
9999
case AssemblyState.ExpectingContentBody:
100100
{
101-
if (f.Type != Constants.FrameBody)
101+
if (!f.IsBody())
102102
{
103103
throw new UnexpectedFrameException(f);
104104
}
105-
byte[] fragment = f.Payload;
106-
m_command.AppendBodyFragment(fragment);
107-
if ((ulong)fragment.Length > m_remainingBodyBytes)
105+
m_command.AppendBodyFragment(f.Payload);
106+
if ((ulong)f.Payload.Length > m_remainingBodyBytes)
108107
{
109108
throw new MalformedFrameException
110109
(string.Format("Overlong content body received - {0} bytes remaining, {1} bytes received",
111110
m_remainingBodyBytes,
112-
fragment.Length));
111+
f.Payload.Length));
113112
}
114-
m_remainingBodyBytes -= (ulong)fragment.Length;
113+
m_remainingBodyBytes -= (ulong)f.Payload.Length;
115114
UpdateContentBodyState();
116115
return CompletedCommand();
117116
}

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class Connection : IConnection
6767
private readonly object m_eventLock = new object();
6868

6969
///<summary>Heartbeat frame for transmission. Reusable across connections.</summary>
70-
private readonly Frame m_heartbeatFrame = new Frame(Constants.FrameHeartbeat, 0, new byte[0]);
70+
private readonly EmptyWriteFrame m_heartbeatFrame = new EmptyWriteFrame();
7171

7272
private ManualResetEvent m_appContinuation = new ManualResetEvent(false);
7373
private EventHandler<CallbackExceptionEventArgs> m_callbackException;
@@ -716,11 +716,11 @@ public void MainLoop()
716716

717717
public void MainLoopIteration()
718718
{
719-
Frame frame = m_frameHandler.ReadFrame();
719+
ReadFrame frame = m_frameHandler.ReadFrame();
720720

721721
NotifyHeartbeatListener();
722722
// We have received an actual frame.
723-
if (frame.Type == Constants.FrameHeartbeat)
723+
if (frame.IsHeartbeat())
724724
{
725725
// Ignore it: we've already just reset the heartbeat
726726
// latch.
@@ -767,7 +767,7 @@ public void MainLoopIteration()
767767
}
768768
}
769769

770-
public void NotifyHeartbeatListener()
770+
public void NotifyHeartbeatListener()
771771
{
772772
if (m_heartbeat != 0)
773773
{
@@ -1099,7 +1099,7 @@ public void HeartbeatWriteTimerCallback(object state)
10991099
if (!m_closed)
11001100
{
11011101
WriteFrame(m_heartbeatFrame);
1102-
m_frameHandler.Flush();
1102+
//m_frameHandler.Flush();
11031103
}
11041104
}
11051105
catch (Exception e)
@@ -1168,13 +1168,13 @@ public override string ToString()
11681168
return string.Format("Connection({0},{1})", m_id, Endpoint);
11691169
}
11701170

1171-
public void WriteFrame(Frame f)
1171+
public void WriteFrame(WriteFrame f)
11721172
{
11731173
m_frameHandler.WriteFrame(f);
11741174
m_heartbeatWrite.Set();
11751175
}
11761176

1177-
public void WriteFrameSet(IList<Frame> f)
1177+
public void WriteFrameSet(IList<WriteFrame> f)
11781178
{
11791179
m_frameHandler.WriteFrameSet(f);
11801180
m_heartbeatWrite.Set();

0 commit comments

Comments
 (0)