Skip to content

Commit c1a87b9

Browse files
committed
update calllers
1 parent 5ebe0b0 commit c1a87b9

File tree

9 files changed

+215
-27
lines changed

9 files changed

+215
-27
lines changed

src/Client/Core/DurableTaskClientOptions.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ public class DurableTaskClientOptions
1212
{
1313
DataConverter dataConverter = JsonDataConverter.Default;
1414
bool enableEntitySupport;
15+
bool enableLargePayloadSupport;
1516

1617
/// <summary>
1718
/// Gets or sets the version of orchestrations that will be created.
@@ -69,6 +70,20 @@ public bool EnableEntitySupport
6970
}
7071
}
7172

73+
/// <summary>
74+
/// Gets or sets a value indicating whether this client should support large payloads using async serialization/deserialization.
75+
/// When enabled, the client will use async methods for serialization and deserialization to support externalized payloads.
76+
/// </summary>
77+
public bool EnableLargePayloadSupport
78+
{
79+
get => this.enableLargePayloadSupport;
80+
set
81+
{
82+
this.enableLargePayloadSupport = value;
83+
this.LargePayloadSupportExplicitlySet = true;
84+
}
85+
}
86+
7287
/// <summary>
7388
/// Gets a value indicating whether <see cref="DataConverter" /> was explicitly set or not.
7489
/// </summary>
@@ -85,6 +100,11 @@ public bool EnableEntitySupport
85100
/// </summary>
86101
internal bool EntitySupportExplicitlySet { get; private set; }
87102

103+
/// <summary>
104+
/// Gets a value indicating whether <see cref="EnableLargePayloadSupport" /> was explicitly set or not.
105+
/// </summary>
106+
internal bool LargePayloadSupportExplicitlySet { get; private set; }
107+
88108
/// <summary>
89109
/// Applies these option values to another.
90110
/// </summary>
@@ -104,6 +124,11 @@ internal void ApplyTo(DurableTaskClientOptions other)
104124
other.EnableEntitySupport = this.EnableEntitySupport;
105125
}
106126

127+
if (!other.LargePayloadSupportExplicitlySet)
128+
{
129+
other.EnableLargePayloadSupport = this.EnableLargePayloadSupport;
130+
}
131+
107132
if (!string.IsNullOrWhiteSpace(this.DefaultVersion))
108133
{
109134
other.DefaultVersion = this.DefaultVersion;

src/Client/Core/OrchestrationMetadata.cs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,87 @@ public OrchestrationMetadata(string name, string instanceId)
196196
return this.DataConverter.Deserialize<T>(this.SerializedCustomStatus);
197197
}
198198

199+
/// <summary>
200+
/// Asynchronously deserializes the orchestration's input into an object of the specified type.
201+
/// </summary>
202+
/// <remarks>
203+
/// This method can only be used when inputs and outputs are explicitly requested from the
204+
/// <see cref="DurableTaskClient.GetInstancesAsync(string, CancellationToken)"/> or
205+
/// <see cref="DurableTaskClient.WaitForInstanceCompletionAsync(string, CancellationToken)"/> method that produced
206+
/// this <see cref="OrchestrationMetadata"/> object.
207+
/// </remarks>
208+
/// <typeparam name="T">The type to deserialize the orchestration input into.</typeparam>
209+
/// <param name="cancellationToken">Cancellation token.</param>
210+
/// <returns>Returns the deserialized input value.</returns>
211+
/// <exception cref="InvalidOperationException">
212+
/// Thrown if this metadata object was fetched without the option to read inputs and outputs.
213+
/// </exception>
214+
public async ValueTask<T?> ReadInputAsAsync<T>(CancellationToken cancellationToken = default)
215+
{
216+
if (!this.RequestedInputsAndOutputs)
217+
{
218+
throw new InvalidOperationException(
219+
$"The {nameof(this.ReadInputAsAsync)} method can only be used on {nameof(OrchestrationMetadata)} objects " +
220+
"that are fetched with the option to include input data.");
221+
}
222+
223+
return await this.DataConverter.DeserializeAsync<T>(this.SerializedInput, cancellationToken);
224+
}
225+
226+
/// <summary>
227+
/// Asynchronously deserializes the orchestration's output into an object of the specified type.
228+
/// </summary>
229+
/// <remarks>
230+
/// This method can only be used when inputs and outputs are explicitly requested from the
231+
/// <see cref="DurableTaskClient.GetInstancesAsync(string, CancellationToken)"/> or
232+
/// <see cref="DurableTaskClient.WaitForInstanceCompletionAsync(string, CancellationToken)"/> method that produced
233+
/// this <see cref="OrchestrationMetadata"/> object.
234+
/// </remarks>
235+
/// <typeparam name="T">The type to deserialize the orchestration output into.</typeparam>
236+
/// <param name="cancellationToken">Cancellation token.</param>
237+
/// <returns>Returns the deserialized output value.</returns>
238+
/// <exception cref="InvalidOperationException">
239+
/// Thrown if this metadata object was fetched without the option to read inputs and outputs.
240+
/// </exception>
241+
public async ValueTask<T?> ReadOutputAsAsync<T>(CancellationToken cancellationToken = default)
242+
{
243+
if (!this.RequestedInputsAndOutputs)
244+
{
245+
throw new InvalidOperationException(
246+
$"The {nameof(this.ReadOutputAsAsync)} method can only be used on {nameof(OrchestrationMetadata)} objects " +
247+
"that are fetched with the option to include output data.");
248+
}
249+
250+
return await this.DataConverter.DeserializeAsync<T>(this.SerializedOutput, cancellationToken);
251+
}
252+
253+
/// <summary>
254+
/// Asynchronously deserializes the orchestration's custom status value into an object of the specified type.
255+
/// </summary>
256+
/// <remarks>
257+
/// This method can only be used when inputs and outputs are explicitly requested from the
258+
/// <see cref="DurableTaskClient.GetInstancesAsync(string, CancellationToken)"/> or
259+
/// <see cref="DurableTaskClient.WaitForInstanceCompletionAsync(string, CancellationToken)"/> method that produced
260+
/// this <see cref="OrchestrationMetadata"/> object.
261+
/// </remarks>
262+
/// <typeparam name="T">The type to deserialize the orchestration' custom status into.</typeparam>
263+
/// <param name="cancellationToken">Cancellation token.</param>
264+
/// <returns>Returns the deserialized custom status value.</returns>
265+
/// <exception cref="InvalidOperationException">
266+
/// Thrown if this metadata object was fetched without the option to read inputs and outputs.
267+
/// </exception>
268+
public async ValueTask<T?> ReadCustomStatusAsAsync<T>(CancellationToken cancellationToken = default)
269+
{
270+
if (!this.RequestedInputsAndOutputs)
271+
{
272+
throw new InvalidOperationException(
273+
$"The {nameof(this.ReadCustomStatusAsAsync)} method can only be used on {nameof(OrchestrationMetadata)}" +
274+
" objects that are fetched with the option to include input and output data.");
275+
}
276+
277+
return await this.DataConverter.DeserializeAsync<T>(this.SerializedCustomStatus, cancellationToken);
278+
}
279+
199280
/// <summary>
200281
/// Generates a user-friendly string representation of the current metadata object.
201282
/// </summary>

src/Client/Grpc/GrpcDurableTaskClient.cs

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ public GrpcDurableTaskClient(string name, GrpcDurableTaskClientOptions options,
6464

6565
DataConverter DataConverter => this.options.DataConverter;
6666

67+
/// <summary>
68+
/// Gets a value indicating whether the DataConverter supports async operations (LargePayload enabled).
69+
/// </summary>
70+
bool SupportsAsyncSerialization => this.options.EnableLargePayloadSupport;
71+
6772
/// <inheritdoc/>
6873
public override ValueTask DisposeAsync()
6974
{
@@ -90,12 +95,16 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
9095
version = this.options.DefaultVersion;
9196
}
9297

98+
string instanceId = options?.InstanceId ?? Guid.NewGuid().ToString("N");
99+
93100
var request = new P.CreateInstanceRequest
94101
{
95102
Name = orchestratorName.Name,
96103
Version = version,
97-
InstanceId = options?.InstanceId ?? Guid.NewGuid().ToString("N"),
98-
Input = this.DataConverter.Serialize(input),
104+
InstanceId = instanceId,
105+
Input = this.SupportsAsyncSerialization
106+
? await this.DataConverter.SerializeAsync(input, cancellation)
107+
: this.DataConverter.Serialize(input),
99108
RequestTime = DateTimeOffset.UtcNow.ToTimestamp(),
100109
};
101110

@@ -109,11 +118,17 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
109118
}
110119

111120
DateTimeOffset? startAt = options?.StartAt;
112-
this.logger.SchedulingOrchestration(
113-
request.InstanceId,
114-
orchestratorName,
115-
sizeInBytes: request.Input != null ? Encoding.UTF8.GetByteCount(request.Input) : 0,
116-
startAt.GetValueOrDefault(DateTimeOffset.UtcNow));
121+
string name = orchestratorName.Name ?? string.Empty;
122+
string? serializedInput = request.Input;
123+
int sizeInBytes = 0;
124+
if (!string.IsNullOrEmpty(serializedInput))
125+
{
126+
sizeInBytes = Encoding.UTF8.GetByteCount(serializedInput!);
127+
}
128+
129+
DateTimeOffset startTime = startAt.GetValueOrDefault(DateTimeOffset.UtcNow);
130+
131+
this.logger.SchedulingOrchestration(instanceId, name, sizeInBytes, startTime);
117132

118133
if (startAt.HasValue)
119134
{
@@ -141,7 +156,9 @@ public override async Task RaiseEventAsync(
141156
{
142157
InstanceId = instanceId,
143158
Name = eventName,
144-
Input = this.DataConverter.Serialize(eventPayload),
159+
Input = this.SupportsAsyncSerialization
160+
? await this.DataConverter.SerializeAsync(eventPayload, cancellation)
161+
: this.DataConverter.Serialize(eventPayload),
145162
};
146163

147164
using Activity? traceActivity = TraceHelper.StartActivityForNewEventRaisedFromClient(request, instanceId);
@@ -161,7 +178,9 @@ public override async Task TerminateInstanceAsync(
161178

162179
this.logger.TerminatingInstance(instanceId);
163180

164-
string? serializedOutput = this.DataConverter.Serialize(output);
181+
string? serializedOutput = this.SupportsAsyncSerialization
182+
? await this.DataConverter.SerializeAsync(output, cancellation)
183+
: this.DataConverter.Serialize(output);
165184
await this.sidecarClient.TerminateInstanceAsync(
166185
new P.TerminateRequest
167186
{

src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ public override DurableEntityClient Entities
6767

6868
DataConverter DataConverter => this.options.DataConverter;
6969

70+
/// <summary>
71+
/// Gets a value indicating whether the DataConverter supports async operations (LargePayload enabled).
72+
/// </summary>
73+
bool SupportsAsyncSerialization => this.options.EnableLargePayloadSupport;
74+
7075
IOrchestrationServiceClient Client => this.options.Client!;
7176

7277
IOrchestrationServicePurgeClient PurgeClient => this.CastClient<IOrchestrationServicePurgeClient>();
@@ -141,14 +146,16 @@ public override async Task<PurgeResult> PurgeAllInstancesAsync(
141146
}
142147

143148
/// <inheritdoc/>
144-
public override Task RaiseEventAsync(
149+
public override async Task RaiseEventAsync(
145150
string instanceId, string eventName, object? eventPayload = null, CancellationToken cancellation = default)
146151
{
147152
Check.NotNullOrEmpty(instanceId);
148153
Check.NotNullOrEmpty(eventName);
149154

150-
string? serializedInput = this.DataConverter.Serialize(eventPayload);
151-
return this.SendInstanceMessageAsync(
155+
string? serializedInput = this.SupportsAsyncSerialization
156+
? await this.DataConverter.SerializeAsync(eventPayload, cancellation)
157+
: this.DataConverter.Serialize(eventPayload);
158+
await this.SendInstanceMessageAsync(
152159
instanceId, new EventRaisedEvent(-1, serializedInput) { Name = eventName }, cancellation);
153160
}
154161

@@ -167,7 +174,9 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
167174
ExecutionId = Guid.NewGuid().ToString("N"),
168175
};
169176

170-
string? serializedInput = this.DataConverter.Serialize(input);
177+
string? serializedInput = this.SupportsAsyncSerialization
178+
? await this.DataConverter.SerializeAsync(input, cancellation)
179+
: this.DataConverter.Serialize(input);
171180

172181
var tags = new Dictionary<string, string>();
173182
if (options?.Tags != null)
@@ -207,16 +216,18 @@ public override Task ResumeInstanceAsync(
207216
=> this.SendInstanceMessageAsync(instanceId, new ExecutionResumedEvent(-1, reason), cancellation);
208217

209218
/// <inheritdoc/>
210-
public override Task TerminateInstanceAsync(
219+
public override async Task TerminateInstanceAsync(
211220
string instanceId, TerminateInstanceOptions? options = null, CancellationToken cancellation = default)
212221
{
213222
object? output = options?.Output;
214223
Check.NotNullOrEmpty(instanceId);
215224
cancellation.ThrowIfCancellationRequested();
216-
string? reason = this.DataConverter.Serialize(output);
225+
string? reason = this.SupportsAsyncSerialization
226+
? await this.DataConverter.SerializeAsync(output, cancellation)
227+
: this.DataConverter.Serialize(output);
217228

218229
// TODO: Support recursive termination of sub-orchestrations
219-
return this.Client.ForceTerminateTaskOrchestrationAsync(instanceId, reason);
230+
await this.Client.ForceTerminateTaskOrchestrationAsync(instanceId, reason);
220231
}
221232

222233
/// <inheritdoc/>

src/Extensions/AzureBlobPayloads/DependencyInjection/DurableTaskClientBuilderExtensions.AzureBlobPayloads.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ public static class DurableTaskClientBuilderExtensionsAzureBlobPayloads
1717
/// <summary>
1818
/// Enables externalized payload storage using Azure Blob Storage for the specified client builder.
1919
/// </summary>
20+
/// <param name="builder">The client builder.</param>
21+
/// <param name="configure">The configure action.</param>
22+
/// <returns>The original builder, for call chaining.</returns>
2023
public static IDurableTaskClientBuilder UseExternalizedPayloads(
2124
this IDurableTaskClientBuilder builder,
2225
Action<LargePayloadStorageOptions> configure)
@@ -43,5 +46,3 @@ public static IDurableTaskClientBuilder UseExternalizedPayloads(
4346
return builder;
4447
}
4548
}
46-
47-

src/Worker/Core/DurableTaskWorkerOptions.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,12 @@ public DataConverter DataConverter
8787
/// </summary>
8888
public bool EnableEntitySupport { get; set; }
8989

90+
/// <summary>
91+
/// Gets or sets a value indicating whether this worker should support large payloads using async serialization/deserialization.
92+
/// When enabled, the worker will use async methods for serialization and deserialization to support externalized payloads.
93+
/// </summary>
94+
public bool EnableLargePayloadSupport { get; set; }
95+
9096
/// <summary>
9197
/// Gets or sets the maximum timer interval for the
9298
/// <see cref="TaskOrchestrationContext.CreateTimer(TimeSpan, CancellationToken)"/> method.
@@ -174,6 +180,7 @@ internal void ApplyTo(DurableTaskWorkerOptions other)
174180
other.DataConverter = this.DataConverter;
175181
other.MaximumTimerInterval = this.MaximumTimerInterval;
176182
other.EnableEntitySupport = this.EnableEntitySupport;
183+
other.EnableLargePayloadSupport = this.EnableLargePayloadSupport;
177184
other.Versioning = this.Versioning;
178185
other.OrchestrationFilter = this.OrchestrationFilter;
179186
}

src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,11 @@ public override TaskOrchestrationEntityFeature Entities
113113
/// </summary>
114114
internal DataConverter DataConverter => this.invocationContext.Options.DataConverter;
115115

116+
/// <summary>
117+
/// Gets a value indicating whether the DataConverter supports async operations (LargePayload enabled).
118+
/// </summary>
119+
bool SupportsAsyncSerialization => this.invocationContext.Options.EnableLargePayloadSupport;
120+
116121
/// <inheritdoc/>
117122
protected override ILoggerFactory LoggerFactory => this.invocationContext.LoggerFactory;
118123

@@ -283,7 +288,9 @@ public override Task<T> WaitForExternalEvent<T>(string eventName, CancellationTo
283288
// Return immediately if this external event has already arrived.
284289
if (this.externalEventBuffer.TryTake(eventName, out string? bufferedEventPayload))
285290
{
286-
return Task.FromResult(this.DataConverter.Deserialize<T>(bufferedEventPayload));
291+
return this.SupportsAsyncSerialization
292+
? this.DataConverter.DeserializeAsync<T>(bufferedEventPayload, cancellationToken).AsTask()
293+
: Task.FromResult(this.DataConverter.Deserialize<T>(bufferedEventPayload));
287294
}
288295

289296
// Create a task completion source that will be set when the external event arrives.
@@ -414,7 +421,7 @@ internal void ExitCriticalSectionIfNeeded()
414421
/// </summary>
415422
/// <param name="eventName">The name of the event to complete.</param>
416423
/// <param name="rawEventPayload">The serialized event payload.</param>
417-
internal void CompleteExternalEvent(string eventName, string rawEventPayload)
424+
internal async Task CompleteExternalEvent(string eventName, string rawEventPayload)
418425
{
419426
if (this.externalEventSources.TryGetValue(eventName, out Queue<IEventSource>? waiters))
420427
{
@@ -429,7 +436,9 @@ internal void CompleteExternalEvent(string eventName, string rawEventPayload)
429436
}
430437
else
431438
{
432-
value = this.DataConverter.Deserialize(rawEventPayload, waiter.EventType);
439+
value = this.SupportsAsyncSerialization
440+
? await this.DataConverter.DeserializeAsync(rawEventPayload, waiter.EventType, CancellationToken.None)
441+
: this.DataConverter.Deserialize(rawEventPayload, waiter.EventType);
433442
}
434443

435444
// Events are completed in FIFO order. Remove the key if the last event was delivered.
@@ -448,6 +457,17 @@ internal void CompleteExternalEvent(string eventName, string rawEventPayload)
448457
}
449458
}
450459

460+
/// <summary>
461+
/// Gets the serialized custom status.
462+
/// </summary>
463+
/// <returns>The custom status serialized to a string, or <c>null</c> if there is not custom status.</returns>
464+
internal async ValueTask<string?> GetSerializedCustomStatusAsync(CancellationToken cancellationToken = default)
465+
{
466+
return this.SupportsAsyncSerialization
467+
? await this.DataConverter.SerializeAsync(this.customStatus, cancellationToken)
468+
: this.DataConverter.Serialize(this.customStatus);
469+
}
470+
451471
/// <summary>
452472
/// Gets the serialized custom status.
453473
/// </summary>

0 commit comments

Comments
 (0)