Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 34 additions & 18 deletions src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -648,30 +648,46 @@ public override async Task GetWorkItems(P.GetWorkItemsRequest request, IServerSt

public override async Task StreamInstanceHistory(P.StreamInstanceHistoryRequest request, IServerStreamWriter<P.HistoryChunk> responseStream, ServerCallContext context)
{
if (this.streamingPastEvents.TryGetValue(request.InstanceId, out List<P.HistoryEvent>? pastEvents))
{
const int MaxChunkBytes = 256 * 1024; // 256KB per chunk to simulate chunked streaming
int currentSize = 0;
P.HistoryChunk chunk = new();
List<P.HistoryEvent>? pastEvents = null;

foreach (P.HistoryEvent e in pastEvents)
{
int eventSize = e.CalculateSize();
if (currentSize > 0 && currentSize + eventSize > MaxChunkBytes)
{
await responseStream.WriteAsync(chunk);
chunk = new P.HistoryChunk();
currentSize = 0;
}
// First, try to get events from the streaming cache (for in-flight orchestrations)
if (this.streamingPastEvents.TryGetValue(request.InstanceId, out pastEvents))
{
// Use the cached streaming events
}
else if (this.service is InMemoryOrchestrationService inMemoryService &&
inMemoryService.TryGetOrchestrationHistory(request.InstanceId, out List<HistoryEvent>? historyEvents))
{
// Get history from the instance store (for completed orchestrations)
pastEvents = historyEvents.Select(ProtobufUtils.ToHistoryEventProto).ToList();
}
else
{
// Instance not found
throw new RpcException(new Status(StatusCode.NotFound, $"An orchestration with the instanceId {request.InstanceId} was not found."));
}

chunk.Events.Add(e);
currentSize += eventSize;
}
const int MaxChunkBytes = 256 * 1024; // 256KB per chunk to simulate chunked streaming
int currentSize = 0;
P.HistoryChunk chunk = new();

if (chunk.Events.Count > 0)
foreach (P.HistoryEvent e in pastEvents)
{
int eventSize = e.CalculateSize();
if (currentSize > 0 && currentSize + eventSize > MaxChunkBytes)
{
await responseStream.WriteAsync(chunk);
chunk = new P.HistoryChunk();
currentSize = 0;
}

chunk.Events.Add(e);
currentSize += eventSize;
}

if (chunk.Events.Count > 0)
{
await responseStream.WriteAsync(chunk);
}
}

Expand Down
31 changes: 31 additions & 0 deletions src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,17 @@ public Task<string> GetOrchestrationHistoryAsync(string instanceId, string execu
throw new NotImplementedException();
}

/// <summary>
/// Tries to get the orchestration history events for the specified instance.
/// </summary>
/// <param name="instanceId">The instance ID.</param>
/// <param name="historyEvents">The history events if found.</param>
/// <returns>True if history events were found, false otherwise.</returns>
public bool TryGetOrchestrationHistory(string instanceId, [NotNullWhen(true)] out List<HistoryEvent>? historyEvents)
{
return this.instanceStore.TryGetHistory(instanceId, out historyEvents);
}

/// <summary>
/// Gets the orchestration state.
/// </summary>
Expand Down Expand Up @@ -587,6 +598,26 @@ public bool TryGetState(string instanceId, [NotNullWhen(true)] out Orchestration
return statusRecord != null;
}

public bool TryGetHistory(string instanceId, [NotNullWhen(true)] out List<HistoryEvent>? historyEvents)
{
if (!this.store.TryGetValue(instanceId, out SerializedInstanceState? state))
{
historyEvents = null;
return false;
}

lock (state)
{
historyEvents = state.HistoryEventsJson
.Where(e => e is not null)
.Select(e => e!.GetValue<HistoryEvent>())
.Where(e => e is not null)
.ToList()!;
}

return true;
}

public void SaveState(
OrchestrationRuntimeState runtimeState,
OrchestrationState statusRecord,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
// Licensed under the MIT License.

using System.Diagnostics.CodeAnalysis;
using DurableTask.Core;
using DurableTask.Core.History;
using FluentAssertions;
using FluentAssertions.Execution;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Worker;
using Xunit.Abstractions;
using CoreOrchestrationStatus = DurableTask.Core.OrchestrationStatus;
using PurgeResult = Microsoft.DurableTask.Client.PurgeResult;

namespace Microsoft.DurableTask.Grpc.Tests;

Expand Down Expand Up @@ -288,6 +292,65 @@ await restartAction.Should().ThrowAsync<ArgumentException>()
.WithMessage("*An orchestration with the instanceId non-existent-instance-id was not found*");
}

[Fact]
public async Task GetOrchestrationHistoryAsync_CompletedOrchestration_ReturnsHistoryEvents()
{
// Arrange
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1));
await using HostTestLifetime server = await this.StartAsync();

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(
OrchestrationName, input: false);

await server.Client.WaitForInstanceStartAsync(instanceId, default);
await server.Client.RaiseEventAsync(instanceId, "event", default);
await server.Client.WaitForInstanceCompletionAsync(instanceId, cts.Token);

// Verify orchestration completed
OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false);
metadata.Should().NotBeNull();
metadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed);

// Act
IList<HistoryEvent> history =
await server.Client.GetOrchestrationHistoryAsync(instanceId, cts.Token);

// Assert
history.Should().NotBeNull();
history.Should().NotBeEmpty();

// Verify history contains expected event types for a completed orchestration
history.Should().Contain(e => e is ExecutionStartedEvent);
history.Should().Contain(e => e is ExecutionCompletedEvent);
history.Should().Contain(e => e is EventRaisedEvent);

// Verify ExecutionStartedEvent has correct orchestration name
ExecutionStartedEvent? startedEvent = history.OfType<ExecutionStartedEvent>().FirstOrDefault();
startedEvent.Should().NotBeNull();
startedEvent!.Name.Should().Be(OrchestrationName);

// Verify ExecutionCompletedEvent indicates completion
ExecutionCompletedEvent? completedEvent = history.OfType<ExecutionCompletedEvent>().FirstOrDefault();
completedEvent.Should().NotBeNull();
completedEvent!.OrchestrationStatus.Should().Be(CoreOrchestrationStatus.Completed);
}

[Fact]
public async Task GetOrchestrationHistoryAsync_NonExistentOrchestration_ThrowsArgumentException()
{
// Arrange
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1));
await using HostTestLifetime server = await this.StartAsync();

// Act
Func<Task> getHistoryAction = () =>
server.Client.GetOrchestrationHistoryAsync("non-existent-instance-id", cts.Token);

// Assert
await getHistoryAction.Should().ThrowAsync<ArgumentException>()
.WithMessage("*An orchestration with the instanceId non-existent-instance-id was not found*");
}

Task<HostTestLifetime> StartAsync()
{
static async Task<string> Orchestration(TaskOrchestrationContext context, bool shouldThrow)
Expand Down