Skip to content

Commit 359eb50

Browse files
v-imohammadmathewcbrettsam
authored
Hotfix4.28.3 - remove condition exception/fixing race condition (#9742)
* Removing conditional exception handling from FileLogger (#9739) * fixing race condition in worker shutdown (#9738) * updating common prop --------- Co-authored-by: Mathew Charles <[email protected]> Co-authored-by: Brett Samblanet <[email protected]>
1 parent 8a2a1f4 commit 359eb50

File tree

7 files changed

+53
-22
lines changed

7 files changed

+53
-22
lines changed

build/common.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<LangVersion>latest</LangVersion>
66
<MajorVersion>4</MajorVersion>
77
<MinorVersion>28</MinorVersion>
8-
<PatchVersion>2</PatchVersion>
8+
<PatchVersion>3</PatchVersion>
99
<BuildNumber Condition="'$(BuildNumber)' == '' ">0</BuildNumber>
1010
<PreviewVersion></PreviewVersion>
1111

src/WebJobs.Script/Diagnostics/FileLogger.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Except
9393
{
9494
_fileWriter.AppendLine(formattedMessage);
9595
}
96-
catch (Exception ex) when (!ex.IsFatal())
96+
catch (Exception)
9797
{
9898
// Make sure the Logger doesn't throw if there are Exceptions (disk full, etc).
9999
}

src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ internal async void ShutdownWebhostLanguageWorkerChannels()
187187
await _webHostLanguageWorkerChannelManager?.ShutdownChannelsAsync();
188188
}
189189

190-
private void SetDispatcherStateToInitialized(Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> webhostLanguageWorkerChannel = null)
190+
private void SetDispatcherStateToInitialized(IDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> webhostLanguageWorkerChannel = null)
191191
{
192192
// RanToCompletion indicates successful process startup
193193
if (State != FunctionInvocationDispatcherState.Initialized
@@ -198,7 +198,7 @@ private void SetDispatcherStateToInitialized(Dictionary<string, TaskCompletionSo
198198
}
199199
}
200200

201-
private void StartWorkerProcesses(int startIndex, Func<IEnumerable<string>, Task> startAction, bool initializeDispatcher = false, Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> webhostLanguageWorkerChannel = null, IEnumerable<string> functionLanguages = null)
201+
private void StartWorkerProcesses(int startIndex, Func<IEnumerable<string>, Task> startAction, bool initializeDispatcher = false, IDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> webhostLanguageWorkerChannel = null, IEnumerable<string> functionLanguages = null)
202202
{
203203
Task.Run(async () =>
204204
{
@@ -309,7 +309,7 @@ public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions, Cance
309309
if (Utility.IsSupportedRuntime(_workerRuntime, _workerConfigs) || _environment.IsMultiLanguageRuntimeEnvironment())
310310
{
311311
State = FunctionInvocationDispatcherState.Initializing;
312-
Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> webhostLanguageWorkerChannels = _webHostLanguageWorkerChannelManager.GetChannels(_workerRuntime);
312+
IDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> webhostLanguageWorkerChannels = _webHostLanguageWorkerChannelManager.GetChannels(_workerRuntime);
313313
if (webhostLanguageWorkerChannels != null)
314314
{
315315
int workerProcessCount = 0;

src/WebJobs.Script/Workers/Rpc/IWebHostRpcWorkerChannelManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public interface IWebHostRpcWorkerChannelManager
1111
{
1212
Task<IRpcWorkerChannel> InitializeChannelAsync(IEnumerable<RpcWorkerConfig> workerConfigs, string language);
1313

14-
Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> GetChannels(string language);
14+
IDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> GetChannels(string language);
1515

1616
Task SpecializeAsync();
1717

src/WebJobs.Script/Workers/Rpc/WebHostRpcWorkerChannelManager.cs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@
77
using System.Linq;
88
using System.Reactive.Linq;
99
using System.Threading.Tasks;
10-
using Microsoft.Azure.AppService.Proxy.Common.Extensions;
1110
using Microsoft.Azure.AppService.Proxy.Common.Infra;
12-
using Microsoft.Azure.AppService.Proxy.Runtime;
1311
using Microsoft.Azure.WebJobs.Script.Config;
1412
using Microsoft.Azure.WebJobs.Script.Diagnostics;
1513
using Microsoft.Azure.WebJobs.Script.Eventing;
@@ -36,7 +34,7 @@ public class WebHostRpcWorkerChannelManager : IWebHostRpcWorkerChannelManager
3634
private Action _shutdownStandbyWorkerChannels;
3735
private IConfiguration _config;
3836

39-
private ConcurrentDictionary<string, Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>>> _workerChannels = new ConcurrentDictionary<string, Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>>>(StringComparer.OrdinalIgnoreCase);
37+
private ConcurrentDictionary<string, ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>>> _workerChannels = new(StringComparer.OrdinalIgnoreCase);
4038

4139
public WebHostRpcWorkerChannelManager(IScriptEventManager eventManager,
4240
IEnvironment environment,
@@ -101,7 +99,7 @@ await rpcWorkerChannel.StartWorkerProcessAsync().ContinueWith(processStartTask =
10199

102100
internal Task<IRpcWorkerChannel> GetChannelAsync(string language)
103101
{
104-
if (!string.IsNullOrEmpty(language) && _workerChannels.TryGetValue(language, out Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> workerChannels))
102+
if (!string.IsNullOrEmpty(language) && _workerChannels.TryGetValue(language, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> workerChannels))
105103
{
106104
if (workerChannels.Count > 0 && workerChannels.TryGetValue(workerChannels.Keys.First(), out TaskCompletionSource<IRpcWorkerChannel> valueTask))
107105
{
@@ -111,9 +109,9 @@ internal Task<IRpcWorkerChannel> GetChannelAsync(string language)
111109
return Task.FromResult<IRpcWorkerChannel>(null);
112110
}
113111

114-
public Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> GetChannels(string language)
112+
public IDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> GetChannels(string language)
115113
{
116-
if (!string.IsNullOrEmpty(language) && _workerChannels.TryGetValue(language, out Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> workerChannels))
114+
if (!string.IsNullOrEmpty(language) && _workerChannels.TryGetValue(language, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> workerChannels))
117115
{
118116
return workerChannels;
119117
}
@@ -237,7 +235,7 @@ public Task<bool> ShutdownChannelIfExistsAsync(string language, string workerId,
237235

238236
if (_hostingConfigOptions.Value.RevertWorkerShutdownBehaviour)
239237
{
240-
if (_workerChannels.TryRemove(language, out Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> rpcWorkerChannels))
238+
if (_workerChannels.TryRemove(language, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> rpcWorkerChannels))
241239
{
242240
if (rpcWorkerChannels.TryGetValue(workerId, out TaskCompletionSource<IRpcWorkerChannel> value))
243241
{
@@ -264,7 +262,7 @@ public Task<bool> ShutdownChannelIfExistsAsync(string language, string workerId,
264262
}
265263
else
266264
{
267-
if (_workerChannels.TryGetValue(language, out Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> rpcWorkerChannels)
265+
if (_workerChannels.TryGetValue(language, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> rpcWorkerChannels)
268266
&& rpcWorkerChannels.TryRemove(workerId, out TaskCompletionSource<IRpcWorkerChannel> value))
269267
{
270268
value?.Task.ContinueWith(channelTask =>
@@ -304,7 +302,7 @@ internal void ScheduleShutdownStandbyChannels()
304302
using (_metricsLogger.LatencyEvent(string.Format(MetricEventNames.SpecializationShutdownStandbyChannels, runtime.Key)))
305303
{
306304
_logger.LogInformation("Disposing standby channel for runtime:{language}", runtime.Key);
307-
if (_workerChannels.TryRemove(runtime.Key, out Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> standbyChannels))
305+
if (_workerChannels.TryRemove(runtime.Key, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> standbyChannels))
308306
{
309307
foreach (string workerId in standbyChannels.Keys)
310308
{
@@ -338,7 +336,7 @@ public async Task ShutdownChannelsAsync()
338336
foreach (string runtime in _workerChannels.Keys)
339337
{
340338
_logger.LogInformation("Shutting down language worker channels for runtime:{runtime}", runtime);
341-
if (_workerChannels.TryRemove(runtime, out Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> standbyChannels))
339+
if (_workerChannels.TryRemove(runtime, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> standbyChannels))
342340
{
343341
foreach (string workerId in standbyChannels.Keys)
344342
{
@@ -378,21 +376,21 @@ internal void AddOrUpdateWorkerChannels(string initializedRuntime, IRpcWorkerCha
378376
_workerChannels.AddOrUpdate(initializedRuntime,
379377
(runtime) =>
380378
{
381-
Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> newLanguageWorkerChannels = new Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>>();
382-
newLanguageWorkerChannels.Add(initializedLanguageWorkerChannel.Id, new TaskCompletionSource<IRpcWorkerChannel>());
379+
ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> newLanguageWorkerChannels = new(StringComparer.OrdinalIgnoreCase);
380+
newLanguageWorkerChannels.TryAdd(initializedLanguageWorkerChannel.Id, new TaskCompletionSource<IRpcWorkerChannel>());
383381
return newLanguageWorkerChannels;
384382
},
385383
(runtime, existingLanguageWorkerChannels) =>
386384
{
387-
existingLanguageWorkerChannels.Add(initializedLanguageWorkerChannel.Id, new TaskCompletionSource<IRpcWorkerChannel>());
385+
existingLanguageWorkerChannels.TryAdd(initializedLanguageWorkerChannel.Id, new TaskCompletionSource<IRpcWorkerChannel>());
388386
return existingLanguageWorkerChannels;
389387
});
390388
}
391389

392390
internal void SetInitializedWorkerChannel(string initializedRuntime, IRpcWorkerChannel initializedLanguageWorkerChannel)
393391
{
394392
_logger.LogDebug("Adding webhost language worker channel for runtime: {language}. workerId:{id}", initializedRuntime, initializedLanguageWorkerChannel.Id);
395-
if (_workerChannels.TryGetValue(initializedRuntime, out Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> channel))
393+
if (_workerChannels.TryGetValue(initializedRuntime, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> channel))
396394
{
397395
if (channel.TryGetValue(initializedLanguageWorkerChannel.Id, out TaskCompletionSource<IRpcWorkerChannel> value))
398396
{
@@ -404,7 +402,7 @@ internal void SetInitializedWorkerChannel(string initializedRuntime, IRpcWorkerC
404402
internal void SetExceptionOnInitializedWorkerChannel(string initializedRuntime, IRpcWorkerChannel initializedLanguageWorkerChannel, Exception exception)
405403
{
406404
_logger.LogDebug("Failed to initialize webhost language worker channel for runtime: {language}. workerId:{id}", initializedRuntime, initializedLanguageWorkerChannel.Id);
407-
if (_workerChannels.TryGetValue(initializedRuntime, out Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> channel))
405+
if (_workerChannels.TryGetValue(initializedRuntime, out ConcurrentDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> channel))
408406
{
409407
if (channel.TryGetValue(initializedLanguageWorkerChannel.Id, out TaskCompletionSource<IRpcWorkerChannel> value))
410408
{

test/WebJobs.Script.Tests/Workers/Rpc/TestRpcWorkerChannelManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public IRpcWorkerChannel GetChannel(string language)
3535
throw new System.NotImplementedException();
3636
}
3737

38-
public Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> GetChannels(string language)
38+
public IDictionary<string, TaskCompletionSource<IRpcWorkerChannel>> GetChannels(string language)
3939
{
4040
if (_workerChannels.TryGetValue(language, out Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> workerChannels))
4141
{

test/WebJobs.Script.Tests/Workers/Rpc/WebHostRpcWorkerChannelManagerTests.cs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System;
55
using System.Collections.Generic;
66
using System.Linq;
7+
using System.Threading;
78
using System.Threading.Tasks;
89
using Microsoft.Azure.WebJobs.Script.Config;
910
using Microsoft.Azure.WebJobs.Script.Description;
@@ -407,6 +408,38 @@ public async Task ShutdownChannelsIfExist_Succeeds()
407408
Assert.Null(initializedChannel);
408409
}
409410

411+
[Fact]
412+
public void ShutdownChannelsIfExist_Race_Succeeds()
413+
{
414+
var channel = CreateTestChannel(RpcWorkerConstants.JavaLanguageWorkerName);
415+
string id = channel.Id;
416+
417+
List<Task<bool>> tasks = new();
418+
List<Thread> threads = new();
419+
for (int i = 0; i < 2; i++)
420+
{
421+
Thread t = new(static (state) =>
422+
{
423+
var (channelManager, tasks, id) = ((WebHostRpcWorkerChannelManager, List<Task<bool>>, string))state;
424+
tasks.Add(channelManager.ShutdownChannelIfExistsAsync(RpcWorkerConstants.JavaLanguageWorkerName, id));
425+
});
426+
threads.Add(t);
427+
}
428+
429+
foreach (Thread t in threads)
430+
{
431+
t.Start((_rpcWorkerChannelManager, tasks, id));
432+
}
433+
434+
foreach (Thread t in threads)
435+
{
436+
t.Join();
437+
}
438+
439+
// only one should successfully shut down
440+
Assert.Single(tasks, t => t.Result == true);
441+
}
442+
410443
[Fact]
411444
public async Task ShutdownChannelsIfExistsAsync_StopsWorkerInvocations()
412445
{

0 commit comments

Comments
 (0)