diff --git a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs index b25177ae9..b74403df8 100644 --- a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs +++ b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs @@ -648,30 +648,46 @@ public override async Task GetWorkItems(P.GetWorkItemsRequest request, IServerSt public override async Task StreamInstanceHistory(P.StreamInstanceHistoryRequest request, IServerStreamWriter responseStream, ServerCallContext context) { - if (this.streamingPastEvents.TryGetValue(request.InstanceId, out List? pastEvents)) - { - const int MaxChunkBytes = 256 * 1024; // 256KB per chunk to simulate chunked streaming - int currentSize = 0; - P.HistoryChunk chunk = new(); + List? pastEvents = null; - foreach (P.HistoryEvent e in pastEvents) - { - int eventSize = e.CalculateSize(); - if (currentSize > 0 && currentSize + eventSize > MaxChunkBytes) - { - await responseStream.WriteAsync(chunk); - chunk = new P.HistoryChunk(); - currentSize = 0; - } + // First, try to get events from the streaming cache (for in-flight orchestrations) + if (this.streamingPastEvents.TryGetValue(request.InstanceId, out pastEvents)) + { + // Use the cached streaming events + } + else if (this.service is InMemoryOrchestrationService inMemoryService && + inMemoryService.TryGetOrchestrationHistory(request.InstanceId, out List? historyEvents)) + { + // Get history from the instance store (for completed orchestrations) + pastEvents = historyEvents.Select(ProtobufUtils.ToHistoryEventProto).ToList(); + } + else + { + // Instance not found + throw new RpcException(new Status(StatusCode.NotFound, $"An orchestration with the instanceId {request.InstanceId} was not found.")); + } - chunk.Events.Add(e); - currentSize += eventSize; - } + const int MaxChunkBytes = 256 * 1024; // 256KB per chunk to simulate chunked streaming + int currentSize = 0; + P.HistoryChunk chunk = new(); - if (chunk.Events.Count > 0) + foreach (P.HistoryEvent e in pastEvents) + { + int eventSize = e.CalculateSize(); + if (currentSize > 0 && currentSize + eventSize > MaxChunkBytes) { await responseStream.WriteAsync(chunk); + chunk = new P.HistoryChunk(); + currentSize = 0; } + + chunk.Events.Add(e); + currentSize += eventSize; + } + + if (chunk.Events.Count > 0) + { + await responseStream.WriteAsync(chunk); } } diff --git a/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs b/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs index eb8fd0424..1ad7a061f 100644 --- a/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs +++ b/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs @@ -260,6 +260,17 @@ public Task GetOrchestrationHistoryAsync(string instanceId, string execu throw new NotImplementedException(); } + /// + /// Tries to get the orchestration history events for the specified instance. + /// + /// The instance ID. + /// The history events if found. + /// True if history events were found, false otherwise. + public bool TryGetOrchestrationHistory(string instanceId, [NotNullWhen(true)] out List? historyEvents) + { + return this.instanceStore.TryGetHistory(instanceId, out historyEvents); + } + /// /// Gets the orchestration state. /// @@ -587,6 +598,26 @@ public bool TryGetState(string instanceId, [NotNullWhen(true)] out Orchestration return statusRecord != null; } + public bool TryGetHistory(string instanceId, [NotNullWhen(true)] out List? historyEvents) + { + if (!this.store.TryGetValue(instanceId, out SerializedInstanceState? state)) + { + historyEvents = null; + return false; + } + + lock (state) + { + historyEvents = state.HistoryEventsJson + .Where(e => e is not null) + .Select(e => e!.GetValue()) + .Where(e => e is not null) + .ToList()!; + } + + return true; + } + public void SaveState( OrchestrationRuntimeState runtimeState, OrchestrationState statusRecord, diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index 2d5df3363..51879cfb0 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -2,11 +2,15 @@ // Licensed under the MIT License. using System.Diagnostics.CodeAnalysis; +using DurableTask.Core; +using DurableTask.Core.History; using FluentAssertions; using FluentAssertions.Execution; using Microsoft.DurableTask.Client; using Microsoft.DurableTask.Worker; using Xunit.Abstractions; +using CoreOrchestrationStatus = DurableTask.Core.OrchestrationStatus; +using PurgeResult = Microsoft.DurableTask.Client.PurgeResult; namespace Microsoft.DurableTask.Grpc.Tests; @@ -288,6 +292,65 @@ await restartAction.Should().ThrowAsync() .WithMessage("*An orchestration with the instanceId non-existent-instance-id was not found*"); } + [Fact] + public async Task GetOrchestrationHistoryAsync_CompletedOrchestration_ReturnsHistoryEvents() + { + // Arrange + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); + await using HostTestLifetime server = await this.StartAsync(); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, input: false); + + await server.Client.WaitForInstanceStartAsync(instanceId, default); + await server.Client.RaiseEventAsync(instanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(instanceId, cts.Token); + + // Verify orchestration completed + OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); + metadata.Should().NotBeNull(); + metadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + + // Act + IList history = + await server.Client.GetOrchestrationHistoryAsync(instanceId, cts.Token); + + // Assert + history.Should().NotBeNull(); + history.Should().NotBeEmpty(); + + // Verify history contains expected event types for a completed orchestration + history.Should().Contain(e => e is ExecutionStartedEvent); + history.Should().Contain(e => e is ExecutionCompletedEvent); + history.Should().Contain(e => e is EventRaisedEvent); + + // Verify ExecutionStartedEvent has correct orchestration name + ExecutionStartedEvent? startedEvent = history.OfType().FirstOrDefault(); + startedEvent.Should().NotBeNull(); + startedEvent!.Name.Should().Be(OrchestrationName); + + // Verify ExecutionCompletedEvent indicates completion + ExecutionCompletedEvent? completedEvent = history.OfType().FirstOrDefault(); + completedEvent.Should().NotBeNull(); + completedEvent!.OrchestrationStatus.Should().Be(CoreOrchestrationStatus.Completed); + } + + [Fact] + public async Task GetOrchestrationHistoryAsync_NonExistentOrchestration_ThrowsArgumentException() + { + // Arrange + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); + await using HostTestLifetime server = await this.StartAsync(); + + // Act + Func getHistoryAction = () => + server.Client.GetOrchestrationHistoryAsync("non-existent-instance-id", cts.Token); + + // Assert + await getHistoryAction.Should().ThrowAsync() + .WithMessage("*An orchestration with the instanceId non-existent-instance-id was not found*"); + } + Task StartAsync() { static async Task Orchestration(TaskOrchestrationContext context, bool shouldThrow)