Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit bf6acc9

Browse files
SeanFeldmannemakam
authored andcommitted
Keep delivery count as-is when peeking a message (#590)
Fixes #589 * Verification test * Differentiate between peek and receive operations * Indicate peek operation * Additional test case to verify behavior when message is received * Test delivery count on peek against real queue * Test delivery count on peeklock against real queue
1 parent e5552e8 commit bf6acc9

File tree

4 files changed

+84
-3
lines changed

4 files changed

+84
-3
lines changed

src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ public static AmqpMessage SBMessageToAmqpMessage(SBMessage sbMessage)
164164
return amqpMessage;
165165
}
166166

167-
public static SBMessage AmqpMessageToSBMessage(AmqpMessage amqpMessage)
167+
public static SBMessage AmqpMessageToSBMessage(AmqpMessage amqpMessage, bool isPeeked = false)
168168
{
169169
if (amqpMessage == null)
170170
{
@@ -229,7 +229,7 @@ public static SBMessage AmqpMessageToSBMessage(AmqpMessage amqpMessage)
229229

230230
if (amqpMessage.Header.DeliveryCount != null)
231231
{
232-
sbMessage.SystemProperties.DeliveryCount = (int)(amqpMessage.Header.DeliveryCount.Value + 1);
232+
sbMessage.SystemProperties.DeliveryCount = isPeeked ? (int)(amqpMessage.Header.DeliveryCount.Value) : (int)(amqpMessage.Header.DeliveryCount.Value + 1);
233233
}
234234
}
235235

src/Microsoft.Azure.ServiceBus/Core/MessageReceiver.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1102,7 +1102,7 @@ protected virtual async Task<IList<Message>> OnPeekAsync(long fromSequenceNumber
11021102
{
11031103
var payload = (ArraySegment<byte>)entry[ManagementConstants.Properties.Message];
11041104
var amqpMessage = AmqpMessage.CreateAmqpStreamMessage(new BufferListStream(new[] { payload }), true);
1105-
message = AmqpMessageConverter.AmqpMessageToSBMessage(amqpMessage);
1105+
message = AmqpMessageConverter.AmqpMessageToSBMessage(amqpMessage, true);
11061106
messages.Add(message);
11071107
}
11081108

test/Microsoft.Azure.ServiceBus.UnitTests/AmqpConverterTests.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,5 +131,39 @@ void SB_message_with_no_TTL_results_in_empty_Ampq_TTL()
131131

132132
Assert.Null(amqpMessage.Header.Ttl);
133133
}
134+
135+
[Fact]
136+
[DisplayTestMethodName]
137+
void When_message_is_peeked_should_have_delivery_count_set_to_zero()
138+
{
139+
var messageBody = Encoding.UTF8.GetBytes("message1");
140+
141+
var amqpValue = new AmqpValue();
142+
amqpValue.Value = new ArraySegment<byte>(messageBody);
143+
var amqpMessage = AmqpMessage.Create(amqpValue);
144+
amqpMessage.Header.DeliveryCount = 2;
145+
146+
var sbMessage = AmqpMessageConverter.AmqpMessageToSBMessage(amqpMessage, isPeeked: true);
147+
sbMessage.SystemProperties.SequenceNumber = 1L;
148+
149+
Assert.Equal(2, sbMessage.SystemProperties.DeliveryCount);
150+
}
151+
152+
[Fact]
153+
[DisplayTestMethodName]
154+
void When_message_is_received_should_have_delivery_count_increased()
155+
{
156+
var messageBody = Encoding.UTF8.GetBytes("message1");
157+
158+
var amqpValue = new AmqpValue();
159+
amqpValue.Value = new ArraySegment<byte>(messageBody);
160+
var amqpMessage = AmqpMessage.Create(amqpValue);
161+
amqpMessage.Header.DeliveryCount = 2;
162+
163+
var sbMessage = AmqpMessageConverter.AmqpMessageToSBMessage(amqpMessage, isPeeked: false);
164+
sbMessage.SystemProperties.SequenceNumber = 1L;
165+
166+
Assert.Equal(3, sbMessage.SystemProperties.DeliveryCount);
167+
}
134168
}
135169
}

test/Microsoft.Azure.ServiceBus.UnitTests/QueueClientTests.cs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
namespace Microsoft.Azure.ServiceBus.UnitTests
55
{
66
using System.Collections.Generic;
7+
using System.Linq;
78
using System.Threading.Tasks;
89
using Xunit;
910

@@ -31,6 +32,52 @@ async Task PeekLockTest(string queueName, int messageCount = 10)
3132
}
3233
}
3334

35+
[Theory]
36+
[MemberData(nameof(TestPermutations))]
37+
[DisplayTestMethodName]
38+
async Task PeekDeliveryCountTest(string queueName)
39+
{
40+
var queueClient = new QueueClient(TestUtility.NamespaceConnectionString, queueName);
41+
try
42+
{
43+
await TestUtility.SendMessagesAsync(queueClient.InnerSender, 1);
44+
45+
var message = await TestUtility.PeekMessageAsync(queueClient.InnerReceiver);
46+
47+
Assert.Equal(0, message.SystemProperties.DeliveryCount);
48+
}
49+
finally
50+
{
51+
var messageToDelete = await TestUtility.ReceiveMessagesAsync(queueClient.InnerReceiver, 1);
52+
await TestUtility.CompleteMessagesAsync(queueClient.InnerReceiver, messageToDelete);
53+
54+
await queueClient.CloseAsync();
55+
}
56+
}
57+
58+
[Theory]
59+
[MemberData(nameof(TestPermutations))]
60+
[DisplayTestMethodName]
61+
async Task PeekLockDeliveryCountTest(string queueName)
62+
{
63+
var queueClient = new QueueClient(TestUtility.NamespaceConnectionString, queueName);
64+
try
65+
{
66+
await TestUtility.SendMessagesAsync(queueClient.InnerSender, 1);
67+
68+
var messages = await TestUtility.ReceiveMessagesAsync(queueClient.InnerReceiver, 1);
69+
70+
await TestUtility.CompleteMessagesAsync(queueClient.InnerReceiver, messages);
71+
72+
Assert.Equal(1, messages.First().SystemProperties.DeliveryCount);
73+
}
74+
finally
75+
{
76+
await queueClient.CloseAsync();
77+
}
78+
}
79+
80+
3481
[Theory]
3582
[MemberData(nameof(TestPermutations))]
3683
[DisplayTestMethodName]

0 commit comments

Comments
 (0)