Skip to content

Commit 52cb320

Browse files
committed
Change BatchMessage for the IMessageBatch abstraction.
[#152515371]
1 parent 738c3c3 commit 52cb320

File tree

10 files changed

+156
-79
lines changed

10 files changed

+156
-79
lines changed

projects/client/ApigenBootstrap/ApigenBootstrap.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
</PropertyGroup>
1616

1717
<ItemGroup>
18-
<Compile Include="..\RabbitMQ.Client\src\client\api\AmqpTimestamp.cs;..\RabbitMQ.Client\src\client\api\IBasicConsumer.cs;..\RabbitMQ.Client\src\client\api\IBasicProperties.cs;..\RabbitMQ.Client\src\client\api\IContentHeader.cs;..\RabbitMQ.Client\src\client\api\IModel.cs;..\RabbitMQ.Client\src\client\api\PublicationAddress.cs;..\RabbitMQ.Client\src\client\api\BasicGetResult.cs;..\RabbitMQ.Client\src\client\api\QueueDeclareOk.cs;..\RabbitMQ.Client\src\client\api\ShutdownEventArgs.cs;..\RabbitMQ.Client\src\client\api\ShutdownInitiator.cs;..\RabbitMQ.Client\src\client\events\BasicReturnEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicAckEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicNackEventArgs.cs;..\RabbitMQ.Client\src\client\events\CallbackExceptionEventArgs.cs;..\RabbitMQ.Client\src\client\events\ConsumerEventArgs.cs;..\RabbitMQ.Client\src\client\events\FlowControlEventArgs.cs;..\RabbitMQ.Client\src\client\impl\IFullModel.cs" />
18+
<Compile Include="..\RabbitMQ.Client\src\client\api\AmqpTimestamp.cs;..\RabbitMQ.Client\src\client\api\IBasicConsumer.cs;..\RabbitMQ.Client\src\client\api\IBasicProperties.cs;..\RabbitMQ.Client\src\client\api\IContentHeader.cs;..\RabbitMQ.Client\src\client\api\IModel.cs;..\RabbitMQ.Client\src\client\api\PublicationAddress.cs;..\RabbitMQ.Client\src\client\api\IMessageBatch.cs;..\RabbitMQ.Client\src\client\api\BasicGetResult.cs;..\RabbitMQ.Client\src\client\api\QueueDeclareOk.cs;..\RabbitMQ.Client\src\client\api\ShutdownEventArgs.cs;..\RabbitMQ.Client\src\client\api\ShutdownInitiator.cs;..\RabbitMQ.Client\src\client\events\BasicReturnEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicAckEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicNackEventArgs.cs;..\RabbitMQ.Client\src\client\events\CallbackExceptionEventArgs.cs;..\RabbitMQ.Client\src\client\events\ConsumerEventArgs.cs;..\RabbitMQ.Client\src\client\events\FlowControlEventArgs.cs;..\RabbitMQ.Client\src\client\impl\IFullModel.cs" />
1919
</ItemGroup>
2020

2121
</Project>
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace RabbitMQ.Client
2+
{
3+
public interface IMessageBatch
4+
{
5+
void Add(string exchange, string routingKey, bool mandatory, IBasicProperties properties, byte[] body);
6+
void Publish();
7+
}
8+
}

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
using RabbitMQ.Client.Apigen.Attributes;
4242
using RabbitMQ.Client.Events;
43+
using RabbitMQ.Client;
4344
using System;
4445
using System.Collections.Generic;
4546

@@ -226,11 +227,6 @@ string BasicConsume(
226227
void BasicPublish(string exchange, string routingKey, bool mandatory,
227228
IBasicProperties basicProperties, byte[] body);
228229

229-
[AmqpMethodDoNotImplement(null)]
230-
void BasicBatchPublish(string exchange, string routingKey, bool mandatory,
231-
IEnumerable<BatchMessage> messages);
232-
233-
234230
/// <summary>
235231
/// Configures QoS parameters of the Basic content-class.
236232
/// </summary>
@@ -283,6 +279,12 @@ void BasicBatchPublish(string exchange, string routingKey, bool mandatory,
283279
[AmqpMethodDoNotImplement(null)]
284280
void ConfirmSelect();
285281

282+
/// <summary>
283+
/// Creates a MessagBatch instance
284+
/// </summary>
285+
[AmqpMethodDoNotImplement(null)]
286+
IMessageBatch CreateMessageBatch();
287+
286288
/// <summary>
287289
/// Construct a completely empty content header for use with the Basic content class.
288290
/// </summary>
@@ -557,8 +559,4 @@ void QueueDeclareNoWait(string queue, bool durable,
557559
/// </summary>
558560
TimeSpan ContinuationTimeout { get; set; }
559561
}
560-
public class BatchMessage{
561-
public byte[] Body { get; set; }
562-
public IBasicProperties basicProperties { get; set; }
563-
}
564562
}

projects/client/RabbitMQ.Client/src/client/api/IModelExtensions.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,6 @@ public static void BasicPublish(this IModel model, string exchange, string routi
103103
{
104104
model.BasicPublish(exchange, routingKey, false, basicProperties, body);
105105
}
106-
public static void BasicBatchPublish(this IModel model, string exchange, string routingKey, IEnumerable<BatchMessage> messages)
107-
{
108-
model.BasicBatchPublish(exchange, routingKey, false, messages);
109-
}
110106

111107
/// <summary>
112108
/// (Spec method) Convenience overload of BasicPublish.

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringModel.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1212,5 +1212,10 @@ protected void RunRecoveryEventHandlers()
12121212
}
12131213
}
12141214
}
1215+
1216+
public IMessageBatch CreateMessageBatch()
1217+
{
1218+
return ((IFullModel)m_delegate).CreateMessageBatch();
1219+
}
12151220
}
12161221
}

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,6 @@ public class Connection : IConnection
105105
private Timer _heartbeatWriteTimer;
106106
private Timer _heartbeatReadTimer;
107107
private AutoResetEvent m_heartbeatRead = new AutoResetEvent(false);
108-
private AutoResetEvent m_heartbeatWrite = new AutoResetEvent(false);
109-
110108

111109
// true if we haven't finished connection negotiation.
112110
// In this state socket exceptions are treated as fatal connection
@@ -537,7 +535,6 @@ public void FinishClose()
537535
{
538536
// Notify hearbeat loops that they can leave
539537
m_heartbeatRead.Set();
540-
m_heartbeatWrite.Set();
541538
m_closed = true;
542539
MaybeStopHeartbeatTimers();
543540

@@ -1170,13 +1167,11 @@ public override string ToString()
11701167
public void WriteFrame(OutboundFrame f)
11711168
{
11721169
m_frameHandler.WriteFrame(f);
1173-
m_heartbeatWrite.Set();
11741170
}
11751171

11761172
public void WriteFrameSet(IList<OutboundFrame> f)
11771173
{
11781174
m_frameHandler.WriteFrameSet(f);
1179-
m_heartbeatWrite.Set();
11801175
}
11811176

11821177
///<summary>API-side invocation of connection abort.</summary>
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
namespace RabbitMQ.Client.Impl
2+
{
3+
using System.Collections.Generic;
4+
using RabbitMQ.Client;
5+
using RabbitMQ.Client.Framing.Impl;
6+
using RabbitMQ.Client.Impl;
7+
8+
public class MessageBatch : IMessageBatch
9+
{
10+
private List<Command> commands = new List<Command>();
11+
private ModelBase model;
12+
internal MessageBatch(ModelBase model)
13+
{
14+
this.model = model;
15+
}
16+
17+
public void Add(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, byte[] body)
18+
{
19+
var bp = basicProperties == null ? model.CreateBasicProperties() : basicProperties;
20+
var method = new BasicPublish
21+
{
22+
m_exchange = exchange,
23+
m_routingKey = routingKey,
24+
m_mandatory = mandatory
25+
};
26+
27+
commands.Add(new Command(method, (ContentHeaderBase)bp, body));
28+
}
29+
30+
public void Publish()
31+
{
32+
model.SendCommands(commands);
33+
}
34+
}
35+
}

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 32 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -472,19 +472,6 @@ public void ModelSend(MethodBase method, ContentHeaderBase header, byte[] body)
472472
Session.Transmit(new Command(method, header, body));
473473
}
474474
}
475-
public void ModelSend(MethodBase method, IEnumerable<BatchMessage> messages)
476-
{
477-
if (method.HasContent)
478-
{
479-
m_flowControlBlock.WaitOne();
480-
}
481-
List<Command> commands = new List<Impl.Command>();
482-
foreach (var message in messages)
483-
{
484-
commands.Add(new Command(method, (ContentHeaderBase)message.basicProperties, message.Body));
485-
}
486-
Session.Transmit(commands);
487-
}
488475

489476
public virtual void OnBasicAck(BasicAckEventArgs args)
490477
{
@@ -1218,6 +1205,26 @@ public abstract void BasicNack(ulong deliveryTag,
12181205
bool multiple,
12191206
bool requeue);
12201207

1208+
internal void AllocatatePublishSeqNos(int count)
1209+
{
1210+
var c = 0;
1211+
lock (m_unconfirmedSet.SyncRoot)
1212+
{
1213+
while(c < count)
1214+
{
1215+
if (NextPublishSeqNo > 0)
1216+
{
1217+
if (!m_unconfirmedSet.Contains(NextPublishSeqNo))
1218+
{
1219+
m_unconfirmedSet.Add(NextPublishSeqNo);
1220+
}
1221+
NextPublishSeqNo++;
1222+
}
1223+
c++;
1224+
}
1225+
}
1226+
}
1227+
12211228
public void BasicPublish(string exchange,
12221229
string routingKey,
12231230
bool mandatory,
@@ -1246,50 +1253,6 @@ public void BasicPublish(string exchange,
12461253
body);
12471254
}
12481255

1249-
public void BasicBatchPublish(string exchange,
1250-
string routingKey,
1251-
bool mandatory,
1252-
IEnumerable<BatchMessage> messages)
1253-
{
1254-
foreach (var message in messages)
1255-
{
1256-
if (message.basicProperties == null)
1257-
{
1258-
message.basicProperties = CreateBasicProperties();
1259-
}
1260-
1261-
if (NextPublishSeqNo > 0)
1262-
{
1263-
lock (m_unconfirmedSet.SyncRoot)
1264-
{
1265-
if (!m_unconfirmedSet.Contains(NextPublishSeqNo))
1266-
{
1267-
m_unconfirmedSet.Add(NextPublishSeqNo);
1268-
}
1269-
NextPublishSeqNo++;
1270-
}
1271-
}
1272-
}
1273-
1274-
_Private_BasicBatchPublish(exchange,
1275-
routingKey,
1276-
mandatory,
1277-
messages);
1278-
}
1279-
public void _Private_BasicBatchPublish(
1280-
string @exchange,
1281-
string @routingKey,
1282-
bool @mandatory,
1283-
//bool @immediate,
1284-
IEnumerable<BatchMessage> messages)
1285-
{
1286-
BasicPublish __req = new BasicPublish();
1287-
__req.m_exchange = @exchange;
1288-
__req.m_routingKey = @routingKey;
1289-
__req.m_mandatory = @mandatory;
1290-
//__req.m_immediate = @immediate;
1291-
ModelSend(__req, messages);
1292-
}
12931256
public abstract void BasicQos(uint prefetchSize,
12941257
ushort prefetchCount,
12951258
bool global);
@@ -1330,6 +1293,10 @@ public void ConfirmSelect()
13301293
///////////////////////////////////////////////////////////////////////////
13311294

13321295
public abstract IBasicProperties CreateBasicProperties();
1296+
public IMessageBatch CreateMessageBatch()
1297+
{
1298+
return new MessageBatch(this);
1299+
}
13331300

13341301

13351302
public void ExchangeBind(string destination,
@@ -1553,6 +1520,13 @@ public void WaitForConfirmsOrDie(TimeSpan timeout)
15531520
}
15541521
}
15551522

1523+
internal void SendCommands(IList<Command> commands)
1524+
{
1525+
m_flowControlBlock.WaitOne();
1526+
AllocatatePublishSeqNos(commands.Count);
1527+
Session.Transmit(commands);
1528+
}
1529+
15561530
protected virtual void handleAckNack(ulong deliveryTag, bool multiple, bool isNack)
15571531
{
15581532
lock (m_unconfirmedSet.SyncRoot)
@@ -1591,6 +1565,7 @@ private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bo
15911565
return k.m_result;
15921566
}
15931567

1568+
15941569
public class BasicConsumerRpcContinuation : SimpleBlockingRpcContinuation
15951570
{
15961571
public IBasicConsumer m_consumer;

projects/client/RabbitMQ.Client/src/client/impl/SessionBase.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@
4141
using System;
4242
using RabbitMQ.Client.Exceptions;
4343
using RabbitMQ.Client.Framing.Impl;
44-
using System.Collections.Generic;
45-
44+
using System.Collections.Generic;
45+
4646
namespace RabbitMQ.Client.Impl
4747
{
4848
public abstract class SessionBase : ISession
@@ -201,7 +201,7 @@ public virtual void Transmit(Command cmd)
201201
cmd.Transmit(ChannelNumber, Connection);
202202
}
203203
public virtual void Transmit(IEnumerable<Command> commands)
204-
{
204+
{
205205
Connection.WriteFrameSet(Command.CalculateFrames(ChannelNumber, Connection, commands));
206206
}
207207
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2016 Pivotal Software, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// http://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v1.1:
23+
//
24+
//---------------------------------------------------------------------------
25+
// The contents of this file are subject to the Mozilla Public License
26+
// Version 1.1 (the "License"); you may not use this file except in
27+
// compliance with the License. You may obtain a copy of the License
28+
// at http://www.mozilla.org/MPL/
29+
//
30+
// Software distributed under the License is distributed on an "AS IS"
31+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32+
// the License for the specific language governing rights and
33+
// limitations under the License.
34+
//
35+
// The Original Code is RabbitMQ.
36+
//
37+
// The Initial Developer of the Original Code is Pivotal Software, Inc.
38+
// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
39+
//---------------------------------------------------------------------------
40+
41+
using NUnit.Framework;
42+
using RabbitMQ.Client;
43+
using RabbitMQ.Client.Impl;
44+
using System;
45+
46+
namespace RabbitMQ.Client.Unit
47+
{
48+
internal class TestMessageBatch : IntegrationFixture
49+
{
50+
[Test]
51+
public void TestMessageBatchSend()
52+
{
53+
Model.QueueDeclare(queue: "test-message-batch-a", durable: false);
54+
Model.QueueDeclare(queue: "test-message-batch-b", durable: false);
55+
var batch = Model.CreateMessageBatch();
56+
batch.Add("", "test-message-batch-a", false, null, new byte [] {});
57+
batch.Add("", "test-message-batch-b", false, null, new byte [] {});
58+
batch.Publish();
59+
var resultA = Model.BasicGet("test-message-batch-a", true);
60+
Assert.NotNull(resultA);
61+
var resultB = Model.BasicGet("test-message-batch-b", true);
62+
Assert.NotNull(resultB);
63+
}
64+
}
65+
}

0 commit comments

Comments
 (0)