Skip to content

Commit b3d90c4

Browse files
Added support to gracefully shutdown language worker (#8385)
Added support to gracefully shutdown language worker
1 parent 65db7b4 commit b3d90c4

File tree

11 files changed

+121
-9
lines changed

11 files changed

+121
-9
lines changed

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
using System.Threading.Tasks;
1515
using System.Threading.Tasks.Dataflow;
1616
using Google.Protobuf.Collections;
17+
using Google.Protobuf.WellKnownTypes;
1718
using Microsoft.Azure.WebJobs.Logging;
1819
using Microsoft.Azure.WebJobs.Script.Description;
1920
using Microsoft.Azure.WebJobs.Script.Diagnostics;
@@ -901,10 +902,36 @@ protected virtual void Dispose(bool disposing)
901902

902903
public void Dispose()
903904
{
905+
StopWorkerProcess();
904906
_disposing = true;
905907
Dispose(true);
906908
}
907909

910+
private void StopWorkerProcess()
911+
{
912+
bool capabilityEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.HandlesWorkerTerminateMessage));
913+
if (!capabilityEnabled)
914+
{
915+
return;
916+
}
917+
918+
int gracePeriod = WorkerConstants.WorkerTerminateGracePeriodInSeconds;
919+
920+
var workerTerminate = new WorkerTerminate()
921+
{
922+
GracePeriod = Duration.FromTimeSpan(TimeSpan.FromSeconds(gracePeriod))
923+
};
924+
925+
_workerChannelLogger.LogDebug($"Sending WorkerTerminate message with grace period {gracePeriod} seconds.");
926+
927+
SendStreamingMessage(new StreamingMessage
928+
{
929+
WorkerTerminate = workerTerminate
930+
});
931+
932+
WorkerProcess.WaitForProcessExitInMilliSeconds(gracePeriod * 1000);
933+
}
934+
908935
public async Task DrainInvocationsAsync()
909936
{
910937
_workerChannelLogger.LogDebug($"Count of in-buffer invocations waiting to be drained out: {_executingInvocations.Count}");

src/WebJobs.Script.WebHost/WebJobsScriptHostService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ public async Task StopAsync(CancellationToken cancellationToken)
434434
var currentHost = ActiveHost;
435435
ActiveHost = null;
436436
Task stopTask = Orphan(currentHost, cancellationToken);
437-
Task result = await Task.WhenAny(stopTask, Task.Delay(TimeSpan.FromSeconds(10), cancellationToken));
437+
Task result = await Task.WhenAny(stopTask, Task.Delay(TimeSpan.FromSeconds(10)));
438438

439439
if (result != stopTask)
440440
{

src/WebJobs.Script/Workers/ProcessManagement/EmptyProcessRegistry.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ namespace Microsoft.Azure.WebJobs.Script.Workers
77
{
88
internal class EmptyProcessRegistry : IProcessRegistry
99
{
10-
public bool Register(Process process) => true;
10+
public bool Register(WorkerProcess process) => true;
11+
12+
public void Close()
13+
{
14+
}
1115
}
1216
}

src/WebJobs.Script/Workers/ProcessManagement/IProcessRegistry.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ namespace Microsoft.Azure.WebJobs.Script.Workers
99
internal interface IProcessRegistry
1010
{
1111
// Registers processes to ensure that they are cleaned up on host exit.
12-
bool Register(Process process);
12+
bool Register(WorkerProcess process);
13+
14+
void Close();
1315
}
1416
}

src/WebJobs.Script/Workers/ProcessManagement/IWorkerProcess.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,7 @@ public interface IWorkerProcess
1111
int Id { get; }
1212

1313
Task StartProcessAsync();
14+
15+
void WaitForProcessExitInMilliSeconds(int waitTime);
1416
}
1517
}

src/WebJobs.Script/Workers/ProcessManagement/JobObjectRegistry.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ internal class JobObjectRegistry : IProcessRegistry, IDisposable
1212
{
1313
private IntPtr _handle;
1414
private bool _disposed = false;
15+
private WorkerProcess _workerProcess = null;
1516

1617
public JobObjectRegistry()
1718
{
@@ -37,9 +38,10 @@ public JobObjectRegistry()
3738
}
3839
}
3940

40-
public bool Register(Process proc)
41+
public bool Register(WorkerProcess workerProcess)
4142
{
42-
return AssignProcessToJobObject(_handle, proc.Handle);
43+
_workerProcess = workerProcess;
44+
return AssignProcessToJobObject(_handle, _workerProcess.Process.Handle);
4345
}
4446

4547
[DllImport("kernel32.dll", CharSet = CharSet.Unicode, SetLastError = true)]
@@ -78,6 +80,8 @@ private void Dispose(bool disposing)
7880

7981
public void Close()
8082
{
83+
_workerProcess?.ProcessWaitingForTermination.Task.GetAwaiter().GetResult();
84+
8185
if (_handle != IntPtr.Zero)
8286
{
8387
CloseHandle(_handle);

src/WebJobs.Script/Workers/ProcessManagement/WorkerProcess.cs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using Microsoft.Azure.WebJobs.Logging;
1212
using Microsoft.Azure.WebJobs.Script.Diagnostics;
1313
using Microsoft.Azure.WebJobs.Script.Eventing;
14+
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
1415
using Microsoft.Extensions.DependencyInjection;
1516
using Microsoft.Extensions.Logging;
1617

@@ -53,8 +54,9 @@ internal WorkerProcess(IScriptEventManager eventManager, IProcessRegistry proces
5354

5455
internal Queue<string> ProcessStdErrDataQueue => _processStdErrDataQueue;
5556

56-
// for testing
57-
internal Process Process { get; set; }
57+
public Process Process { get; set; }
58+
59+
public TaskCompletionSource<bool> ProcessWaitingForTermination { get; set; } = new TaskCompletionSource<bool>();
5860

5961
internal abstract Process CreateWorkerProcess();
6062

@@ -78,7 +80,7 @@ public Task StartProcessAsync()
7880
Process.BeginOutputReadLine();
7981

8082
// Register process only after it starts
81-
_processRegistry?.Register(Process);
83+
_processRegistry?.Register(this);
8284

8385
RegisterWithProcessMonitor();
8486

@@ -204,10 +206,18 @@ internal void BuildAndLogConsoleLog(string msg, LogLevel level)
204206

205207
internal abstract void HandleWorkerProcessRestart();
206208

209+
public void WaitForProcessExitInMilliSeconds(int waitTime)
210+
{
211+
Process.WaitForExit(waitTime);
212+
}
213+
207214
public void Dispose()
208215
{
209216
Disposing = true;
210217
// best effort process disposal
218+
219+
ProcessWaitingForTermination.SetResult(false);
220+
211221
try
212222
{
213223
_eventSubscription?.Dispose();

src/WebJobs.Script/Workers/Rpc/RpcWorkerConstants.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public static class RpcWorkerConstants
5151
public const string AcceptsListOfFunctionLoadRequests = "AcceptsListOfFunctionLoadRequests";
5252
public const string EnableUserCodeException = "EnableUserCodeException";
5353
public const string SupportsLoadResponseCollection = "SupportsLoadResponseCollection";
54+
public const string HandlesWorkerTerminateMessage = "HandlesWorkerTerminateMessage";
5455

5556
// Host Capabilities
5657
public const string V2Compatable = "V2Compatable";

src/WebJobs.Script/Workers/WorkerConstants.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ public static class WorkerConstants
99
public const string HttpScheme = "http";
1010

1111
public const int WorkerReadyCheckPollingIntervalMilliseconds = 25;
12+
public const int WorkerTerminateGracePeriodInSeconds = 5;
1213
public const string WorkerConfigFileName = "worker.config.json";
1314
public const string DefaultWorkersDirectoryName = "workers";
1415

test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,48 @@ await Assert.ThrowsAsync<TaskCanceledException>(async () =>
145145
});
146146
}
147147

148+
[Fact]
149+
public void WorkerChannel_Dispose_With_WorkerTerminateCapability()
150+
{
151+
var initTask = _workerChannel.StartWorkerProcessAsync(CancellationToken.None);
152+
153+
IDictionary<string, string> capabilities = new Dictionary<string, string>()
154+
{
155+
{ RpcWorkerConstants.HandlesWorkerTerminateMessage, "1" }
156+
};
157+
158+
StartStream startStream = new StartStream()
159+
{
160+
WorkerId = _workerId
161+
};
162+
163+
StreamingMessage startStreamMessage = new StreamingMessage()
164+
{
165+
StartStream = startStream
166+
};
167+
168+
// Send worker init request and enable the capabilities
169+
GrpcEvent rpcEvent = new GrpcEvent(_workerId, startStreamMessage);
170+
_workerChannel.SendWorkerInitRequest(rpcEvent);
171+
_testFunctionRpcService.PublishWorkerInitResponseEvent(capabilities);
172+
173+
_workerChannel.Dispose();
174+
var traces = _logger.GetLogMessages();
175+
var expectedLogMsg = $"Sending WorkerTerminate message with grace period {WorkerConstants.WorkerTerminateGracePeriodInSeconds} seconds.";
176+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, expectedLogMsg)));
177+
}
178+
179+
[Fact]
180+
public void WorkerChannel_Dispose_Without_WorkerTerminateCapability()
181+
{
182+
var initTask = _workerChannel.StartWorkerProcessAsync(CancellationToken.None);
183+
184+
_workerChannel.Dispose();
185+
var traces = _logger.GetLogMessages();
186+
var expectedLogMsg = $"Sending WorkerTerminate message with grace period {WorkerConstants.WorkerTerminateGracePeriodInSeconds} seconds.";
187+
Assert.False(traces.Any(m => string.Equals(m.FormattedMessage, expectedLogMsg)));
188+
}
189+
148190
[Fact]
149191
public async Task StartWorkerProcessAsync_Invoked_SetupFunctionBuffers_Verify_ReadyForInvocation()
150192
{

0 commit comments

Comments
 (0)