Skip to content

Commit f2a4141

Browse files
authored
bugfix: ScheduledStartTime + start orch. actions (#3217)
* bugfix: ScheduledStartTime + start orch. actions
1 parent c056c6d commit f2a4141

File tree

6 files changed

+233
-57
lines changed

6 files changed

+233
-57
lines changed

src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,7 @@ internal static P.PurgeInstancesResponse CreatePurgeInstancesResponse(PurgeResul
573573
InstanceId = operationAction.StartNewOrchestration.InstanceId,
574574
Version = operationAction.StartNewOrchestration.Version,
575575
RequestTime = operationAction.StartNewOrchestration.RequestTime?.ToDateTimeOffset(),
576+
ScheduledStartTime = operationAction.StartNewOrchestration.ScheduledTime?.ToDateTime(),
576577
ParentTraceContext = operationAction.StartNewOrchestration.ParentTraceContext != null ?
577578
new DistributedTraceContext(
578579
operationAction.StartNewOrchestration.ParentTraceContext.TraceParent,
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using Microsoft.Azure.Functions.Worker;
5+
using Microsoft.DurableTask;
6+
using Microsoft.DurableTask.Entities;
7+
8+
using Microsoft.Azure.Functions.Worker.Http;
9+
using Microsoft.DurableTask.Client;
10+
using Microsoft.Extensions.Logging;
11+
12+
namespace Microsoft.Azure.Durable.Tests.E2E;
13+
14+
public class EntityCreatesScheduledOrchestration
15+
{
16+
[Function("EntityCreatesScheduledOrchestrationOrchestrator_HttpStart")]
17+
public static async Task<HttpResponseData> HttpStartScheduled(
18+
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
19+
[DurableClient] DurableTaskClient client,
20+
FunctionContext executionContext,
21+
int scheduledStartDelaySeconds)
22+
{
23+
ILogger logger = executionContext.GetLogger("EntityCreatesScheduledOrchestrationOrchestrator_HttpStart");
24+
25+
// Function input comes from the request content.
26+
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
27+
nameof(EntityCreatesScheduledOrchestrationOrchestrator), input: scheduledStartDelaySeconds);
28+
29+
logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);
30+
31+
// Returns an HTTP 202 response with an instance management payload.
32+
// See https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-http-api#start-orchestration
33+
return await client.CreateCheckStatusResponseAsync(req, instanceId);
34+
}
35+
36+
[Function(nameof(EntityCreatesScheduledOrchestrationOrchestrator))]
37+
public static async Task<string> EntityCreatesScheduledOrchestrationOrchestrator(
38+
[OrchestrationTrigger] TaskOrchestrationContext context)
39+
{
40+
var entityId = new EntityInstanceId(nameof(SubOrchestratorTriggerEntity), "singleton");
41+
var scheduledOrchestrationInstanceId = await context.Entities.CallEntityAsync<string>(entityId, nameof(SubOrchestratorTriggerEntity.Call), context.GetInput<int>());
42+
return scheduledOrchestrationInstanceId;
43+
}
44+
45+
[Function(nameof(ScheduledOrchestrationSubOrchestrator))]
46+
public static string ScheduledOrchestrationSubOrchestrator([OrchestrationTrigger] TaskOrchestrationContext context, string? _)
47+
{
48+
return "Success";
49+
}
50+
}
51+
52+
53+
public class SubOrchestratorTriggerEntity: TaskEntity<string>
54+
{
55+
public string Call(int delaySeconds)
56+
{
57+
var options = new StartOrchestrationOptions(null, DateTime.UtcNow.AddSeconds(delaySeconds));
58+
var instanceId = this.Context.ScheduleNewOrchestration(nameof(EntityCreatesScheduledOrchestration.ScheduledOrchestrationSubOrchestrator), null, options);
59+
return instanceId;
60+
}
61+
62+
protected override string InitializeState(TaskEntityOperation entityOperation)
63+
{
64+
return string.Empty;
65+
}
66+
67+
[Function(nameof(SubOrchestratorTriggerEntity))]
68+
public Task RunEntityAsync([EntityTrigger] TaskEntityDispatcher dispatcher)
69+
{
70+
return dispatcher.DispatchAsync(this);
71+
}
72+
}

test/e2e/Tests/Fixtures/FunctionAppFixture.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ public class FunctionAppFixture : IAsyncLifetime
1515
internal FunctionAppProcess functionAppProcess;
1616
internal ITestLanguageLocalizer functionLanguageLocalizer;
1717

18+
internal enum ConfiguredDurabilityProviderType
19+
{
20+
AzureStorage,
21+
MSSQL,
22+
AzureManaged
23+
}
24+
1825
public FunctionAppFixture(IMessageSink messageSink)
1926
{
2027
ILoggerFactory loggerFactory = new LoggerFactory();
@@ -50,6 +57,23 @@ public FunctionAppFixture(IMessageSink messageSink)
5057
this.functionAppProcess = new FunctionAppProcess(this.logger, this.TestLogs, this.functionLanguageLocalizer.GetLanguageType());
5158
}
5259

60+
internal ConfiguredDurabilityProviderType GetDurabilityProvider()
61+
{
62+
string? e2eTestDurableBackendEnvVarValue = Environment.GetEnvironmentVariable("E2E_TEST_DURABLE_BACKEND");
63+
switch (e2eTestDurableBackendEnvVarValue)
64+
{
65+
case "mssql":
66+
return ConfiguredDurabilityProviderType.MSSQL;
67+
case "azuremanaged":
68+
return ConfiguredDurabilityProviderType.AzureManaged;
69+
case "azurestorage":
70+
return ConfiguredDurabilityProviderType.AzureStorage;
71+
default:
72+
this.logger.LogWarning("Environment variable E2E_TEST_DURABLE_BACKEND not set, test code will assume Azure Storage backend");
73+
return ConfiguredDurabilityProviderType.AzureStorage;
74+
}
75+
}
76+
5377
public Task InitializeAsync()
5478
{
5579
return this.functionAppProcess.InitializeAsync();

test/e2e/Tests/Helpers/DurableHelpers.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,24 @@ internal class DurableHelpers
1818

1919
internal class OrchestrationStatusDetails
2020
{
21+
public string InstanceId { get; set; } = string.Empty;
2122
public string RuntimeStatus { get; set; } = string.Empty;
2223
public string Input { get; set; } = string.Empty;
2324
public string Output { get; set; } = string.Empty;
2425
public DateTime CreatedTime { get; set; }
2526
public DateTime LastUpdatedTime { get; set; }
2627
public OrchestrationStatusDetails(string statusQueryResponse)
2728
{
29+
if (string.IsNullOrEmpty(statusQueryResponse))
30+
{
31+
return;
32+
}
2833
JsonNode? statusQueryJsonNode = JsonNode.Parse(statusQueryResponse);
2934
if (statusQueryJsonNode == null)
3035
{
3136
return;
3237
}
38+
this.InstanceId = statusQueryJsonNode["instanceId"]?.GetValue<string>() ?? string.Empty;
3339
this.RuntimeStatus = statusQueryJsonNode["runtimeStatus"]?.GetValue<string>() ?? string.Empty;
3440
this.Input = statusQueryJsonNode["input"]?.ToString() ?? string.Empty;
3541
this.Output = statusQueryJsonNode["output"]?.ToString() ?? string.Empty;

test/e2e/Tests/Tests/HelloCitiesTest.cs

Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,6 @@ public HttpEndToEndTests(FunctionAppFixture fixture, ITestOutputHelper testOutpu
2020
this.output = testOutputHelper;
2121
}
2222

23-
// Due to some kind of asynchronous race condition in XUnit, when running these tests in pipelines,
24-
// the output may be disposed before the message is written. Just ignore these types of errors for now.
25-
private void WriteOutput(string message)
26-
{
27-
try
28-
{
29-
this.output.WriteLine(message);
30-
}
31-
catch
32-
{
33-
// Ignore
34-
}
35-
}
36-
3723
[Theory]
3824
[InlineData("HelloCities", HttpStatusCode.Accepted, "Hello Tokyo!")]
3925
public async Task HttpTriggerTests(string orchestrationName, HttpStatusCode expectedStatusCode, string partialExpectedOutput)
@@ -48,47 +34,4 @@ public async Task HttpTriggerTests(string orchestrationName, HttpStatusCode expe
4834
var orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
4935
Assert.Contains(partialExpectedOutput, orchestrationDetails.Output);
5036
}
51-
52-
[Theory]
53-
[InlineData("HelloCities_HttpStart_Scheduled", 5, HttpStatusCode.Accepted)]
54-
[InlineData("HelloCities_HttpStart_Scheduled", -5, HttpStatusCode.Accepted)]
55-
[Trait("PowerShell", "Skip")] // Scheduled orchestrations not implemented in PowerShell
56-
public async Task ScheduledStartTests(string functionName, int startDelaySeconds, HttpStatusCode expectedStatusCode)
57-
{
58-
var testStartTime = DateTime.UtcNow;
59-
var scheduledStartTime = testStartTime + TimeSpan.FromSeconds(startDelaySeconds);
60-
string urlQueryString = $"?ScheduledStartTime={scheduledStartTime.ToString("o")}";
61-
62-
using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger(functionName, urlQueryString);
63-
64-
string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response);
65-
66-
Assert.Equal(expectedStatusCode, response.StatusCode);
67-
68-
if (scheduledStartTime > DateTime.UtcNow + TimeSpan.FromSeconds(1))
69-
{
70-
if (this.fixture.functionLanguageLocalizer.GetLanguageType() == LanguageType.DotnetIsolated ||
71-
this.fixture.functionLanguageLocalizer.GetLanguageType() == LanguageType.Java)
72-
{
73-
await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Pending", 30);
74-
}
75-
else
76-
{
77-
// Scheduled orchestrations are not properly implemented in the other languages - however,
78-
// this test has been implemented using timers in the orchestration instead.
79-
await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Running", 30);
80-
}
81-
}
82-
83-
await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Completed", Math.Max(startDelaySeconds, 0) + 30);
84-
85-
// This +2s should not be necessary - however, experimentally the orchestration may run up to ~1 second before the scheduled time.
86-
// It is unclear currently whether this is a bug where orchestrations run early, or a clock difference/error,
87-
// but leaving this logic in for now until further investigation.
88-
Assert.True(DateTime.UtcNow + TimeSpan.FromSeconds(2) >= scheduledStartTime);
89-
90-
var finalOrchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
91-
WriteOutput($"Last updated at {finalOrchestrationDetails.LastUpdatedTime}, scheduled to complete at {scheduledStartTime}");
92-
Assert.True(finalOrchestrationDetails.LastUpdatedTime + TimeSpan.FromSeconds(2) >= scheduledStartTime);
93-
}
9437
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System.Net;
5+
using Xunit;
6+
using Xunit.Abstractions;
7+
8+
namespace Microsoft.Azure.Durable.Tests.DotnetIsolatedE2E;
9+
10+
[Collection(Constants.FunctionAppCollectionName)]
11+
public class ScheduledOrchestrationTests
12+
{
13+
private readonly FunctionAppFixture fixture;
14+
private readonly ITestOutputHelper output;
15+
16+
public ScheduledOrchestrationTests(FunctionAppFixture fixture, ITestOutputHelper testOutputHelper)
17+
{
18+
this.fixture = fixture;
19+
this.fixture.TestLogs.UseTestLogger(testOutputHelper);
20+
this.output = testOutputHelper;
21+
}
22+
23+
// Due to some kind of asynchronous race condition in XUnit, when running these tests in pipelines,
24+
// the output may be disposed before the message is written. Just ignore these types of errors for now.
25+
private void WriteOutput(string message)
26+
{
27+
try
28+
{
29+
this.output.WriteLine(message);
30+
}
31+
catch
32+
{
33+
// Ignore
34+
}
35+
}
36+
37+
[Theory]
38+
[InlineData("HelloCities_HttpStart_Scheduled", 10, HttpStatusCode.Accepted)]
39+
[InlineData("HelloCities_HttpStart_Scheduled", -5, HttpStatusCode.Accepted)]
40+
[Trait("PowerShell", "Skip")] // Scheduled orchestrations not implemented in PowerShell
41+
public async Task ScheduledStartTests(string functionName, int startDelaySeconds, HttpStatusCode expectedStatusCode)
42+
{
43+
var testStartTime = DateTime.UtcNow;
44+
var scheduledStartTime = testStartTime + TimeSpan.FromSeconds(startDelaySeconds);
45+
string urlQueryString = $"?ScheduledStartTime={scheduledStartTime.ToString("o")}";
46+
47+
using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger(functionName, urlQueryString);
48+
49+
string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response);
50+
51+
Assert.Equal(expectedStatusCode, response.StatusCode);
52+
53+
if (scheduledStartTime > DateTime.UtcNow + TimeSpan.FromSeconds(1))
54+
{
55+
if (this.fixture.functionLanguageLocalizer.GetLanguageType() == LanguageType.DotnetIsolated ||
56+
this.fixture.functionLanguageLocalizer.GetLanguageType() == LanguageType.Java)
57+
{
58+
// This line will throw if the orchestration goes to a terminal state before reaching "Pending",
59+
// ensuring that any scheduled orchestrations that run immediately fail the test.
60+
await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Pending", 30);
61+
}
62+
else
63+
{
64+
// Scheduled orchestrations are not properly implemented in the other languages - however,
65+
// this test has been implemented using timers in the orchestration instead.
66+
await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Running", 30);
67+
}
68+
}
69+
70+
await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Completed", Math.Max(startDelaySeconds, 0) + 30);
71+
72+
// This +2s should not be necessary - however, experimentally the orchestration may run up to ~1 second before the scheduled time.
73+
// It is unclear currently whether this is a bug where orchestrations run early, or a clock difference/error,
74+
// but leaving this logic in for now until further investigation.
75+
Assert.True(DateTime.UtcNow + TimeSpan.FromSeconds(2) >= scheduledStartTime);
76+
77+
var finalOrchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
78+
WriteOutput($"Last updated at {finalOrchestrationDetails.LastUpdatedTime}, scheduled to complete at {scheduledStartTime}");
79+
Assert.True(finalOrchestrationDetails.LastUpdatedTime + TimeSpan.FromSeconds(2) >= scheduledStartTime);
80+
}
81+
82+
[Theory]
83+
[InlineData("EntityCreatesScheduledOrchestrationOrchestrator_HttpStart", 10, HttpStatusCode.Accepted)]
84+
[InlineData("EntityCreatesScheduledOrchestrationOrchestrator_HttpStart", -5, HttpStatusCode.Accepted)]
85+
[Trait("PowerShell", "Skip")] // Durable Entities not yet implemented in PowerShell
86+
[Trait("Java", "Skip")] // Durable Entities not yet implemented in Java
87+
[Trait("Python", "Skip")] // Durable Entities do not support the "schedule new orchestration" action in Python
88+
[Trait("Node", "Skip")] // Durable Entities do not support the "schedule new orchestration" action in Node
89+
[Trait("MSSQL", "Skip")] // Durable Entities are not supported in MSSQL for out-of-proc (see https://github.com/microsoft/durabletask-mssql/issues/205)
90+
public async Task ScheduledStartFromEntitiesTests(string functionName, int startDelaySeconds, HttpStatusCode expectedStatusCode)
91+
{
92+
var testStartTime = DateTime.UtcNow;
93+
var scheduledStartTime = testStartTime + TimeSpan.FromSeconds(startDelaySeconds);
94+
string urlQueryString = $"?scheduledStartDelaySeconds={startDelaySeconds}";
95+
96+
using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger(functionName, urlQueryString);
97+
98+
string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response);
99+
100+
Assert.Equal(expectedStatusCode, response.StatusCode);
101+
await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Completed", Math.Max(startDelaySeconds, 0) + 30);
102+
var schedulerOrchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
103+
string subOrchestratorInstanceId = schedulerOrchestrationDetails.Output;
104+
105+
string subOrchestratorStatusQueryGetUri = statusQueryGetUri.ToLower().Replace(schedulerOrchestrationDetails.InstanceId.ToLower(), subOrchestratorInstanceId);
106+
107+
// Azure Storage backend has a quirk where creating an orchestration from an entity creates the OrchestrationStarted event in the History table
108+
// but doesn't initialize the orchestration state in the Instances table until the orchestration starts running. Since the implementation for
109+
// GetOrchestrationStateAsync in AzureStorage only queries the Instances table, this will 404 until the orchestration state becomes "running",
110+
// so we can't check for "Pending".
111+
// The checks below should still suffice to prove that the orchestration did not run until the scheduled time.
112+
if (scheduledStartTime > DateTime.UtcNow + TimeSpan.FromSeconds(1) && this.fixture.GetDurabilityProvider() != FunctionAppFixture.ConfiguredDurabilityProviderType.AzureStorage)
113+
{
114+
// This line will throw if the orchestration goes to a terminal state before reaching "Pending",
115+
// ensuring that any scheduled orchestrations that run immediately fail the test.
116+
await DurableHelpers.WaitForOrchestrationStateAsync(subOrchestratorStatusQueryGetUri, "Pending", 30);
117+
}
118+
119+
await DurableHelpers.WaitForOrchestrationStateAsync(subOrchestratorStatusQueryGetUri, "Completed", Math.Max(startDelaySeconds, 0) + 30);
120+
121+
// This +2s should not be necessary - however, experimentally the orchestration may run up to ~1 second before the scheduled time.
122+
// It is unclear currently whether this is a bug where orchestrations run early, or a clock difference/error,
123+
// but leaving this logic in for now until further investigation.
124+
Assert.True(DateTime.UtcNow + TimeSpan.FromSeconds(2) >= scheduledStartTime);
125+
126+
var subOrchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(subOrchestratorStatusQueryGetUri);
127+
Assert.True(subOrchestrationDetails.LastUpdatedTime + TimeSpan.FromSeconds(2) >= scheduledStartTime);
128+
Assert.Equal("Success", subOrchestrationDetails.Output);
129+
}
130+
}

0 commit comments

Comments
 (0)