Skip to content

Commit bfd1000

Browse files
Allow manual lock renewal from processor and function extension (Azure#27308)
* Allow manual lock renewal from processor and function extension * Fix * Remove lockeduntil because it can change * Fix
1 parent 99f12f4 commit bfd1000

13 files changed

+199
-9
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public ProcessMessageEventArgs(Azure.Messaging.ServiceBus.ServiceBusReceivedMess
3030
public virtual System.Threading.Tasks.Task DeadLetterMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
3131
public virtual System.Threading.Tasks.Task DeadLetterMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, string deadLetterReason, string deadLetterErrorDescription = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
3232
public virtual System.Threading.Tasks.Task DeferMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
33+
public virtual System.Threading.Tasks.Task RenewMessageLockAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
3334
}
3435
public partial class ProcessSessionEventArgs : System.EventArgs
3536
{
@@ -54,6 +55,7 @@ public ProcessSessionMessageEventArgs(Azure.Messaging.ServiceBus.ServiceBusRecei
5455
public virtual System.Threading.Tasks.Task DeferMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
5556
public virtual System.Threading.Tasks.Task<System.BinaryData> GetSessionStateAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
5657
public virtual void ReleaseSession() { }
58+
public virtual System.Threading.Tasks.Task RenewSessionLockAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
5759
public virtual System.Threading.Tasks.Task SetSessionStateAsync(System.BinaryData sessionState, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
5860
}
5961
public partial class ServiceBusClient : System.IAsyncDisposable

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessMessageEventArgs.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,5 +109,13 @@ await _receiver.DeferMessageAsync(
109109
.ConfigureAwait(false);
110110
message.IsSettled = true;
111111
}
112+
113+
///<inheritdoc cref="ServiceBusReceiver.RenewMessageLockAsync(ServiceBusReceivedMessage, CancellationToken)"/>
114+
public virtual async Task RenewMessageLockAsync(
115+
ServiceBusReceivedMessage message,
116+
CancellationToken cancellationToken = default)
117+
{
118+
await _receiver.RenewMessageLockAsync(message, cancellationToken).ConfigureAwait(false);
119+
}
112120
}
113121
}

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionMessageEventArgs.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,5 +160,11 @@ await _sessionReceiver.DeferMessageAsync(
160160
/// </summary>
161161
public virtual void ReleaseSession() =>
162162
_receiverManager.CancelSession();
163+
164+
///<inheritdoc cref="ServiceBusSessionReceiver.RenewSessionLockAsync(CancellationToken)"/>
165+
public virtual async Task RenewSessionLockAsync(CancellationToken cancellationToken = default)
166+
{
167+
await _sessionReceiver.RenewSessionLockAsync(cancellationToken).ConfigureAwait(false);
168+
}
163169
}
164170
}

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -876,6 +876,48 @@ async Task ProcessMessage(ProcessMessageEventArgs args)
876876
}
877877
}
878878

879+
[Test]
880+
public async Task CanManuallyRenewMessageLock()
881+
{
882+
var lockDuration = ShortLockDuration;
883+
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false, lockDuration: lockDuration))
884+
{
885+
await using var client = CreateClient();
886+
var sender = client.CreateSender(scope.QueueName);
887+
int messageCount = 10;
888+
889+
await sender.SendMessagesAsync(ServiceBusTestUtilities.GetMessages(messageCount));
890+
891+
await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions
892+
{
893+
MaxConcurrentCalls = 10,
894+
MaxAutoLockRenewalDuration = TimeSpan.Zero
895+
});
896+
897+
int receivedCount = 0;
898+
var tcs = new TaskCompletionSource<bool>();
899+
900+
async Task ProcessMessage(ProcessMessageEventArgs args)
901+
{
902+
var count = Interlocked.Increment(ref receivedCount);
903+
if (count == messageCount)
904+
{
905+
tcs.SetResult(true);
906+
}
907+
908+
var initialLockedUntil = args.Message.LockedUntil;
909+
await args.RenewMessageLockAsync(args.Message);
910+
Assert.Greater(args.Message.LockedUntil, initialLockedUntil);
911+
}
912+
processor.ProcessMessageAsync += ProcessMessage;
913+
processor.ProcessErrorAsync += ServiceBusTestUtilities.ExceptionHandler;
914+
915+
await processor.StartProcessingAsync();
916+
await tcs.Task;
917+
await processor.StopProcessingAsync();
918+
}
919+
}
920+
879921
[Test]
880922
public async Task CanUpdateConcurrency()
881923
{

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,51 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args)
660660
}
661661
}
662662

663+
[Test]
664+
public async Task CanManuallyRenewSessionLock()
665+
{
666+
var lockDuration = ShortLockDuration;
667+
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true, lockDuration: lockDuration))
668+
{
669+
await using var client = CreateClient();
670+
var sender = client.CreateSender(scope.QueueName);
671+
int messageCount = 10;
672+
673+
for (int i = 0; i < messageCount; i++)
674+
{
675+
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage(Guid.NewGuid().ToString()));
676+
}
677+
678+
await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
679+
{
680+
MaxConcurrentSessions = messageCount,
681+
MaxAutoLockRenewalDuration = TimeSpan.Zero
682+
});
683+
684+
int receivedCount = 0;
685+
var tcs = new TaskCompletionSource<bool>();
686+
687+
async Task ProcessMessage(ProcessSessionMessageEventArgs args)
688+
{
689+
var count = Interlocked.Increment(ref receivedCount);
690+
if (count == messageCount)
691+
{
692+
tcs.SetResult(true);
693+
}
694+
695+
var initialLockedUntil = args.SessionLockedUntil;
696+
await args.RenewSessionLockAsync();
697+
Assert.Greater(args.SessionLockedUntil, initialLockedUntil);
698+
}
699+
processor.ProcessMessageAsync += ProcessMessage;
700+
processor.ProcessErrorAsync += ServiceBusTestUtilities.ExceptionHandler;
701+
702+
await processor.StartProcessingAsync();
703+
await tcs.Task;
704+
await processor.StopProcessingAsync();
705+
}
706+
}
707+
663708
[Test]
664709
[TestCase(1, 0)]
665710
[TestCase(5, 0)]

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/api/Microsoft.Azure.WebJobs.Extensions.ServiceBus.netstandard2.0.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ protected ServiceBusMessageActions() { }
6868
public virtual System.Threading.Tasks.Task DeadLetterMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
6969
public virtual System.Threading.Tasks.Task DeadLetterMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, string deadLetterReason, string deadLetterErrorDescription = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
7070
public virtual System.Threading.Tasks.Task DeferMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
71+
public virtual System.Threading.Tasks.Task RenewMessageLockAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
7172
}
7273
public partial class ServiceBusOptions : Microsoft.Azure.WebJobs.Hosting.IOptionsFormatter
7374
{
@@ -90,8 +91,10 @@ public ServiceBusOptions() { }
9091
public partial class ServiceBusSessionMessageActions : Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions
9192
{
9293
protected ServiceBusSessionMessageActions() { }
94+
public System.DateTimeOffset SessionLockedUntil { get { throw null; } }
9395
public virtual System.Threading.Tasks.Task<System.BinaryData> GetSessionStateAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
9496
public virtual void ReleaseSession() { }
97+
public virtual System.Threading.Tasks.Task RenewSessionLockAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
9598
public virtual System.Threading.Tasks.Task SetSessionStateAsync(System.BinaryData sessionState, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
9699
}
97100
public partial class SessionMessageProcessor

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
<PackageReference Include="Microsoft.Azure.WebJobs.Sources" PrivateAssets="All" />
1717
<PackageReference Include="Microsoft.Azure.WebJobs" />
1818
<PackageReference Include="Microsoft.Extensions.Azure" />
19-
<PackageReference Include="Azure.Messaging.ServiceBus" />
19+
<!-- <PackageReference Include="Azure.Messaging.ServiceBus" />-->
2020
</ItemGroup>
2121

2222
<ItemGroup>
@@ -29,4 +29,8 @@
2929
<Compile Include="..\..\Azure.Messaging.ServiceBus\src\Resources.Designer.cs" LinkBase="Shared" />
3030
<Compile Include="..\..\..\extensions\Microsoft.Azure.WebJobs.Extensions.Clients\src\Shared\WebJobsConfigurationExtensions.cs" LinkBase="Shared" />
3131
</ItemGroup>
32+
33+
<ItemGroup>
34+
<ProjectReference Include="..\..\Azure.Messaging.ServiceBus\src\Azure.Messaging.ServiceBus.csproj" />
35+
</ItemGroup>
3236
</Project>

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/ServiceBusMessageActions.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
33

4+
using System;
45
using System.Collections.Concurrent;
56
using Azure.Messaging.ServiceBus;
67
using System.Collections.Generic;
@@ -195,6 +196,31 @@ await _sessionEventArgs.DeferMessageAsync(
195196
TrackMessageAsSettled(message);
196197
}
197198

199+
///<inheritdoc cref="ServiceBusReceiver.RenewMessageLockAsync(ServiceBusReceivedMessage, CancellationToken)"/>
200+
public virtual async Task RenewMessageLockAsync(
201+
ServiceBusReceivedMessage message,
202+
CancellationToken cancellationToken = default)
203+
{
204+
if (_receiver is ServiceBusSessionReceiver || _sessionEventArgs != null)
205+
{
206+
throw new InvalidOperationException(Resources.CannotLockMessageOnSessionEntity);
207+
}
208+
if (_receiver != null)
209+
{
210+
await _receiver.RenewMessageLockAsync(
211+
message,
212+
cancellationToken)
213+
.ConfigureAwait(false);
214+
}
215+
else
216+
{
217+
await _eventArgs.RenewMessageLockAsync(
218+
message,
219+
cancellationToken)
220+
.ConfigureAwait(false);
221+
}
222+
}
223+
198224
private void TrackMessageAsSettled(ServiceBusReceivedMessage message)
199225
=> SettledMessages[message] = 0;
200226
}

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/ServiceBusSessionMessageActions.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ public class ServiceBusSessionMessageActions : ServiceBusMessageActions
1919

2020
internal bool ShouldReleaseSession { get; set; }
2121

22+
/// <inheritdoc cref="ServiceBusSessionReceiver.SessionLockedUntil"/>
23+
public DateTimeOffset SessionLockedUntil => _eventArgs?.SessionLockedUntil ?? _receiver.SessionLockedUntil;
24+
2225
internal ServiceBusSessionMessageActions(ProcessSessionMessageEventArgs eventArgs) : base(eventArgs)
2326
{
2427
_eventArgs = eventArgs;
@@ -74,5 +77,22 @@ public virtual void ReleaseSession()
7477
{
7578
ShouldReleaseSession = true;
7679
}
80+
81+
///<inheritdoc cref="ServiceBusSessionReceiver.RenewSessionLockAsync(CancellationToken)"/>
82+
public virtual async Task RenewSessionLockAsync(CancellationToken cancellationToken = default)
83+
{
84+
if (_receiver != null)
85+
{
86+
await _receiver.RenewSessionLockAsync(
87+
cancellationToken)
88+
.ConfigureAwait(false);
89+
}
90+
else
91+
{
92+
await _eventArgs.RenewSessionLockAsync(
93+
cancellationToken)
94+
.ConfigureAwait(false);
95+
}
96+
}
7797
}
7898
}

sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Triggers/ServiceBusTriggerBindingStrategy.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,11 @@ public Dictionary<string, object> GetBindingData(ServiceBusTriggerInput value)
5252
}
5353

5454
var bindingData = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase);
55-
// TODO - investigate why the parameter names need to be hard-coded here but they are not
56-
// for binding to sender
55+
56+
// Support MessageReceiver and MessageSession parameters for backcompat
5757
SafeAddValue(() => bindingData.Add("MessageReceiver", value.MessageActions));
5858
SafeAddValue(() => bindingData.Add("MessageSession", value.MessageActions));
59+
5960
SafeAddValue(() => bindingData.Add("MessageActions", value.MessageActions));
6061
SafeAddValue(() => bindingData.Add("SessionActions", value.MessageActions));
6162
SafeAddValue(() => bindingData.Add("Client", value.Client));
@@ -79,7 +80,9 @@ public Dictionary<string, Type> GetBindingContract(bool isSingleDispatch = true)
7980
AddBindingContractMember(contract, "DeadLetterSource", typeof(string), isSingleDispatch);
8081
AddBindingContractMember(contract, "LockToken", typeof(string), isSingleDispatch);
8182
AddBindingContractMember(contract, "ExpiresAtUtc", typeof(DateTime), isSingleDispatch);
83+
AddBindingContractMember(contract, "ExpiresAt", typeof(DateTimeOffset), isSingleDispatch);
8284
AddBindingContractMember(contract, "EnqueuedTimeUtc", typeof(DateTime), isSingleDispatch);
85+
AddBindingContractMember(contract, "EnqueuedTime", typeof(DateTimeOffset), isSingleDispatch);
8386
AddBindingContractMember(contract, "MessageId", typeof(string), isSingleDispatch);
8487
AddBindingContractMember(contract, "ContentType", typeof(string), isSingleDispatch);
8588
AddBindingContractMember(contract, "ReplyTo", typeof(string), isSingleDispatch);
@@ -92,6 +95,7 @@ public Dictionary<string, Type> GetBindingContract(bool isSingleDispatch = true)
9295
AddBindingContractMember(contract, "ApplicationProperties", typeof(IDictionary<string, object>), isSingleDispatch);
9396
// for backcompat
9497
AddBindingContractMember(contract, "UserProperties", typeof(IDictionary<string, object>), isSingleDispatch);
98+
9599
contract.Add("MessageReceiver", typeof(ServiceBusMessageActions));
96100
contract.Add("MessageSession", typeof(ServiceBusSessionMessageActions));
97101
contract.Add("MessageActions", typeof(ServiceBusMessageActions));
@@ -107,7 +111,9 @@ internal static void AddBindingData(Dictionary<string, object> bindingData, Serv
107111
var deadLetterSources = new string[length];
108112
var lockTokens = new string[length];
109113
var expiresAtUtcs = new DateTime[length];
114+
var expiresAt = new DateTimeOffset[length];
110115
var enqueuedTimeUtcs = new DateTime[length];
116+
var enqueuedTimes = new DateTimeOffset[length];
111117
var messageIds = new string[length];
112118
var contentTypes = new string[length];
113119
var replyTos = new string[length];
@@ -121,7 +127,9 @@ internal static void AddBindingData(Dictionary<string, object> bindingData, Serv
121127
SafeAddValue(() => bindingData.Add("DeadLetterSourceArray", deadLetterSources));
122128
SafeAddValue(() => bindingData.Add("LockTokenArray", lockTokens));
123129
SafeAddValue(() => bindingData.Add("ExpiresAtUtcArray", expiresAtUtcs));
130+
SafeAddValue(() => bindingData.Add("ExpiresAtArray", expiresAt));
124131
SafeAddValue(() => bindingData.Add("EnqueuedTimeUtcArray", enqueuedTimeUtcs));
132+
SafeAddValue(() => bindingData.Add("EnqueuedTimeArray", enqueuedTimes));
125133
SafeAddValue(() => bindingData.Add("MessageIdArray", messageIds));
126134
SafeAddValue(() => bindingData.Add("ContentTypeArray", contentTypes));
127135
SafeAddValue(() => bindingData.Add("ReplyToArray", replyTos));
@@ -140,7 +148,9 @@ internal static void AddBindingData(Dictionary<string, object> bindingData, Serv
140148
deadLetterSources[i] = messages[i].DeadLetterSource;
141149
lockTokens[i] = messages[i].LockToken;
142150
expiresAtUtcs[i] = messages[i].ExpiresAt.DateTime;
151+
expiresAt[i] = messages[i].ExpiresAt;
143152
enqueuedTimeUtcs[i] = messages[i].EnqueuedTime.DateTime;
153+
enqueuedTimes[i] = messages[i].EnqueuedTime;
144154
messageIds[i] = messages[i].MessageId;
145155
contentTypes[i] = messages[i].ContentType;
146156
replyTos[i] = messages[i].ReplyTo;
@@ -157,8 +167,12 @@ private static void AddBindingData(Dictionary<string, object> bindingData, Servi
157167
SafeAddValue(() => bindingData.Add(nameof(value.DeliveryCount), value.DeliveryCount));
158168
SafeAddValue(() => bindingData.Add(nameof(value.DeadLetterSource), value.DeadLetterSource));
159169
SafeAddValue(() => bindingData.Add(nameof(value.LockToken), value.LockToken));
170+
// for backcompat
160171
SafeAddValue(() => bindingData.Add("ExpiresAtUtc", value.ExpiresAt.DateTime));
172+
SafeAddValue(() => bindingData.Add(nameof(value.ExpiresAt), value.ExpiresAt));
173+
// for backcompat
161174
SafeAddValue(() => bindingData.Add("EnqueuedTimeUtc", value.EnqueuedTime.DateTime));
175+
SafeAddValue(() => bindingData.Add(nameof(value.EnqueuedTime), value.EnqueuedTime));
162176
SafeAddValue(() => bindingData.Add(nameof(value.MessageId), value.MessageId));
163177
SafeAddValue(() => bindingData.Add(nameof(value.ContentType), value.ContentType));
164178
SafeAddValue(() => bindingData.Add(nameof(value.ReplyTo), value.ReplyTo));

0 commit comments

Comments
 (0)