Skip to content

Commit 3fb80ad

Browse files
authored
Queue Listener fixes: Drain Mode and Message Deletion (Azure#47791)
* Update drainMode logic * Drain mode and shutdown cancellation * Add tests * Fix tests * Fix spacing * Update comment, add changes to changelog * Updates to tests * Update tests, add TesteQueueProcessor * Adding test constructor * Uncomment * revert constructor * Refactor tests * Revert eng * Update change log * Update change log * Fix changelog
1 parent 5eda9c7 commit 3fb80ad

File tree

4 files changed

+167
-2
lines changed

4 files changed

+167
-2
lines changed

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueListener.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,12 @@ public Task StartAsync(CancellationToken cancellationToken)
169169

170170
public async Task StopAsync(CancellationToken cancellationToken)
171171
{
172-
if (_drainModeManager?.IsDrainModeEnabled ?? false)
172+
if (!_drainModeManager?.IsDrainModeEnabled ?? true)
173173
{
174+
// Cancel the execution token when drain mode is not enabled or drain mode manager is not set.
174175
_executionCancellationTokenSource.Cancel();
175176
}
177+
176178
using (cancellationToken.Register(() => _shutdownCancellationTokenSource.Cancel()))
177179
{
178180
ThrowIfDisposed();
@@ -417,7 +419,7 @@ internal async Task ProcessMessageAsync(QueueMessage message, TimeSpan visibilit
417419
// Use a different cancellation token for shutdown to allow graceful shutdown.
418420
// Specifically, don't cancel the completion or update of the message itself during graceful shutdown.
419421
// Only cancel completion or update of the message if a non-graceful shutdown is requested via _shutdownCancellationTokenSource.
420-
await _queueProcessor.CompleteProcessingMessageAsync(message, result, linkedCts.Token).ConfigureAwait(false);
422+
await _queueProcessor.CompleteProcessingMessageAsync(message, result, _shutdownCancellationTokenSource.Token).ConfigureAwait(false);
421423
}
422424
}
423425
catch (TaskCanceledException)

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
### Breaking Changes
88

99
### Bugs Fixed
10+
- Fixed bug where calling StopAsync in QueueListener while drain mode was enabled would cancel the execution cancellation token.
11+
- Fixed bug where the cancellation token passed to QueueListener.ProcessMessageAsync was being propagated to the QueueProcessor.CompleteProcessingMessageAsync call. Since this token is always canceled when QueueListener.StopAsync is invoked, it caused messages to be processed but not deleted.
1012

1113
### Other Changes
1214

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/QueueListenerTests.cs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System;
55
using System.Collections.Generic;
66
using System.Linq;
7+
using System.Reflection;
78
using System.Threading;
89
using System.Threading.Tasks;
910
using Azure;
@@ -15,6 +16,7 @@
1516
using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners;
1617
using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Tests;
1718
using Microsoft.Azure.WebJobs.Extensions.Storage.Queues.Listeners;
19+
using Microsoft.Azure.WebJobs.Extensions.Storage.Queues.Tests;
1820
using Microsoft.Azure.WebJobs.Host;
1921
using Microsoft.Azure.WebJobs.Host.Executors;
2022
using Microsoft.Azure.WebJobs.Host.Protocols;
@@ -648,6 +650,136 @@ public async Task ProcessMessageAsync_FunctionInvocationFails()
648650
await _listener.ProcessMessageAsync(_queueMessage, TimeSpan.FromMinutes(10), cancellationToken);
649651
}
650652

653+
[Test]
654+
public async Task CompleteProcessingMessageAsync_PassesShutdownCancellationToken()
655+
{
656+
var systemShutdownCts = new CancellationTokenSource();
657+
QueueProcessorOptions options = new QueueProcessorOptions(_mockQueue.Object, _loggerFactory, _queuesOptions);
658+
var testQueueProcessor = new TestQueueProcessor(options);
659+
660+
_mockTriggerExecutor.Setup(x => x.ExecuteAsync(It.IsAny<QueueMessage>(), It.IsAny<CancellationToken>()))
661+
.ReturnsAsync(new FunctionResult(true));
662+
663+
var exceptionHandlerMock = new Mock<IWebJobsExceptionHandler>();
664+
665+
var listener = new QueueListener(
666+
_mockQueue.Object,
667+
null,
668+
_mockTriggerExecutor.Object,
669+
_mockExceptionDispatcher.Object,
670+
_loggerFactory,
671+
null,
672+
_queuesOptions,
673+
testQueueProcessor,
674+
new FunctionDescriptor { Id = TestFunctionId },
675+
null,
676+
drainModeManager: null);
677+
678+
SetCancellationToken(listener, systemShutdownCts, "_shutdownCancellationTokenSource");
679+
680+
// Act
681+
await listener.ProcessMessageAsync(_queueMessage, TimeSpan.FromMinutes(2), CancellationToken.None);
682+
683+
Assert.AreEqual(systemShutdownCts.Token, testQueueProcessor.CapturedDeleteToken);
684+
}
685+
686+
[Test]
687+
public async Task StopAsync_WhenDrainModeNotEnabled_ExecutionCancellationTokenIsCanceled()
688+
{
689+
var drainModeManagerMock = new Mock<IDrainModeManager>();
690+
drainModeManagerMock.Setup(d => d.IsDrainModeEnabled).Returns(false);
691+
var executionCancellationTokenSource = new CancellationTokenSource();
692+
693+
var listener = new QueueListener(
694+
_mockQueue.Object,
695+
null,
696+
_mockTriggerExecutor.Object,
697+
_mockExceptionDispatcher.Object,
698+
_loggerFactory,
699+
null,
700+
_queuesOptions,
701+
_mockQueueProcessor.Object,
702+
new FunctionDescriptor { Id = TestFunctionId },
703+
null,
704+
drainModeManager: drainModeManagerMock.Object);
705+
706+
SetCancellationToken(listener, executionCancellationTokenSource, "_executionCancellationTokenSource");
707+
708+
// Act
709+
await listener.StartAsync(CancellationToken.None);
710+
await listener.StopAsync(CancellationToken.None);
711+
712+
// Assert
713+
Assert.IsTrue(executionCancellationTokenSource.Token.IsCancellationRequested, "Execution token should be canceled when drain mode is not enabled.");
714+
}
715+
716+
[Test]
717+
public async Task StopAsync_WhenDrainModeEnabled_ExecutionCancellationTokenIsNotCanceled()
718+
{
719+
// Arrange
720+
var drainModeManagerMock = new Mock<IDrainModeManager>();
721+
drainModeManagerMock.Setup(d => d.IsDrainModeEnabled).Returns(true);
722+
var executionCancellationTokenSource = new CancellationTokenSource();
723+
724+
var listener = new QueueListener(
725+
_mockQueue.Object,
726+
null,
727+
_mockTriggerExecutor.Object,
728+
_mockExceptionDispatcher.Object,
729+
_loggerFactory,
730+
null,
731+
_queuesOptions,
732+
_mockQueueProcessor.Object,
733+
new FunctionDescriptor { Id = TestFunctionId },
734+
null,
735+
drainModeManager: drainModeManagerMock.Object);
736+
737+
SetCancellationToken(listener, executionCancellationTokenSource, "_executionCancellationTokenSource");
738+
739+
// Act
740+
await listener.StartAsync(CancellationToken.None);
741+
await listener.StopAsync(CancellationToken.None);
742+
743+
// Assert
744+
Assert.IsFalse(executionCancellationTokenSource.Token.IsCancellationRequested, "Execution token should not be canceled when drain mode is enabled.");
745+
}
746+
747+
[Test]
748+
public async Task StopAsync_ActivatesCancellation_WhenDrainModeManagerNull()
749+
{
750+
// Arrange
751+
var executionCancellationTokenSource = new CancellationTokenSource();
752+
753+
var listener = new QueueListener(
754+
_mockQueue.Object,
755+
null,
756+
_mockTriggerExecutor.Object,
757+
_mockExceptionDispatcher.Object,
758+
_loggerFactory,
759+
null,
760+
_queuesOptions,
761+
_mockQueueProcessor.Object,
762+
new FunctionDescriptor { Id = TestFunctionId },
763+
null,
764+
drainModeManager: null);
765+
766+
SetCancellationToken(listener, executionCancellationTokenSource, "_executionCancellationTokenSource");
767+
768+
// Act
769+
await listener.StartAsync(CancellationToken.None);
770+
771+
await listener.StopAsync(CancellationToken.None);
772+
773+
// Assert
774+
Assert.IsTrue(executionCancellationTokenSource.Token.IsCancellationRequested, "Execution token should be canceled when drain mode manager is null.");
775+
}
776+
777+
private static void SetCancellationToken(QueueListener listener, CancellationTokenSource cts, string fieldName)
778+
{
779+
var shutdownTokenField = typeof(QueueListener).GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Instance);
780+
shutdownTokenField.SetValue(listener, cts);
781+
}
782+
651783
[Test]
652784
public void Get_TargetScale_IsNotNull()
653785
{
@@ -676,6 +808,7 @@ public void Get_TargetScale_IsNotNull()
676808
var result = localListener.GetTargetScaler();
677809
Assert.IsNotNull(result);
678810
}
811+
679812
public class TestFixture : IDisposable
680813
{
681814
private const string TestQueuePrefix = "queuelistenertests";
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Azure.Storage.Queues.Models;
7+
using Microsoft.Azure.WebJobs.Host.Queues;
8+
9+
namespace Microsoft.Azure.WebJobs.Extensions.Storage.Queues.Tests
10+
{
11+
public class TestQueueProcessor : QueueProcessor
12+
{
13+
public bool DeleteCalled { get; private set; }
14+
public CancellationToken CapturedDeleteToken { get; private set; }
15+
16+
public TestQueueProcessor(QueueProcessorOptions options)
17+
: base(options)
18+
{
19+
}
20+
21+
protected override async Task DeleteMessageAsync(QueueMessage message, CancellationToken cancellationToken)
22+
{
23+
DeleteCalled = true;
24+
CapturedDeleteToken = cancellationToken; // store the exact token used
25+
await base.DeleteMessageAsync(message, cancellationToken);
26+
}
27+
}
28+
}

0 commit comments

Comments
 (0)