Skip to content

Commit a6ff430

Browse files
When sending methods with concent, flush socket stream once
References rabbitmq/rabbitmq-server#156.
1 parent 8d7844a commit a6ff430

File tree

4 files changed

+52
-2
lines changed

4 files changed

+52
-2
lines changed

projects/client/RabbitMQ.Client/src/client/impl/Command.cs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,18 @@ public byte[] ConsolidateBody()
153153
}
154154

155155
public void Transmit(int channelNumber, Connection connection)
156+
{
157+
if(Method.HasContent)
158+
{
159+
TransmitAsFrameSet(channelNumber, connection);
160+
}
161+
else
162+
{
163+
TransmitAsSingleFrame(channelNumber, connection);
164+
}
165+
}
166+
167+
public void TransmitAsSingleFrame(int channelNumber, Connection connection)
156168
{
157169
var frame = new Frame(Constants.FrameMethod, channelNumber);
158170
NetworkBinaryWriter writer = frame.GetWriter();
@@ -162,6 +174,20 @@ public void Transmit(int channelNumber, Connection connection)
162174
Method.WriteArgumentsTo(argWriter);
163175
argWriter.Flush();
164176
connection.WriteFrame(frame);
177+
}
178+
179+
public void TransmitAsFrameSet(int channelNumber, Connection connection)
180+
{
181+
var frame = new Frame(Constants.FrameMethod, channelNumber);
182+
NetworkBinaryWriter writer = frame.GetWriter();
183+
writer.Write((ushort)Method.ProtocolClassId);
184+
writer.Write((ushort)Method.ProtocolMethodId);
185+
var argWriter = new MethodArgumentWriter(writer);
186+
Method.WriteArgumentsTo(argWriter);
187+
argWriter.Flush();
188+
189+
var frames = new List<Frame>();
190+
frames.Add(frame);
165191

166192
if (Method.HasContent)
167193
{
@@ -171,7 +197,7 @@ public void Transmit(int channelNumber, Connection connection)
171197
writer = frame.GetWriter();
172198
writer.Write((ushort)Header.ProtocolClassId);
173199
Header.WriteTo(writer, (ulong)body.Length);
174-
connection.WriteFrame(frame);
200+
frames.Add(frame);
175201

176202
var frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
177203
int bodyPayloadMax = (frameMax == 0)
@@ -185,9 +211,11 @@ public void Transmit(int channelNumber, Connection connection)
185211
writer = frame.GetWriter();
186212
writer.Write(body, offset,
187213
(remaining < bodyPayloadMax) ? remaining : bodyPayloadMax);
188-
connection.WriteFrame(frame);
214+
frames.Add(frame);
189215
}
190216
}
217+
218+
connection.WriteFrameSet(frames);
191219
}
192220
}
193221
}

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,6 +1015,12 @@ public void WriteFrame(Frame f)
10151015
m_heartbeatWrite.Set();
10161016
}
10171017

1018+
public void WriteFrameSet(IList<Frame> f)
1019+
{
1020+
m_frameHandler.WriteFrameSet(f);
1021+
m_heartbeatWrite.Set();
1022+
}
1023+
10181024
///<summary>API-side invocation of connection abort.</summary>
10191025
public void Abort()
10201026
{

projects/client/RabbitMQ.Client/src/client/impl/IFrameHandler.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
//---------------------------------------------------------------------------
4040

4141
using System;
42+
using System.Collections.Generic;
4243
using System.Net;
4344

4445
namespace RabbitMQ.Client.Impl
@@ -69,6 +70,8 @@ public interface IFrameHandler
6970

7071
void WriteFrame(Frame frame);
7172

73+
void WriteFrameSet(IList<Frame> frames);
74+
7275
void Flush();
7376
}
7477
}

projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
using System;
4444
using System.IO;
4545
using System.Net;
46+
using System.Collections.Generic;
4647
using System.Net.Sockets;
4748
using System.Text;
4849

@@ -222,6 +223,18 @@ public void WriteFrame(Frame frame)
222223
}
223224
}
224225

226+
public void WriteFrameSet(IList<Frame> frames)
227+
{
228+
lock (m_writer)
229+
{
230+
foreach(var f in frames)
231+
{
232+
f.WriteTo(m_writer);
233+
}
234+
m_writer.Flush();
235+
}
236+
}
237+
225238
public void Flush()
226239
{
227240
lock (m_writer)

0 commit comments

Comments
 (0)