Skip to content

Commit c4178a9

Browse files
Merge pull request #104 from rabbitmq/rabbitmq-server-156
When sending methods with concent, flush socket stream once
2 parents 736a991 + 0bbca43 commit c4178a9

File tree

5 files changed

+65
-6
lines changed

5 files changed

+65
-6
lines changed

projects/client/RabbitMQ.Client.WinRT/src/client/impl/SocketFrameHandler.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
using RabbitMQ.Util;
4343
using System;
4444
using System.IO;
45+
using System.Collections.Generic;
4546
using System.Text;
4647
using System.Threading;
4748
using Windows.Foundation;
@@ -195,6 +196,18 @@ public void WriteFrame(Frame frame)
195196
}
196197
}
197198

199+
public void WriteFrameSet(IList<Frame> frames)
200+
{
201+
lock (m_writer)
202+
{
203+
foreach (var f in frames)
204+
{
205+
f.WriteTo(m_writer);
206+
}
207+
m_writer.Flush();
208+
}
209+
}
210+
198211
public void Flush()
199212
{
200213
lock (m_writer)

projects/client/RabbitMQ.Client/src/client/impl/Command.cs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,18 @@ public byte[] ConsolidateBody()
153153
}
154154

155155
public void Transmit(int channelNumber, Connection connection)
156+
{
157+
if(Method.HasContent)
158+
{
159+
TransmitAsFrameSet(channelNumber, connection);
160+
}
161+
else
162+
{
163+
TransmitAsSingleFrame(channelNumber, connection);
164+
}
165+
}
166+
167+
public void TransmitAsSingleFrame(int channelNumber, Connection connection)
156168
{
157169
var frame = new Frame(Constants.FrameMethod, channelNumber);
158170
NetworkBinaryWriter writer = frame.GetWriter();
@@ -162,6 +174,20 @@ public void Transmit(int channelNumber, Connection connection)
162174
Method.WriteArgumentsTo(argWriter);
163175
argWriter.Flush();
164176
connection.WriteFrame(frame);
177+
}
178+
179+
public void TransmitAsFrameSet(int channelNumber, Connection connection)
180+
{
181+
var frame = new Frame(Constants.FrameMethod, channelNumber);
182+
NetworkBinaryWriter writer = frame.GetWriter();
183+
writer.Write((ushort)Method.ProtocolClassId);
184+
writer.Write((ushort)Method.ProtocolMethodId);
185+
var argWriter = new MethodArgumentWriter(writer);
186+
Method.WriteArgumentsTo(argWriter);
187+
argWriter.Flush();
188+
189+
var frames = new List<Frame>();
190+
frames.Add(frame);
165191

166192
if (Method.HasContent)
167193
{
@@ -171,7 +197,7 @@ public void Transmit(int channelNumber, Connection connection)
171197
writer = frame.GetWriter();
172198
writer.Write((ushort)Header.ProtocolClassId);
173199
Header.WriteTo(writer, (ulong)body.Length);
174-
connection.WriteFrame(frame);
200+
frames.Add(frame);
175201

176202
var frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
177203
int bodyPayloadMax = (frameMax == 0)
@@ -185,9 +211,11 @@ public void Transmit(int channelNumber, Connection connection)
185211
writer = frame.GetWriter();
186212
writer.Write(body, offset,
187213
(remaining < bodyPayloadMax) ? remaining : bodyPayloadMax);
188-
connection.WriteFrame(frame);
214+
frames.Add(frame);
189215
}
190216
}
217+
218+
connection.WriteFrameSet(frames);
191219
}
192220
}
193221
}

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,6 +1122,12 @@ public void WriteFrame(Frame f)
11221122
m_heartbeatWrite.Set();
11231123
}
11241124

1125+
public void WriteFrameSet(IList<Frame> f)
1126+
{
1127+
m_frameHandler.WriteFrameSet(f);
1128+
m_heartbeatWrite.Set();
1129+
}
1130+
11251131
///<summary>API-side invocation of connection abort.</summary>
11261132
public void Abort()
11271133
{

projects/client/RabbitMQ.Client/src/client/impl/IFrameHandler.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
//---------------------------------------------------------------------------
4040

4141
using System;
42+
using System.Collections.Generic;
4243
using System.Net;
4344

4445
namespace RabbitMQ.Client.Impl
@@ -73,6 +74,8 @@ public interface IFrameHandler
7374

7475
void WriteFrame(Frame frame);
7576

77+
void WriteFrameSet(IList<Frame> frames);
78+
7679
void Flush();
7780
}
7881
}

projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,17 @@
4343
using System;
4444
using System.IO;
4545
using System.Net;
46+
using System.Collections.Generic;
4647
using System.Net.Sockets;
4748
using System.Text;
4849

4950
namespace RabbitMQ.Client.Impl
5051
{
5152
public class SocketFrameHandler : IFrameHandler
5253
{
53-
// ^^ System.Net.Sockets.SocketError doesn't exist in .NET 1.1
54-
5554
// Timeout in seconds to wait for a clean socket close.
5655
public const int SOCKET_CLOSING_TIMEOUT = 1;
5756

58-
public const int WSAEWOULDBLOCK = 10035;
59-
6057
public NetworkBinaryReader m_reader;
6158
public TcpClient m_socket;
6259
public NetworkBinaryWriter m_writer;
@@ -226,6 +223,18 @@ public void WriteFrame(Frame frame)
226223
}
227224
}
228225

226+
public void WriteFrameSet(IList<Frame> frames)
227+
{
228+
lock (m_writer)
229+
{
230+
foreach(var f in frames)
231+
{
232+
f.WriteTo(m_writer);
233+
}
234+
m_writer.Flush();
235+
}
236+
}
237+
229238
public void Flush()
230239
{
231240
lock (m_writer)

0 commit comments

Comments
 (0)