Skip to content

Commit ca82252

Browse files
Implement host configuration property for handling pre-cancelled invocation requests (#9523) (#9826)
Co-authored-by: Lilian Kasem <[email protected]>
1 parent 24770f8 commit ca82252

File tree

11 files changed

+156
-29
lines changed

11 files changed

+156
-29
lines changed

release_notes.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,7 @@
88
- Add function grouping information (https://github.com/Azure/azure-functions-host/pull/9735)
99
- Bump `Microsoft.IdentityModel.Tokens`, `Microsoft.IdentityModel.Protocols.OpenIdConnect`, and
1010
`System.IdentityModel.Tokens.Jwt` from 6.32.0 to 6.35.0 (#9793)
11+
- Implement host configuration property for handling pre-cancelled invocation requests (#9523)
12+
- If a worker supports CancellationTokens, cancelled invocations will now be sent to the worker by default
13+
- Customers can opt-out of this behavior by setting `SendCanceledInvocationsToWorker` to `false` in host.json
14+
- If a worker does not support CancellationTokens, cancelled invocations will not be sent to the worker

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
using Microsoft.Azure.WebJobs.Script.Workers;
3030
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
3131
using Microsoft.Azure.WebJobs.Script.Workers.SharedMemoryDataTransfer;
32+
using Microsoft.Extensions.Hosting;
3233
using Microsoft.Extensions.Logging;
3334
using Microsoft.Extensions.Options;
3435
using Yarp.ReverseProxy.Forwarder;
@@ -42,6 +43,7 @@ namespace Microsoft.Azure.WebJobs.Script.Grpc
4243
internal partial class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
4344
{
4445
private readonly IScriptEventManager _eventManager;
46+
private readonly IScriptHostManager _scriptHostManager;
4547
private readonly RpcWorkerConfig _workerConfig;
4648
private readonly string _runtime;
4749
private readonly IEnvironment _environment;
@@ -81,16 +83,18 @@ internal partial class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
8183
private TaskCompletionSource<List<RawFunctionMetadata>> _functionsIndexingTask = new TaskCompletionSource<List<RawFunctionMetadata>>(TaskCreationOptions.RunContinuationsAsynchronously);
8284
private TimeSpan _functionLoadTimeout = TimeSpan.FromMinutes(1);
8385
private bool _isSharedMemoryDataTransferEnabled;
84-
private bool? _cancelCapabilityEnabled;
86+
private bool _isHandlesInvocationCancelMessageCapabilityEnabled;
8587
private bool _isWorkerApplicationInsightsLoggingEnabled;
8688
private IHttpProxyService _httpProxyService;
8789
private Uri _httpProxyEndpoint;
8890
private System.Timers.Timer _timer;
8991
private bool _functionMetadataRequestSent = false;
92+
private IOptions<ScriptJobHostOptions> _scriptHostOptions;
9093

9194
internal GrpcWorkerChannel(
9295
string workerId,
9396
IScriptEventManager eventManager,
97+
IScriptHostManager hostManager,
9498
RpcWorkerConfig workerConfig,
9599
IWorkerProcess rpcWorkerProcess,
96100
ILogger logger,
@@ -105,6 +109,7 @@ internal GrpcWorkerChannel(
105109
{
106110
_workerId = workerId;
107111
_eventManager = eventManager;
112+
_scriptHostManager = hostManager;
108113
_workerConfig = workerConfig;
109114
_runtime = workerConfig.Description.Language;
110115
_rpcWorkerProcess = rpcWorkerProcess;
@@ -140,6 +145,10 @@ internal GrpcWorkerChannel(
140145

141146
// Temporary switch to allow fully testing new algorithm in production
142147
_messageDispatcherFactory = GetProcessorFactory();
148+
149+
_scriptHostManager.ActiveHostChanged += HandleActiveHostChange;
150+
151+
LoadScriptJobHostOptions(_scriptHostManager as IServiceProvider);
143152
}
144153

145154
private bool IsHttpProxyingWorker => _httpProxyEndpoint is not null;
@@ -152,6 +161,30 @@ internal GrpcWorkerChannel(
152161

153162
public RpcWorkerConfig WorkerConfig => _workerConfig;
154163

164+
public IOptions<ScriptJobHostOptions> JobHostOptions => _scriptHostOptions;
165+
166+
private void HandleActiveHostChange(object sender, ActiveHostChangedEventArgs e)
167+
{
168+
if (e.NewHost is null)
169+
{
170+
return;
171+
}
172+
173+
LoadScriptJobHostOptions(e.NewHost.Services);
174+
}
175+
176+
private void LoadScriptJobHostOptions(IServiceProvider provider)
177+
{
178+
if (provider?.GetService(typeof(IOptions<ScriptJobHostOptions>)) is IOptions<ScriptJobHostOptions> scriptHostOptions)
179+
{
180+
_scriptHostOptions = scriptHostOptions;
181+
}
182+
else
183+
{
184+
_workerChannelLogger.LogDebug("Unable to resolve ScriptJobHostOptions");
185+
}
186+
}
187+
155188
// Temporary switch that allows us to move between the "old" ThreadPool-only processor
156189
// and a "new" Channel processor (for proper ordering of messages).
157190
private IInvocationMessageDispatcherFactory GetProcessorFactory()
@@ -511,7 +544,7 @@ internal void ApplyCapabilities(IDictionary<string, string> capabilities, GrpcCa
511544
UpdateCapabilities(capabilities, strategy);
512545

513546
_isSharedMemoryDataTransferEnabled = IsSharedMemoryDataTransferEnabled();
514-
_cancelCapabilityEnabled ??= !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesInvocationCancelMessage));
547+
_isHandlesInvocationCancelMessageCapabilityEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesInvocationCancelMessage));
515548

516549
if (!_isSharedMemoryDataTransferEnabled)
517550
{
@@ -816,12 +849,21 @@ internal async Task SendInvocationRequest(ScriptInvocationContext context)
816849
return;
817850
}
818851

819-
// do not send an invocation request if cancellation has been requested
820852
if (context.CancellationToken.IsCancellationRequested)
821853
{
822-
_workerChannelLogger.LogWarning("Cancellation has been requested. The invocation request with id '{invocationId}' is canceled and will not be sent to the worker.", invocationId);
823-
context.ResultSource.TrySetCanceled();
824-
return;
854+
_workerChannelLogger.LogDebug("Cancellation was requested prior to the invocation request ('{invocationId}') being sent to the worker.", invocationId);
855+
856+
// If the worker does not support handling InvocationCancel grpc messages, or if cancellation is supported and the customer opts-out
857+
// of sending cancelled invocations to the worker, we will cancel the result source and not send the invocation to the worker.
858+
if (!_isHandlesInvocationCancelMessageCapabilityEnabled || !JobHostOptions.Value.SendCanceledInvocationsToWorker)
859+
{
860+
_workerChannelLogger.LogInformation("Cancelling invocation '{invocationId}' due to cancellation token being signaled. "
861+
+ "This invocation was not sent to the worker. Read more about this here: https://aka.ms/azure-functions-cancellations", invocationId);
862+
863+
// This will result in an invocation failure with a "FunctionInvocationCanceled" exception.
864+
context.ResultSource.TrySetCanceled();
865+
return;
866+
}
825867
}
826868

827869
var invocationRequest = await context.ToRpcInvocationRequest(_workerChannelLogger, _workerCapabilities, _isSharedMemoryDataTransferEnabled, _sharedMemoryManager);
@@ -834,9 +876,10 @@ await SendStreamingMessageAsync(new StreamingMessage
834876
InvocationRequest = invocationRequest
835877
});
836878

837-
if (_cancelCapabilityEnabled != null && _cancelCapabilityEnabled.Value)
879+
if (_isHandlesInvocationCancelMessageCapabilityEnabled)
838880
{
839-
context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId));
881+
var cancellationCtr = context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId));
882+
context.Properties.Add(ScriptConstants.CancellationTokenRegistration, cancellationCtr);
840883
}
841884

842885
if (IsHttpProxyingWorker && context.FunctionMetadata.IsHttpTriggerFunction())
@@ -1038,6 +1081,13 @@ internal async Task InvokeResponse(InvocationResponse invokeResponse)
10381081
if (_executingInvocations.TryRemove(invokeResponse.InvocationId, out var invocation))
10391082
{
10401083
var context = invocation.Context;
1084+
1085+
if (context.Properties.TryGetValue(ScriptConstants.CancellationTokenRegistration, out CancellationTokenRegistration ctr))
1086+
{
1087+
await ctr.DisposeAsync();
1088+
context.Properties.Remove(ScriptConstants.CancellationTokenRegistration);
1089+
}
1090+
10411091
if (invokeResponse.Result.IsInvocationSuccess(context.ResultSource, capabilityEnabled))
10421092
{
10431093
_metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvokeSucceeded, Id));
@@ -1355,6 +1405,7 @@ protected virtual void Dispose(bool disposing)
13551405
_startLatencyMetric?.Dispose();
13561406
_workerInitTask?.TrySetCanceled();
13571407
_timer?.Dispose();
1408+
_scriptHostManager.ActiveHostChanged -= HandleActiveHostChange;
13581409

13591410
// unlink function inputs
13601411
if (_inputLinks is not null)

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,20 @@ public class GrpcWorkerChannelFactory : IRpcWorkerChannelFactory
2121
private readonly ILoggerFactory _loggerFactory = null;
2222
private readonly IRpcWorkerProcessFactory _rpcWorkerProcessFactory = null;
2323
private readonly IScriptEventManager _eventManager = null;
24+
private readonly IScriptHostManager _hostManager = null;
2425
private readonly IEnvironment _environment = null;
2526
private readonly IOptionsMonitor<ScriptApplicationHostOptions> _applicationHostOptions = null;
2627
private readonly ISharedMemoryManager _sharedMemoryManager = null;
2728
private readonly IOptions<WorkerConcurrencyOptions> _workerConcurrencyOptions;
2829
private readonly IOptions<FunctionsHostingConfigOptions> _hostingConfigOptions;
2930
private readonly IHttpProxyService _httpProxyService;
3031

31-
public GrpcWorkerChannelFactory(IScriptEventManager eventManager, IEnvironment environment, ILoggerFactory loggerFactory,
32+
public GrpcWorkerChannelFactory(IScriptEventManager eventManager, IScriptHostManager hostManager, IEnvironment environment, ILoggerFactory loggerFactory,
3233
IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions, IRpcWorkerProcessFactory rpcWorkerProcessManager, ISharedMemoryManager sharedMemoryManager,
3334
IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions, IOptions<FunctionsHostingConfigOptions> hostingConfigOptions, IHttpProxyService httpProxyService)
3435
{
3536
_eventManager = eventManager;
37+
_hostManager = hostManager;
3638
_loggerFactory = loggerFactory;
3739
_rpcWorkerProcessFactory = rpcWorkerProcessManager;
3840
_environment = environment;
@@ -55,17 +57,18 @@ public IRpcWorkerChannel Create(string scriptRootPath, string runtime, IMetricsL
5557
ILogger workerLogger = _loggerFactory.CreateLogger($"Worker.LanguageWorkerChannel.{runtime}.{workerId}");
5658
IWorkerProcess rpcWorkerProcess = _rpcWorkerProcessFactory.Create(workerId, runtime, scriptRootPath, languageWorkerConfig);
5759

58-
return CreateInternal(workerId, _eventManager, languageWorkerConfig, rpcWorkerProcess, workerLogger, metricsLogger, attemptCount,
60+
return CreateInternal(workerId, _eventManager, _hostManager, languageWorkerConfig, rpcWorkerProcess, workerLogger, metricsLogger, attemptCount,
5961
_environment, _applicationHostOptions, _sharedMemoryManager, _workerConcurrencyOptions, _hostingConfigOptions, _httpProxyService);
6062
}
6163

62-
internal virtual IRpcWorkerChannel CreateInternal(string workerId, IScriptEventManager eventManager, RpcWorkerConfig languageWorkerConfig, IWorkerProcess rpcWorkerProcess,
64+
internal virtual IRpcWorkerChannel CreateInternal(string workerId, IScriptEventManager eventManager, IScriptHostManager hostManager, RpcWorkerConfig languageWorkerConfig, IWorkerProcess rpcWorkerProcess,
6365
ILogger workerLogger, IMetricsLogger metricsLogger, int attemptCount, IEnvironment environment, IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions,
6466
ISharedMemoryManager sharedMemoryManager, IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions, IOptions<FunctionsHostingConfigOptions> hostingConfigOptions, IHttpProxyService httpProxyService)
6567
{
6668
return new GrpcWorkerChannel(
6769
workerId,
6870
eventManager,
71+
hostManager,
6972
languageWorkerConfig,
7073
rpcWorkerProcess,
7174
workerLogger,

src/WebJobs.Script/Config/ConfigurationSectionNames.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ public static class ConfigurationSectionNames
2323
public const string EasyAuth = "easyauth";
2424
public const string Retry = "retry";
2525
public const string SequentialJobHostRestart = JobHost + ":sequentialRestart";
26+
public const string SendCanceledInvocationsToWorker = "sendCanceledInvocationsToWorker";
2627
}
2728
}

src/WebJobs.Script/Config/HostJsonFileConfigurationSource.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class HostJsonFileConfigurationProvider : ConfigurationProvider
5151
{
5252
"version", "functionTimeout", "retry", "functions", "http", "watchDirectories", "watchFiles", "queues", "serviceBus",
5353
"eventHub", "singleton", "logging", "aggregator", "healthMonitor", "extensionBundle", "managedDependencies",
54-
"customHandler", "httpWorker", "extensions", "concurrency"
54+
"customHandler", "httpWorker", "extensions", "concurrency", ConfigurationSectionNames.SendCanceledInvocationsToWorker
5555
};
5656

5757
private static readonly string[] CredentialNameFragments = new[] { "password", "pwd", "key", "secret", "token", "sas" };

src/WebJobs.Script/Config/ScriptJobHostOptions.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,5 +122,12 @@ public string RootScriptPath
122122
/// Gets or sets a value indicating whether the ScriptHost is in standby mode.
123123
/// </summary>
124124
public bool IsStandbyConfiguration { get; set; }
125+
126+
/// <summary>
127+
/// Gets or sets a value indicating whether the host sends cancelled invocations to the worker.
128+
/// This defaults to true, meaning if cancellation is signalled we will still send the pre-cancelled
129+
/// invocation to the worker.
130+
/// </summary>
131+
public bool SendCanceledInvocationsToWorker { get; set; } = true;
125132
}
126133
}

src/WebJobs.Script/Config/ScriptJobHostOptionsSetup.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public void Configure(ScriptJobHostOptions options)
5151
{
5252
options.FileLoggingMode = fileLoggingMode.Value;
5353
}
54+
5455
Utility.ValidateRetryOptions(options.Retry);
5556
}
5657

src/WebJobs.Script/ScriptConstants.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,5 +243,7 @@ public static class ScriptConstants
243243
public static readonly string HttpProxyTask = "HttpProxyTask";
244244

245245
public static readonly string OperationNameKey = "OperationName";
246+
247+
public static readonly string CancellationTokenRegistration = "CancellationTokenRegistration";
246248
}
247249
}

test/WebJobs.Script.Tests.Integration/ApplicationInsights/ApplicationInsightsTestFixture.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,24 +97,24 @@ public void Dispose()
9797

9898
private class TestGrpcWorkerChannelFactory : GrpcWorkerChannelFactory
9999
{
100-
public TestGrpcWorkerChannelFactory(IScriptEventManager eventManager, IEnvironment environment, ILoggerFactory loggerFactory, IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions, IRpcWorkerProcessFactory rpcWorkerProcessManager, ISharedMemoryManager sharedMemoryManager, IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions, IOptions<FunctionsHostingConfigOptions> hostingConfigOptions, IHttpProxyService httpProxyService)
101-
: base(eventManager, environment, loggerFactory, applicationHostOptions, rpcWorkerProcessManager, sharedMemoryManager, workerConcurrencyOptions, hostingConfigOptions, httpProxyService)
100+
public TestGrpcWorkerChannelFactory(IScriptEventManager eventManager, IScriptHostManager hostManager, IEnvironment environment, ILoggerFactory loggerFactory, IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions, IRpcWorkerProcessFactory rpcWorkerProcessManager, ISharedMemoryManager sharedMemoryManager, IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions, IOptions<FunctionsHostingConfigOptions> hostingConfigOptions, IHttpProxyService httpProxyService)
101+
: base(eventManager, hostManager, environment, loggerFactory, applicationHostOptions, rpcWorkerProcessManager, sharedMemoryManager, workerConcurrencyOptions, hostingConfigOptions, httpProxyService)
102102
{
103103
}
104104

105-
internal override IRpcWorkerChannel CreateInternal(string workerId, IScriptEventManager eventManager, RpcWorkerConfig languageWorkerConfig, IWorkerProcess rpcWorkerProcess,
105+
internal override IRpcWorkerChannel CreateInternal(string workerId, IScriptEventManager eventManager, IScriptHostManager hostManager, RpcWorkerConfig languageWorkerConfig, IWorkerProcess rpcWorkerProcess,
106106
ILogger workerLogger, IMetricsLogger metricsLogger, int attemptCount, IEnvironment environment, IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions,
107107
ISharedMemoryManager sharedMemoryManager, IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions, IOptions<FunctionsHostingConfigOptions> hostingConfigOptions, IHttpProxyService httpProxyService)
108108
{
109-
return new TestGrpcWorkerChannel(workerId, eventManager, languageWorkerConfig, rpcWorkerProcess, workerLogger, metricsLogger,
109+
return new TestGrpcWorkerChannel(workerId, eventManager, hostManager, languageWorkerConfig, rpcWorkerProcess, workerLogger, metricsLogger,
110110
attemptCount, environment, applicationHostOptions, sharedMemoryManager, workerConcurrencyOptions, hostingConfigOptions, httpProxyService);
111111
}
112112

113113
private class TestGrpcWorkerChannel : GrpcWorkerChannel
114114
{
115-
internal TestGrpcWorkerChannel(string workerId, IScriptEventManager eventManager, RpcWorkerConfig workerConfig, IWorkerProcess rpcWorkerProcess, ILogger logger, IMetricsLogger metricsLogger, int attemptCount, IEnvironment environment, IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions, ISharedMemoryManager sharedMemoryManager, IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions, IOptions<FunctionsHostingConfigOptions> hostingConfigOptions, IHttpProxyService httpProxyService)
116-
: base(workerId, eventManager, workerConfig, rpcWorkerProcess, logger, metricsLogger, attemptCount, environment, applicationHostOptions, sharedMemoryManager, workerConcurrencyOptions, hostingConfigOptions, httpProxyService)
117-
{
115+
internal TestGrpcWorkerChannel(string workerId, IScriptEventManager eventManager, IScriptHostManager hostManager, RpcWorkerConfig workerConfig, IWorkerProcess rpcWorkerProcess, ILogger logger, IMetricsLogger metricsLogger, int attemptCount, IEnvironment environment, IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions, ISharedMemoryManager sharedMemoryManager, IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions, IOptions<FunctionsHostingConfigOptions> hostingConfigOptions, IHttpProxyService httpProxyService)
116+
: base(workerId, eventManager, hostManager, workerConfig, rpcWorkerProcess, logger, metricsLogger, attemptCount, environment, applicationHostOptions, sharedMemoryManager, workerConcurrencyOptions, hostingConfigOptions, httpProxyService)
117+
{
118118
}
119119

120120
internal override void UpdateCapabilities(IDictionary<string, string> fields, GrpcCapabilitiesUpdateStrategy strategy)

0 commit comments

Comments
 (0)