Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .github/workflows/validate-build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Validate Build

on:
push:
branches: [ main ]
paths-ignore: [ '**.md' ]
pull_request:
branches: [ main ]
paths-ignore: [ '**.md' ]

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Setup .NET
uses: actions/setup-dotnet@v1
with:
dotnet-version: 6.0.x
- name: Enable App Service MyGet feed
run: dotnet nuget add source https://www.myget.org/F/azure-appservice/api/v3/index.json --name appservice-myget
- name: Restore dependencies
run: dotnet restore
- name: Build
run: dotnet build --no-restore -p:FileVersionRevision=$GITHUB_RUN_NUMBER
- name: Test
run: dotnet test --no-build --verbosity normal
Binary file modified CHANGELOG.md
Binary file not shown.
1 change: 1 addition & 0 deletions DurableTask.Sdk.sln
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "misc", "misc", "{6E392DF7-D
LICENSE = LICENSE
nuget.config = nuget.config
README.md = README.md
.github\workflows\validate-build.yml = .github\workflows\validate-build.yml
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ConsoleApp", "samples\ConsoleApp\ConsoleApp.csproj", "{F9812EB9-A3CD-4E80-8CEA-243AE2AF925F}"
Expand Down
2 changes: 1 addition & 1 deletion src/DurableTask/DurableTask.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
<PackageReference Include="System.Collections.Immutable" Version="5.0.0" />
<PackageReference Include="Grpc.AspNetCore.Server" Version="2.38.0" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.7.0" />
<PackageReference Include="Microsoft.DurableTask.Sidecar.Protobuf" Version="0.1.0-alpha" />
<PackageReference Include="Microsoft.DurableTask.Sidecar.Protobuf" Version="0.2.0-alpha" />
</ItemGroup>

<ItemGroup>
Expand Down
41 changes: 37 additions & 4 deletions src/DurableTask/Grpc/DurableTaskGrpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class DurableTaskGrpcClient : DurableTaskClient
readonly IConfiguration? configuration;
readonly GrpcChannel sidecarGrpcChannel;
readonly TaskHubSidecarServiceClient sidecarClient;
readonly bool ownsChannel;

bool isDisposed;

Expand All @@ -35,8 +36,19 @@ public class DurableTaskGrpcClient : DurableTaskClient
this.logger = SdkUtils.GetLogger(builder.loggerFactory ?? this.services.GetService<ILoggerFactory>() ?? NullLoggerFactory.Instance);
this.configuration = builder.configuration ?? this.services.GetService<IConfiguration>();

string sidecarAddress = builder.address ?? SdkUtils.GetSidecarAddress(this.configuration);
this.sidecarGrpcChannel = GrpcChannel.ForAddress(sidecarAddress);
if (builder.channel != null)
{
// Use the channel from the builder, which was given to us by the app (thus we don't own it and can't dispose it)
this.sidecarGrpcChannel = builder.channel;
this.ownsChannel = false;
}
else
{
// We have to create our own channel and are responsible for disposing it
this.sidecarGrpcChannel = GrpcChannel.ForAddress(builder.address ?? SdkUtils.GetSidecarAddress(this.configuration));
this.ownsChannel = true;
}

this.sidecarClient = new TaskHubSidecarServiceClient(this.sidecarGrpcChannel);
}

Expand All @@ -48,8 +60,11 @@ public override async ValueTask DisposeAsync()
{
if (!this.isDisposed)
{
await this.sidecarGrpcChannel.ShutdownAsync();
this.sidecarGrpcChannel.Dispose();
if (this.ownsChannel)
{
await this.sidecarGrpcChannel.ShutdownAsync();
this.sidecarGrpcChannel.Dispose();
}

GC.SuppressFinalize(this);
this.isDisposed = true;
Expand Down Expand Up @@ -218,6 +233,7 @@ public sealed class Builder
internal ILoggerFactory? loggerFactory;
internal IDataConverter? dataConverter;
internal IConfiguration? configuration;
internal GrpcChannel? channel;
internal string? address;

public Builder UseLoggerFactory(ILoggerFactory loggerFactory)
Expand All @@ -238,6 +254,23 @@ public Builder UseAddress(string address)
return this;
}

/// <summary>
/// Configures a <see cref="GrpcChannel"/> to use for communicating with the sidecar process.
/// </summary>
/// <remarks>
/// This builder method allows you to provide your own gRPC channel for communicating with the Durable Task
/// sidecar service. Channels provided using this method won't be disposed when the client is disposed.
/// Rather, the caller remains responsible for shutting down the channel after disposing the client.
/// </remarks>
/// <param name="channel">The gRPC channel to use.</param>
/// <returns>Returns this <see cref="Builder"/> instance.</returns>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="channel"/> is <c>null</c>.</exception>
public Builder UseGrpcChannel(GrpcChannel channel)
{
this.channel = channel ?? throw new ArgumentNullException(nameof(channel));
return this;
}

public Builder UseDataConverter(IDataConverter dataConverter)
{
this.dataConverter = dataConverter ?? throw new ArgumentNullException(nameof(dataConverter));
Expand Down
66 changes: 63 additions & 3 deletions src/DurableTask/Grpc/DurableTaskGrpcWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class DurableTaskGrpcWorker : IHostedService, IAsyncDisposable
readonly ILogger logger;
readonly IConfiguration? configuration;
readonly GrpcChannel sidecarGrpcChannel;
readonly bool ownsChannel;
readonly TaskHubSidecarServiceClient sidecarClient;
readonly WorkerContext workerContext;

Expand All @@ -62,8 +63,19 @@ public class DurableTaskGrpcWorker : IHostedService, IAsyncDisposable
this.orchestrators = builder.taskProvider.orchestratorsBuilder.ToImmutable();
this.activities = builder.taskProvider.activitiesBuilder.ToImmutable();

string sidecarAddress = builder.address ?? SdkUtils.GetSidecarAddress(this.configuration);
this.sidecarGrpcChannel = GrpcChannel.ForAddress(sidecarAddress);
if (builder.channel != null)
{
// Use the channel from the builder, which was given to us by the app (thus we don't own it and can't dispose it)
this.sidecarGrpcChannel = builder.channel;
this.ownsChannel = false;
}
else
{
// We have to create our own channel and are responsible for disposing it
this.sidecarGrpcChannel = GrpcChannel.ForAddress(builder.address ?? SdkUtils.GetSidecarAddress(this.configuration));
this.ownsChannel = true;
}

this.sidecarClient = new TaskHubSidecarServiceClient(this.sidecarGrpcChannel);
}

Expand Down Expand Up @@ -156,6 +168,12 @@ async ValueTask IAsyncDisposable.DisposeAsync()
{
}

if (this.ownsChannel)
{
await this.sidecarGrpcChannel.ShutdownAsync();
this.sidecarGrpcChannel.Dispose();
}

GC.SuppressFinalize(this);
}

Expand Down Expand Up @@ -294,10 +312,34 @@ async Task OnRunOrchestratorAsync(P.OrchestratorRequest request)
result.CustomStatus,
result.Actions);

this.logger.SendingOrchestratorResponse(name, response.InstanceId, response.Actions.Count);
this.logger.SendingOrchestratorResponse(
name,
response.InstanceId,
response.Actions.Count,
GetActionsListForLogging(response.Actions));

await this.sidecarClient.CompleteOrchestratorTaskAsync(response);
}

static string GetActionsListForLogging(IReadOnlyList<P.OrchestratorAction> actions)
{
if (actions.Count == 0)
{
return string.Empty;
}
else if (actions.Count == 1)
{
return actions[0].OrchestratorActionTypeCase.ToString();
}
else
{
// Returns something like "ScheduleTask x5, CreateTimer x1,..."
return string.Join(", ", actions
.GroupBy(a => a.OrchestratorActionTypeCase)
.Select(group => $"{group.Key} x{group.Count()}"));
}
}

OrchestratorExecutionResult CreateOrchestrationFailedActionResult(Exception e)
{
return this.CreateOrchestrationFailedActionResult(
Expand Down Expand Up @@ -398,6 +440,7 @@ public sealed class Builder
internal IServiceProvider? services;
internal IConfiguration? configuration;
internal string? address;
internal GrpcChannel? channel;

internal Builder()
{
Expand All @@ -411,6 +454,23 @@ public Builder UseAddress(string address)
return this;
}

/// <summary>
/// Configures a <see cref="GrpcChannel"/> to use for communicating with the sidecar process.
/// </summary>
/// <remarks>
/// This builder method allows you to provide your own gRPC channel for communicating with the Durable Task
/// sidecar service. Channels provided using this method won't be disposed when the worker is disposed.
/// Rather, the caller remains responsible for shutting down the channel after disposing the worker.
/// </remarks>
/// <param name="channel">The gRPC channel to use.</param>
/// <returns>Returns this <see cref="Builder"/> instance.</returns>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="channel"/> is <c>null</c>.</exception>
public Builder UseGrpcChannel(GrpcChannel channel)
{
this.channel = channel ?? throw new ArgumentNullException(nameof(channel));
return this;
}

public Builder UseLoggerFactory(ILoggerFactory loggerFactory)
{
this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
Expand Down
4 changes: 2 additions & 2 deletions src/DurableTask/Logs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ static partial class Logs
[LoggerMessage(EventId = 10, Level = LogLevel.Debug, Message = "{instanceId}: Received request for '{name}' orchestrator.")]
public static partial void ReceivedOrchestratorRequest(this ILogger logger, string name, string instanceId);

[LoggerMessage(EventId = 11, Level = LogLevel.Debug, Message = "{instanceId}: Sending {count} actions for '{name}' orchestrator.")]
public static partial void SendingOrchestratorResponse(this ILogger logger, string name, string instanceId, int count);
[LoggerMessage(EventId = 11, Level = LogLevel.Debug, Message = "{instanceId}: Sending {count} action(s) [{actionsList}] for '{name}' orchestrator.")]
public static partial void SendingOrchestratorResponse(this ILogger logger, string name, string instanceId, int count, string actionsList);

[LoggerMessage(EventId = 12, Level = LogLevel.Warning, Message = "{instanceId}: '{name}' orchestrator failed with an unhandled exception: {details}.")]
public static partial void OrchestratorFailed(this ILogger logger, string name, string instanceId, string details);
Expand Down
6 changes: 5 additions & 1 deletion test/DurableTask.Sdk.Tests/DurableTask.Sdk.Tests.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
Expand All @@ -20,6 +20,10 @@
</PackageReference>
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.DurableTask.Sidecar" Version="0.2.0-alpha" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\DurableTask\DurableTask.csproj" />
</ItemGroup>
Expand Down
80 changes: 80 additions & 0 deletions test/DurableTask.Sdk.Tests/GrpcSidecarFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using DurableTask.Core;
using DurableTask.Grpc;
using DurableTask.Sidecar;
using DurableTask.Sidecar.Grpc;
using Grpc.Net.Client;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace DurableTask.Sdk.Tests;

public class GrpcSidecarFixture : IDisposable
{
const string ListenAddress = "http://127.0.0.1:4002";

readonly IWebHost host;
readonly GrpcChannel sidecarChannel;

public GrpcSidecarFixture()
{
InMemoryOrchestrationService service = new();

this.host = new WebHostBuilder()
.UseKestrel(options =>
{
// Need to force Http2 in Kestrel in unencrypted scenarios
// https://docs.microsoft.com/en-us/aspnet/core/grpc/troubleshoot?view=aspnetcore-3.0
options.ConfigureEndpointDefaults(listenOptions => listenOptions.Protocols = HttpProtocols.Http2);
})
.UseUrls(ListenAddress)
.ConfigureServices(services =>
{
services.AddGrpc();
services.AddSingleton<IOrchestrationService>(service);
services.AddSingleton<IOrchestrationServiceClient>(service);
services.AddSingleton<TaskHubGrpcServer>();
})
.Configure(app =>
{
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapGrpcService<TaskHubGrpcServer>();
});
})
.Build();

this.host.Start();

this.sidecarChannel = GrpcChannel.ForAddress(ListenAddress);
}

public DurableTaskGrpcWorker.Builder GetWorkerBuilder()
{
// The gRPC channel is reused across tests to avoid the overhead of creating new connections (which is very slow)
return DurableTaskGrpcWorker.CreateBuilder().UseGrpcChannel(this.sidecarChannel);

}

public DurableTaskGrpcClient.Builder GetClientBuilder()
{
// The gRPC channel is reused across tests to avoid the overhead of creating new connections (which is very slow)
return DurableTaskGrpcClient.CreateBuilder().UseGrpcChannel(this.sidecarChannel);
}

public void Dispose()
{
this.sidecarChannel.ShutdownAsync().GetAwaiter().GetResult();
this.sidecarChannel.Dispose();

this.host.StopAsync().GetAwaiter().GetResult();
this.host.Dispose();
}
}
13 changes: 9 additions & 4 deletions test/DurableTask.Sdk.Tests/OrchestrationPatterns.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,31 @@
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Grpc;
using DurableTask.Sdk.Tests;
using DurableTask.Sdk.Tests.Logging;
using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;

namespace DurableTask.Tests;

public class OrchestrationPatterns : IDisposable
public class OrchestrationPatterns : IClassFixture<GrpcSidecarFixture>, IDisposable
{
readonly CancellationTokenSource testTimeoutSource = new(Debugger.IsAttached ? TimeSpan.FromMinutes(5) : TimeSpan.FromSeconds(10));
readonly ILoggerFactory loggerFactory;

public OrchestrationPatterns(ITestOutputHelper output)
// Documentation on xunit test fixtures: https://xunit.net/docs/shared-context
readonly GrpcSidecarFixture sidecarFixture;

public OrchestrationPatterns(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture)
{
TestLogProvider logProvider = new(output);
this.loggerFactory = LoggerFactory.Create(builder =>
{
builder.AddProvider(logProvider);
builder.SetMinimumLevel(LogLevel.Debug);
});
this.sidecarFixture = sidecarFixture;
}

/// <summary>
Expand All @@ -48,15 +53,15 @@ void IDisposable.Dispose()
/// </summary>
DurableTaskGrpcWorker.Builder CreateWorkerBuilder()
{
return DurableTaskGrpcWorker.CreateBuilder().UseLoggerFactory(this.loggerFactory);
return this.sidecarFixture.GetWorkerBuilder().UseLoggerFactory(this.loggerFactory);
}

/// <summary>
/// Creates a <see cref="DurableTaskGrpcClient"/> configured to output logs to xunit logging infrastructure.
/// </summary>
DurableTaskClient CreateDurableTaskClient()
{
return DurableTaskGrpcClient.CreateBuilder().UseLoggerFactory(this.loggerFactory).Build();
return this.sidecarFixture.GetClientBuilder().UseLoggerFactory(this.loggerFactory).Build();
}

[Fact]
Expand Down