Skip to content

Commit 3295551

Browse files
authored
Batch Publish
Batch publish allows sending multiple messages in one stream on the socket. Sending in batches improves performance by reducing the number of TCP/IP messages sent and TCP/IP acknowledgments received. This change compiles the commands for all publish messages into a memory buffer that is posted to the socket as a single stream. Closing the socket/model/connection before the buffer has completed sending does not guarantee message delivery.
1 parent 557be74 commit 3295551

File tree

4 files changed

+101
-3
lines changed

4 files changed

+101
-3
lines changed

projects/client/RabbitMQ.Client/src/client/impl/Command.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ public void Transmit(int channelNumber, Connection connection)
140140
}
141141
}
142142

143+
144+
143145
public void TransmitAsSingleFrame(int channelNumber, Connection connection)
144146
{
145147
connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method));
@@ -166,5 +168,31 @@ public void TransmitAsFrameSet(int channelNumber, Connection connection)
166168

167169
connection.WriteFrameSet(frames);
168170
}
171+
172+
173+
public static IList<OutboundFrame> CalculateFrames(int channelNumber, Connection connection, IEnumerable<Command> commands)
174+
{
175+
List<OutboundFrame> frames = new List<Impl.OutboundFrame>();
176+
177+
foreach (var cmd in commands)
178+
{
179+
frames.Add(new MethodOutboundFrame(channelNumber, cmd.Method));
180+
if (cmd.Method.HasContent)
181+
{
182+
var body = cmd.Body;// var body = ConsolidateBody(); // Cache, since the property is compiled.
183+
184+
frames.Add(new HeaderOutboundFrame(channelNumber, cmd.Header, body.Length));
185+
var frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
186+
var bodyPayloadMax = (frameMax == 0) ? body.Length : frameMax - EmptyFrameSize;
187+
for (int offset = 0; offset < body.Length; offset += bodyPayloadMax)
188+
{
189+
var remaining = body.Length - offset;
190+
var count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
191+
frames.Add(new BodySegmentOutboundFrame(channelNumber, body, offset, count));
192+
}
193+
}
194+
}
195+
return frames;
196+
}
169197
}
170198
}

projects/client/RabbitMQ.Client/src/client/impl/ISession.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@
3939
//---------------------------------------------------------------------------
4040

4141
using System;
42-
42+
using System.Collections.Generic;
43+
4344
namespace RabbitMQ.Client.Impl
4445
{
4546
public interface ISession
@@ -79,5 +80,6 @@ public interface ISession
7980
void HandleFrame(InboundFrame frame);
8081
void Notify();
8182
void Transmit(Command cmd);
83+
void Transmit(IEnumerable<Command> cmd);
8284
}
8385
}

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,19 @@ public void ModelSend(MethodBase method, ContentHeaderBase header, byte[] body)
472472
Session.Transmit(new Command(method, header, body));
473473
}
474474
}
475+
public void ModelSend(MethodBase method, IEnumerable<BatchMessage> messages)
476+
{
477+
if (method.HasContent)
478+
{
479+
m_flowControlBlock.WaitOne();
480+
}
481+
List<Command> commands = new List<Impl.Command>();
482+
foreach (var message in messages)
483+
{
484+
commands.Add(new Command(method, (ContentHeaderBase)message.basicProperties, message.Body));
485+
}
486+
Session.Transmit(commands);
487+
}
475488

476489
public virtual void OnBasicAck(BasicAckEventArgs args)
477490
{
@@ -1050,6 +1063,12 @@ public abstract void _Private_BasicPublish(string exchange,
10501063
IBasicProperties basicProperties,
10511064
byte[] body);
10521065

1066+
//public abstract void _Private_BasicBatchPublish(string exchange,
1067+
// string routingKey,
1068+
// bool mandatory,
1069+
// IEnumerable<BatchMessage> messages);
1070+
1071+
10531072
public abstract void _Private_BasicRecover(bool requeue);
10541073

10551074
public abstract void _Private_ChannelClose(ushort replyCode,
@@ -1231,8 +1250,52 @@ public void BasicPublish(string exchange,
12311250
mandatory,
12321251
basicProperties,
12331252
body);
1234-
}
1253+
}
1254+
1255+
public void BasicBatchPublish(string exchange,
1256+
string routingKey,
1257+
bool mandatory,
1258+
IEnumerable<BatchMessage> messages)
1259+
{
1260+
foreach (var message in messages)
1261+
{
1262+
if (message.basicProperties == null)
1263+
{
1264+
message.basicProperties = CreateBasicProperties();
1265+
}
1266+
1267+
if (NextPublishSeqNo > 0)
1268+
{
1269+
lock (m_unconfirmedSet.SyncRoot)
1270+
{
1271+
if (!m_unconfirmedSet.Contains(NextPublishSeqNo))
1272+
{
1273+
m_unconfirmedSet.Add(NextPublishSeqNo);
1274+
}
1275+
NextPublishSeqNo++;
1276+
}
1277+
}
1278+
}
12351279

1280+
_Private_BasicBatchPublish(exchange,
1281+
routingKey,
1282+
mandatory,
1283+
messages);
1284+
}
1285+
public void _Private_BasicBatchPublish(
1286+
string @exchange,
1287+
string @routingKey,
1288+
bool @mandatory,
1289+
//bool @immediate,
1290+
IEnumerable<BatchMessage> messages)
1291+
{
1292+
BasicPublish __req = new BasicPublish();
1293+
__req.m_exchange = @exchange;
1294+
__req.m_routingKey = @routingKey;
1295+
__req.m_mandatory = @mandatory;
1296+
//__req.m_immediate = @immediate;
1297+
ModelSend(__req, messages);
1298+
}
12361299
public abstract void BasicQos(uint prefetchSize,
12371300
ushort prefetchCount,
12381301
bool global);

projects/client/RabbitMQ.Client/src/client/impl/SessionBase.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@
4141
using System;
4242
using RabbitMQ.Client.Exceptions;
4343
using RabbitMQ.Client.Framing.Impl;
44-
44+
using System.Collections.Generic;
45+
4546
namespace RabbitMQ.Client.Impl
4647
{
4748
public abstract class SessionBase : ISession
@@ -199,5 +200,9 @@ public virtual void Transmit(Command cmd)
199200
// of frames within a channel. But that is fixed in socket frame handler instead, so no need to lock.
200201
cmd.Transmit(ChannelNumber, Connection);
201202
}
203+
public virtual void Transmit(IEnumerable<Command> commands)
204+
{
205+
Connection.WriteFrameSet(Command.CalculateFrames(ChannelNumber, Connection, commands));
206+
}
202207
}
203208
}

0 commit comments

Comments
 (0)