Skip to content

Commit 1c407dd

Browse files
committed
resume/suspend
1 parent 989c99d commit 1c407dd

File tree

2 files changed

+79
-0
lines changed

2 files changed

+79
-0
lines changed

src/Extensions/AzureBlobPayloads/Interceptors/AzureBlobPayloadsInterceptor.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ Task ExternalizeRequestPayloadsAsync<TRequest>(TRequest request, CancellationTok
133133
return this.MaybeExternalizeAsync(v => r.Input = v, r.Input, cancellation);
134134
case P.TerminateRequest r:
135135
return this.MaybeExternalizeAsync(v => r.Output = v, r.Output, cancellation);
136+
case P.SuspendRequest r:
137+
return this.MaybeExternalizeAsync(v => r.Reason = v, r.Reason, cancellation);
138+
case P.ResumeRequest r:
139+
return this.MaybeExternalizeAsync(v => r.Reason = v, r.Reason, cancellation);
136140
case P.SignalEntityRequest r:
137141
return this.MaybeExternalizeAsync(v => r.Input = v, r.Input, cancellation);
138142
case P.ActivityResponse r:

test/Grpc.IntegrationTests/LargePayloadTests.cs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,81 @@ public async Task LargeOrchestrationInputAndOutputAndCustomStatus()
8888
Assert.Contains(JsonSerializer.Serialize(largeInput + largeInput), fakeStore.uploadedPayloads);
8989
}
9090

91+
// Validates client externalizes large suspend and resume reasons.
92+
[Fact]
93+
public async Task SuspendAndResume_Reason_IsExternalizedByClient()
94+
{
95+
string largeReason1 = new string('Z', 700 * 1024); // 700KB
96+
string largeReason2 = new string('Y', 650 * 1024); // 650KB
97+
TaskName orchestratorName = nameof(SuspendAndResume_Reason_IsExternalizedByClient);
98+
99+
InMemoryPayloadStore clientStore = new InMemoryPayloadStore();
100+
101+
await using HostTestLifetime server = await this.StartWorkerAsync(
102+
worker =>
103+
{
104+
// Long-running orchestrator to give time for suspend/resume
105+
worker.AddTasks(tasks => tasks.AddOrchestratorFunc<object?, string>(
106+
orchestratorName,
107+
async (ctx, _) =>
108+
{
109+
await ctx.CreateTimer(TimeSpan.FromMinutes(5), CancellationToken.None);
110+
return "done";
111+
}));
112+
},
113+
client =>
114+
{
115+
// Enable externalization on the client and use the in-memory store to track uploads
116+
client.UseExternalizedPayloads(opts =>
117+
{
118+
opts.ExternalizeThresholdBytes = 1024; // 1KB threshold to force externalization
119+
opts.ContainerName = "test";
120+
opts.ConnectionString = "UseDevelopmentStorage=true";
121+
});
122+
client.Services.AddSingleton<IPayloadStore>(clientStore);
123+
});
124+
125+
string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
126+
await server.Client.WaitForInstanceStartAsync(instanceId, this.TimeoutToken);
127+
128+
// Suspend with large reason (should be externalized by client)
129+
await server.Client.SuspendInstanceAsync(instanceId, largeReason1, this.TimeoutToken);
130+
await server.Client.WaitForInstanceStartAsync(instanceId, this.TimeoutToken);
131+
132+
// verify it is suspended
133+
OrchestrationMetadata? status = await server.Client.GetInstanceAsync(instanceId, getInputsAndOutputs: false, this.TimeoutToken);
134+
Assert.NotNull(status);
135+
Assert.Equal(OrchestrationRuntimeStatus.Suspended, status!.RuntimeStatus);
136+
137+
// Resume with large reason (should be externalized by client)
138+
await server.Client.ResumeInstanceAsync(instanceId, largeReason2, this.TimeoutToken);
139+
140+
// verify it is resumed (poll up to 5 seconds)
141+
var deadline = DateTime.UtcNow.AddSeconds(5);
142+
while (true)
143+
{
144+
status = await server.Client.GetInstanceAsync(instanceId, getInputsAndOutputs: false, this.TimeoutToken);
145+
if (status is not null && status.RuntimeStatus == OrchestrationRuntimeStatus.Running)
146+
{
147+
break;
148+
}
149+
150+
if (DateTime.UtcNow >= deadline)
151+
{
152+
Assert.NotNull(status);
153+
Assert.Equal(OrchestrationRuntimeStatus.Running, status!.RuntimeStatus);
154+
}
155+
156+
await Task.Delay(TimeSpan.FromSeconds(1), this.TimeoutToken);
157+
}
158+
159+
160+
161+
Assert.True(clientStore.UploadCount >= 2);
162+
Assert.Contains(largeReason1, clientStore.uploadedPayloads);
163+
Assert.Contains(largeReason2, clientStore.uploadedPayloads);
164+
}
165+
91166
// Validates terminating an instance with a large output payload is externalized by the client.
92167
[Fact]
93168
public async Task LargeTerminateWithPayload()

0 commit comments

Comments
 (0)