Skip to content

Commit c2825fc

Browse files
pragnagopafabiocav
authored andcommitted
Block for environmentreload request to complete
1 parent 7ff14e1 commit c2825fc

File tree

8 files changed

+61
-37
lines changed

8 files changed

+61
-37
lines changed

src/WebJobs.Script/Eventing/Rpc/WorkerProcessReadyEvent.cs

Lines changed: 0 additions & 18 deletions
This file was deleted.

src/WebJobs.Script/Rpc/ILanguageWorkerChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public interface ILanguageWorkerChannel
2121

2222
void SendFunctionLoadRequests();
2323

24-
void SendFunctionEnvironmentReloadRequest();
24+
Task SendFunctionEnvironmentReloadRequest();
2525

2626
Task StartWorkerProcessAsync();
2727
}

src/WebJobs.Script/Rpc/LanguageWorkerChannel.cs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ internal class LanguageWorkerChannel : ILanguageWorkerChannel, IDisposable
5252
private Capabilities _workerCapabilities;
5353
private ILogger _workerChannelLogger;
5454
private ILanguageWorkerProcess _languageWorkerProcess;
55+
private TaskCompletionSource<bool> _reloadTask = new TaskCompletionSource<bool>();
5556

5657
internal LanguageWorkerChannel(
5758
string workerId,
@@ -139,15 +140,19 @@ internal void SendWorkerInitRequest(RpcEvent startEvent)
139140
});
140141
}
141142

142-
internal void PublishWorkerProcessReadyEvent(FunctionEnvironmentReloadResponse res)
143+
internal void FunctionEnvironmentReloadResponse(FunctionEnvironmentReloadResponse res)
143144
{
144-
if (_disposing)
145+
_workerChannelLogger.LogDebug("Received FunctionEnvironmentReloadResponse");
146+
if (_reloadTask.Task.IsCompleted)
145147
{
146-
// do not publish ready events when disposing
147-
return;
148+
throw new InvalidOperationException("FunctionEnvironmentReloadResponse received more than once");
148149
}
149-
WorkerProcessReadyEvent wpEvent = new WorkerProcessReadyEvent(_workerId, _runtime);
150-
_eventManager.Publish(wpEvent);
150+
if (res.Result.IsFailure(out Exception relaodEnvironmentVariablesException))
151+
{
152+
_workerChannelLogger.LogError(relaodEnvironmentVariablesException, "Failed to reload environment variables");
153+
_reloadTask.SetResult(false);
154+
}
155+
_reloadTask.SetResult(true);
151156
}
152157

153158
internal void PublishRpcChannelReadyEvent(RpcEvent initEvent)
@@ -206,10 +211,14 @@ public void SendFunctionLoadRequests()
206211
}
207212
}
208213

209-
public void SendFunctionEnvironmentReloadRequest()
214+
public Task SendFunctionEnvironmentReloadRequest()
210215
{
211-
_eventSubscriptions.Add(_inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.FunctionEnvironmentReloadResponse)
212-
.Subscribe((msg) => PublishWorkerProcessReadyEvent(msg.Message.FunctionEnvironmentReloadResponse)));
216+
_workerChannelLogger.LogDebug("Sending FunctionEnvironmentReloadRequest");
217+
_eventSubscriptions
218+
.Add(_inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.FunctionEnvironmentReloadResponse)
219+
.Timeout(workerInitTimeout)
220+
.Take(1)
221+
.Subscribe((msg) => FunctionEnvironmentReloadResponse(msg.Message.FunctionEnvironmentReloadResponse)));
213222

214223
IDictionary processEnv = Environment.GetEnvironmentVariables();
215224

@@ -223,6 +232,7 @@ public void SendFunctionEnvironmentReloadRequest()
223232
{
224233
FunctionEnvironmentReloadRequest = request
225234
});
235+
return _reloadTask.Task;
226236
}
227237

228238
internal void SendFunctionLoadRequest(FunctionMetadata metadata)

src/WebJobs.Script/Rpc/WebHostLanguageWorkerChannelManager.cs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,7 @@ public async Task SpecializeAsync()
9494
if (_workerRuntime != null && languageWorkerChannel != null)
9595
{
9696
_logger.LogInformation("Loading environment variables for runtime: {runtime}", _workerRuntime);
97-
IObservable<WorkerProcessReadyEvent> processReadyEvents = _eventManager.OfType<WorkerProcessReadyEvent>()
98-
.Where(msg => string.Equals(msg.Language, _workerRuntime, StringComparison.OrdinalIgnoreCase))
99-
.Timeout(workerInitTimeout);
100-
languageWorkerChannel.SendFunctionEnvironmentReloadRequest();
101-
102-
// Wait for response from language worker process
103-
await processReadyEvents.FirstAsync();
97+
await languageWorkerChannel.SendFunctionEnvironmentReloadRequest();
10498
}
10599
_shutdownStandbyWorkerChannels();
106100
}

test/WebJobs.Script.Tests.Integration/Host/StandbyManager/StandbyManagerE2ETests_Windows.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ await TestHelpers.Await(() =>
8282
Assert.NotSame(GetCachedTimeZoneInfo(), _originalTimeZoneInfoCache);
8383
}
8484

85-
[Fact(Skip = "https://github.com/Azure/azure-functions-host/issues/4230")]
85+
[Fact]
8686
public async Task StandbyModeE2E_Java()
8787
{
8888
_settings.Add(EnvironmentSettingNames.AzureWebsiteInstanceId, Guid.NewGuid().ToString());

test/WebJobs.Script.Tests/Rpc/LanguageWorkerChannelTests.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,20 @@ public void FunctionLoadRequest_IsExpected()
150150
Assert.True(proxyFunctionLoadRequest.Metadata.IsProxy);
151151
}
152152

153+
[Fact]
154+
public void Multiple_FunctionEnvironmentReloadResponse_Throws()
155+
{
156+
_workerChannel.SendFunctionEnvironmentReloadRequest();
157+
_testFunctionRpcService.PublishFunctionEnvironmentReloadResponseEvent();
158+
159+
var traces = _logger.GetLogMessages();
160+
161+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Received FunctionEnvironmentReloadResponse")));
162+
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, "Sending FunctionEnvironmentReloadRequest")));
163+
var ex = Assert.Throws<InvalidOperationException>(() => _workerChannel.FunctionEnvironmentReloadResponse(TestFunctionRpcService.GetTestFunctionEnvReloadResponse()));
164+
Assert.Contains("FunctionEnvironmentReloadResponse received more than once", ex.Message);
165+
}
166+
153167
private IEnumerable<FunctionMetadata> GetTestFunctionsList(string runtime)
154168
{
155169
return new List<FunctionMetadata>()

test/WebJobs.Script.Tests/Rpc/TestFunctionRpcService.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,29 @@ public void PublishFunctionLoadResponseEvent(string functionId)
4949
_eventManager.Publish(new InboundEvent(_workerId, responseMessage));
5050
}
5151

52+
public void PublishFunctionEnvironmentReloadResponseEvent()
53+
{
54+
FunctionEnvironmentReloadResponse relaodEnvResponse = GetTestFunctionEnvReloadResponse();
55+
StreamingMessage responseMessage = new StreamingMessage()
56+
{
57+
FunctionEnvironmentReloadResponse = relaodEnvResponse
58+
};
59+
_eventManager.Publish(new InboundEvent(_workerId, responseMessage));
60+
}
61+
62+
public static FunctionEnvironmentReloadResponse GetTestFunctionEnvReloadResponse()
63+
{
64+
StatusResult statusResult = new StatusResult()
65+
{
66+
Status = StatusResult.Types.Status.Success
67+
};
68+
FunctionEnvironmentReloadResponse relaodEnvResponse = new FunctionEnvironmentReloadResponse()
69+
{
70+
Result = statusResult
71+
};
72+
return relaodEnvResponse;
73+
}
74+
5275
public void PublishInvocationResponseEvent()
5376
{
5477
StatusResult statusResult = new StatusResult()

test/WebJobs.Script.Tests/Rpc/TestLanguageWorkerChannel.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,10 @@ public void SendFunctionLoadRequests()
5050
_testLogger.LogInformation("RegisterFunctions called");
5151
}
5252

53-
public void SendFunctionEnvironmentReloadRequest()
53+
public Task SendFunctionEnvironmentReloadRequest()
5454
{
5555
_testLogger.LogInformation("SendFunctionEnvironmentReloadRequest called");
56+
return Task.CompletedTask;
5657
}
5758

5859
public void SendInvocationRequest(ScriptInvocationContext context)

0 commit comments

Comments
 (0)