-
Notifications
You must be signed in to change notification settings - Fork 287
Lightweight scale controller package for DTS backend #3387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from 7 commits
9d16acf
fbfde5c
69b26a5
54b49cf
2109e65
9cab285
178f6a2
4740743
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| // Copyright (c) .NET Foundation. All rights reserved. | ||
| // Licensed under the MIT License. See LICENSE in the project root for license information. | ||
|
|
||
| using System; | ||
| using Microsoft.Azure.WebJobs.Host.Scale; | ||
| using Microsoft.DurableTask.AzureManagedBackend; | ||
| using Microsoft.Extensions.Logging; | ||
|
|
||
| namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.FunctionsScale.AzureManaged | ||
| { | ||
| /// <summary> | ||
| /// The AzureManaged backend implementation of the scalability provider for Durable Functions. | ||
| /// </summary> | ||
| public class AzureManagedScalabilityProvider : ScalabilityProvider | ||
| { | ||
| private readonly AzureManagedOrchestrationService orchestrationService; | ||
| private readonly string connectionName; | ||
| private readonly ILogger logger; | ||
|
|
||
| /// <summary> | ||
| /// Initializes a new instance of the <see cref="AzureManagedScalabilityProvider"/> class. | ||
| /// </summary> | ||
| /// <param name="orchestrationService"> | ||
| /// The <see cref="AzureManagedOrchestrationService"/> instance that provides access to backend service for scaling operations. | ||
| /// </param> | ||
| /// <param name="connectionName"> | ||
| /// The logical name of the storage or service connection associated with this provider. | ||
| /// </param> | ||
| /// <param name="logger"> | ||
| /// The <see cref="ILogger"/> instance used for logging provider activities and diagnostics. | ||
| /// </param> | ||
| /// <exception cref="ArgumentNullException"> | ||
| /// Thrown if <paramref name="orchestrationService"/> is <see langword="null"/>. | ||
| /// </exception> | ||
| public AzureManagedScalabilityProvider( | ||
| AzureManagedOrchestrationService orchestrationService, | ||
| string connectionName, | ||
| ILogger logger) | ||
| : base("AzureManaged", connectionName) | ||
| { | ||
| this.orchestrationService = orchestrationService ?? throw new ArgumentNullException(nameof(orchestrationService)); | ||
| this.connectionName = connectionName; | ||
| this.logger = logger; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Gets the app setting containing the Azure Managed connection string. | ||
| /// </summary> | ||
| public override string ConnectionName => this.connectionName; | ||
|
|
||
| /// <inheritdoc/> | ||
| /// This is not used. | ||
| public override bool TryGetScaleMonitor( | ||
| string functionId, | ||
| string functionName, | ||
| string hubName, | ||
| string targetConnectionName, | ||
| out IScaleMonitor scaleMonitor) | ||
| { | ||
| // Azure Managed backend does not support the legacy scale monitor infrastructure. | ||
| // Return a dummy scale monitor to avoid exceptions. | ||
| scaleMonitor = new DummyScaleMonitor(functionId, hubName); | ||
| return true; | ||
| } | ||
nytian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /// <inheritdoc/> | ||
| public override bool TryGetTargetScaler( | ||
| string functionId, | ||
| string functionName, | ||
| string hubName, | ||
| string targetConnectionName, | ||
| out ITargetScaler targetScaler) | ||
| { | ||
| // Create a target scaler that uses the orchestration service's metrics endpoint. | ||
| // All target scalers share the same AzureManagedOrchestrationService in the same task hub. | ||
| targetScaler = new AzureManagedTargetScaler(this.orchestrationService, functionId, this.logger); | ||
| return true; | ||
| } | ||
|
|
||
| private class DummyScaleMonitor : IScaleMonitor | ||
| { | ||
| private static readonly ScaleMetrics DummyScaleMetrics = new ScaleMetrics(); | ||
| private static readonly ScaleStatus DummyScaleStatus = new ScaleStatus(); | ||
|
|
||
| public DummyScaleMonitor(string functionId, string taskHub) | ||
| { | ||
| this.Descriptor = new ScaleMonitorDescriptor( | ||
| id: $"DurableTask.AzureManaged:{taskHub ?? "default"}", | ||
| functionId); | ||
| } | ||
|
|
||
| public ScaleMonitorDescriptor Descriptor { get; } | ||
|
|
||
| public System.Threading.Tasks.Task<ScaleMetrics> GetMetricsAsync() => System.Threading.Tasks.Task.FromResult(DummyScaleMetrics); | ||
|
|
||
| public ScaleStatus GetScaleStatus(ScaleStatusContext context) => DummyScaleStatus; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,223 @@ | ||
| // Copyright (c) .NET Foundation. All rights reserved. | ||
| // Licensed under the MIT License. See LICENSE in the project root for license information. | ||
|
|
||
| using System; | ||
| using System.Collections.Generic; | ||
| using Azure.Core; | ||
| using Azure.Identity; | ||
| using Microsoft.AspNetCore.Identity; | ||
nytian marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| using Microsoft.Azure.WebJobs.Host.Scale; | ||
| using Microsoft.DurableTask.AzureManagedBackend; | ||
| using Microsoft.Extensions.Configuration; | ||
| using Microsoft.Extensions.Logging; | ||
|
|
||
| #nullable enable | ||
| namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.FunctionsScale.AzureManaged | ||
| { | ||
| /// <summary> | ||
| /// Factory class responsible for creating and managing instances of <see cref="AzureManagedScalabilityProvider"/>. | ||
| /// </summary> | ||
| public class AzureManagedScalabilityProviderFactory : IScalabilityProviderFactory | ||
| { | ||
| private const string LoggerName = "Triggers.DurableTask.AzureManaged"; | ||
| private const string ProviderName = "AzureManaged"; | ||
|
|
||
| private readonly Dictionary<(string, string?, string?), AzureManagedScalabilityProvider> cachedProviders = new Dictionary<(string, string?, string?), AzureManagedScalabilityProvider>(); | ||
| private readonly IConfiguration configuration; | ||
| private readonly ILoggerFactory loggerFactory; | ||
| private readonly ILogger logger; | ||
|
|
||
| /// <summary> | ||
| /// Initializes a new instance of the <see cref="AzureManagedScalabilityProviderFactory"/> class. | ||
| /// </summary> | ||
| /// <param name="configuration"> | ||
| /// The <see cref="IConfiguration"/> interface used to resolve connection strings and application settings. | ||
| /// </param> | ||
| /// <param name="loggerFactory"> | ||
| /// The <see cref="ILoggerFactory"/> used to create loggers for diagnostics. | ||
| /// </param> | ||
| /// <exception cref="ArgumentNullException"> | ||
| /// Thrown if any required argument is <see langword="null"/>. | ||
| /// </exception> | ||
| public AzureManagedScalabilityProviderFactory( | ||
| IConfiguration configuration, | ||
| ILoggerFactory loggerFactory) | ||
| { | ||
| this.configuration = configuration ?? throw new ArgumentNullException(nameof(configuration)); | ||
| this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); | ||
| this.logger = this.loggerFactory.CreateLogger(LoggerName); | ||
|
|
||
| this.DefaultConnectionName = "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Gets the logical name of this scalability provider type. | ||
| /// </summary> | ||
| public virtual string Name => ProviderName; | ||
|
|
||
| /// <summary> | ||
| /// Gets the default connection name configured for this factory. | ||
| /// </summary> | ||
| public string DefaultConnectionName { get; } | ||
|
|
||
| /// <summary> | ||
| /// Creates or retrieves an <see cref="AzureManagedScalabilityProvider"/> instance based on the provided pre-deserialized metadata. | ||
| /// </summary> | ||
| /// <param name="metadata">The pre-deserialized Durable Task metadata.</param> | ||
| /// <param name="triggerMetadata">Trigger metadata used to access Properties like token credentials.</param> | ||
| /// <returns> | ||
| /// An <see cref="AzureManagedScalabilityProvider"/> instance configured using | ||
| /// the specified metadata and resolved connection information. | ||
| /// </returns> | ||
| /// <exception cref="InvalidOperationException"> | ||
| /// Thrown if no valid connection string could be resolved for the given connection name. | ||
| /// </exception> | ||
| public ScalabilityProvider GetScalabilityProvider(DurableTaskMetadata metadata, TriggerMetadata? triggerMetadata) | ||
| { | ||
| // Get connection name from metadata, fallback to default | ||
| string? rawConnectionName = TriggerMetadataExtensions.ResolveConnectionName(metadata?.StorageProvider); | ||
| string connectionName = rawConnectionName ?? this.DefaultConnectionName; | ||
| this.logger.LogInformation("Using connection name '{ConnectionName}'", connectionName); | ||
|
|
||
| // Look up connection string from configuration | ||
| string? connectionString = | ||
| this.configuration.GetConnectionString(connectionName) ?? | ||
| this.configuration[connectionName] ?? | ||
| Environment.GetEnvironmentVariable(connectionName); | ||
|
|
||
| if (string.IsNullOrEmpty(connectionString)) | ||
| { | ||
| throw new InvalidOperationException( | ||
| $"No valid connection string found for '{connectionName}'. " + | ||
| $"Please ensure it is defined in app settings, connection strings, or environment variables."); | ||
| } | ||
|
|
||
| AzureManagedConnectionString azureManagedConnectionString = new AzureManagedConnectionString(connectionString); | ||
|
|
||
| // Extract task hub name from metadata | ||
| string? taskHubName = metadata?.TaskHubName ?? azureManagedConnectionString.TaskHubName; | ||
|
|
||
| // Include client ID in cache key to handle managed identity changes | ||
| // Use the original connection name (rawConnectionName or default) for the cache key, not the connection string value | ||
| (string, string?, string?) cacheKey = (connectionName, taskHubName, azureManagedConnectionString.ClientId); | ||
|
|
||
| this.logger.LogDebug( | ||
| "Getting durability provider for connection '{Connection}', task hub '{TaskHub}', and client ID '{ClientId}'...", | ||
| cacheKey.Item1, | ||
| cacheKey.Item2 ?? "null", | ||
| cacheKey.Item3 ?? "null"); | ||
|
|
||
| lock (this.cachedProviders) | ||
| { | ||
| // If a provider has already been created for this connection name, task hub, and client ID, return it. | ||
| if (this.cachedProviders.TryGetValue(cacheKey, out AzureManagedScalabilityProvider? cachedProvider)) | ||
| { | ||
| this.logger.LogDebug( | ||
| "Returning cached durability provider for connection '{Connection}', task hub '{TaskHub}', and client ID '{ClientId}'", | ||
| cacheKey.Item1, | ||
| cacheKey.Item2, | ||
| cacheKey.Item3 ?? "null"); | ||
| return cachedProvider; | ||
| } | ||
|
Comment on lines
+116
to
+125
|
||
|
|
||
| // Create options from the connection string. | ||
| // For runtime-driven scaling, token credentials are loaded directly from the host. | ||
| AzureManagedOrchestrationServiceOptions options = | ||
| AzureManagedOrchestrationServiceOptions.FromConnectionString(connectionString); | ||
|
|
||
| // If triggerMetadata is provided (from functions Scale Controller), try to get token credential from it. | ||
| if (triggerMetadata != null && triggerMetadata.Properties != null && | ||
| triggerMetadata.Properties.TryGetValue("GetAzureManagedTokenCredential", out object? tokenCredentialFunc)) | ||
| { | ||
| if (tokenCredentialFunc is Func<string, TokenCredential> getTokenCredential) | ||
| { | ||
| try | ||
| { | ||
| TokenCredential tokenCredential = getTokenCredential(connectionName); | ||
|
|
||
| if (tokenCredential == null) | ||
| { | ||
| this.logger.LogWarning( | ||
| "Token credential retrieved from trigger metadata is null for connection '{Connection}'.", | ||
| connectionName); | ||
| } | ||
| else | ||
| { | ||
| // Override the credential from connection string | ||
| options.TokenCredential = tokenCredential; | ||
| this.logger.LogInformation("Retrieved token credential from trigger metadata for connection '{Connection}'", connectionName); | ||
| } | ||
| } | ||
| catch (OperationCanceledException ex) | ||
| { | ||
| // Expected scenario when the operation is cancelled; | ||
| // log and fall back to the connection string credential. | ||
| this.logger.LogWarning( | ||
| ex, | ||
| "Getting token credential from trigger metadata was canceled for connection '{Connection}'", | ||
| connectionName); | ||
| } | ||
| catch (AuthenticationFailedException ex) | ||
| { | ||
| // Authentication failures are expected in some environments; | ||
| // log and fall back to the connection string credential. | ||
| this.logger.LogWarning( | ||
| ex, | ||
| "Authentication failed while getting token credential from trigger metadata for connection '{Connection}'", | ||
| connectionName); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| // Unexpected exception types. Fall back to use connection string. | ||
| this.logger.LogWarning( | ||
| ex, | ||
| "Unexpected error while getting token credential from trigger metadata for connection '{Connection}'", | ||
| connectionName); | ||
| } | ||
github-code-quality[bot] marked this conversation as resolved.
Fixed
Show fixed
Hide fixed
github-code-quality[bot] marked this conversation as resolved.
Fixed
Show fixed
Hide fixed
Comment on lines
+173
to
+180
|
||
| } | ||
| else | ||
| { | ||
| this.logger.LogWarning( | ||
| "Token credential function pointer in trigger metadata is not of expected type for connection '{Connection}'", | ||
| connectionName); | ||
| } | ||
| } | ||
| else | ||
| { | ||
| this.logger.LogInformation( | ||
| "No trigger metadata provided or trigger metadata does not contain 'GetAzureManagedTokenCredential', " + | ||
| "using the token credential built from connection string for connection '{Connection}'.", connectionName); | ||
| } | ||
|
|
||
| // Set task hub name if configured | ||
| if (!string.IsNullOrEmpty(taskHubName)) | ||
| { | ||
| options.TaskHubName = taskHubName; | ||
| } | ||
|
|
||
| // Set concurrency limits from metadata | ||
| int defaultConcurrency = Environment.ProcessorCount * 10; | ||
|
|
||
| options.MaxConcurrentOrchestrationWorkItems = metadata?.MaxConcurrentOrchestratorFunctions ?? defaultConcurrency; | ||
| options.MaxConcurrentActivityWorkItems = metadata?.MaxConcurrentActivityFunctions ?? defaultConcurrency; | ||
|
|
||
nytian marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| this.logger.LogInformation( | ||
| "Creating durability provider for connection '{Connection}', task hub '{TaskHub}', and client ID '{ClientId}'...", | ||
| cacheKey.Item1, | ||
| cacheKey.Item2, | ||
| cacheKey.Item3 ?? "null"); | ||
|
|
||
| AzureManagedOrchestrationService service = new AzureManagedOrchestrationService(options, this.loggerFactory); | ||
| AzureManagedScalabilityProvider provider = new AzureManagedScalabilityProvider(service, connectionName, this.logger); | ||
|
|
||
| // Extract max concurrent values from trigger metadata (from Scale Controller payload) | ||
| // Default: 10 times the number of processors on the current machine | ||
nytian marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| provider.MaxConcurrentTaskOrchestrationWorkItems = metadata?.MaxConcurrentOrchestratorFunctions ?? defaultConcurrency; | ||
| provider.MaxConcurrentTaskActivityWorkItems = metadata?.MaxConcurrentActivityFunctions ?? defaultConcurrency; | ||
|
|
||
| this.cachedProviders.Add(cacheKey, provider); | ||
| return provider; | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
dtsjob starts Azurite but doesn’t wait for it to accept connections before running tests. This can make the DTS scale tests flaky on slower runners. Add the same readiness loop used in theazure-storagejob (e.g.,nc -zon port 10000 with retries) before invokingdotnet test.