Skip to content

Commit 163c606

Browse files
author
Emil Einarsson
committed
rename BatchMessage to MessageBatch for consistency with Azure#539
1 parent 645cf87 commit 163c606

File tree

8 files changed

+90
-90
lines changed

8 files changed

+90
-90
lines changed

src/Microsoft.Azure.ServiceBus/Core/MessageReceiver.cs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,15 @@ public class MessageReceiver : ClientEntity, IMessageReceiver
4848
readonly ConcurrentExpiringSet<Guid> requestResponseLockedMessages;
4949
readonly bool isSessionReceiver;
5050
readonly object messageReceivePumpSyncLock;
51-
readonly object batchMessageReceivePumpSyncLock;
51+
readonly object messageBatchReceivePumpSyncLock;
5252
readonly ActiveClientLinkManager clientLinkManager;
5353
readonly ServiceBusDiagnosticSource diagnosticSource;
5454

5555
int prefetchCount;
5656
long lastPeekedSequenceNumber;
5757
MessageReceivePump receivePump;
5858
CancellationTokenSource receivePumpCancellationTokenSource;
59-
BatchMessageReceivePump batchReceivePump;
59+
MessageBatchReceivePump batchReceivePump;
6060
CancellationTokenSource batchReceivePumpCancellationTokenSource;
6161

6262
/// <summary>
@@ -195,7 +195,7 @@ internal MessageReceiver(
195195
this.requestResponseLockedMessages = new ConcurrentExpiringSet<Guid>();
196196
this.PrefetchCount = prefetchCount;
197197
this.messageReceivePumpSyncLock = new object();
198-
this.batchMessageReceivePumpSyncLock = new object();
198+
this.messageBatchReceivePumpSyncLock = new object();
199199
this.clientLinkManager = new ActiveClientLinkManager(this, this.CbsTokenProvider);
200200
this.diagnosticSource = new ServiceBusDiagnosticSource(entityPath, serviceBusConnection.Endpoint);
201201
MessagingEventSource.Log.MessageReceiverCreateStop(serviceBusConnection.Endpoint.Authority, entityPath, this.ClientId);
@@ -827,7 +827,7 @@ public async Task RenewLocksAsync(IEnumerable<Message> messages)
827827

828828
var lockTokens = messages.Select(x => x.SystemProperties.LockToken);
829829

830-
MessagingEventSource.Log.BatchMessageRenewLockStart(this.ClientId, messages.Count(), lockTokens);
830+
MessagingEventSource.Log.MessageBatchRenewLockStart(this.ClientId, messages.Count(), lockTokens);
831831
bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
832832
Activity activity = isDiagnosticSourceEnabled ? this.diagnosticSource.RenewLocksStart() : null;
833833

@@ -856,7 +856,7 @@ public async Task RenewLocksAsync(IEnumerable<Message> messages)
856856
{
857857
this.diagnosticSource.RenewLocksStop(activity, renewTask?.Status, lockedUntilUtc);
858858
}
859-
MessagingEventSource.Log.BatchMessageRenewLockStop(this.ClientId);
859+
MessagingEventSource.Log.MessageBatchRenewLockStop(this.ClientId);
860860

861861
}
862862

@@ -961,9 +961,9 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
961961
/// </summary>
962962
/// <param name="handler">A <see cref="Func{T1, T2, TResult}"/> that processes messages.</param>
963963
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is used to notify exceptions.</param>
964-
public void RegisterBatchMessageHandler(Func<IList<Message>, CancellationToken, Task> handler, Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
964+
public void RegisterMessageBatchHandler(Func<IList<Message>, CancellationToken, Task> handler, Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
965965
{
966-
this.RegisterBatchMessageHandler(handler, new BatchMessageHandlerOptions(exceptionReceivedHandler));
966+
this.RegisterMessageBatchHandler(handler, new MessageBatchHandlerOptions(exceptionReceivedHandler));
967967
}
968968

969969
/// <summary>
@@ -986,10 +986,10 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
986986
/// <param name="handler">A <see cref="Func{Message, CancellationToken, Task}"/> that processes messages.</param>
987987
/// <param name="messageHandlerOptions">The <see cref="MessageHandlerOptions"/> options used to configure the settings of the pump.</param>
988988
/// <remarks>Enable prefetch to speed up the receive rate.</remarks>
989-
public void RegisterBatchMessageHandler(Func<IList<Message>, CancellationToken, Task> handler, BatchMessageHandlerOptions messageHandlerOptions)
989+
public void RegisterMessageBatchHandler(Func<IList<Message>, CancellationToken, Task> handler, MessageBatchHandlerOptions messageHandlerOptions)
990990
{
991991
this.ThrowIfClosed();
992-
this.OnBatchMessageHandler(messageHandlerOptions, handler);
992+
this.OnMessageBatchHandler(messageHandlerOptions, handler);
993993
}
994994

995995
/// <summary>
@@ -1099,7 +1099,7 @@ protected override async Task OnClosingAsync()
10991099
this.receivePump = null;
11001100
}
11011101
}
1102-
lock (this.batchMessageReceivePumpSyncLock)
1102+
lock (this.messageBatchReceivePumpSyncLock)
11031103
{
11041104
if (this.batchReceivePump != null)
11051105
{
@@ -1410,21 +1410,21 @@ protected virtual async Task<DateTime> OnRenewLocksAsync(IEnumerable<string> loc
14101410
}
14111411

14121412
/// <summary> </summary>
1413-
protected virtual void OnBatchMessageHandler(
1414-
BatchMessageHandlerOptions registerHandlerOptions,
1413+
protected virtual void OnMessageBatchHandler(
1414+
MessageBatchHandlerOptions registerHandlerOptions,
14151415
Func<IList<Message>, CancellationToken, Task> callback)
14161416
{
1417-
MessagingEventSource.Log.RegisterOnBatchMessageHandlerStart(this.ClientId, registerHandlerOptions);
1417+
MessagingEventSource.Log.RegisterOnMessageBatchHandlerStart(this.ClientId, registerHandlerOptions);
14181418

1419-
lock (this.batchMessageReceivePumpSyncLock)
1419+
lock (this.messageBatchReceivePumpSyncLock)
14201420
{
14211421
if (this.batchReceivePump != null)
14221422
{
14231423
throw new InvalidOperationException(Resources.MessageHandlerAlreadyRegistered);
14241424
}
14251425

14261426
this.batchReceivePumpCancellationTokenSource = new CancellationTokenSource();
1427-
this.batchReceivePump = new BatchMessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.batchReceivePumpCancellationTokenSource.Token);
1427+
this.batchReceivePump = new MessageBatchReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.batchReceivePumpCancellationTokenSource.Token);
14281428
}
14291429

14301430
try
@@ -1434,7 +1434,7 @@ protected virtual void OnBatchMessageHandler(
14341434
catch (Exception exception)
14351435
{
14361436
MessagingEventSource.Log.RegisterOnMessageHandlerException(this.ClientId, exception);
1437-
lock (this.batchMessageReceivePumpSyncLock)
1437+
lock (this.messageBatchReceivePumpSyncLock)
14381438
{
14391439
if (this.batchReceivePump != null)
14401440
{

src/Microsoft.Azure.ServiceBus/BatchMessageHandlerOptions.cs renamed to src/Microsoft.Azure.ServiceBus/MessageBatchHandlerOptions.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@ namespace Microsoft.Azure.ServiceBus
1010
using System.Collections.Generic;
1111

1212
/// <summary>Provides options associated with message pump processing using
13-
/// <see cref="QueueClient.RegisterBatchMessageHandler(Func{IList{Message}, CancellationToken, Task}, BatchMessageHandlerOptions)" /> and
14-
/// <see cref="SubscriptionClient.RegisterBatchMessageHandler(Func{IList{Message}, CancellationToken, Task}, BatchMessageHandlerOptions)" />.</summary>
15-
public sealed class BatchMessageHandlerOptions
13+
/// <see cref="QueueClient.RegisterMessageBatchHandler(Func{IList{Message}, CancellationToken, Task}, MessageBatchHandlerOptions)" /> and
14+
/// <see cref="SubscriptionClient.RegisterMessageBatchHandler(Func{IList{Message}, CancellationToken, Task}, MessageBatchHandlerOptions)" />.</summary>
15+
public sealed class MessageBatchHandlerOptions
1616
{
1717
int maxConcurrentCalls;
1818
TimeSpan maxAutoRenewDuration;
1919
int maxMessageCount;
2020

21-
/// <summary>Initializes a new instance of the <see cref="BatchMessageHandlerOptions" /> class.
21+
/// <summary>Initializes a new instance of the <see cref="MessageBatchHandlerOptions" /> class.
2222
/// Default Values:
2323
/// <see cref="MaxConcurrentCalls"/> = 1
2424
/// <see cref="MaxMessageCount"/> = 1
@@ -28,7 +28,7 @@ public sealed class BatchMessageHandlerOptions
2828
/// </summary>
2929
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
3030
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
31-
public BatchMessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
31+
public MessageBatchHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
3232
{
3333
this.MaxConcurrentCalls = 1;
3434
this.MaxMessageCount = 1;

src/Microsoft.Azure.ServiceBus/BatchMessageReceivePump.cs renamed to src/Microsoft.Azure.ServiceBus/MessageBatchReceivePump.cs

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,25 @@ namespace Microsoft.Azure.ServiceBus
1212
using Core;
1313
using Primitives;
1414

15-
sealed class BatchMessageReceivePump
15+
sealed class MessageBatchReceivePump
1616
{
17-
readonly Func<IList<Message>, CancellationToken, Task> onBatchMessageCallback;
17+
readonly Func<IList<Message>, CancellationToken, Task> onMessageBatchCallback;
1818
readonly string endpoint;
19-
readonly BatchMessageHandlerOptions registerHandlerOptions;
19+
readonly MessageBatchHandlerOptions registerHandlerOptions;
2020
readonly MessageReceiver messageReceiver;
2121
readonly CancellationToken pumpCancellationToken;
2222
readonly SemaphoreSlim maxConcurrentCallsSemaphoreSlim;
2323
readonly ServiceBusDiagnosticSource diagnosticSource;
2424

25-
public BatchMessageReceivePump(MessageReceiver messageReceiver,
26-
BatchMessageHandlerOptions registerHandlerOptions,
25+
public MessageBatchReceivePump(MessageReceiver messageReceiver,
26+
MessageBatchHandlerOptions registerHandlerOptions,
2727
Func<IList<Message>, CancellationToken, Task> callback,
2828
Uri endpoint,
2929
CancellationToken pumpCancellationToken)
3030
{
3131
this.messageReceiver = messageReceiver ?? throw new ArgumentNullException(nameof(messageReceiver));
3232
this.registerHandlerOptions = registerHandlerOptions;
33-
this.onBatchMessageCallback = callback;
33+
this.onMessageBatchCallback = callback;
3434
this.endpoint = endpoint.Authority;
3535
this.pumpCancellationToken = pumpCancellationToken;
3636
this.maxConcurrentCallsSemaphoreSlim = new SemaphoreSlim(this.registerHandlerOptions.MaxConcurrentCalls);
@@ -39,7 +39,7 @@ public BatchMessageReceivePump(MessageReceiver messageReceiver,
3939

4040
public void StartPump()
4141
{
42-
TaskExtensionHelper.Schedule(() => this.BatchMessagePumpTaskAsync());
42+
TaskExtensionHelper.Schedule(() => this.MessageBatchPumpTaskAsync());
4343
}
4444

4545
bool ShouldRenewLock()
@@ -55,7 +55,7 @@ Task RaiseExceptionReceived(Exception e, string action)
5555
return this.registerHandlerOptions.RaiseExceptionReceived(eventArgs);
5656
}
5757

58-
async Task BatchMessagePumpTaskAsync()
58+
async Task MessageBatchPumpTaskAsync()
5959
{
6060
while (!this.pumpCancellationToken.IsCancellationRequested)
6161
{
@@ -71,11 +71,11 @@ async Task BatchMessagePumpTaskAsync()
7171
{
7272
if (ServiceBusDiagnosticSource.IsEnabled())
7373
{
74-
return this.BatchMessageDispatchTaskInstrumented(messages);
74+
return this.MessageBatchDispatchTaskInstrumented(messages);
7575
}
7676
else
7777
{
78-
return this.BatchMessageDispatchTask(messages);
78+
return this.MessageBatchDispatchTask(messages);
7979
}
8080
});
8181
}
@@ -101,13 +101,13 @@ async Task BatchMessagePumpTaskAsync()
101101
}
102102
}
103103

104-
async Task BatchMessageDispatchTaskInstrumented(IList<Message> messages)
104+
async Task MessageBatchDispatchTaskInstrumented(IList<Message> messages)
105105
{
106106
IEnumerable<Activity> activities = this.diagnosticSource.ProcessStart(messages);
107107
Task processTask = null;
108108
try
109109
{
110-
processTask = BatchMessageDispatchTask(messages);
110+
processTask = MessageBatchDispatchTask(messages);
111111
await processTask.ConfigureAwait(false);
112112
}
113113
catch (Exception e)
@@ -121,12 +121,12 @@ async Task BatchMessageDispatchTaskInstrumented(IList<Message> messages)
121121
}
122122
}
123123

124-
async Task BatchMessageDispatchTask(IList<Message> messages)
124+
async Task MessageBatchDispatchTask(IList<Message> messages)
125125
{
126126
CancellationTokenSource renewLockCancellationTokenSource = null;
127127
Timer autoRenewLockCancellationTimer = null;
128128

129-
MessagingEventSource.Log.BatchMessageReceiverPumpDispatchTaskStart(this.messageReceiver.ClientId, messages);
129+
MessagingEventSource.Log.MessageBatchReceiverPumpDispatchTaskStart(this.messageReceiver.ClientId, messages);
130130

131131
if (this.ShouldRenewLock())
132132
{
@@ -139,14 +139,14 @@ async Task BatchMessageDispatchTask(IList<Message> messages)
139139

140140
try
141141
{
142-
MessagingEventSource.Log.BatchMessageReceiverPumpUserCallbackStart(this.messageReceiver.ClientId);
143-
await this.onBatchMessageCallback(messages, this.pumpCancellationToken).ConfigureAwait(false);
142+
MessagingEventSource.Log.MessageBatchReceiverPumpUserCallbackStart(this.messageReceiver.ClientId);
143+
await this.onMessageBatchCallback(messages, this.pumpCancellationToken).ConfigureAwait(false);
144144

145-
MessagingEventSource.Log.BatchMessageReceiverPumpUserCallbackStop(this.messageReceiver.ClientId);
145+
MessagingEventSource.Log.MessageBatchReceiverPumpUserCallbackStop(this.messageReceiver.ClientId);
146146
}
147147
catch (Exception exception)
148148
{
149-
MessagingEventSource.Log.BatchMessageReceiverPumpUserCallbackException(this.messageReceiver.ClientId, exception);
149+
MessagingEventSource.Log.MessageBatchReceiverPumpUserCallbackException(this.messageReceiver.ClientId, exception);
150150
await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);
151151

152152
// Nothing much to do if UserCallback throws, Abandon message and Release semaphore.
@@ -175,7 +175,7 @@ async Task BatchMessageDispatchTask(IList<Message> messages)
175175
await this.CompleteMessagesIfNeededAsync(messages).ConfigureAwait(false);
176176
this.maxConcurrentCallsSemaphoreSlim.Release();
177177

178-
MessagingEventSource.Log.BatchMessageReceiverPumpDispatchTaskStop(this.messageReceiver.ClientId, this.maxConcurrentCallsSemaphoreSlim.CurrentCount);
178+
MessagingEventSource.Log.MessageBatchReceiverPumpDispatchTaskStop(this.messageReceiver.ClientId, this.maxConcurrentCallsSemaphoreSlim.CurrentCount);
179179
}
180180

181181
void CancelAutoRenewLock(object state)
@@ -232,7 +232,7 @@ async Task RenewMessageLockTask(IList<Message> messages, CancellationToken renew
232232
try
233233
{
234234
var amount = MessagingUtilities.CalculateRenewAfterDuration(messages.Last().SystemProperties.LockedUntilUtc);
235-
MessagingEventSource.Log.BatchMessageReceiverPumpRenewMessageStart(this.messageReceiver.ClientId, amount);
235+
MessagingEventSource.Log.MessageBatchReceiverPumpRenewMessageStart(this.messageReceiver.ClientId, amount);
236236

237237
// We're awaiting the task created by 'ContinueWith' to avoid awaiting the Delay task which may be canceled
238238
// by the renewLockCancellationToken. This way we prevent a TaskCanceledException.
@@ -248,7 +248,7 @@ async Task RenewMessageLockTask(IList<Message> messages, CancellationToken renew
248248
!renewLockCancellationToken.IsCancellationRequested)
249249
{
250250
await this.messageReceiver.RenewLocksAsync(messages).ConfigureAwait(false);
251-
MessagingEventSource.Log.BatchMessageReceiverPumpRenewMessageStop(this.messageReceiver.ClientId);
251+
MessagingEventSource.Log.MessageBatchReceiverPumpRenewMessageStop(this.messageReceiver.ClientId);
252252
}
253253
else
254254
{

0 commit comments

Comments
 (0)