Skip to content

Commit 253afa9

Browse files
authored
Merge pull request #1 from YulerB/YulerB-feature-batch
Batch Publish
2 parents 557be74 + 3295551 commit 253afa9

File tree

4 files changed

+101
-3
lines changed

4 files changed

+101
-3
lines changed

projects/client/RabbitMQ.Client/src/client/impl/Command.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ public void Transmit(int channelNumber, Connection connection)
140140
}
141141
}
142142

143+
144+
143145
public void TransmitAsSingleFrame(int channelNumber, Connection connection)
144146
{
145147
connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method));
@@ -166,5 +168,31 @@ public void TransmitAsFrameSet(int channelNumber, Connection connection)
166168

167169
connection.WriteFrameSet(frames);
168170
}
171+
172+
173+
public static IList<OutboundFrame> CalculateFrames(int channelNumber, Connection connection, IEnumerable<Command> commands)
174+
{
175+
List<OutboundFrame> frames = new List<Impl.OutboundFrame>();
176+
177+
foreach (var cmd in commands)
178+
{
179+
frames.Add(new MethodOutboundFrame(channelNumber, cmd.Method));
180+
if (cmd.Method.HasContent)
181+
{
182+
var body = cmd.Body;// var body = ConsolidateBody(); // Cache, since the property is compiled.
183+
184+
frames.Add(new HeaderOutboundFrame(channelNumber, cmd.Header, body.Length));
185+
var frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
186+
var bodyPayloadMax = (frameMax == 0) ? body.Length : frameMax - EmptyFrameSize;
187+
for (int offset = 0; offset < body.Length; offset += bodyPayloadMax)
188+
{
189+
var remaining = body.Length - offset;
190+
var count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
191+
frames.Add(new BodySegmentOutboundFrame(channelNumber, body, offset, count));
192+
}
193+
}
194+
}
195+
return frames;
196+
}
169197
}
170198
}

projects/client/RabbitMQ.Client/src/client/impl/ISession.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@
3939
//---------------------------------------------------------------------------
4040

4141
using System;
42-
42+
using System.Collections.Generic;
43+
4344
namespace RabbitMQ.Client.Impl
4445
{
4546
public interface ISession
@@ -79,5 +80,6 @@ public interface ISession
7980
void HandleFrame(InboundFrame frame);
8081
void Notify();
8182
void Transmit(Command cmd);
83+
void Transmit(IEnumerable<Command> cmd);
8284
}
8385
}

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,19 @@ public void ModelSend(MethodBase method, ContentHeaderBase header, byte[] body)
472472
Session.Transmit(new Command(method, header, body));
473473
}
474474
}
475+
public void ModelSend(MethodBase method, IEnumerable<BatchMessage> messages)
476+
{
477+
if (method.HasContent)
478+
{
479+
m_flowControlBlock.WaitOne();
480+
}
481+
List<Command> commands = new List<Impl.Command>();
482+
foreach (var message in messages)
483+
{
484+
commands.Add(new Command(method, (ContentHeaderBase)message.basicProperties, message.Body));
485+
}
486+
Session.Transmit(commands);
487+
}
475488

476489
public virtual void OnBasicAck(BasicAckEventArgs args)
477490
{
@@ -1050,6 +1063,12 @@ public abstract void _Private_BasicPublish(string exchange,
10501063
IBasicProperties basicProperties,
10511064
byte[] body);
10521065

1066+
//public abstract void _Private_BasicBatchPublish(string exchange,
1067+
// string routingKey,
1068+
// bool mandatory,
1069+
// IEnumerable<BatchMessage> messages);
1070+
1071+
10531072
public abstract void _Private_BasicRecover(bool requeue);
10541073

10551074
public abstract void _Private_ChannelClose(ushort replyCode,
@@ -1231,8 +1250,52 @@ public void BasicPublish(string exchange,
12311250
mandatory,
12321251
basicProperties,
12331252
body);
1234-
}
1253+
}
1254+
1255+
public void BasicBatchPublish(string exchange,
1256+
string routingKey,
1257+
bool mandatory,
1258+
IEnumerable<BatchMessage> messages)
1259+
{
1260+
foreach (var message in messages)
1261+
{
1262+
if (message.basicProperties == null)
1263+
{
1264+
message.basicProperties = CreateBasicProperties();
1265+
}
1266+
1267+
if (NextPublishSeqNo > 0)
1268+
{
1269+
lock (m_unconfirmedSet.SyncRoot)
1270+
{
1271+
if (!m_unconfirmedSet.Contains(NextPublishSeqNo))
1272+
{
1273+
m_unconfirmedSet.Add(NextPublishSeqNo);
1274+
}
1275+
NextPublishSeqNo++;
1276+
}
1277+
}
1278+
}
12351279

1280+
_Private_BasicBatchPublish(exchange,
1281+
routingKey,
1282+
mandatory,
1283+
messages);
1284+
}
1285+
public void _Private_BasicBatchPublish(
1286+
string @exchange,
1287+
string @routingKey,
1288+
bool @mandatory,
1289+
//bool @immediate,
1290+
IEnumerable<BatchMessage> messages)
1291+
{
1292+
BasicPublish __req = new BasicPublish();
1293+
__req.m_exchange = @exchange;
1294+
__req.m_routingKey = @routingKey;
1295+
__req.m_mandatory = @mandatory;
1296+
//__req.m_immediate = @immediate;
1297+
ModelSend(__req, messages);
1298+
}
12361299
public abstract void BasicQos(uint prefetchSize,
12371300
ushort prefetchCount,
12381301
bool global);

projects/client/RabbitMQ.Client/src/client/impl/SessionBase.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@
4141
using System;
4242
using RabbitMQ.Client.Exceptions;
4343
using RabbitMQ.Client.Framing.Impl;
44-
44+
using System.Collections.Generic;
45+
4546
namespace RabbitMQ.Client.Impl
4647
{
4748
public abstract class SessionBase : ISession
@@ -199,5 +200,9 @@ public virtual void Transmit(Command cmd)
199200
// of frames within a channel. But that is fixed in socket frame handler instead, so no need to lock.
200201
cmd.Transmit(ChannelNumber, Connection);
201202
}
203+
public virtual void Transmit(IEnumerable<Command> commands)
204+
{
205+
Connection.WriteFrameSet(Command.CalculateFrames(ChannelNumber, Connection, commands));
206+
}
202207
}
203208
}

0 commit comments

Comments
 (0)