diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index c6ca54d4c..22e0e80da 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -797,7 +797,6 @@ async Task LockNextTaskOrchestrationWorkItemAsync(boo session.RuntimeState, orchestrationWorkItem.NewMessages, settings.AllowReplayingTerminalInstances, - session.TrackingStoreContext, cancellationToken); if (!string.IsNullOrEmpty(warningMessage)) { @@ -1059,7 +1058,6 @@ async Task IsExecutableInstanceAsync( OrchestrationRuntimeState runtimeState, IList newMessages, bool allowReplayingTerminalInstances, - object trackingStoreContext, CancellationToken cancellationToken) { if (runtimeState.ExecutionStartedEvent == null && !newMessages.Any(msg => msg.Event is ExecutionStartedEvent)) @@ -1078,8 +1076,7 @@ async Task IsExecutableInstanceAsync( var executionTerminatedEvent = (ExecutionTerminatedEvent)executionTerminatedEventMessage.Event; await this.trackingStore.UpdateStatusForTerminationAsync( instanceId, - executionTerminatedEvent.Input, - executionTerminatedEvent.Timestamp); + executionTerminatedEvent); return $"Instance is {OrchestrationStatus.Terminated}"; } @@ -1103,11 +1100,11 @@ await this.trackingStore.UpdateStatusForTerminationAsync( if (instanceStatus == null || (instanceStatus.State.OrchestrationInstance.ExecutionId == runtimeState.OrchestrationInstance.ExecutionId && instanceStatus.State.OrchestrationStatus != runtimeState.OrchestrationStatus)) { - await this.trackingStore.UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync( + await this.trackingStore.UpdateInstanceStatusForCompletedOrchestrationAsync( runtimeState.OrchestrationInstance.InstanceId, runtimeState.OrchestrationInstance.ExecutionId, runtimeState, - trackingStoreContext, + instanceStatus is not null, cancellationToken); } if (!allowReplayingTerminalInstances) diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 2f2dca83b..c6f63838a 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -798,21 +798,25 @@ public override async Task UpdateStatusForRewindAsync(string instanceId, Cancell /// public override async Task UpdateStatusForTerminationAsync( string instanceId, - string output, - DateTime lastUpdatedTime, + ExecutionTerminatedEvent executionTerminatedEvent, CancellationToken cancellationToken = default) { string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId); - TableEntity entity = new TableEntity(sanitizedInstanceId, "") + TableEntity instanceEntity = new TableEntity(sanitizedInstanceId, "") { ["RuntimeStatus"] = OrchestrationStatus.Terminated.ToString("G"), - ["LastUpdatedTime"] = lastUpdatedTime, + ["LastUpdatedTime"] = executionTerminatedEvent.Timestamp, ["CompletedTime"] = DateTime.UtcNow, - [OutputProperty] = output + // In the case of terminating an orchestration, the termination reason becomes the orchestration's output. + [OutputProperty] = executionTerminatedEvent.Input, }; + // Setting addBlobPropertyName to false ensures that the blob URL is saved as the "Output" of the instance entity, which is the expected behavior + // for large orchestration outputs. + await this.CompressLargeMessageAsync(instanceEntity, listOfBlobs: null, cancellationToken: cancellationToken, addBlobPropertyName: false); + Stopwatch stopwatch = Stopwatch.StartNew(); - await this.InstancesTable.MergeEntityAsync(entity, ETag.All, cancellationToken); + await this.InstancesTable.MergeEntityAsync(instanceEntity, ETag.All, cancellationToken); this.settings.Logger.InstanceStatusUpdate( this.storageAccountName, @@ -864,6 +868,7 @@ public override Task StartAsync(CancellationToken cancellationToken = default) ["CustomStatus"] = newRuntimeState.Status ?? "null", ["ExecutionId"] = executionId, ["LastUpdatedTime"] = newEvents.Last().Timestamp, + ["TaskHubName"] = this.settings.TaskHubName, }; // check if we are replacing a previous execution with blobs; those will be deleted from the store after the update. This could occur in a ContinueAsNew scenario @@ -910,6 +915,8 @@ public override Task StartAsync(CancellationToken cancellationToken = default) instanceEntity["Version"] = executionStartedEvent.Version; instanceEntity["CreatedTime"] = executionStartedEvent.Timestamp; instanceEntity["RuntimeStatus"] = OrchestrationStatus.Running.ToString(); + instanceEntity["Tags"] = TagsSerializer.Serialize(executionStartedEvent.Tags); + instanceEntity["Generation"] = executionStartedEvent.Generation; if (executionStartedEvent.ScheduledStartTime.HasValue) { instanceEntity["ScheduledStartTime"] = executionStartedEvent.ScheduledStartTime; @@ -1048,11 +1055,11 @@ public override Task StartAsync(CancellationToken cancellationToken = default) return eTagValue; } - public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync( + public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync( string instanceId, string executionId, OrchestrationRuntimeState runtimeState, - object trackingStoreContext, + bool instanceEntityExists, CancellationToken cancellationToken = default) { if (runtimeState.OrchestrationStatus != OrchestrationStatus.Completed && @@ -1063,28 +1070,90 @@ public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForComplete return; } - TrackingStoreContext context = (TrackingStoreContext)trackingStoreContext; - if (context.Blobs.Count > 0) - { - var tasks = new List(context.Blobs.Count); - foreach (string blobName in context.Blobs) - { - tasks.Add(this.messageManager.DeleteBlobAsync(blobName)); - } - await Task.WhenAll(tasks); - } - string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId); + ExecutionStartedEvent executionStartedEvent = runtimeState.ExecutionStartedEvent; + + // We need to set all of the fields of the instance entity in the case that it was never created for the orchestration. + // This can be the case for a suborchestration that completed in one execution, for example. var instanceEntity = new TableEntity(sanitizedInstanceId, string.Empty) { + ["Name"] = runtimeState.Name, + ["Version"] = runtimeState.Version, + ["CreatedTime"] = executionStartedEvent.Timestamp, // TODO: Translating null to "null" is a temporary workaround. We should prioritize // https://github.com/Azure/durabletask/issues/477 so that this is no longer necessary. ["CustomStatus"] = runtimeState.Status ?? "null", ["ExecutionId"] = executionId, ["LastUpdatedTime"] = runtimeState.Events.Last().Timestamp, ["RuntimeStatus"] = runtimeState.OrchestrationStatus.ToString(), - ["CompletedTime"] = runtimeState.CompletedTime + ["CompletedTime"] = runtimeState.CompletedTime, + ["Tags"] = TagsSerializer.Serialize(executionStartedEvent.Tags), + ["TaskHubName"] = this.settings.TaskHubName, }; + if (runtimeState.ExecutionStartedEvent.ScheduledStartTime.HasValue) + { + instanceEntity["ScheduledStartTime"] = executionStartedEvent.ScheduledStartTime; + } + + static TableEntity GetSingleEntityFromHistoryTableResults(IReadOnlyList entities, string dataType) + { + try + { + TableEntity singleEntity = entities.SingleOrDefault(); + + return singleEntity ?? throw new DurableTaskStorageException($"The history table query to determine the blob storage URL " + + $"for the large orchestration {dataType} returned no rows. Unable to extract the URL from these results."); + } + catch (InvalidOperationException) + { + throw new DurableTaskStorageException($"The history table query to determine the blob storage URL for the large orchestration " + + $"{dataType} returned more than one row, when exactly one row is expected. " + + $"Unable to extract the URL from these results."); + } + } + + // Set the output. + // In the case that the output is too large and is stored in blob storage, extract the blob name from the ExecutionCompleted history entity. + if (this.ExceedsMaxTablePropertySize(runtimeState.Output)) + { + string filter = $"{nameof(ITableEntity.PartitionKey)} eq '{KeySanitation.EscapePartitionKey(instanceId)}'" + + $" and {nameof(OrchestrationInstance.ExecutionId)} eq '{executionId}'" + + $" and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.ExecutionCompleted)}'"; + TableEntity executionCompletedEntity = GetSingleEntityFromHistoryTableResults(await this.QueryHistoryAsync(filter, instanceId, cancellationToken), "output"); + this.SetInstancesTablePropertyFromHistoryProperty( + executionCompletedEntity, + instanceEntity, + historyPropertyName: nameof(runtimeState.ExecutionCompletedEvent.Result), + instancePropertyName: OutputProperty, + data: runtimeState.Output); + } + else + { + instanceEntity[OutputProperty] = runtimeState.Output; + } + + // If the input has not been set by a previous execution, set the input. + if (!instanceEntityExists) + { + // In the case that the input is too large and is stored in blob storage, extract the blob name from the ExecutionStarted history entity. + if (this.ExceedsMaxTablePropertySize(runtimeState.Input)) + { + string filter = $"{nameof(ITableEntity.PartitionKey)} eq '{KeySanitation.EscapePartitionKey(instanceId)}'" + + $" and {nameof(OrchestrationInstance.ExecutionId)} eq '{executionId}'" + + $" and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.ExecutionStarted)}'"; + TableEntity executionStartedEntity = GetSingleEntityFromHistoryTableResults(await this.QueryHistoryAsync(filter, instanceId, cancellationToken), "input"); + this.SetInstancesTablePropertyFromHistoryProperty( + executionStartedEntity, + instanceEntity, + historyPropertyName: nameof(executionStartedEvent.Input), + instancePropertyName: InputProperty, + data: executionStartedEvent.Input); + } + else + { + instanceEntity[InputProperty] = runtimeState.Input; + } + } Stopwatch orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew(); await this.InstancesTable.InsertOrMergeEntityAsync(instanceEntity); @@ -1161,7 +1230,7 @@ void SetInstancesTablePropertyFromHistoryProperty( } } - async Task CompressLargeMessageAsync(TableEntity entity, List listOfBlobs, CancellationToken cancellationToken) + async Task CompressLargeMessageAsync(TableEntity entity, List listOfBlobs, CancellationToken cancellationToken, bool addBlobPropertyName = true) { foreach (string propertyName in VariableSizeEntityProperties) { @@ -1176,9 +1245,16 @@ property is string stringProperty && // Clear out the original property value and create a new "*BlobName"-suffixed property. // The runtime will look for the new "*BlobName"-suffixed column to know if a property is stored in a blob. - string blobPropertyName = GetBlobPropertyName(propertyName); - entity.Add(blobPropertyName, blobName); - entity[propertyName] = string.Empty; + if (addBlobPropertyName) + { + string blobPropertyName = GetBlobPropertyName(propertyName); + entity.Add(blobPropertyName, blobName); + entity[propertyName] = string.Empty; + } + else + { + entity[propertyName] = this.messageManager.GetBlobUrl(blobName); + } // if necessary, keep track of all the blobs associated with this execution listOfBlobs?.Add(blobName); @@ -1226,6 +1302,12 @@ static string GetBlobName(TableEntity entity, string property) // EventType. Use a hardcoded value to record the orchestration input. eventType = "Input"; } + else if (property == "Output") + { + // This message is used to terminate an orchestration with no history, so it does not have a + // corresponding EventType. Use a hardcoded value to record the orchestration output. + eventType = "Output"; + } else if (property == "Tags") { eventType = "Tags"; diff --git a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs index c0ce27831..4a1b91fae 100644 --- a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs @@ -105,16 +105,15 @@ interface ITrackingStore /// /// Updates the instance status of the specified orchestration instance to match that of for a completed orchestration. - /// Also deletes any orphaned blobs of . /// This method is meant to be called in the case that there is an inconsistency between the instance and history table due to a failure during a call to /// for a completing orchestration. If the orchestration is not in a terminal state, the method will immediately return and do nothing. /// /// The ID of the orchestration. /// The execution ID of the orchestration. /// The runtime state of the orchestration. - /// Additional context for the execution that is maintained by the tracking store. + /// Whether the instance entity already exists in the instance store. /// The token to monitor for cancellation requests. The default value is . - Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default); + Task UpdateInstanceStatusForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, bool instanceEntityExists, CancellationToken cancellationToken = default); /// /// Get The Orchestration State for querying all orchestration instances @@ -166,13 +165,12 @@ interface ITrackingStore Task UpdateStatusForRewindAsync(string instanceId, CancellationToken cancellationToken = default); /// - /// Used to update the instance status to "Terminated" whend a pending orchestration is terminated. + /// Used to update the instance status to "Terminated" when a pending orchestration is terminated. /// /// The instance being terminated - /// The output of the orchestration - /// The last updated time of the orchestration (the time the termination request was created) + /// The termination history event. /// The token to monitor for cancellation requests. The default value is . - Task UpdateStatusForTerminationAsync(string instanceId, string output, DateTime lastUpdatedTime, CancellationToken cancellationToken = default); + Task UpdateStatusForTerminationAsync(string instanceId, ExecutionTerminatedEvent executionTerminatedEvent, CancellationToken cancellationToken = default); /// /// Purge The History and state which is older than thresholdDateTimeUtc based on the timestamp type specified by timeRangeFilterType diff --git a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs index 1e70a98ee..ca3da07bd 100644 --- a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs @@ -179,24 +179,23 @@ await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[] public override async Task UpdateStatusForTerminationAsync( string instanceId, - string output, - DateTime lastUpdatedTime, + ExecutionTerminatedEvent executionTerminatedEvent, CancellationToken cancellationToken = default) { // Get the most recent execution and update its status to terminated IEnumerable instanceEntity = await this.instanceStore.GetOrchestrationStateAsync(instanceId, allInstances: false); instanceEntity.Single().State.OrchestrationStatus = OrchestrationStatus.Terminated; - instanceEntity.Single().State.LastUpdatedTime = lastUpdatedTime; + instanceEntity.Single().State.LastUpdatedTime = executionTerminatedEvent.Timestamp; instanceEntity.Single().State.CompletedTime = DateTime.UtcNow; - instanceEntity.Single().State.Output = output; + instanceEntity.Single().State.Output = executionTerminatedEvent.Input; await this.instanceStore.WriteEntitiesAsync(instanceEntity); } - public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync( + public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync( string instanceId, string executionId, OrchestrationRuntimeState runtimeState, - object trackingStoreContext, + bool instanceEntityExists, CancellationToken cancellationToken = default) { if (runtimeState.OrchestrationStatus != OrchestrationStatus.Completed && @@ -207,7 +206,6 @@ public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForComplete return; } - // No blobs to delete for this tracking store implementation await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[] { new OrchestrationStateInstanceEntity() diff --git a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs index e6f55fc59..7879537e4 100644 --- a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs +++ b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs @@ -101,7 +101,7 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId, CancellationTo } /// - public abstract Task UpdateStatusForTerminationAsync(string instanceId, string output, DateTime lastUpdatedTime, CancellationToken cancellationToken = default); + public abstract Task UpdateStatusForTerminationAsync(string instanceId, ExecutionTerminatedEvent executionTerminatedEvent, CancellationToken cancellationToken = default); /// public abstract Task StartAsync(CancellationToken cancellationToken = default); @@ -110,6 +110,6 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId, CancellationTo public abstract Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, ETag? eTag, object executionData, CancellationToken cancellationToken = default); /// - public abstract Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default); + public abstract Task UpdateInstanceStatusForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, bool instanceEntityExists, CancellationToken cancellationToken = default); } } diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index c7fc9944b..502e5fab6 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -884,7 +884,7 @@ public async Task TerminateOrchestration(bool enableExtendedSessions) var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Counter), 0); // Need to wait for the instance to start before we can terminate it. - // TODO: This requirement may not be ideal and should be revisited. + // TerminatePendingOrchestration tests terminating a pending orchestration. await client.WaitForStartupAsync(TimeSpan.FromSeconds(10)); await client.TerminateAsync("sayōnara"); @@ -966,12 +966,15 @@ public async Task TerminateSuspendedOrchestration(bool enableExtendedSessions) } /// - /// Test that a pending orchestration can be terminated. + /// Test that a pending orchestration can be terminated (including tests with a large termination reason that will need to be + /// stored in blob storage). /// [DataTestMethod] - [DataRow(true)] - [DataRow(false)] - public async Task TerminatePendingOrchestration(bool enableExtendedSessions) + [DataRow(true, true)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(false, false)] + public async Task TerminatePendingOrchestration(bool enableExtendedSessions, bool largeTerminationReason) { using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions)) { @@ -980,20 +983,27 @@ public async Task TerminatePendingOrchestration(bool enableExtendedSessions) var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Counter), 0, startAt: DateTime.UtcNow.AddMinutes(1)); await client.WaitForStatusChange(TimeSpan.FromSeconds(5), OrchestrationStatus.Pending); - await client.TerminateAsync("terminate"); + string message = largeTerminationReason ? this.GenerateMediumRandomStringPayload().ToString() : "terminate"; + await client.TerminateAsync(message); var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); + if (largeTerminationReason) + { + int blobCount = await this.GetBlobCount("test-largemessages", client.InstanceId); + Assert.IsTrue(blobCount > 0); + } + // Confirm the pending orchestration was terminated. Assert.AreEqual(OrchestrationStatus.Terminated, status?.OrchestrationStatus); - Assert.AreEqual("terminate", status?.Output); + Assert.AreEqual(message, status?.Output); // Now sleep for a minute and confirm that the orchestration does not start after its scheduled time. Thread.Sleep(TimeSpan.FromMinutes(1)); status = await client.GetStatusAsync(); Assert.AreEqual(OrchestrationStatus.Terminated, status?.OrchestrationStatus); - Assert.AreEqual("terminate", status?.Output); + Assert.AreEqual(message, status?.Output); await host.StopAsync(); } @@ -2495,16 +2505,18 @@ public async Task TestAllowReplayingTerminalInstances(bool enableExtendedSession [DataRow(false, true)] [DataRow(true, false)] [DataRow(false, false)] - public async Task TestWorkerFailingDuringCompleteWorkItemCall(bool enableExtendedSessions, bool terminate) + public async Task TestWorkerFailingDuringCompleteWorkItemCallCompletedOrchestration(bool enableExtendedSessions, bool terminate) { using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions)) { await host.StartAsync(); // Run simple orchestrator to completion, this will help us obtain a valid terminal history for the orchestrator - var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), "hello!"); + string input = "hello!"; + var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), input, tags: new Dictionary { { "key", "value" } }); var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + string executionId = status.OrchestrationInstance.ExecutionId; // Simulate having an "out of date" Instance table, by setting it's runtime status to "Running". // This simulates the scenario where the History table was updated, but not the Instance table. @@ -2516,7 +2528,195 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCall(bool enableExtende Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); TableEntity entity = new TableEntity(instanceId, "") { - ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G") + ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G"), + ["Output"] = "null", + }; + await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); + + // Assert that the status in the Instance table reads "Running" + IList state = await client.GetStateAsync(instanceId); + OrchestrationStatus forcedStatus = state.First().OrchestrationStatus; + Assert.AreEqual(OrchestrationStatus.Running, forcedStatus); + + // The type of event sent should not matter - the event itself should be discarded, and the instance table updated + // to reflect the status in the history table. + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Completed, status.OrchestrationStatus); + Assert.AreEqual(input, JToken.Parse(status.Output).ToString()); + Assert.AreEqual(input, JToken.Parse(status.Input).ToString()); + + // Now simulate there being no instance entity (which can be the case for suborchestrations that complete in one execution), and try again + await instanceTable.DeleteEntityAsync(entity, Azure.ETag.All); + + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Completed, status.OrchestrationStatus); + Assert.AreEqual(input, JToken.Parse(status.Output).ToString()); + Assert.AreEqual(input, JToken.Parse(status.Input).ToString()); + Assert.IsTrue(status.Name.Contains(nameof(Orchestrations.Echo))); + Assert.IsTrue(status.Tags.Contains(new KeyValuePair("key", "value"))); + Assert.AreEqual(executionId, status.OrchestrationInstance.ExecutionId); + + await host.StopAsync(); + } + } + + /// + /// Same as but for a failed orchestration. + /// + /// + [DataTestMethod] + [DataRow(true, true)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(false, false)] + public async Task TestWorkerFailingDuringCompleteWorkItemCallFailedOrchestration(bool enableExtendedSessions, bool terminate) + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions)) + { + await host.StartAsync(); + + string failureReason = "Failure!"; + var client = await host.StartOrchestrationAsync( + typeof(Orchestrations.ThrowException), + input: failureReason); + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); + Assert.AreEqual(OrchestrationStatus.Failed, status?.OrchestrationStatus); + string executionId = status.OrchestrationInstance.ExecutionId; + + // Simulate having an "out of date" Instance table, by setting it's runtime status to "Running". + // This simulates the scenario where the History table was updated, but not the Instance table. + var instanceId = client.InstanceId; + AzureStorageOrchestrationServiceSettings settings = TestHelpers.GetTestAzureStorageOrchestrationServiceSettings( + enableExtendedSessions); + AzureStorageClient azureStorageClient = new AzureStorageClient(settings); + + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + + TableEntity entity = new TableEntity(instanceId, "") + { + ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G"), + ["Output"] = "null", + }; + await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); + + // Assert that the status in the Instance table reads "Running" + IList state = await client.GetStateAsync(instanceId); + OrchestrationStatus forcedStatus = state.First().OrchestrationStatus; + Assert.AreEqual(OrchestrationStatus.Running, forcedStatus); + + // The type of event sent should not matter - the event itself should be discarded, and the instance table updated + // to reflect the status in the history table. + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Failed, status.OrchestrationStatus); + Assert.AreEqual(failureReason, status.Output); + Assert.AreEqual(failureReason, JToken.Parse(status.Input).ToString()); + + // Now simulate there being no instance entity (which can be the case for suborchestrations that complete in one execution), and try again + await instanceTable.DeleteEntityAsync(entity, Azure.ETag.All); + + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Failed, status.OrchestrationStatus); + Assert.AreEqual(failureReason, status.Output); + Assert.AreEqual(failureReason, JToken.Parse(status.Input).ToString()); + Assert.IsTrue(status.Name.Contains(nameof(Orchestrations.ThrowException))); + Assert.AreEqual(executionId, status.OrchestrationInstance.ExecutionId); + + await host.StopAsync(); + } + } + + /// + /// Same as but for a terminated orchestration. + /// + [DataTestMethod] + [DataRow(true, true)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(false, false)] + public async Task TestWorkerFailingDuringCompleteWorkItemCallTerminatedOrchestration(bool enableExtendedSessions, bool terminate) + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions)) + { + await host.StartAsync(); + + // Using the counter orchestration because it will wait indefinitely for input. + var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Counter), 0); + await client.WaitForStartupAsync(TimeSpan.FromSeconds(10)); + // Terminate the orchestration + string reason = "terminate"; + await client.TerminateAsync(reason); + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); + Assert.AreEqual(OrchestrationStatus.Terminated, status?.OrchestrationStatus); + string executionId = status.OrchestrationInstance.ExecutionId; + + // Simulate having an "out of date" Instance table, by setting it's runtime status to "Running". + // This simulates the scenario where the History table was updated, but not the Instance table. + var instanceId = client.InstanceId; + AzureStorageOrchestrationServiceSettings settings = TestHelpers.GetTestAzureStorageOrchestrationServiceSettings( + enableExtendedSessions); + AzureStorageClient azureStorageClient = new AzureStorageClient(settings); + + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + TableEntity entity = new TableEntity(instanceId, "") + { + ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G"), + ["Output"] = "null", }; await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); @@ -2542,8 +2742,318 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCall(bool enableExtende Assert.AreEqual(1, state.Count); status = state.First(); - OrchestrationStatus expectedStatus = OrchestrationStatus.Completed; - Assert.AreEqual(expectedStatus, status.OrchestrationStatus); + Assert.AreEqual(OrchestrationStatus.Terminated, status.OrchestrationStatus); + Assert.AreEqual(reason, status.Output); + Assert.AreEqual(0, int.Parse(status.Input)); + + // Now simulate there being no instance entity (which can be the case for suborchestrations that complete in one execution), and try again + await instanceTable.DeleteEntityAsync(entity, Azure.ETag.All); + + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Terminated, status.OrchestrationStatus); + Assert.AreEqual(reason, status.Output); + Assert.AreEqual(0, int.Parse(status.Input)); + Assert.IsTrue(status.Name.Contains(nameof(Orchestrations.Counter))); + Assert.AreEqual(executionId, status.OrchestrationInstance.ExecutionId); + + await host.StopAsync(); + } + } + + /// + /// Same as but for an orchestration with large input + /// and output, which will need to be stored in blob storage. + /// + [DataTestMethod] + [DataRow(true, true)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(false, false)] + public async Task TestWorkerFailingDuringCompleteWorkItemCallLargeInputOutput(bool enableExtendedSessions, bool terminate) + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions)) + { + await host.StartAsync(); + + string message = this.GenerateMediumRandomStringPayload().ToString(); + var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), message); + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + string executionId = status.OrchestrationInstance.ExecutionId; + + var instanceId = client.InstanceId; + int blobCount = await this.GetBlobCount("test-largemessages", instanceId); + Assert.IsTrue(blobCount > 0); + + // Simulate having an "out of date" Instance table, by setting it's runtime status to "Running". + // This simulates the scenario where the History table was updated, but not the Instance table. + AzureStorageOrchestrationServiceSettings settings = TestHelpers.GetTestAzureStorageOrchestrationServiceSettings( + enableExtendedSessions); + AzureStorageClient azureStorageClient = new AzureStorageClient(settings); + + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + TableEntity entity = new TableEntity(instanceId, "") + { + ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G"), + ["Output"] = "null", + }; + await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); + + // Assert that the status in the Instance table reads "Running" + IList state = await client.GetStateAsync(instanceId); + OrchestrationStatus forcedStatus = state.First().OrchestrationStatus; + Assert.AreEqual(OrchestrationStatus.Running, forcedStatus); + + // The type of event sent should not matter - the event itself should be discarded, and the instance table updated + // to reflect the status in the history table. + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Completed, status.OrchestrationStatus); + Assert.AreEqual(message, JToken.Parse(status.Output).ToString()); + Assert.AreEqual(message, JToken.Parse(status.Input).ToString()); + + // Now simulate there being no instance entity (which can be the case for suborchestrations that complete in one execution), and try again + await instanceTable.DeleteEntityAsync(entity, Azure.ETag.All); + + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Completed, status.OrchestrationStatus); + Assert.AreEqual(message, JToken.Parse(status.Output).ToString()); + Assert.AreEqual(message, JToken.Parse(status.Input).ToString()); + Assert.IsTrue(status.Name.Contains(nameof(Orchestrations.Echo))); + Assert.AreEqual(executionId, status.OrchestrationInstance.ExecutionId); + + await host.StopAsync(); + } + } + + /// + /// Same as but for a large termination reason that + /// will need to be stored in blob storage. + /// + [DataTestMethod] + [DataRow(true, true)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(false, false)] + public async Task TestWorkerFailingDuringCompleteWorkItemCallLargeTerminationReason(bool enableExtendedSessions, bool terminate) + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions)) + { + await host.StartAsync(); + + string message = this.GenerateMediumRandomStringPayload().ToString(); + // Using the counter orchestration because it will wait indefinitely for input. + var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Counter), 0); + await client.WaitForStartupAsync(TimeSpan.FromSeconds(10)); + // Terminate the orchestration + await client.TerminateAsync(message); + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); + Assert.AreEqual(OrchestrationStatus.Terminated, status?.OrchestrationStatus); + string executionId = status.OrchestrationInstance.ExecutionId; + + var instanceId = client.InstanceId; + int blobCount = await this.GetBlobCount("test-largemessages", instanceId); + Assert.IsTrue(blobCount > 0); + + // Simulate having an "out of date" Instance table, by setting it's runtime status to "Running". + // This simulates the scenario where the History table was updated, but not the Instance table. + AzureStorageOrchestrationServiceSettings settings = TestHelpers.GetTestAzureStorageOrchestrationServiceSettings( + enableExtendedSessions); + AzureStorageClient azureStorageClient = new AzureStorageClient(settings); + + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + TableEntity entity = new TableEntity(instanceId, "") + { + ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G"), + ["Output"] = "null", + }; + await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); + + // Assert that the status in the Instance table reads "Running" + IList state = await client.GetStateAsync(instanceId); + OrchestrationStatus forcedStatus = state.First().OrchestrationStatus; + Assert.AreEqual(OrchestrationStatus.Running, forcedStatus); + + // The type of event sent should not matter - the event itself should be discarded, and the instance table updated + // to reflect the status in the history table. + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Terminated, status.OrchestrationStatus); + Assert.AreEqual(message, status.Output); + Assert.AreEqual(0, int.Parse(status.Input)); + + // Now simulate there being no instance entity (which can be the case for suborchestrations that complete in one execution), and try again + await instanceTable.DeleteEntityAsync(entity, Azure.ETag.All); + + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Terminated, status.OrchestrationStatus); + Assert.AreEqual(message, status.Output); + Assert.AreEqual(0, int.Parse(status.Input)); + Assert.IsTrue(status.Name.Contains(nameof(Orchestrations.Counter))); + Assert.AreEqual(executionId, status.OrchestrationInstance.ExecutionId); + + await host.StopAsync(); + } + } + + /// + /// Same as but for a large exception message that will need + /// to be stored in blob storage. + /// + [DataTestMethod] + [DataRow(true, true)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(false, false)] + public async Task TestWorkerFailingDuringCompleteWorkItemCallLargeFailureReason(bool enableExtendedSessions, bool terminate) + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions)) + { + await host.StartAsync(); + + string message = this.GenerateMediumRandomStringPayload().ToString(); + var client = await host.StartOrchestrationAsync( + typeof(Orchestrations.ThrowException), + input: message); + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); + Assert.AreEqual(OrchestrationStatus.Failed, status?.OrchestrationStatus); + string executionId = status.OrchestrationInstance.ExecutionId; + + var instanceId = client.InstanceId; + int blobCount = await this.GetBlobCount("test-largemessages", instanceId); + Assert.IsTrue(blobCount > 0); + + // Simulate having an "out of date" Instance table, by setting it's runtime status to "Running". + // This simulates the scenario where the History table was updated, but not the Instance table. + AzureStorageOrchestrationServiceSettings settings = TestHelpers.GetTestAzureStorageOrchestrationServiceSettings( + enableExtendedSessions); + AzureStorageClient azureStorageClient = new AzureStorageClient(settings); + + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + TableEntity entity = new TableEntity(instanceId, "") + { + ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G"), + ["Output"] = "null", + }; + await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); + + // Assert that the status in the Instance table reads "Running" + IList state = await client.GetStateAsync(instanceId); + OrchestrationStatus forcedStatus = state.First().OrchestrationStatus; + Assert.AreEqual(OrchestrationStatus.Running, forcedStatus); + + // The type of event sent should not matter - the event itself should be discarded, and the instance table updated + // to reflect the status in the history table. + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Failed, status.OrchestrationStatus); + Assert.AreEqual(message, status.Output); + Assert.AreEqual(message, JToken.Parse(status.Input).ToString()); + + // Now simulate there being no instance entity (which can be the case for suborchestrations that complete in one execution), and try again + await instanceTable.DeleteEntityAsync(entity, Azure.ETag.All); + + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Failed, status.OrchestrationStatus); + Assert.AreEqual(message, status.Output); + Assert.AreEqual(message, JToken.Parse(status.Input).ToString()); + Assert.IsTrue(status.Name.Contains(nameof(Orchestrations.ThrowException))); + Assert.AreEqual(executionId, status.OrchestrationInstance.ExecutionId); await host.StopAsync(); }