Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>

<PropertyGroup Condition="'$(TargetFramework)' == 'net6.0'">
<SuppressTfmSupportBuildWarnings>true</SuppressTfmSupportBuildWarnings>
<CheckEolTargetFramework>false</CheckEolTargetFramework>
</PropertyGroup>

<!-- Assembly name and namespace -->
<PropertyGroup Condition="!$(MSBuildProjectName.StartsWith('Microsoft.DurableTask'))">
<TopLevelNamespace Condition="'$(TopLevelNamespace)' == ''">Microsoft.DurableTask</TopLevelNamespace>
Expand Down
4 changes: 2 additions & 2 deletions samples/AzureFunctionsUnitTests/SampleUnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ public TestResponse(FunctionContext functionContext) : base(functionContext)
public class TestLogger : ILogger
{
// list of all logs emitted, for validation
public IList<string> CapturedLogs {get; set;} = new List<string>();
public IList<string> CapturedLogs { get; set; } = new List<string>();

public IDisposable BeginScope<TState>(TState state) => Mock.Of<IDisposable>();
public IDisposable BeginScope<TState>(TState state) where TState : notnull => Mock.Of<IDisposable>();

public bool IsEnabled(LogLevel logLevel)
{
Expand Down
2 changes: 1 addition & 1 deletion samples/LargePayloadConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
// Keep threshold small to force externalization for demo purposes
opts.ExternalizeThresholdBytes = 1024; // 1KB
opts.ConnectionString = builder.Configuration.GetValue<string>("DURABLETASK_STORAGE") ?? "UseDevelopmentStorage=true";
opts.ContainerName = builder.Configuration.GetValue<string>("DURABLETASK_PAYLOAD_CONTAINER");
opts.ContainerName = builder.Configuration.GetValue<string>("DURABLETASK_PAYLOAD_CONTAINER") ?? "payloads";
});

// 2) Configure Durable Task client
Expand Down
2 changes: 1 addition & 1 deletion src/Client/Grpc/GrpcDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(

DateTimeOffset? startAt = options?.StartAt;
this.logger.SchedulingOrchestration(
request.InstanceId,
request.InstanceId ?? string.Empty,
orchestratorName,
sizeInBytes: request.Input != null ? Encoding.UTF8.GetByteCount(request.Input) : 0,
startAt.GetValueOrDefault(DateTimeOffset.UtcNow));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public LargePayloadStorageOptions(Uri accountUri, TokenCredential credential)
/// Gets or sets the threshold in bytes at which payloads are externalized. Default is 900_000 bytes.
/// Value must not exceed 1 MiB (1,048,576 bytes).
/// </summary>

public int ExternalizeThresholdBytes
{
get => this.externalizeThresholdBytes;
Expand Down
29 changes: 25 additions & 4 deletions src/Extensions/AzureBlobPayloads/PayloadStore/BlobPayloadStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public override async Task<string> UploadAsync(string payLoad, CancellationToken
// using MemoryStream payloadStream = new(payloadBuffer, writable: false);

// await payloadStream.CopyToAsync(compressedBlobStream, bufferSize: DefaultCopyBufferSize, cancellationToken);
await compressedBlobStream.WriteAsync(payloadBuffer, 0, payloadBuffer.Length, cancellationToken);
await WritePayloadAsync(payloadBuffer, compressedBlobStream, cancellationToken);
await compressedBlobStream.FlushAsync(cancellationToken);
await blobStream.FlushAsync(cancellationToken);
}
Expand All @@ -99,7 +99,7 @@ public override async Task<string> UploadAsync(string payLoad, CancellationToken

// using MemoryStream payloadStream = new(payloadBuffer, writable: false);
// await payloadStream.CopyToAsync(blobStream, bufferSize: DefaultCopyBufferSize, cancellationToken);
await blobStream.WriteAsync(payloadBuffer, 0, payloadBuffer.Length, cancellationToken);
await WritePayloadAsync(payloadBuffer, blobStream, cancellationToken);
await blobStream.FlushAsync(cancellationToken);
}

Expand All @@ -126,11 +126,11 @@ public override async Task<string> DownloadAsync(string token, CancellationToken
{
using GZipStream decompressed = new(contentStream, CompressionMode.Decompress);
using StreamReader reader = new(decompressed, Encoding.UTF8);
return await reader.ReadToEndAsync();
return await ReadToEndAsync(reader, cancellationToken);
}

using StreamReader uncompressedReader = new(contentStream, Encoding.UTF8);
return await uncompressedReader.ReadToEndAsync();
return await ReadToEndAsync(uncompressedReader, cancellationToken);
}

/// <inheritdoc/>
Expand All @@ -144,6 +144,27 @@ public override bool IsKnownPayloadToken(string value)
return value.StartsWith(TokenPrefix, StringComparison.Ordinal);
}

static async Task WritePayloadAsync(byte[] payloadBuffer, Stream target, CancellationToken cancellationToken)
{
#if NETSTANDARD2_0
await target.WriteAsync(payloadBuffer, 0, payloadBuffer.Length, cancellationToken).ConfigureAwait(false);
#else
await target.WriteAsync(payloadBuffer.AsMemory(0, payloadBuffer.Length), cancellationToken).ConfigureAwait(false);
#endif
}

static async Task<string> ReadToEndAsync(StreamReader reader, CancellationToken cancellationToken)
{
#if NETSTANDARD2_0
cancellationToken.ThrowIfCancellationRequested();
return await reader.ReadToEndAsync().ConfigureAwait(false);
#elif NET8_0_OR_GREATER
return await reader.ReadToEndAsync(cancellationToken).ConfigureAwait(false);
#else
return await reader.ReadToEndAsync().WaitAsync(cancellationToken).ConfigureAwait(false);
#endif
}

static string EncodeToken(string container, string name) => $"blob:v1:{container}:{name}";

static (string Container, string Name) DecodeToken(string token)
Expand Down
49 changes: 27 additions & 22 deletions src/InProcessTestHost/DurableTaskTestHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace Microsoft.DurableTask.Testing;
/// </summary>
public sealed class DurableTaskTestHost : IAsyncDisposable
{
readonly IWebHost sidecarHost;
readonly IHost sidecarHost;
readonly IHost workerHost;
readonly GrpcChannel grpcChannel;

Expand All @@ -33,7 +33,7 @@ public sealed class DurableTaskTestHost : IAsyncDisposable
/// <param name="workerHost">The worker host.</param>
/// <param name="grpcChannel">The gRPC channel.</param>
/// <param name="client">The durable task client.</param>
public DurableTaskTestHost(IWebHost sidecarHost, IHost workerHost, GrpcChannel grpcChannel, DurableTaskClient client)
public DurableTaskTestHost(IHost sidecarHost, IHost workerHost, GrpcChannel grpcChannel, DurableTaskClient client)
{
this.sidecarHost = sidecarHost;
this.workerHost = workerHost;
Expand Down Expand Up @@ -68,32 +68,37 @@ public static async Task<DurableTaskTestHost> StartAsync(
? $"http://localhost:{options.Port.Value}"
: $"http://localhost:{Random.Shared.Next(30000, 40000)}";

var sidecarHost = new WebHostBuilder()
.UseKestrel(kestrelOptions =>
IHost sidecarHost = Host.CreateDefaultBuilder()
.ConfigureWebHostDefaults(webBuilder =>
{
// Configure for HTTP/2 (required for gRPC)
kestrelOptions.ConfigureEndpointDefaults(listenOptions =>
listenOptions.Protocols = HttpProtocols.Http2);
})
.UseUrls(address)
.ConfigureServices(services =>
{
services.AddGrpc();
services.AddSingleton<IOrchestrationService>(orchestrationService);
services.AddSingleton<IOrchestrationServiceClient>(orchestrationService);
services.AddSingleton<TaskHubGrpcServer>();
})
.Configure(app =>
{
app.UseRouting();
app.UseEndpoints(endpoints =>
webBuilder.UseUrls(address);
webBuilder.ConfigureKestrel(kestrelOptions =>
{
// Configure for HTTP/2 (required for gRPC)
kestrelOptions.ConfigureEndpointDefaults(listenOptions =>
listenOptions.Protocols = HttpProtocols.Http2);
});

webBuilder.ConfigureServices(services =>
{
services.AddGrpc();
services.AddSingleton<IOrchestrationService>(orchestrationService);
services.AddSingleton<IOrchestrationServiceClient>(orchestrationService);
services.AddSingleton<TaskHubGrpcServer>();
});

webBuilder.Configure(app =>
{
endpoints.MapGrpcService<TaskHubGrpcServer>();
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapGrpcService<TaskHubGrpcServer>();
});
});
})
.Build();

sidecarHost.Start();
await sidecarHost.StartAsync(cancellationToken);
var grpcChannel = GrpcChannel.ForAddress(address);

// Create worker host with user's orchestrators and activities
Expand Down
13 changes: 10 additions & 3 deletions src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -471,20 +471,20 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state,
{
// Get the original orchestration state
IList<OrchestrationState> states = await this.client.GetOrchestrationStateAsync(request.InstanceId, false);

if (states == null || states.Count == 0)
{
throw new RpcException(new Status(StatusCode.NotFound, $"An orchestration with the instanceId {request.InstanceId} was not found."));
}

OrchestrationState state = states[0];

// Check if the state is null
if (state == null)
{
throw new RpcException(new Status(StatusCode.NotFound, $"An orchestration with the instanceId {request.InstanceId} was not found."));
}

string newInstanceId = request.RestartWithNewInstanceId ? Guid.NewGuid().ToString("N") : request.InstanceId;

// Create a new orchestration instance
Expand Down Expand Up @@ -646,6 +646,13 @@ public override async Task GetWorkItems(P.GetWorkItemsRequest request, IServerSt
}
}

/// <summary>
/// Streams the instance history for a given orchestration instance to the client in chunked form.
/// </summary>
/// <param name="request">The history request that identifies the instance.</param>
/// <param name="responseStream">The response stream used to write history chunks.</param>
/// <param name="context">The server call context for the streaming operation.</param>
/// <returns>A task that completes when streaming finishes.</returns>
public override async Task StreamInstanceHistory(P.StreamInstanceHistoryRequest request, IServerStreamWriter<P.HistoryChunk> responseStream, ServerCallContext context)
{
if (this.streamingPastEvents.TryGetValue(request.InstanceId, out List<P.HistoryEvent>? pastEvents))
Expand Down
7 changes: 5 additions & 2 deletions src/Shared/AzureManaged/DurableTaskVersionUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

using System.Diagnostics;

#pragma warning disable CS0436

namespace Microsoft.DurableTask;

/// <summary>
Expand All @@ -18,7 +20,7 @@ public static class DurableTaskUserAgentUtil
/// <summary>
/// The version of the SDK used in the user agent string.
/// </summary>
static readonly string PackageVersion = FileVersionInfo.GetVersionInfo(typeof(DurableTaskUserAgentUtil).Assembly.Location).FileVersion;
static readonly string PackageVersion = FileVersionInfo.GetVersionInfo(typeof(DurableTaskUserAgentUtil).Assembly.Location).FileVersion ?? "unknown";

/// <summary>
/// Generates the user agent string for the Durable Task SDK based on a fixed name, the package version, and the caller type.
Expand All @@ -29,4 +31,5 @@ public static string GetUserAgent(string callerType)
{
return $"{SdkName}/{PackageVersion?.ToString() ?? "unknown"} ({callerType})";
}
}
}
#pragma warning restore CS0436
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void UseDurableTaskScheduler_WithLocalhostConnectionString_ShouldConfigur
[Theory]
[InlineData(null, "testhub")]
[InlineData("myaccount.westus3.durabletask.io", null)]
public void UseDurableTaskScheduler_WithNullParameters_ShouldThrowOptionsValidationException(string endpoint, string taskHub)
public void UseDurableTaskScheduler_WithNullParameters_ShouldThrowOptionsValidationException(string? endpoint, string? taskHub)
{
// Arrange
ServiceCollection services = new ServiceCollection();
Expand All @@ -104,7 +104,7 @@ public void UseDurableTaskScheduler_WithNullParameters_ShouldThrowOptionsValidat
DefaultAzureCredential credential = new DefaultAzureCredential();

// Act
mockBuilder.Object.UseDurableTaskScheduler(endpoint, taskHub, credential);
mockBuilder.Object.UseDurableTaskScheduler(endpoint!, taskHub!, credential);
ServiceProvider provider = services.BuildServiceProvider();

// Assert
Expand Down Expand Up @@ -157,15 +157,15 @@ public void UseDurableTaskScheduler_WithInvalidConnectionString_ShouldThrowArgum
[Theory]
[InlineData("")]
[InlineData(null)]
public void UseDurableTaskScheduler_WithNullOrEmptyConnectionString_ShouldThrowArgumentException(string connectionString)
public void UseDurableTaskScheduler_WithNullOrEmptyConnectionString_ShouldThrowArgumentException(string? connectionString)
{
// Arrange
ServiceCollection services = new ServiceCollection();
Mock<IDurableTaskClientBuilder> mockBuilder = new Mock<IDurableTaskClientBuilder>();
mockBuilder.Setup(b => b.Services).Returns(services);

// Act & Assert
Action action = () => mockBuilder.Object.UseDurableTaskScheduler(connectionString);
Action action = () => mockBuilder.Object.UseDurableTaskScheduler(connectionString!);
action.Should().Throw<ArgumentNullException>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void Ctor_EntitiesConfigured_GetClientSuccess()
[Theory]
[InlineData(false)]
[InlineData(true)]
public async void GetInstanceMetadata_EmptyList_Null(bool isNull)
public async Task GetInstanceMetadata_EmptyList_Null(bool isNull)
{
// arrange
List<Core.OrchestrationState>? states = isNull ? null : new();
Expand Down
Loading
Loading