Skip to content

Commit a5b5bd1

Browse files
committed
history event streaming test added
1 parent 9d40f6e commit a5b5bd1

File tree

2 files changed

+121
-8
lines changed

2 files changed

+121
-8
lines changed

test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ public class TaskHubGrpcServer : P.TaskHubSidecarService.TaskHubSidecarServiceBa
3131
readonly TaskHubDispatcherHost dispatcherHost;
3232
readonly IsConnectedSignal isConnectedSignal = new();
3333
readonly SemaphoreSlim sendWorkItemLock = new(initialCount: 1);
34+
readonly ConcurrentDictionary<string, List<P.HistoryEvent>> streamingPastEvents = new(StringComparer.OrdinalIgnoreCase);
35+
36+
volatile bool supportsHistoryStreaming;
3437

3538
// Initialized when a client connects to this service to receive work-item commands.
3639
IServerStreamWriter<P.WorkItem>? workerToClientStream;
@@ -479,6 +482,8 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state,
479482

480483
public override async Task GetWorkItems(P.GetWorkItemsRequest request, IServerStreamWriter<P.WorkItem> responseStream, ServerCallContext context)
481484
{
485+
// Record whether the client supports history streaming
486+
this.supportsHistoryStreaming = request.Capabilities.Contains(P.WorkerCapability.HistoryStreaming);
482487
// Use a lock to mitigate the race condition where we signal the dispatch host to start but haven't
483488
// yet saved a reference to the client response stream.
484489
lock (this.isConnectedSignal)
@@ -521,6 +526,35 @@ public override async Task GetWorkItems(P.GetWorkItemsRequest request, IServerSt
521526
}
522527
}
523528

529+
public override async Task StreamInstanceHistory(P.StreamInstanceHistoryRequest request, IServerStreamWriter<P.HistoryChunk> responseStream, ServerCallContext context)
530+
{
531+
if (this.streamingPastEvents.TryGetValue(request.InstanceId, out List<P.HistoryEvent>? pastEvents))
532+
{
533+
const int MaxChunkBytes = 256 * 1024; // 256KB per chunk to simulate chunked streaming
534+
int currentSize = 0;
535+
P.HistoryChunk chunk = new();
536+
537+
foreach (P.HistoryEvent e in pastEvents)
538+
{
539+
int eventSize = e.CalculateSize();
540+
if (currentSize > 0 && currentSize + eventSize > MaxChunkBytes)
541+
{
542+
await responseStream.WriteAsync(chunk);
543+
chunk = new P.HistoryChunk();
544+
currentSize = 0;
545+
}
546+
547+
chunk.Events.Add(e);
548+
currentSize += eventSize;
549+
}
550+
551+
if (chunk.Events.Count > 0)
552+
{
553+
await responseStream.WriteAsync(chunk);
554+
}
555+
}
556+
}
557+
524558
/// <summary>
525559
/// Invoked by the <see cref="TaskHubDispatcherHost"/> when a work item is available, proxies the call to execute an orchestrator over a gRPC channel.
526560
/// </summary>
@@ -547,16 +581,37 @@ async Task<GrpcOrchestratorExecutionResult> ITaskExecutor.ExecuteOrchestrator(
547581

548582
try
549583
{
584+
var orkRequest = new P.OrchestratorRequest
585+
{
586+
InstanceId = instance.InstanceId,
587+
ExecutionId = instance.ExecutionId,
588+
NewEvents = { newEvents.Select(ProtobufUtils.ToHistoryEventProto) },
589+
OrchestrationTraceContext = orchestrationTraceContext,
590+
};
591+
592+
// Decide whether to stream based on total size of past events (> 1MiB)
593+
List<P.HistoryEvent> protoPastEvents = pastEvents.Select(ProtobufUtils.ToHistoryEventProto).ToList();
594+
int totalBytes = 0;
595+
foreach (P.HistoryEvent ev in protoPastEvents)
596+
{
597+
totalBytes += ev.CalculateSize();
598+
}
599+
600+
if (this.supportsHistoryStreaming && totalBytes > (1024))
601+
{
602+
orkRequest.RequiresHistoryStreaming = true;
603+
// Store past events to serve via StreamInstanceHistory
604+
this.streamingPastEvents[instance.InstanceId] = protoPastEvents;
605+
}
606+
else
607+
{
608+
// Embed full history in the work item
609+
orkRequest.PastEvents.AddRange(protoPastEvents);
610+
}
611+
550612
await this.SendWorkItemToClientAsync(new P.WorkItem
551613
{
552-
OrchestratorRequest = new P.OrchestratorRequest
553-
{
554-
InstanceId = instance.InstanceId,
555-
ExecutionId = instance.ExecutionId,
556-
NewEvents = { newEvents.Select(ProtobufUtils.ToHistoryEventProto) },
557-
OrchestrationTraceContext = orchestrationTraceContext,
558-
PastEvents = { pastEvents.Select(ProtobufUtils.ToHistoryEventProto) },
559-
}
614+
OrchestratorRequest = orkRequest,
560615
});
561616
}
562617
catch

test/Grpc.IntegrationTests/LargePayloadTests.cs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,64 @@ public async Task LargeOrchestrationInputAndOutputAndCustomStatus()
8888
Assert.Contains(JsonSerializer.Serialize(largeInput + largeInput), fakeStore.uploadedPayloads);
8989
}
9090

91+
// Validates history streaming path resolves externalized inputs/outputs in HistoryChunk.
92+
[Fact]
93+
public async Task HistoryStreaming_ResolvesPayloads()
94+
{
95+
// Make payloads large enough so that past events history exceeds 1 MiB to trigger streaming
96+
string largeInput = new string('H', 2 * 1024 * 1024); // 2 MiB
97+
string largeOutput = new string('O', 2 * 1024 * 1024); // 2 MiB
98+
TaskName orch = nameof(HistoryStreaming_ResolvesPayloads);
99+
100+
InMemoryPayloadStore store = new InMemoryPayloadStore();
101+
102+
await using HostTestLifetime server = await this.StartWorkerAsync(
103+
worker =>
104+
{
105+
worker.AddTasks(tasks => tasks.AddOrchestratorFunc<string, string>(
106+
orch,
107+
async (ctx, input) =>
108+
{
109+
// Emit several events so that the serialized history size grows
110+
for (int i = 0; i < 50; i++)
111+
{
112+
await ctx.CreateTimer(TimeSpan.FromMilliseconds(10), CancellationToken.None);
113+
}
114+
return largeOutput;
115+
}));
116+
117+
worker.UseExternalizedPayloads(opts =>
118+
{
119+
opts.ExternalizeThresholdBytes = 1024;
120+
opts.ContainerName = "test";
121+
opts.ConnectionString = "UseDevelopmentStorage=true";
122+
});
123+
worker.Services.AddSingleton<IPayloadStore>(store);
124+
},
125+
client =>
126+
{
127+
// Enable client to resolve outputs on query
128+
client.UseExternalizedPayloads(opts =>
129+
{
130+
opts.ExternalizeThresholdBytes = 1024;
131+
opts.ContainerName = "test";
132+
opts.ConnectionString = "UseDevelopmentStorage=true";
133+
});
134+
client.Services.AddSingleton<IPayloadStore>(store);
135+
});
136+
137+
// Start orchestration with large input to exercise history input resolution
138+
string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orch, largeInput);
139+
OrchestrationMetadata completed = await server.Client.WaitForInstanceCompletionAsync(
140+
instanceId, getInputsAndOutputs: true, this.TimeoutToken);
141+
142+
Assert.Equal(OrchestrationRuntimeStatus.Completed, completed.RuntimeStatus);
143+
Assert.Equal(largeInput, completed.ReadInputAs<string>());
144+
Assert.Equal(largeOutput, completed.ReadOutputAs<string>());
145+
Assert.True(store.UploadCount >= 2);
146+
Assert.True(store.DownloadCount >= 2);
147+
}
148+
91149
// Validates client externalizes large suspend and resume reasons.
92150
[Fact]
93151
public async Task SuspendAndResume_Reason_IsExternalizedByClient()

0 commit comments

Comments
 (0)