Skip to content

Commit fd17324

Browse files
Add a mechanism for providers to override the scale monitor (#1692)
This allows backends to plug in their own autoscaler for use in elastic premium deployments. Adds a virtual TryGetScaleMonitor method to DurabilityProvider If the provider does not implement that method, constructs a NoOpScaleMonitor to avoid exceptions when runtime scale is disabled Allows the provider to trace scaling decisions in the same way as currently done by Azure Table Scaling monitor Co-authored-by: Varshi Bachu <[email protected]>
1 parent 52169fc commit fd17324

10 files changed

+178
-151
lines changed

src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,14 @@
99
using DurableTask.AzureStorage;
1010
using DurableTask.AzureStorage.Tracking;
1111
using DurableTask.Core;
12+
using Microsoft.Extensions.Logging;
1213
using Newtonsoft.Json;
1314
using Newtonsoft.Json.Converters;
1415
using Newtonsoft.Json.Linq;
1516
using Newtonsoft.Json.Serialization;
17+
#if !FUNCTIONS_V1
18+
using Microsoft.Azure.WebJobs.Host.Scale;
19+
#endif
1620
using AzureStorage = DurableTask.AzureStorage;
1721

1822
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
@@ -25,11 +29,13 @@ internal class AzureStorageDurabilityProvider : DurabilityProvider
2529
private readonly AzureStorageOrchestrationService serviceClient;
2630
private readonly string connectionName;
2731
private readonly JObject storageOptionsJson;
32+
private readonly ILogger logger;
2833

2934
public AzureStorageDurabilityProvider(
3035
AzureStorageOrchestrationService service,
3136
string connectionName,
32-
AzureStorageOptions options)
37+
AzureStorageOptions options,
38+
ILogger logger)
3339
: base("Azure Storage", service, service, connectionName)
3440
{
3541
this.serviceClient = service;
@@ -41,6 +47,7 @@ public AzureStorageDurabilityProvider(
4147
Converters = { new StringEnumConverter() },
4248
ContractResolver = new CamelCasePropertyNamesContractResolver(),
4349
});
50+
this.logger = logger;
4451
}
4552

4653
public override bool SupportsEntities => true;
@@ -180,5 +187,24 @@ internal static OrchestrationInstanceStatusQueryCondition ConvertWebjobsDurableC
180187
FetchInput = condition.ShowInput,
181188
};
182189
}
190+
191+
#if !FUNCTIONS_V1
192+
/// <inheritdoc/>
193+
public override bool TryGetScaleMonitor(
194+
string functionId,
195+
string functionName,
196+
string hubName,
197+
string storageConnectionString,
198+
out IScaleMonitor scaleMonitor)
199+
{
200+
scaleMonitor = new DurableTaskScaleMonitor(
201+
functionId,
202+
functionName,
203+
hubName,
204+
storageConnectionString,
205+
this.logger);
206+
return true;
207+
}
208+
#endif
183209
}
184210
}

src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProviderFactory.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
1111
{
1212
internal class AzureStorageDurabilityProviderFactory : IDurabilityProviderFactory
1313
{
14+
private const string LoggerName = "Host.Triggers.DurableTask.AzureStorage";
1415
internal const string ProviderName = "AzureStorage";
1516

1617
private readonly DurableTaskOptions options;
@@ -95,10 +96,12 @@ public virtual DurabilityProvider GetDurabilityProvider()
9596
if (this.defaultStorageProvider == null)
9697
{
9798
var defaultService = new AzureStorageOrchestrationService(this.defaultSettings);
99+
ILogger logger = this.loggerFactory.CreateLogger(LoggerName);
98100
this.defaultStorageProvider = new AzureStorageDurabilityProvider(
99101
defaultService,
100102
this.defaultConnectionName,
101-
this.azureStorageOptions);
103+
this.azureStorageOptions,
104+
logger);
102105
}
103106

104107
return this.defaultStorageProvider;
@@ -126,10 +129,12 @@ private AzureStorageDurabilityProvider GetAzureStorageStorageProvider(DurableCli
126129
}
127130
else
128131
{
132+
ILogger logger = this.loggerFactory.CreateLogger(LoggerName);
129133
innerClient = new AzureStorageDurabilityProvider(
130134
new AzureStorageOrchestrationService(settings),
131135
connectionName,
132-
this.azureStorageOptions);
136+
this.azureStorageOptions,
137+
logger);
133138
}
134139

135140
return innerClient;

src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@
66
using System.Threading;
77
using System.Threading.Tasks;
88
using DurableTask.Core;
9+
using Microsoft.Extensions.Logging;
910
using Newtonsoft.Json;
1011
using Newtonsoft.Json.Linq;
12+
#if !FUNCTIONS_V1
13+
using Microsoft.Azure.WebJobs.Host.Scale;
14+
#endif
1115

1216
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
1317
{
@@ -438,5 +442,27 @@ internal virtual bool ConnectionNameMatches(DurabilityProvider durabilityProvide
438442
{
439443
return this.ConnectionName.Equals(durabilityProvider.ConnectionName);
440444
}
445+
446+
#if !FUNCTIONS_V1
447+
/// <summary>
448+
/// Tries to obtain a scale monitor for autoscaling.
449+
/// </summary>
450+
/// <param name="functionId">Function id.</param>
451+
/// <param name="functionName">Function name.</param>
452+
/// <param name="hubName">Task hub name.</param>
453+
/// <param name="storageConnectionString">Storage account connection string, used for Azure Storage provider.</param>
454+
/// <param name="scaleMonitor">The scale monitor.</param>
455+
/// <returns>True if autoscaling is supported, false otherwise.</returns>
456+
public virtual bool TryGetScaleMonitor(
457+
string functionId,
458+
string functionName,
459+
string hubName,
460+
string storageConnectionString,
461+
out IScaleMonitor scaleMonitor)
462+
{
463+
scaleMonitor = null;
464+
return false;
465+
}
466+
#endif
441467
}
442468
}

src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
using Microsoft.Azure.WebJobs.Description;
2121
#if !FUNCTIONS_V1
2222
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation;
23+
using Microsoft.Azure.WebJobs.Host.Scale;
2324
#endif
2425
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Listener;
2526
using Microsoft.Azure.WebJobs.Host;
@@ -1311,6 +1312,59 @@ internal static void TagActivityWithOrchestrationStatus(OrchestrationRuntimeStat
13111312
activity.AddTag("DurableFunctionsRuntimeStatus", statusStr);
13121313
}
13131314
}
1315+
1316+
internal IScaleMonitor GetScaleMonitor(string functionId, FunctionName functionName, string storageConnectionString)
1317+
{
1318+
if (this.defaultDurabilityProvider.TryGetScaleMonitor(
1319+
functionId,
1320+
functionName.Name,
1321+
this.Options.HubName,
1322+
storageConnectionString,
1323+
out IScaleMonitor scaleMonitor))
1324+
{
1325+
return scaleMonitor;
1326+
}
1327+
else
1328+
{
1329+
// the durability provider does not support runtime scaling.
1330+
// Create an empty scale monitor to avoid exceptions (unless runtime scaling is actually turned on).
1331+
return new NoOpScaleMonitor($"{functionId}-DurableTaskTrigger-{this.Options.HubName}".ToLower());
1332+
}
1333+
}
1334+
1335+
/// <summary>
1336+
/// A placeholder scale monitor, can be used by durability providers that do not support runtime scaling.
1337+
/// This is required to allow operation of those providers even if runtime scaling is turned off
1338+
/// see discussion https://github.com/Azure/azure-functions-durable-extension/pull/1009/files#r341767018.
1339+
/// </summary>
1340+
private sealed class NoOpScaleMonitor : IScaleMonitor
1341+
{
1342+
/// <summary>
1343+
/// Construct a placeholder scale monitor.
1344+
/// </summary>
1345+
/// <param name="name">A descriptive name.</param>
1346+
public NoOpScaleMonitor(string name)
1347+
{
1348+
this.Descriptor = new ScaleMonitorDescriptor(name);
1349+
}
1350+
1351+
/// <summary>
1352+
/// A descriptive name.
1353+
/// </summary>
1354+
public ScaleMonitorDescriptor Descriptor { get; private set; }
1355+
1356+
/// <inheritdoc/>
1357+
Task<ScaleMetrics> IScaleMonitor.GetMetricsAsync()
1358+
{
1359+
throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling");
1360+
}
1361+
1362+
/// <inheritdoc/>
1363+
ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context)
1364+
{
1365+
throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling");
1366+
}
1367+
}
13141368
#endif
13151369
}
13161370
}

src/WebJobs.Extensions.DurableTask/Listener/DurableTaskListener.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ internal sealed class DurableTaskListener : IListener
2828
private readonly FunctionType functionType;
2929
private readonly string storageConnectionString;
3030
#if !FUNCTIONS_V1
31-
private readonly Lazy<DurableTaskScaleMonitor> scaleMonitor;
31+
private readonly Lazy<IScaleMonitor> scaleMonitor;
3232
#endif
3333

3434
public DurableTaskListener(
@@ -52,13 +52,11 @@ public DurableTaskListener(
5252
this.functionType = functionType;
5353
this.storageConnectionString = storageConnectionString;
5454
#if !FUNCTIONS_V1
55-
this.scaleMonitor = new Lazy<DurableTaskScaleMonitor>(() =>
56-
new DurableTaskScaleMonitor(
55+
this.scaleMonitor = new Lazy<IScaleMonitor>(() =>
56+
this.config.GetScaleMonitor(
5757
this.functionId,
5858
this.functionName,
59-
this.config.Options.HubName,
60-
this.storageConnectionString,
61-
this.config.TraceHelper));
59+
this.storageConnectionString));
6260
#endif
6361
}
6462

src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
using DurableTask.AzureStorage.Monitoring;
1313
using Dynamitey.DynamicObjects;
1414
using Microsoft.Azure.WebJobs.Host.Scale;
15+
using Microsoft.Extensions.Logging;
1516
using Microsoft.WindowsAzure.Storage;
1617
using Newtonsoft.Json;
1718

@@ -20,28 +21,28 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
2021
internal sealed class DurableTaskScaleMonitor : IScaleMonitor<DurableTaskTriggerMetrics>
2122
{
2223
private readonly string functionId;
23-
private readonly FunctionName functionName;
24+
private readonly string functionName;
2425
private readonly string hubName;
2526
private readonly string storageConnectionString;
26-
private readonly EndToEndTraceHelper traceHelper;
2727
private readonly ScaleMonitorDescriptor scaleMonitorDescriptor;
28+
private readonly ILogger logger;
2829

2930
private DisconnectedPerformanceMonitor performanceMonitor;
3031

3132
public DurableTaskScaleMonitor(
3233
string functionId,
33-
FunctionName functionName,
34+
string functionName,
3435
string hubName,
3536
string storageConnectionString,
36-
EndToEndTraceHelper traceHelper,
37+
ILogger logger,
3738
DisconnectedPerformanceMonitor performanceMonitor = null)
3839
{
39-
this.functionId = functionId;
40-
this.functionName = functionName;
41-
this.hubName = hubName;
40+
this.functionId = functionId;
41+
this.functionName = functionName;
42+
this.hubName = hubName;
4243
this.storageConnectionString = storageConnectionString;
44+
this.logger = logger;
4345
this.performanceMonitor = performanceMonitor;
44-
this.traceHelper = traceHelper;
4546
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower());
4647
}
4748

@@ -86,7 +87,7 @@ public async Task<DurableTaskTriggerMetrics> GetMetricsAsync()
8687
}
8788
catch (StorageException e)
8889
{
89-
this.traceHelper.ExtensionWarningEvent(this.hubName, this.functionName.Name, string.Empty, e.ToString());
90+
this.logger.LogWarning("{details}. Function: {functionName}. HubName: {hubName}.", e.ToString(), this.functionName, this.hubName);
9091
}
9192

9293
if (heartbeat != null)
@@ -114,7 +115,7 @@ public ScaleStatus GetScaleStatus(ScaleStatusContext<DurableTaskTriggerMetrics>
114115
return this.GetScaleStatusCore(context.WorkerCount, context.Metrics?.ToArray());
115116
}
116117

117-
private ScaleStatus GetScaleStatusCore(int workerCount, DurableTaskTriggerMetrics[] metrics)
118+
private ScaleStatus GetScaleStatusCore(int workerCount, DurableTaskTriggerMetrics[] metrics)
118119
{
119120
var scaleStatus = new ScaleStatus() { Vote = ScaleVote.None };
120121
if (metrics == null)
@@ -173,12 +174,13 @@ private ScaleStatus GetScaleStatusCore(int workerCount, DurableTaskTriggerMetric
173174
break;
174175
}
175176

176-
this.traceHelper.ExtensionInformationalEvent(
177-
this.hubName,
178-
string.Empty,
179-
this.functionName.Name,
180-
$"Durable Functions Trigger Scale Decision: {scaleStatus.Vote.ToString()}, Reason: {scaleRecommendation?.Reason}",
181-
writeToUserLogs: writeToUserLogs);
177+
if (writeToUserLogs)
178+
{
179+
this.logger.LogInformation(
180+
$"Durable Functions Trigger Scale Decision: {scaleStatus.Vote.ToString()}, Reason: {scaleRecommendation?.Reason}",
181+
this.hubName,
182+
this.functionName);
183+
}
182184

183185
return scaleStatus;
184186
}

src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask-net461.xml

Lines changed: 0 additions & 29 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)