Skip to content

Commit 7617df9

Browse files
author
Connor McMahon
authored
Fix Local RPC race condition on app startup (#1719)
* Add test * Fix broken test * Finally have broken test * Fix build warnings * Fix implementation * Add release notes * Fix broken test * Respond to PR feedback
1 parent e17afcc commit 7617df9

File tree

5 files changed

+84
-51
lines changed

5 files changed

+84
-51
lines changed

release_notes.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,6 @@
33
- Improved concurrency defaults for the App Service Consumption plan (https://github.com/Azure/azure-functions-durable-extension/pull/1706)
44

55
## Bug Fixes:
6-
- Properly used update management API URLs after a successful slot swap (#1716)
6+
- Properly used update management API URLs after a successful slot swap (#1716)
7+
- Fix race condition when multiple apps start with local RPC endpoints on the same VM in parallel. (#1719)
8+

src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,7 +1141,7 @@ internal bool TryGetRpcBaseUrl(out Uri rpcBaseUrl)
11411141
return true;
11421142
}
11431143

1144-
// The app owner explicitly disabled the local RPC endpoint.
1144+
// The app owner explicitly disabled the local RPC endpoint
11451145
rpcBaseUrl = null;
11461146
return false;
11471147
}
@@ -1151,14 +1151,14 @@ internal async Task StartLocalHttpServerAsync()
11511151
{
11521152
if (!this.localHttpListener.IsListening)
11531153
{
1154+
await this.localHttpListener.StartAsync();
1155+
11541156
this.traceHelper.ExtensionInformationalEvent(
11551157
this.durableTaskOptions.HubName,
11561158
instanceId: string.Empty,
11571159
functionName: string.Empty,
1158-
message: $"Opening local RPC endpoint: {this.localHttpListener.InternalRpcUri}",
1160+
message: $"Opened local RPC endpoint: {this.localHttpListener.InternalRpcUri}",
11591161
writeToUserLogs: true);
1160-
1161-
await this.localHttpListener.StartAsync();
11621162
}
11631163
}
11641164

src/WebJobs.Extensions.DurableTask/LocalHttpListener.cs

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ internal class LocalHttpListener : IDisposable
2222
{
2323
private const int DefaultPort = 17071;
2424

25-
private readonly IWebHost localWebHost;
2625
private readonly Func<HttpRequestMessage, Task<HttpResponseMessage>> handler;
2726
private readonly EndToEndTraceHelper traceHelper;
2827
private readonly DurableTaskOptions durableTaskOptions;
28+
private IWebHost localWebHost;
2929

3030
public LocalHttpListener(
3131
EndToEndTraceHelper traceHelper,
@@ -36,23 +36,12 @@ public LocalHttpListener(
3636
this.handler = handler ?? throw new ArgumentNullException(nameof(handler));
3737
this.durableTaskOptions = durableTaskOptions ?? throw new ArgumentNullException(nameof(durableTaskOptions));
3838

39-
#if !FUNCTIONS_V1
40-
this.InternalRpcUri = new Uri($"http://127.0.0.1:{this.GetAvailablePort()}/durabletask/");
41-
var listenUri = new Uri(this.InternalRpcUri.GetLeftPart(UriPartial.Authority));
42-
this.localWebHost = new WebHostBuilder()
43-
.UseKestrel()
44-
.UseUrls(listenUri.OriginalString)
45-
.Configure(a => a.Run(this.HandleRequestAsync))
46-
.Build();
47-
#else
48-
// Just use default port for internal Uri. No need to check for port availability since
49-
// we won't be listening to this endpoint.
50-
this.InternalRpcUri = new Uri($"http://127.0.0.1:{DefaultPort}/durabletask/");
39+
// Set to a non null value
40+
this.InternalRpcUri = new Uri($"http://uninitialized");
5141
this.localWebHost = new NoOpWebHost();
52-
#endif
5342
}
5443

55-
public Uri InternalRpcUri { get; }
44+
public Uri InternalRpcUri { get; private set; }
5645

5746
public bool IsListening { get; private set; }
5847

@@ -66,8 +55,44 @@ public async Task StartAsync()
6655
}
6756

6857
#if !FUNCTIONS_V1
69-
await this.localWebHost.StartAsync();
70-
this.IsListening = true;
58+
const int maxAttempts = 10;
59+
int numAttempts = 1;
60+
do
61+
{
62+
int availablePort = this.GetAvailablePort();
63+
try
64+
{
65+
this.InternalRpcUri = new Uri($"http://127.0.0.1:{availablePort}/durabletask/");
66+
var listenUri = new Uri(this.InternalRpcUri.GetLeftPart(UriPartial.Authority));
67+
this.localWebHost = new WebHostBuilder()
68+
.UseKestrel()
69+
.UseUrls(listenUri.OriginalString)
70+
.Configure(a => a.Run(this.HandleRequestAsync))
71+
.Build();
72+
73+
await this.localWebHost.StartAsync();
74+
this.IsListening = true;
75+
break;
76+
}
77+
catch (IOException)
78+
{
79+
this.traceHelper.ExtensionWarningEvent(
80+
this.durableTaskOptions.HubName,
81+
functionName: string.Empty,
82+
instanceId: string.Empty,
83+
message: $"Failed to open local socket {availablePort}. This was attempt #{numAttempts} to open a local port.");
84+
numAttempts++;
85+
var random = new Random();
86+
var millisecondsToWait = (int)Math.Round(random.NextDouble() * 1000);
87+
await Task.Delay(millisecondsToWait);
88+
}
89+
}
90+
while (numAttempts <= maxAttempts);
91+
92+
if (!this.IsListening)
93+
{
94+
throw new IOException($"Unable to find a port to open an RPC endpoint on after {maxAttempts} attempts");
95+
}
7196
#else
7297
// no-op: this is dummy code to make build warnings go away
7398
await Task.Yield();
@@ -158,7 +183,6 @@ private static async Task SetResponseAsync(HttpContext context, HttpResponseMess
158183
}
159184
}
160185

161-
#if FUNCTIONS_V1
162186
private class NoOpWebHost : IWebHost
163187
{
164188
public IFeatureCollection ServerFeatures => throw new NotImplementedException();
@@ -173,6 +197,5 @@ public void Start() { }
173197

174198
public Task StopAsync(CancellationToken cancellationToken = default(CancellationToken)) => Task.CompletedTask;
175199
}
176-
#endif
177200
}
178201
}

test/Common/DurableTaskEndToEndTests.cs

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4869,35 +4869,43 @@ public async Task CallActivity_Like_From_Azure_Portal()
48694869
}
48704870
}
48714871

4872-
[Fact]
4872+
[Theory]
48734873
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
4874-
public async Task MultipleHostsLocalRpcSameDevice()
4874+
[InlineData(true)]
4875+
[InlineData(false)]
4876+
public async Task MultipleHostsOnSameVM(bool enableLocalRpc)
48754877
{
4876-
ITestHost host1 = TestHelpers.GetJobHost(
4877-
this.loggerProvider,
4878-
nameof(this.MultipleHostsLocalRpcSameDevice) + "1",
4879-
false,
4880-
localRpcEndpointEnabled: true);
4881-
await host1.StartAsync();
4882-
ITestHost host2 = TestHelpers.GetJobHost(
4883-
this.loggerProvider,
4884-
nameof(this.MultipleHostsLocalRpcSameDevice) + "2",
4885-
false,
4886-
localRpcEndpointEnabled: true);
4887-
try
4888-
{
4889-
await host2.StartAsync();
4890-
}
4891-
catch (Exception)
4892-
{
4893-
Assert.True(false, "Could not start up two hosts on the same device in parallel");
4894-
}
4895-
finally
4878+
// This test wants to be sure there are no race conditions while starting up multiple hosts in parallel,
4879+
// so attempt various times to increase the likelihood of hitting a race condition if one exists.
4880+
int numAttempts = 5;
4881+
for (int attempt = 0; attempt < numAttempts; attempt++)
48964882
{
4897-
await host1.StopAsync();
4898-
host1.Dispose();
4899-
await host2.StopAsync();
4900-
host2.Dispose();
4883+
int numThreads = 10;
4884+
var hosts = new List<ITestHost>(numThreads);
4885+
4886+
try
4887+
{
4888+
Parallel.For(0, numThreads, new ParallelOptions() { MaxDegreeOfParallelism = numThreads }, (i) =>
4889+
hosts.Add(TestHelpers.GetJobHost(
4890+
this.loggerProvider,
4891+
nameof(this.MultipleHostsOnSameVM) + i,
4892+
false,
4893+
localRpcEndpointEnabled: enableLocalRpc)));
4894+
4895+
await Task.WhenAll(hosts.Select(host => host.StartAsync()));
4896+
}
4897+
catch (Exception)
4898+
{
4899+
Assert.True(false, "Could not start up two hosts on the same device in parallel");
4900+
}
4901+
finally
4902+
{
4903+
foreach (var host in hosts)
4904+
{
4905+
await host.StopAsync();
4906+
host.Dispose();
4907+
}
4908+
}
49014909
}
49024910
}
49034911

test/FunctionsV2/OutOfProcTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ public async Task TestLocalRcpEndpointRuntimeVersion(string runtimeVersion, bool
348348
// Validate if we opened local RPC endpoint by looking at log statements.
349349
var logger = this.loggerProvider.CreatedLoggers.Single(l => l.Category == TestHelpers.LogCategory);
350350
var logMessages = logger.LogMessages.ToList();
351-
bool enabledRpcEndpoint = logMessages.Any(msg => msg.Level == Microsoft.Extensions.Logging.LogLevel.Information && msg.FormattedMessage.StartsWith("Opening local RPC endpoint:"));
351+
bool enabledRpcEndpoint = logMessages.Any(msg => msg.Level == Microsoft.Extensions.Logging.LogLevel.Information && msg.FormattedMessage.StartsWith("Opened local RPC endpoint:"));
352352

353353
Assert.Equal(enabledExpected, enabledRpcEndpoint);
354354

0 commit comments

Comments
 (0)