Skip to content

Commit 74c9c24

Browse files
committed
MessageBatch -> BasicPublishBatch
BasicPublishBatch can be used for more efficient batch publishing that will better utilise TCP and provides more throughput. [#152041636]
1 parent c020263 commit 74c9c24

File tree

10 files changed

+38
-37
lines changed

10 files changed

+38
-37
lines changed

projects/client/ApigenBootstrap/ApigenBootstrap.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
</PropertyGroup>
1616

1717
<ItemGroup>
18-
<Compile Include="..\RabbitMQ.Client\src\client\api\AmqpTimestamp.cs;..\RabbitMQ.Client\src\client\api\IBasicConsumer.cs;..\RabbitMQ.Client\src\client\api\IBasicProperties.cs;..\RabbitMQ.Client\src\client\api\IContentHeader.cs;..\RabbitMQ.Client\src\client\api\IModel.cs;..\RabbitMQ.Client\src\client\api\PublicationAddress.cs;..\RabbitMQ.Client\src\client\api\IMessageBatch.cs;..\RabbitMQ.Client\src\client\api\BasicGetResult.cs;..\RabbitMQ.Client\src\client\api\QueueDeclareOk.cs;..\RabbitMQ.Client\src\client\api\ShutdownEventArgs.cs;..\RabbitMQ.Client\src\client\api\ShutdownInitiator.cs;..\RabbitMQ.Client\src\client\events\BasicReturnEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicAckEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicNackEventArgs.cs;..\RabbitMQ.Client\src\client\events\CallbackExceptionEventArgs.cs;..\RabbitMQ.Client\src\client\events\ConsumerEventArgs.cs;..\RabbitMQ.Client\src\client\events\FlowControlEventArgs.cs;..\RabbitMQ.Client\src\client\impl\IFullModel.cs" />
18+
<Compile Include="..\RabbitMQ.Client\src\client\api\AmqpTimestamp.cs;..\RabbitMQ.Client\src\client\api\IBasicConsumer.cs;..\RabbitMQ.Client\src\client\api\IBasicProperties.cs;..\RabbitMQ.Client\src\client\api\IContentHeader.cs;..\RabbitMQ.Client\src\client\api\IModel.cs;..\RabbitMQ.Client\src\client\api\PublicationAddress.cs;..\RabbitMQ.Client\src\client\api\IBasicPublishBatch.cs;..\RabbitMQ.Client\src\client\api\BasicGetResult.cs;..\RabbitMQ.Client\src\client\api\QueueDeclareOk.cs;..\RabbitMQ.Client\src\client\api\ShutdownEventArgs.cs;..\RabbitMQ.Client\src\client\api\ShutdownInitiator.cs;..\RabbitMQ.Client\src\client\events\BasicReturnEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicAckEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicNackEventArgs.cs;..\RabbitMQ.Client\src\client\events\CallbackExceptionEventArgs.cs;..\RabbitMQ.Client\src\client\events\ConsumerEventArgs.cs;..\RabbitMQ.Client\src\client\events\FlowControlEventArgs.cs;..\RabbitMQ.Client\src\client\impl\IFullModel.cs" />
1919
</ItemGroup>
2020

2121
</Project>

projects/client/RabbitMQ.Client/src/client/api/IMessageBatch.cs renamed to projects/client/RabbitMQ.Client/src/client/api/IBasicPublishBatch.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
//---------------------------------------------------------------------------
4040
namespace RabbitMQ.Client
4141
{
42-
public interface IMessageBatch
42+
public interface IBasicPublishBatch
4343
{
4444
void Add(string exchange, string routingKey, bool mandatory, IBasicProperties properties, byte[] body);
4545
void Publish();

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,10 +280,10 @@ void BasicPublish(string exchange, string routingKey, bool mandatory,
280280
void ConfirmSelect();
281281

282282
/// <summary>
283-
/// Creates a MessagBatch instance
283+
/// Creates a BasicPublishBatch instance
284284
/// </summary>
285285
[AmqpMethodDoNotImplement(null)]
286-
IMessageBatch CreateMessageBatch();
286+
IBasicPublishBatch CreateBasicPublishBatch();
287287

288288
/// <summary>
289289
/// Construct a completely empty content header for use with the Basic content class.

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringModel.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,9 +1213,9 @@ protected void RunRecoveryEventHandlers()
12131213
}
12141214
}
12151215

1216-
public IMessageBatch CreateMessageBatch()
1216+
public IBasicPublishBatch CreateBasicPublishBatch()
12171217
{
1218-
return ((IFullModel)m_delegate).CreateMessageBatch();
1218+
return ((IFullModel)m_delegate).CreateBasicPublishBatch();
12191219
}
12201220
}
12211221
}

projects/client/RabbitMQ.Client/src/client/impl/MessageBatch.cs renamed to projects/client/RabbitMQ.Client/src/client/impl/BasicPublishBatch.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@ namespace RabbitMQ.Client.Impl
4545
using RabbitMQ.Client.Framing.Impl;
4646
using RabbitMQ.Client.Impl;
4747

48-
public class MessageBatch : IMessageBatch
48+
public class BasicPublishBatch : IBasicPublishBatch
4949
{
5050
private List<Command> commands = new List<Command>();
5151
private ModelBase model;
52-
internal MessageBatch(ModelBase model)
52+
internal BasicPublishBatch (ModelBase model)
5353
{
5454
this.model = model;
5555
}

projects/client/RabbitMQ.Client/src/client/impl/Command.cs

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -170,28 +170,29 @@ public void TransmitAsFrameSet(int channelNumber, Connection connection)
170170
}
171171

172172

173-
public static IList<OutboundFrame> CalculateFrames(int channelNumber, Connection connection, IEnumerable<Command> commands)
173+
public static List<OutboundFrame> CalculateFrames(int channelNumber, Connection connection, IList<Command> commands)
174174
{
175-
List<OutboundFrame> frames = new List<Impl.OutboundFrame>();
176-
177-
foreach (var cmd in commands)
178-
{
179-
frames.Add(new MethodOutboundFrame(channelNumber, cmd.Method));
180-
if (cmd.Method.HasContent)
181-
{
182-
var body = cmd.Body;// var body = ConsolidateBody(); // Cache, since the property is compiled.
183-
184-
frames.Add(new HeaderOutboundFrame(channelNumber, cmd.Header, body.Length));
185-
var frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
186-
var bodyPayloadMax = (frameMax == 0) ? body.Length : frameMax - EmptyFrameSize;
187-
for (int offset = 0; offset < body.Length; offset += bodyPayloadMax)
188-
{
189-
var remaining = body.Length - offset;
190-
var count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
191-
frames.Add(new BodySegmentOutboundFrame(channelNumber, body, offset, count));
192-
}
193-
}
175+
var frames = new List<OutboundFrame>();
176+
177+
foreach (var cmd in commands)
178+
{
179+
frames.Add(new MethodOutboundFrame(channelNumber, cmd.Method));
180+
if (cmd.Method.HasContent)
181+
{
182+
var body = cmd.Body;// var body = ConsolidateBody(); // Cache, since the property is compiled.
183+
184+
frames.Add(new HeaderOutboundFrame(channelNumber, cmd.Header, body.Length));
185+
var frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
186+
var bodyPayloadMax = (frameMax == 0) ? body.Length : frameMax - EmptyFrameSize;
187+
for (int offset = 0; offset < body.Length; offset += bodyPayloadMax)
188+
{
189+
var remaining = body.Length - offset;
190+
var count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
191+
frames.Add(new BodySegmentOutboundFrame(channelNumber, body, offset, count));
192+
}
193+
}
194194
}
195+
195196
return frames;
196197
}
197198
}

projects/client/RabbitMQ.Client/src/client/impl/ISession.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@
3939
//---------------------------------------------------------------------------
4040

4141
using System;
42-
using System.Collections.Generic;
43-
42+
using System.Collections.Generic;
43+
4444
namespace RabbitMQ.Client.Impl
4545
{
4646
public interface ISession
@@ -80,6 +80,6 @@ public interface ISession
8080
void HandleFrame(InboundFrame frame);
8181
void Notify();
8282
void Transmit(Command cmd);
83-
void Transmit(IEnumerable<Command> cmd);
83+
void Transmit(IList<Command> cmd);
8484
}
8585
}

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1293,9 +1293,9 @@ public void ConfirmSelect()
12931293
///////////////////////////////////////////////////////////////////////////
12941294

12951295
public abstract IBasicProperties CreateBasicProperties();
1296-
public IMessageBatch CreateMessageBatch()
1296+
public IBasicPublishBatch CreateBasicPublishBatch()
12971297
{
1298-
return new MessageBatch(this);
1298+
return new BasicPublishBatch(this);
12991299
}
13001300

13011301

projects/client/RabbitMQ.Client/src/client/impl/SessionBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ public virtual void Transmit(Command cmd)
200200
// of frames within a channel. But that is fixed in socket frame handler instead, so no need to lock.
201201
cmd.Transmit(ChannelNumber, Connection);
202202
}
203-
public virtual void Transmit(IEnumerable<Command> commands)
203+
public virtual void Transmit(IList<Command> commands)
204204
{
205205
Connection.WriteFrameSet(Command.CalculateFrames(ChannelNumber, Connection, commands));
206206
}

projects/client/Unit/src/unit/TestMessageBatch.cs renamed to projects/client/Unit/src/unit/TestBasicPublishBatch.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,14 @@
4545

4646
namespace RabbitMQ.Client.Unit
4747
{
48-
internal class TestMessageBatch : IntegrationFixture
48+
internal class TestBasicPublishBatch : IntegrationFixture
4949
{
5050
[Test]
51-
public void TestMessageBatchSend()
51+
public void TestBasicPublishBatchSend()
5252
{
5353
Model.QueueDeclare(queue: "test-message-batch-a", durable: false);
5454
Model.QueueDeclare(queue: "test-message-batch-b", durable: false);
55-
var batch = Model.CreateMessageBatch();
55+
var batch = Model.CreateBasicPublishBatch();
5656
batch.Add("", "test-message-batch-a", false, null, new byte [] {});
5757
batch.Add("", "test-message-batch-b", false, null, new byte [] {});
5858
batch.Publish();

0 commit comments

Comments
 (0)