Skip to content

Commit cd062a9

Browse files
committed
Merge branch 'wangbill/autochunk' of https://github.com/microsoft/durabletask-dotnet into wangbill/autochunk
2 parents ae33892 + deffc05 commit cd062a9

File tree

23 files changed

+199
-133
lines changed

23 files changed

+199
-133
lines changed

Directory.Build.props

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77
<ImplicitUsings>enable</ImplicitUsings>
88
</PropertyGroup>
99

10+
<PropertyGroup Condition="'$(TargetFramework)' == 'net6.0'">
11+
<SuppressTfmSupportBuildWarnings>true</SuppressTfmSupportBuildWarnings>
12+
<CheckEolTargetFramework>false</CheckEolTargetFramework>
13+
</PropertyGroup>
14+
1015
<!-- Assembly name and namespace -->
1116
<PropertyGroup Condition="!$(MSBuildProjectName.StartsWith('Microsoft.DurableTask'))">
1217
<TopLevelNamespace Condition="'$(TopLevelNamespace)' == ''">Microsoft.DurableTask</TopLevelNamespace>

samples/AzureFunctionsUnitTests/SampleUnitTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,9 @@ public TestResponse(FunctionContext functionContext) : base(functionContext)
200200
public class TestLogger : ILogger
201201
{
202202
// list of all logs emitted, for validation
203-
public IList<string> CapturedLogs {get; set;} = new List<string>();
203+
public IList<string> CapturedLogs { get; set; } = new List<string>();
204204

205-
public IDisposable BeginScope<TState>(TState state) => Mock.Of<IDisposable>();
205+
public IDisposable BeginScope<TState>(TState state) where TState : notnull => Mock.Of<IDisposable>();
206206

207207
public bool IsEnabled(LogLevel logLevel)
208208
{

samples/LargePayloadConsoleApp/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
// Keep threshold small to force externalization for demo purposes
2828
opts.ExternalizeThresholdBytes = 1024; // 1KB
2929
opts.ConnectionString = builder.Configuration.GetValue<string>("DURABLETASK_STORAGE") ?? "UseDevelopmentStorage=true";
30-
opts.ContainerName = builder.Configuration.GetValue<string>("DURABLETASK_PAYLOAD_CONTAINER");
30+
opts.ContainerName = builder.Configuration.GetValue<string>("DURABLETASK_PAYLOAD_CONTAINER") ?? "payloads";
3131
});
3232

3333
// 2) Configure Durable Task client

src/Client/Grpc/GrpcDurableTaskClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
110110

111111
DateTimeOffset? startAt = options?.StartAt;
112112
this.logger.SchedulingOrchestration(
113-
request.InstanceId,
113+
request.InstanceId ?? string.Empty,
114114
orchestratorName,
115115
sizeInBytes: request.Input != null ? Encoding.UTF8.GetByteCount(request.Input) : 0,
116116
startAt.GetValueOrDefault(DateTimeOffset.UtcNow));

src/Extensions/AzureBlobPayloads/Options/LargePayloadStorageOptions.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ public LargePayloadStorageOptions(Uri accountUri, TokenCredential credential)
6363
/// Gets or sets the threshold in bytes at which payloads are externalized. Default is 900_000 bytes.
6464
/// Value must not exceed 1 MiB (1,048,576 bytes).
6565
/// </summary>
66-
6766
public int ExternalizeThresholdBytes
6867
{
6968
get => this.externalizeThresholdBytes;

src/Extensions/AzureBlobPayloads/PayloadStore/BlobPayloadStore.cs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public override async Task<string> UploadAsync(string payLoad, CancellationToken
8989
// using MemoryStream payloadStream = new(payloadBuffer, writable: false);
9090

9191
// await payloadStream.CopyToAsync(compressedBlobStream, bufferSize: DefaultCopyBufferSize, cancellationToken);
92-
await compressedBlobStream.WriteAsync(payloadBuffer, 0, payloadBuffer.Length, cancellationToken);
92+
await WritePayloadAsync(payloadBuffer, compressedBlobStream, cancellationToken);
9393
await compressedBlobStream.FlushAsync(cancellationToken);
9494
await blobStream.FlushAsync(cancellationToken);
9595
}
@@ -99,7 +99,7 @@ public override async Task<string> UploadAsync(string payLoad, CancellationToken
9999

100100
// using MemoryStream payloadStream = new(payloadBuffer, writable: false);
101101
// await payloadStream.CopyToAsync(blobStream, bufferSize: DefaultCopyBufferSize, cancellationToken);
102-
await blobStream.WriteAsync(payloadBuffer, 0, payloadBuffer.Length, cancellationToken);
102+
await WritePayloadAsync(payloadBuffer, blobStream, cancellationToken);
103103
await blobStream.FlushAsync(cancellationToken);
104104
}
105105

@@ -126,11 +126,11 @@ public override async Task<string> DownloadAsync(string token, CancellationToken
126126
{
127127
using GZipStream decompressed = new(contentStream, CompressionMode.Decompress);
128128
using StreamReader reader = new(decompressed, Encoding.UTF8);
129-
return await reader.ReadToEndAsync();
129+
return await ReadToEndAsync(reader, cancellationToken);
130130
}
131131

132132
using StreamReader uncompressedReader = new(contentStream, Encoding.UTF8);
133-
return await uncompressedReader.ReadToEndAsync();
133+
return await ReadToEndAsync(uncompressedReader, cancellationToken);
134134
}
135135

136136
/// <inheritdoc/>
@@ -144,6 +144,27 @@ public override bool IsKnownPayloadToken(string value)
144144
return value.StartsWith(TokenPrefix, StringComparison.Ordinal);
145145
}
146146

147+
static async Task WritePayloadAsync(byte[] payloadBuffer, Stream target, CancellationToken cancellationToken)
148+
{
149+
#if NETSTANDARD2_0
150+
await target.WriteAsync(payloadBuffer, 0, payloadBuffer.Length, cancellationToken).ConfigureAwait(false);
151+
#else
152+
await target.WriteAsync(payloadBuffer.AsMemory(0, payloadBuffer.Length), cancellationToken).ConfigureAwait(false);
153+
#endif
154+
}
155+
156+
static async Task<string> ReadToEndAsync(StreamReader reader, CancellationToken cancellationToken)
157+
{
158+
#if NETSTANDARD2_0
159+
cancellationToken.ThrowIfCancellationRequested();
160+
return await reader.ReadToEndAsync().ConfigureAwait(false);
161+
#elif NET8_0_OR_GREATER
162+
return await reader.ReadToEndAsync(cancellationToken).ConfigureAwait(false);
163+
#else
164+
return await reader.ReadToEndAsync().WaitAsync(cancellationToken).ConfigureAwait(false);
165+
#endif
166+
}
167+
147168
static string EncodeToken(string container, string name) => $"blob:v1:{container}:{name}";
148169

149170
static (string Container, string Name) DecodeToken(string token)

src/InProcessTestHost/DurableTaskTestHost.cs

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ namespace Microsoft.DurableTask.Testing;
2222
/// </summary>
2323
public sealed class DurableTaskTestHost : IAsyncDisposable
2424
{
25-
readonly IWebHost sidecarHost;
25+
readonly IHost sidecarHost;
2626
readonly IHost workerHost;
2727
readonly GrpcChannel grpcChannel;
2828

@@ -33,7 +33,7 @@ public sealed class DurableTaskTestHost : IAsyncDisposable
3333
/// <param name="workerHost">The worker host.</param>
3434
/// <param name="grpcChannel">The gRPC channel.</param>
3535
/// <param name="client">The durable task client.</param>
36-
public DurableTaskTestHost(IWebHost sidecarHost, IHost workerHost, GrpcChannel grpcChannel, DurableTaskClient client)
36+
public DurableTaskTestHost(IHost sidecarHost, IHost workerHost, GrpcChannel grpcChannel, DurableTaskClient client)
3737
{
3838
this.sidecarHost = sidecarHost;
3939
this.workerHost = workerHost;
@@ -68,32 +68,37 @@ public static async Task<DurableTaskTestHost> StartAsync(
6868
? $"http://localhost:{options.Port.Value}"
6969
: $"http://localhost:{Random.Shared.Next(30000, 40000)}";
7070

71-
var sidecarHost = new WebHostBuilder()
72-
.UseKestrel(kestrelOptions =>
71+
IHost sidecarHost = Host.CreateDefaultBuilder()
72+
.ConfigureWebHostDefaults(webBuilder =>
7373
{
74-
// Configure for HTTP/2 (required for gRPC)
75-
kestrelOptions.ConfigureEndpointDefaults(listenOptions =>
76-
listenOptions.Protocols = HttpProtocols.Http2);
77-
})
78-
.UseUrls(address)
79-
.ConfigureServices(services =>
80-
{
81-
services.AddGrpc();
82-
services.AddSingleton<IOrchestrationService>(orchestrationService);
83-
services.AddSingleton<IOrchestrationServiceClient>(orchestrationService);
84-
services.AddSingleton<TaskHubGrpcServer>();
85-
})
86-
.Configure(app =>
87-
{
88-
app.UseRouting();
89-
app.UseEndpoints(endpoints =>
74+
webBuilder.UseUrls(address);
75+
webBuilder.ConfigureKestrel(kestrelOptions =>
76+
{
77+
// Configure for HTTP/2 (required for gRPC)
78+
kestrelOptions.ConfigureEndpointDefaults(listenOptions =>
79+
listenOptions.Protocols = HttpProtocols.Http2);
80+
});
81+
82+
webBuilder.ConfigureServices(services =>
83+
{
84+
services.AddGrpc();
85+
services.AddSingleton<IOrchestrationService>(orchestrationService);
86+
services.AddSingleton<IOrchestrationServiceClient>(orchestrationService);
87+
services.AddSingleton<TaskHubGrpcServer>();
88+
});
89+
90+
webBuilder.Configure(app =>
9091
{
91-
endpoints.MapGrpcService<TaskHubGrpcServer>();
92+
app.UseRouting();
93+
app.UseEndpoints(endpoints =>
94+
{
95+
endpoints.MapGrpcService<TaskHubGrpcServer>();
96+
});
9297
});
9398
})
9499
.Build();
95100

96-
sidecarHost.Start();
101+
await sidecarHost.StartAsync(cancellationToken);
97102
var grpcChannel = GrpcChannel.ForAddress(address);
98103

99104
// Create worker host with user's orchestrators and activities

src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -715,7 +715,13 @@ public override async Task GetWorkItems(P.GetWorkItemsRequest request, IServerSt
715715
}
716716
}
717717

718-
/// <inheritdoc/>
718+
/// <summary>
719+
/// Streams the instance history for a given orchestration instance to the client in chunked form.
720+
/// </summary>
721+
/// <param name="request">The history request that identifies the instance.</param>
722+
/// <param name="responseStream">The response stream used to write history chunks.</param>
723+
/// <param name="context">The server call context for the streaming operation.</param>
724+
/// <returns>A task that completes when streaming finishes.</returns>
719725
public override async Task StreamInstanceHistory(P.StreamInstanceHistoryRequest request, IServerStreamWriter<P.HistoryChunk> responseStream, ServerCallContext context)
720726
{
721727
if (this.streamingPastEvents.TryGetValue(request.InstanceId, out List<P.HistoryEvent>? pastEvents))

src/Shared/AzureManaged/DurableTaskVersionUtil.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
using System.Diagnostics;
55

6+
#pragma warning disable CS0436
7+
68
namespace Microsoft.DurableTask;
79

810
/// <summary>
@@ -18,7 +20,7 @@ public static class DurableTaskUserAgentUtil
1820
/// <summary>
1921
/// The version of the SDK used in the user agent string.
2022
/// </summary>
21-
static readonly string PackageVersion = FileVersionInfo.GetVersionInfo(typeof(DurableTaskUserAgentUtil).Assembly.Location).FileVersion;
23+
static readonly string PackageVersion = FileVersionInfo.GetVersionInfo(typeof(DurableTaskUserAgentUtil).Assembly.Location).FileVersion ?? "unknown";
2224

2325
/// <summary>
2426
/// Generates the user agent string for the Durable Task SDK based on a fixed name, the package version, and the caller type.
@@ -29,4 +31,5 @@ public static string GetUserAgent(string callerType)
2931
{
3032
return $"{SdkName}/{PackageVersion?.ToString() ?? "unknown"} ({callerType})";
3133
}
32-
}
34+
}
35+
#pragma warning restore CS0436

src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public int CompleteOrchestrationWorkItemChunkSizeInBytes
6262
if (value < MinCompleteOrchestrationWorkItemChunkSizeInBytes ||
6363
value > MaxCompleteOrchestrationWorkItemChunkSizeInBytes)
6464
{
65-
string message = $"CompleteOrchestrationWorkItemChunkSizeInBytes must be between " +
65+
string message = $"{nameof(CompleteOrchestrationWorkItemChunkSizeInBytes)} must be between " +
6666
$"{MinCompleteOrchestrationWorkItemChunkSizeInBytes} bytes (1 MB) and " +
6767
$"{MaxCompleteOrchestrationWorkItemChunkSizeInBytes} bytes (3.9 MB), inclusive.";
6868
throw new ArgumentOutOfRangeException(

0 commit comments

Comments
 (0)