Skip to content

Commit 8e5e630

Browse files
authored
[Storage] [WebJobs] LogScan to properly use the target BlobServiceClient when specified (#53464)
* WIP - builds - Added target blobserviceclient to IBlobListenerStrategy.RegisterAsync; Added catch for possible permissions failure to default to primary * Minor cleanup * Remove unused method * PR feedback - change back strategy to take in 1 blob client; Only log when target account does not have permissions * Remove unnecessary changes * Change Register Shared BLob Listener to use primary client for receipts and target client for the strategy * Removed SAS off Uri from being logged; Added another Authorization error code * WIP - adding unit tests * Add test to ensure the correct client is being passed to strategy and executor * cleanup of unnecessary setup * Update Changelog * PR comments * PR Comment part 2
1 parent 4a1154c commit 8e5e630

File tree

8 files changed

+311
-10
lines changed

8 files changed

+311
-10
lines changed

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
### Breaking Changes
88

99
### Bugs Fixed
10+
- Bug fix ensuring that BlobTrigger log scan targets the correct storage account in multi-account scenarios.
1011

1112
### Other Changes
1213

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobListenerFactory.cs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,13 @@ public async Task<IListener> CreateAsync(CancellationToken cancellationToken)
107107
new SharedBlobListenerFactory(hostId, _hostBlobServiceClient, _exceptionHandler, _blobWrittenWatcherSetter, _loggerFactory.CreateLogger<BlobListener>()));
108108

109109
// Register the blob container we wish to monitor with the shared blob listener.
110-
await RegisterWithSharedBlobListenerAsync(hostId, sharedBlobListener, primaryBlobClient,
111-
blobTriggerQueueWriter, cancellationToken).ConfigureAwait(false);
110+
await RegisterWithSharedBlobListenerAsync(
111+
hostId,
112+
sharedBlobListener,
113+
primaryBlobClient,
114+
targetBlobClient,
115+
blobTriggerQueueWriter,
116+
cancellationToken).ConfigureAwait(false);
112117
}
113118

114119
// Create a "bridge" listener that will monitor the blob
@@ -165,14 +170,24 @@ await RegisterWithSharedBlobListenerAsync(hostId, sharedBlobListener, primaryBlo
165170
private async Task RegisterWithSharedBlobListenerAsync(
166171
string hostId,
167172
SharedBlobListener sharedBlobListener,
168-
BlobServiceClient blobClient,
173+
BlobServiceClient primaryBlobClient,
174+
BlobServiceClient targetBlobClient,
169175
BlobTriggerQueueWriter blobTriggerQueueWriter,
170176
CancellationToken cancellationToken)
171177
{
172-
BlobTriggerExecutor triggerExecutor = new BlobTriggerExecutor(hostId, _functionDescriptor, _input, new BlobReceiptManager(blobClient),
173-
blobTriggerQueueWriter, _loggerFactory.CreateLogger<BlobListener>());
174-
175-
await sharedBlobListener.RegisterAsync(blobClient, _container, triggerExecutor, cancellationToken).ConfigureAwait(false);
178+
BlobTriggerExecutor triggerExecutor = new BlobTriggerExecutor(
179+
hostId,
180+
_functionDescriptor,
181+
_input,
182+
new BlobReceiptManager(primaryBlobClient),
183+
blobTriggerQueueWriter,
184+
_loggerFactory.CreateLogger<BlobListener>());
185+
186+
await sharedBlobListener.RegisterAsync(
187+
targetBlobClient,
188+
_container,
189+
triggerExecutor,
190+
cancellationToken).ConfigureAwait(false);
176191
}
177192

178193
private void RegisterWithSharedBlobQueueListenerAsync(

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/PollLogsStrategy.Logger.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,15 @@ private class Logger
1616
LoggerMessage.Define<string, string, int>(LogLevel.Debug, new EventId(300, nameof(ScanBlobLogs)),
1717
"Log scan for recent blob updates in container '{containerName}' with PollId '{pollId}' found {blobCount} blobs.");
1818

19+
private static readonly Action<ILogger<BlobListener>, string, Exception> _loggingNotEnabledOnTargetAccount =
20+
LoggerMessage.Define<string>(LogLevel.Warning, new EventId(400, nameof(LoggingNotEnabledOnTargetAccount)),
21+
"Storage Analytics Logs are not enabled on the target blob storage account: '{targetAccountName}'. See aka.ms/AAywxhb");
22+
1923
public static void ScanBlobLogs(ILogger<BlobListener> logger, string containerName, string pollId, int blobCount) =>
2024
_scanBlobLogs(logger, containerName, pollId, blobCount, null);
25+
26+
public static void LoggingNotEnabledOnTargetAccount(ILogger<BlobListener> logger, string targetAccountName) =>
27+
_loggingNotEnabledOnTargetAccount(logger, targetAccountName, null);
2128
}
2229
}
2330
}

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/PollLogsStrategy.cs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ public PollLogsStrategy(IWebJobsExceptionHandler exceptionHandler, ILogger<BlobL
4949
_exceptionHandler = exceptionHandler ?? throw new ArgumentNullException(nameof(exceptionHandler));
5050
}
5151

52-
public async Task RegisterAsync(BlobServiceClient blobServiceClient, BlobContainerClient container, ITriggerExecutor<BlobTriggerExecutorContext> triggerExecutor,
52+
public async Task RegisterAsync(
53+
BlobServiceClient blobServiceClient,
54+
BlobContainerClient container,
55+
ITriggerExecutor<BlobTriggerExecutorContext> triggerExecutor,
5356
CancellationToken cancellationToken)
5457
{
5558
ThrowIfDisposed();
@@ -78,8 +81,18 @@ public async Task RegisterAsync(BlobServiceClient blobServiceClient, BlobContain
7881

7982
if (!_logListeners.ContainsKey(blobServiceClient))
8083
{
81-
BlobLogListener logListener = await BlobLogListener.CreateAsync(blobServiceClient, _logger, cancellationToken).ConfigureAwait(false);
82-
_logListeners.Add(blobServiceClient, logListener);
84+
try
85+
{
86+
BlobLogListener logListener = await BlobLogListener.CreateAsync(blobServiceClient, _logger, cancellationToken).ConfigureAwait(false);
87+
_logListeners.Add(blobServiceClient, logListener);
88+
}
89+
catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.AuthorizationPermissionMismatch ||
90+
ex.ErrorCode == BlobErrorCode.InsufficientAccountPermissions ||
91+
ex.ErrorCode == BlobErrorCode.AuthorizationFailure)
92+
{
93+
// Log which account we couldn't enable logging on due to permissions.
94+
Logger.LoggingNotEnabledOnTargetAccount(_logger, blobServiceClient.AccountName);
95+
}
8396
}
8497
}
8598

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/SharedBlobListener.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,15 @@ public SharedBlobListener(string hostId, BlobServiceClient blobServiceClient,
3939
_timer = new TaskSeriesTimer(_strategy, exceptionHandler, initialWait: Task.Delay(0));
4040
}
4141

42+
// This constructor is used for injecting custom strategy in unit tests.
43+
internal SharedBlobListener(IBlobListenerStrategy strategy, IWebJobsExceptionHandler exceptionHandler)
44+
{
45+
_strategy = strategy ?? throw new ArgumentNullException(nameof(strategy));
46+
47+
// Start the first iteration immediately.
48+
_timer = new TaskSeriesTimer(_strategy, exceptionHandler, initialWait: Task.Delay(0));
49+
}
50+
4251
public IBlobWrittenWatcher BlobWritterWatcher
4352
{
4453
get { return _strategy; }
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
using System.Reflection;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Azure;
9+
using Azure.Storage.Blobs;
10+
using Azure.Storage.Blobs.Models;
11+
using Azure.Storage.Queues;
12+
using Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners;
13+
using Microsoft.Azure.WebJobs.Extensions.Storage.Common;
14+
using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners;
15+
using Microsoft.Azure.WebJobs.Host;
16+
using Microsoft.Azure.WebJobs.Host.Executors;
17+
using Microsoft.Azure.WebJobs.Host.Listeners;
18+
using Microsoft.Azure.WebJobs.Host.Protocols;
19+
using Microsoft.Azure.WebJobs.Host.Scale;
20+
using Microsoft.Azure.WebJobs.Host.Timers;
21+
using Microsoft.Extensions.Logging;
22+
using Moq;
23+
using NUnit.Framework;
24+
using NUnit.Framework.Internal;
25+
26+
namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Tests.Listeners
27+
{
28+
public class BlobListenerFactoryTests
29+
{
30+
[Test]
31+
public async Task CreateAsync_RegisterWithSharedBlobListenerAsync_UsesTargetBlobClient()
32+
{
33+
// Arrange
34+
// Storage account and container names
35+
string accountName1 = "fakeaccount1";
36+
string accountName2 = "fakeaccount2";
37+
string containerName1 = "fakecontainer1";
38+
string containerName2 = "fakecontainer2";
39+
40+
// Mock BlobContainerClients
41+
var containerClient1 = new Mock<BlobContainerClient>(new Uri($"https://{accountName1}.blob.core.windows.net/{containerName1}"), null);
42+
containerClient1.Setup(x => x.Uri).Returns(new Uri($"https://{accountName1}.blob.core.windows.net/{containerName1}"));
43+
containerClient1.Setup(x => x.Name).Returns(containerName1);
44+
containerClient1.Setup(x => x.AccountName).Returns(accountName1);
45+
46+
var containerClient2 = new Mock<BlobContainerClient>(new Uri($"https://{accountName1}.blob.core.windows.net/{containerName2}"), null);
47+
containerClient2.Setup(x => x.Uri).Returns(new Uri($"https://{accountName1}.blob.core.windows.net/{containerName2}"));
48+
containerClient2.Setup(x => x.Name).Returns(containerName2);
49+
containerClient2.Setup(x => x.AccountName).Returns(accountName2);
50+
51+
var hostNamesContainerClient = new Mock<BlobContainerClient>(new Uri($"https://{accountName1}.blob.core.windows.net/{HostContainerNames.Hosts}"), null);
52+
hostNamesContainerClient.Setup(x => x.Uri).Returns(new Uri($"https://{accountName1}.blob.core.windows.net/{HostContainerNames.Hosts}"));
53+
hostNamesContainerClient.Setup(x => x.Name).Returns(HostContainerNames.Hosts);
54+
hostNamesContainerClient.Setup(x => x.AccountName).Returns(accountName1);
55+
56+
// Mock BlobServiceClients
57+
var primaryClient = new Mock<BlobServiceClient>(new Uri($"https://{accountName1}.blob.core.windows.net/"), null);
58+
primaryClient.Setup(x => x.Uri).Returns(new Uri($"https://{accountName1}.blob.core.windows.net/"));
59+
primaryClient.Setup(x => x.AccountName).Returns(accountName1);
60+
primaryClient.Setup(x => x.GetBlobContainerClient(containerName1)).Returns(containerClient1.Object);
61+
primaryClient.Setup(x => x.GetBlobContainerClient(containerName2)).Returns(containerClient2.Object);
62+
primaryClient.Setup(x => x.GetBlobContainerClient(HostContainerNames.Hosts)).Returns(hostNamesContainerClient.Object);
63+
primaryClient.Setup(x => x.GetPropertiesAsync(It.IsAny<CancellationToken>())).ReturnsAsync(Response.FromValue(new BlobServiceProperties(), null));
64+
65+
var targetClient = new Mock<BlobServiceClient>(new Uri($"https://{accountName2}.blob.core.windows.net/"), null);
66+
targetClient.Setup(x => x.Uri).Returns(new Uri($"https://{accountName2}.blob.core.windows.net/"));
67+
targetClient.Setup(x => x.AccountName).Returns(accountName2);
68+
targetClient.Setup(x => x.GetPropertiesAsync(It.IsAny<CancellationToken>())).ReturnsAsync(Response.FromValue(new BlobServiceProperties(), null));
69+
70+
// Other dependencies
71+
var loggerFactory = new LoggerFactory();
72+
var logger = loggerFactory.CreateLogger<BlobListener>();
73+
var hostIdProvider = new Mock<IHostIdProvider>();
74+
var blobsOptions = new BlobsOptions();
75+
var exceptionHandler = new Mock<IWebJobsExceptionHandler>();
76+
var blobWrittenWatcherSetter = new Mock<IContextSetter<IBlobWrittenWatcher>>();
77+
var hostQueueServiceClient = new QueueServiceClient(new Uri($"https://{accountName1}.queue.core.windows.net/"));
78+
var dataQueueServiceClient = new QueueServiceClient(new Uri($"https://{accountName2}.queue.core.windows.net/"));
79+
var queueServiceClientProvider = new FakeQueueServiceClientProvider(hostQueueServiceClient);
80+
var sharedQueueWatcher = new SharedQueueWatcher();
81+
var blobTriggerQueueWriterFactory = new BlobTriggerQueueWriterFactory(
82+
hostIdProvider.Object,
83+
queueServiceClientProvider,
84+
sharedQueueWatcher);
85+
var executor = new Mock<ITriggeredFunctionExecutor>();
86+
var input = new Mock<IBlobPathSource>();
87+
var singletonManager = new Mock<IHostSingletonManager>();
88+
var concurrencyManager = new Mock<ConcurrencyManager>();
89+
var drainModeManager = new Mock<IDrainModeManager>();
90+
var functionDescriptor = new FunctionDescriptor { Id = "id", ShortName = "shortname" };
91+
92+
// Setup SharedContextProvider and Test Strategy
93+
var sharedContextProvider = new Mock<ISharedContextProvider>();
94+
var testStrategy = new TestBlobListenerStrategy();
95+
var sharedBlobListener = new SharedBlobListener(testStrategy, exceptionHandler.Object);
96+
sharedContextProvider.Setup(x => x.GetOrCreateInstance(It.IsAny<SharedBlobListenerFactory>()))
97+
.Returns(sharedBlobListener);
98+
99+
// Setup SharedBlobQueueListener
100+
var sharedBlobQueueListener = new SharedBlobQueueListener(
101+
new Mock<IListener>().Object,
102+
new BlobQueueTriggerExecutor(BlobTriggerSource.LogsAndContainerScan,
103+
new Mock<IBlobWrittenWatcher>().Object, logger));
104+
sharedContextProvider.Setup(s => s.GetOrCreateInstance<SharedBlobQueueListener>(It.IsAny<SharedBlobQueueListenerFactory>()))
105+
.Returns(sharedBlobQueueListener);
106+
107+
// Create the factory
108+
var factory = new BlobListenerFactory(
109+
hostIdProvider.Object,
110+
blobsOptions,
111+
exceptionHandler.Object,
112+
blobWrittenWatcherSetter.Object,
113+
blobTriggerQueueWriterFactory,
114+
sharedContextProvider.Object,
115+
loggerFactory,
116+
functionDescriptor,
117+
primaryClient.Object,
118+
hostQueueServiceClient,
119+
targetClient.Object,
120+
dataQueueServiceClient,
121+
containerClient1.Object,
122+
input.Object,
123+
BlobTriggerSource.LogsAndContainerScan,
124+
executor.Object,
125+
singletonManager.Object,
126+
concurrencyManager.Object,
127+
drainModeManager.Object
128+
);
129+
130+
// Act
131+
await factory.CreateAsync(CancellationToken.None);
132+
133+
// Assert
134+
// 1. The strategy should have registered the target client and container
135+
Assert.AreEqual(targetClient.Object, testStrategy.TargetServiceClient, "TargetServiceClient should be the target client.");
136+
Assert.AreEqual(containerClient1.Object, testStrategy.ContainerClient, "ContainerClient should be the primary container.");
137+
138+
// 2. The BlobTriggerExecutor should use a BlobReceiptManager with the primary client
139+
var receiptManagerField = typeof(BlobTriggerExecutor).GetField("_receiptManager", BindingFlags.NonPublic | BindingFlags.Instance);
140+
var receiptManager = receiptManagerField.GetValue(testStrategy.Executor);
141+
var blobContainerClientField = typeof(BlobReceiptManager).GetField("_blobContainerClient", BindingFlags.NonPublic | BindingFlags.Instance);
142+
var resultPrimaryClient = blobContainerClientField.GetValue(receiptManager);
143+
144+
// The BlobReceiptManager should use the hostNamesContainerClient (from the primary client)
145+
Assert.AreEqual(hostNamesContainerClient.Object, resultPrimaryClient, "BlobReceiptManager should use the primary container client.");
146+
}
147+
}
148+
}

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/ScanBlobScanLogHybridPollingStrategyTests.cs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,58 @@ public async Task RegisterAsync_InitializesWithScanInfoManager()
318318
RunExecuterWithExpectedBlobs(expectedNames, product, executor);
319319
}
320320

321+
[Test]
322+
public async Task RegisterAsync_HandlesPermissionErrors()
323+
{
324+
List<BlobErrorCode> permissionErrors = new List<BlobErrorCode>
325+
{
326+
BlobErrorCode.AuthorizationPermissionMismatch,
327+
BlobErrorCode.InsufficientAccountPermissions,
328+
BlobErrorCode.AuthorizationFailure
329+
};
330+
331+
LambdaBlobTriggerExecutor executor = new LambdaBlobTriggerExecutor();
332+
string accountName = "fakeaccount";
333+
foreach (BlobErrorCode errorCode in permissionErrors)
334+
{
335+
Uri uri = new Uri($"https://{accountName}.blob.core.windows.net/");
336+
Mock<BlobServiceClient> mockServiceClient = new Mock<BlobServiceClient>(uri, null);
337+
338+
TestBlobScanInfoManager scanInfoManager = new TestBlobScanInfoManager();
339+
IBlobListenerStrategy product = new ScanBlobScanLogHybridPollingStrategy(scanInfoManager, _exceptionHandler, _logger);
340+
341+
// Setup GetPropertiesAsync to succeed
342+
mockServiceClient.Setup(x => x.Uri).Returns(uri);
343+
mockServiceClient.Setup(x => x.AccountName).Returns(accountName);
344+
mockServiceClient.Setup(x => x.GetBlobContainerClient("fakecontainer")).Returns(_blobContainerMock.Object);
345+
mockServiceClient.Setup(x => x.GetBlobContainerClient("fakecontainer2")).Returns(_secondBlobContainerMock.Object);
346+
mockServiceClient.Setup(x => x.GetBlobContainerClient("$logs")).Returns(_logsContainerMock.Object);
347+
mockServiceClient
348+
.Setup(x => x.GetPropertiesAsync(It.IsAny<CancellationToken>()))
349+
.ReturnsAsync(Response.FromValue(_serviceProperties, default));
350+
351+
// Setup SetPropertiesAsync to throw a RequestFailedException with your error code
352+
mockServiceClient
353+
.Setup(x => x.SetPropertiesAsync(
354+
It.IsAny<BlobServiceProperties>(),
355+
It.IsAny<CancellationToken>()))
356+
.ThrowsAsync(new RequestFailedException(status: 403, message: "This request is not authorized to perform this operation using this permission.", errorCode: errorCode.ToString(), default));
357+
358+
// Create a few blobs.
359+
for (int i = 0; i < 5; i++)
360+
{
361+
CreateBlobAndUploadToContainer(_blobContainerMock, _blobItems);
362+
}
363+
364+
await scanInfoManager.UpdateLatestScanAsync(AccountName, ContainerName, DateTime.UtcNow);
365+
await product.RegisterAsync(mockServiceClient.Object, _blobContainerMock.Object, executor, CancellationToken.None);
366+
367+
var logMessages = _loggerProvider.GetAllLogMessages();
368+
Assert.IsTrue(logMessages.Any(m => m.EventId.Name == "LoggingNotEnabledOnTargetAccount"
369+
|| m.FormattedMessage.Contains("LoggingNotEnabledOnTargetAccount")));
370+
}
371+
}
372+
321373
[Test]
322374
public async Task ExecuteAsync_UpdatesScanInfoManager()
323375
{

0 commit comments

Comments
 (0)