Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit b72d875

Browse files
committed
Validate received messages cannot be added to Batch
implemented
1 parent 31eede4 commit b72d875

File tree

4 files changed

+42
-23
lines changed

4 files changed

+42
-23
lines changed

src/Microsoft.Azure.ServiceBus/Core/Batch.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Microsoft.Azure.Amqp;
55
using Microsoft.Azure.Amqp.Framing;
66
using Microsoft.Azure.ServiceBus.Amqp;
7+
using Microsoft.Azure.ServiceBus.Diagnostics;
78

89
namespace Microsoft.Azure.ServiceBus.Core
910
{
@@ -36,6 +37,8 @@ public bool TryAdd(Message message)
3637
{
3738
ThrowIfDisposed();
3839

40+
message.VerifyMessageIsNotPreviouslyReceived();
41+
3942
var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message);
4043

4144
if (firstMessage == null)

src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright (c) Microsoft. All rights reserved.
22
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
33

4+
using Microsoft.Azure.ServiceBus.Diagnostics;
5+
46
namespace Microsoft.Azure.ServiceBus.Core
57
{
68
using System;
@@ -16,6 +18,7 @@ namespace Microsoft.Azure.ServiceBus.Core
1618
using Microsoft.Azure.ServiceBus.Amqp;
1719
using Microsoft.Azure.ServiceBus.Primitives;
1820

21+
1922
/// <summary>
2023
/// The MessageSender can be used to send messages to Queues or Topics.
2124
/// </summary>
@@ -236,7 +239,7 @@ public async Task SendAsync(IList<Message> messageList)
236239
{
237240
this.ThrowIfClosed();
238241

239-
var count = MessageSender.ValidateMessages(messageList);
242+
var count = MessageSender.VerifyMessagesAreNotPreviouslyReceived(messageList);
240243
MessagingEventSource.Log.MessageSendStart(this.ClientId, count);
241244

242245
bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
@@ -275,7 +278,6 @@ public async Task SendAsync(Batch batch)
275278
{
276279
this.ThrowIfClosed();
277280

278-
//var count = MessageSender.ValidateMessages(messageList);
279281
// MessagingEventSource.Log.MessageSendStart(this.ClientId, count);
280282

281283
// var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
@@ -335,7 +337,7 @@ public async Task<long> ScheduleMessageAsync(Message message, DateTimeOffset sch
335337
}
336338

337339
message.ScheduledEnqueueTimeUtc = scheduleEnqueueTimeUtc.UtcDateTime;
338-
MessageSender.ValidateMessage(message);
340+
message.VerifyMessageIsNotPreviouslyReceived();
339341
MessagingEventSource.Log.ScheduleMessageStart(this.ClientId, scheduleEnqueueTimeUtc);
340342
long result = 0;
341343

@@ -484,7 +486,7 @@ protected override async Task OnClosingAsync()
484486
await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false);
485487
}
486488

487-
static int ValidateMessages(IList<Message> messageList)
489+
static int VerifyMessagesAreNotPreviouslyReceived(IList<Message> messageList)
488490
{
489491
var count = 0;
490492
if (messageList == null)
@@ -495,20 +497,12 @@ static int ValidateMessages(IList<Message> messageList)
495497
foreach (var message in messageList)
496498
{
497499
count++;
498-
ValidateMessage(message);
500+
message.VerifyMessageIsNotPreviouslyReceived();
499501
}
500502

501503
return count;
502504
}
503505

504-
static void ValidateMessage(Message message)
505-
{
506-
if (message.SystemProperties.IsLockTokenSet)
507-
{
508-
throw Fx.Exception.Argument(nameof(message), "Cannot send a message that was already received.");
509-
}
510-
}
511-
512506
static void CloseSession(SendingAmqpLink link)
513507
{
514508
// Note we close the session (which includes the link).

src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ namespace Microsoft.Azure.ServiceBus.Diagnostics
66
using System;
77
using System.Collections.Generic;
88
using System.Diagnostics;
9+
using Microsoft.Azure.ServiceBus.Primitives;
910

1011
public static class MessageExtensions
1112
{
@@ -15,20 +16,20 @@ public static class MessageExtensions
1516
/// <returns>New <see cref="Activity"/> with tracing context</returns>
1617
/// </summary>
1718
/// <remarks>
18-
/// Tracing context is used to correlate telemetry between producer and consumer and
19+
/// Tracing context is used to correlate telemetry between producer and consumer and
1920
/// represented by 'Diagnostic-Id' and 'Correlation-Context' properties in <see cref="Message.UserProperties"/>.
20-
///
21+
///
2122
/// .NET SDK automatically injects context when sending message to the ServiceBus (if diagnostics is enabled by tracing system).
22-
///
23+
///
2324
/// <para>
2425
/// 'Diagnostic-Id' uniquely identifies operation that enqueued message
2526
/// </para>
2627
/// <para>
2728
/// 'Correlation-Context' is comma separated list of sting key value pairs represeting optional context for the operation.
2829
/// </para>
29-
///
30+
///
3031
/// If there is no tracing context in the message, this method returns <see cref="Activity"/> without parent.
31-
///
32+
///
3233
/// Returned <see cref="Activity"/> needs to be started before it can be used (see example below)
3334
/// </remarks>
3435
/// <example>
@@ -39,26 +40,26 @@ public static class MessageExtensions
3940
/// var activity = message.ExtractActivity();
4041
/// activity.Start();
4142
/// Logger.LogInformation($"Message received, Id = {Activity.Current.Id}")
42-
/// try
43+
/// try
4344
/// {
4445
/// // process message
4546
/// }
4647
/// catch (Exception ex)
4748
/// {
4849
/// Logger.LogError($"Exception {ex}, Id = {Activity.Current.Id}")
4950
/// }
50-
/// finally
51+
/// finally
5152
/// {
5253
/// activity.Stop();
5354
/// // Activity is stopped, we no longer have it in Activity.Current, let's user activity now
5455
/// Logger.LogInformation($"Message processed, Id = {activity.Id}, Duration = {activity.Duration}")
5556
/// }
5657
/// }
5758
/// </code>
58-
///
59-
/// Note that every log is stamped with <see cref="Activity.Current"/>.Id, that could be used within
59+
///
60+
/// Note that every log is stamped with <see cref="Activity.Current"/>.Id, that could be used within
6061
/// any nested method call (sync or async) - <see cref="Activity.Current"/> is an ambient context that flows with async method calls.
61-
///
62+
///
6263
/// </example>
6364

6465
public static Activity ExtractActivity(this Message message, string activityName = null)
@@ -149,5 +150,14 @@ internal static bool TryExtractContext(this Message message, out IList<KeyValueP
149150
}
150151
return false;
151152
}
153+
154+
155+
internal static void VerifyMessageIsNotPreviouslyReceived(this Message message)
156+
{
157+
if (message.SystemProperties.IsLockTokenSet)
158+
{
159+
throw Fx.Exception.Argument(nameof(message), "Cannot send a message that was already received.");
160+
}
161+
}
152162
}
153163
}

test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,17 @@ public void Should_throw_if_batch_disposed()
2929
Assert.Throws<Exception>(() => batch.TryAdd(new Message()));
3030
}
3131
}
32+
33+
[Fact]
34+
public void Should_throw_when_trying_add_received_message_to_batch()
35+
{
36+
using (var batch = new Batch(100))
37+
{
38+
var message = new Message("test".GetBytes());
39+
message.SystemProperties.LockTokenGuid = Guid.NewGuid();
40+
41+
Assert.Throws<ArgumentException>(() => batch.TryAdd(message));
42+
}
43+
}
3244
}
3345
}

0 commit comments

Comments
 (0)