-
Notifications
You must be signed in to change notification settings - Fork 284
Introduce a Lightweight Scaling Package for Multi Durable Backends #3240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from 1 commit
dfb12ca
0b2b2c6
335fff8
2c51fb7
a869d52
9867a99
af5545f
f15aea3
25e49ce
5973886
ffd44fd
a2956b4
0e950a0
a8e9bef
9d8dd48
9e52edf
8188cbe
d9fbed3
9692154
2dd1e66
93c15b7
f94ff06
54ff4c2
709487b
9ed6630
fbb2acc
ebfefca
ed56b50
09e3ece
ec4f7a1
80d13f4
bfb4207
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| // Copyright (c) .NET Foundation. All rights reserved. | ||
| // Licensed under the MIT License. See LICENSE in the project root for license information. | ||
|
|
||
| using DurableTask.AzureStorage; | ||
| using Microsoft.Azure.WebJobs.Host.Scale; | ||
| using Microsoft.Extensions.Logging; | ||
| using Newtonsoft.Json; | ||
| using Newtonsoft.Json.Converters; | ||
| using Newtonsoft.Json.Linq; | ||
| using Newtonsoft.Json.Serialization; | ||
|
|
||
| namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale.AzureStorage | ||
| { | ||
| /// <summary> | ||
| /// The Azure Storage implementation of additional methods not required by IOrchestrationService. | ||
| /// </summary> | ||
| public class AzureStorageDurabilityProvider : DurabilityProvider | ||
| { | ||
| private readonly AzureStorageOrchestrationService serviceClient; | ||
| private readonly IStorageServiceClientProviderFactory clientProviderFactory; | ||
| private readonly string connectionName; | ||
| private readonly JObject storageOptionsJson; | ||
| private readonly ILogger logger; | ||
|
|
||
| private readonly object initLock = new object(); | ||
|
|
||
| private DurableTaskMetricsProvider singletonDurableTaskMetricsProvider; | ||
|
|
||
| public AzureStorageDurabilityProvider( | ||
| AzureStorageOrchestrationService service, | ||
| IStorageServiceClientProviderFactory clientProviderFactory, | ||
| string connectionName, | ||
| AzureStorageOptions options, | ||
| ILogger logger) | ||
| : base("Azure Storage", service, service, connectionName) | ||
| { | ||
| this.serviceClient = service; | ||
| this.clientProviderFactory = clientProviderFactory; | ||
| this.connectionName = connectionName; | ||
| this.storageOptionsJson = JObject.FromObject( | ||
| options, | ||
| new JsonSerializer | ||
| { | ||
| Converters = { new StringEnumConverter() }, | ||
| ContractResolver = new CamelCasePropertyNamesContractResolver(), | ||
| }); | ||
| this.logger = logger; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// The app setting containing the Azure Storage connection string. | ||
| /// </summary> | ||
| public override string ConnectionName => this.connectionName; | ||
|
|
||
| public override JObject ConfigurationJson => this.storageOptionsJson; | ||
|
|
||
| public override string EventSourceName { get; set; } = "DurableTask-AzureStorage"; | ||
|
|
||
| internal DurableTaskMetricsProvider GetMetricsProvider( | ||
| string hubName, | ||
| StorageAccountClientProvider storageAccountClientProvider, | ||
| ILogger logger) | ||
| { | ||
| return new DurableTaskMetricsProvider(hubName, logger, performanceMonitor: null, storageAccountClientProvider); | ||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| public override bool TryGetScaleMonitor( | ||
| string functionId, | ||
| string functionName, | ||
| string hubName, | ||
| string connectionName, | ||
| out IScaleMonitor scaleMonitor) | ||
| { | ||
| lock (this.initLock) | ||
| { | ||
| if (this.singletonDurableTaskMetricsProvider == null) | ||
| { | ||
| // This is only called by the ScaleController, it doesn't run in the Functions Host process. | ||
| this.singletonDurableTaskMetricsProvider = this.GetMetricsProvider( | ||
| hubName, | ||
| this.clientProviderFactory.GetClientProvider(connectionName), | ||
| this.logger); | ||
| } | ||
|
|
||
| scaleMonitor = new DurableTaskScaleMonitor(functionId, hubName, this.logger, this.singletonDurableTaskMetricsProvider); | ||
| return true; | ||
| } | ||
| } | ||
|
|
||
| public override bool TryGetTargetScaler( | ||
| string functionId, | ||
| string functionName, | ||
| string hubName, | ||
| string connectionName, | ||
| out ITargetScaler targetScaler) | ||
| { | ||
| lock (this.initLock) | ||
| { | ||
| if (this.singletonDurableTaskMetricsProvider == null) | ||
| { | ||
| // This is only called by the ScaleController, it doesn't run in the Functions Host process. | ||
| this.singletonDurableTaskMetricsProvider = this.GetMetricsProvider( | ||
| hubName, | ||
| this.clientProviderFactory.GetClientProvider(connectionName), | ||
| this.logger); | ||
| } | ||
|
|
||
| targetScaler = new DurableTaskTargetScaler(functionId, this.singletonDurableTaskMetricsProvider, this, this.logger); | ||
| return true; | ||
| } | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,270 @@ | ||||||||||||||||||||
| // Copyright (c) .NET Foundation. All rights reserved. | ||||||||||||||||||||
| // Licensed under the MIT License. See LICENSE in the project root for license information. | ||||||||||||||||||||
|
|
||||||||||||||||||||
| using System; | ||||||||||||||||||||
| using System.Text.Json; | ||||||||||||||||||||
| using DurableTask.AzureStorage; | ||||||||||||||||||||
| using Microsoft.Extensions.Logging; | ||||||||||||||||||||
| using Microsoft.Extensions.Options; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale.AzureStorage | ||||||||||||||||||||
| { | ||||||||||||||||||||
| public class AzureStorageDurabilityProviderFactory : IDurabilityProviderFactory | ||||||||||||||||||||
| { | ||||||||||||||||||||
| private const string LoggerName = "Host.Triggers.DurableTask.AzureStorage"; | ||||||||||||||||||||
| internal const string ProviderName = "AzureStorage"; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| private readonly DurableTaskOptions options; | ||||||||||||||||||||
| private readonly IStorageServiceClientProviderFactory clientProviderFactory; | ||||||||||||||||||||
| private readonly AzureStorageOptions azureStorageOptions; | ||||||||||||||||||||
| private readonly INameResolver nameResolver; | ||||||||||||||||||||
| private readonly ILoggerFactory loggerFactory; | ||||||||||||||||||||
| private readonly bool useSeparateQueueForEntityWorkItems; | ||||||||||||||||||||
| private readonly bool inConsumption; // If true, optimize defaults for consumption | ||||||||||||||||||||
| private AzureStorageDurabilityProvider defaultStorageProvider; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Must wait to get settings until we have validated taskhub name. | ||||||||||||||||||||
| private bool hasValidatedOptions; | ||||||||||||||||||||
| private AzureStorageOrchestrationServiceSettings defaultSettings; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /// <summary> | ||||||||||||||||||||
| /// | ||||||||||||||||||||
| /// </summary> | ||||||||||||||||||||
| /// <param name="options"></param> | ||||||||||||||||||||
| /// <param name="clientProviderFactory"></param> | ||||||||||||||||||||
| /// <param name="nameResolver"></param> | ||||||||||||||||||||
| /// <param name="loggerFactory"></param> | ||||||||||||||||||||
| /// <param name="platformInfo"></param> | ||||||||||||||||||||
| /// <exception cref="ArgumentNullException"></exception> | ||||||||||||||||||||
| public AzureStorageDurabilityProviderFactory( | ||||||||||||||||||||
| IOptions<DurableTaskOptions> options, | ||||||||||||||||||||
| IStorageServiceClientProviderFactory clientProviderFactory, | ||||||||||||||||||||
| INameResolver nameResolver, | ||||||||||||||||||||
| ILoggerFactory loggerFactory, | ||||||||||||||||||||
| #pragma warning disable CS0612 // Type or member is obsolete | ||||||||||||||||||||
| IPlatformInformation platformInfo) | ||||||||||||||||||||
| #pragma warning restore CS0612 // Type or member is obsolete | ||||||||||||||||||||
| { | ||||||||||||||||||||
| // this constructor may be called by dependency injection even if the AzureStorage provider is not selected | ||||||||||||||||||||
| // in that case, return immediately, since this provider is not actually used, but can still throw validation errors | ||||||||||||||||||||
| if (options.Value.StorageProvider.TryGetValue("type", out object value) | ||||||||||||||||||||
| && value is string s | ||||||||||||||||||||
| && !string.Equals(s, this.Name, StringComparison.OrdinalIgnoreCase)) | ||||||||||||||||||||
| { | ||||||||||||||||||||
| return; | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| this.options = options.Value; | ||||||||||||||||||||
| this.clientProviderFactory = clientProviderFactory ?? throw new ArgumentNullException(nameof(clientProviderFactory)); | ||||||||||||||||||||
| this.nameResolver = nameResolver ?? throw new ArgumentNullException(nameof(nameResolver)); | ||||||||||||||||||||
| this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| this.azureStorageOptions = new AzureStorageOptions(); | ||||||||||||||||||||
| this.inConsumption = platformInfo.IsInConsumptionPlan(); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // The consumption plan has different performance characteristics so we provide | ||||||||||||||||||||
| // different defaults for key configuration values. | ||||||||||||||||||||
| int maxConcurrentOrchestratorsDefault = this.inConsumption ? 5 : 10 * Environment.ProcessorCount; | ||||||||||||||||||||
| int maxConcurrentActivitiesDefault = this.inConsumption ? 10 : 10 * Environment.ProcessorCount; | ||||||||||||||||||||
| int maxConcurrentEntitiesDefault = this.inConsumption ? 10 : 10 * Environment.ProcessorCount; | ||||||||||||||||||||
| int maxEntityOperationBatchSizeDefault = this.inConsumption ? 50 : 5000; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if (this.inConsumption) | ||||||||||||||||||||
|
||||||||||||||||||||
| { | ||||||||||||||||||||
| WorkerRuntimeType language = platformInfo.GetWorkerRuntimeType(); | ||||||||||||||||||||
|
||||||||||||||||||||
| if (language == WorkerRuntimeType.Python) | ||||||||||||||||||||
| { | ||||||||||||||||||||
| this.azureStorageOptions.ControlQueueBufferThreshold = 32; | ||||||||||||||||||||
| } | ||||||||||||||||||||
| else | ||||||||||||||||||||
| { | ||||||||||||||||||||
| this.azureStorageOptions.ControlQueueBufferThreshold = 128; | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
||||||||||||||||||||
| if (language == WorkerRuntimeType.Python) | |
| { | |
| this.azureStorageOptions.ControlQueueBufferThreshold = 32; | |
| } | |
| else | |
| { | |
| this.azureStorageOptions.ControlQueueBufferThreshold = 128; | |
| } | |
| this.azureStorageOptions.ControlQueueBufferThreshold = (language == WorkerRuntimeType.Python) ? 32 : 128; |
Outdated
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing type definition: ConnectionStringNames is referenced but not defined in the provided files. Ensure this type exists or replace with a string constant.
| this.DefaultConnectionName = this.azureStorageOptions.ConnectionName ?? ConnectionStringNames.Storage; | |
| this.DefaultConnectionName = this.azureStorageOptions.ConnectionName ?? "AzureWebJobsStorage"; |
Outdated
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect property name in error message: The error message references 'MaxConcurrentOrchestratorFunctions' but should reference 'MaxConcurrentActivityFunctions' to match the property being validated.
| MaxConcurrentTaskActivityWorkItems = this.options.MaxConcurrentActivityFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentOrchestratorFunctions)} needs a default value"), | |
| MaxConcurrentTaskActivityWorkItems = this.options.MaxConcurrentActivityFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentActivityFunctions)} needs a default value"), |
nytian marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
Outdated
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing type definition: EndToEndTraceHelper is referenced but not defined in the provided files. Ensure this type exists or replace with an appropriate alternative.
| AppName = EndToEndTraceHelper.LocalAppName, | |
| AppName = AppDomain.CurrentDomain.FriendlyName, |
Outdated
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'enure' to 'ensure' and 'unqique' to 'unique'.
| // to enure unqique worker names | |
| // to ensure unique worker names |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing type definition: IPlatformInformation is referenced but not defined in the provided files. Ensure this interface exists in the codebase.