Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@
<PackageVersion Include="xunit.abstractions" Version="2.0.3" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2"/>
<PackageVersion Include="Xunit.Combinatorial" Version="1.6.24" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="9.0.0" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.InMemory" Version="9.0.0" />
</ItemGroup>

<!-- Base-class library dependencies -->
Expand Down
193 changes: 193 additions & 0 deletions src/InProcessTestHost/DurableTaskTestExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using DurableTask.Core;
using Grpc.Net.Client;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Client.Grpc;
using Microsoft.DurableTask.Testing.Sidecar;
using Microsoft.DurableTask.Testing.Sidecar.Grpc;
using Microsoft.DurableTask.Worker;
using Microsoft.DurableTask.Worker.Grpc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace Microsoft.DurableTask.Testing;

/// <summary>
/// Extension methods for integrating in-memory durable task testing with your existing DI container,
/// such as WebApplicationFactory.
/// </summary>
/// These extensions allow you to inject the <see cref="InMemoryOrchestrationService"/> into your
/// existing test host so that your orchestrations and activities can resolve services from DI container.
public static class DurableTaskTestExtensions
{
/// <summary>
/// These extensions allow you to inject the <see cref="InMemoryOrchestrationService"/> into your
/// existing test host so that your orchestrations and activities can resolve services from DI container.
/// </summary>
/// <param name="services">The service collection (from your WebApplicationFactory or host).</param>
/// <param name="configureTasks">Action to register orchestrators and activities.</param>
/// <param name="options">Optional configuration options.</param>
/// <returns>The service collection for chaining.</returns>
public static IServiceCollection AddInMemoryDurableTask(
this IServiceCollection services,
Action<DurableTaskRegistry> configureTasks,
InMemoryDurableTaskOptions? options = null)
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(configureTasks);

options ??= new InMemoryDurableTaskOptions();

// Determine port for the internal gRPC server
int port = options.Port ?? Random.Shared.Next(30000, 40000);
string address = $"http://localhost:{port}";

// Register the in-memory orchestration service as a singleton
services.AddSingleton<InMemoryOrchestrationService>(sp =>
{
var loggerFactory = sp.GetService<ILoggerFactory>();
return new InMemoryOrchestrationService(loggerFactory);
});
services.AddSingleton<IOrchestrationService>(sp => sp.GetRequiredService<InMemoryOrchestrationService>());
services.AddSingleton<IOrchestrationServiceClient>(sp => sp.GetRequiredService<InMemoryOrchestrationService>());

// Register the gRPC sidecar server as a hosted service
services.AddSingleton<TaskHubGrpcServer>();
services.AddHostedService<InMemoryGrpcSidecarHost>(sp =>
{
return new InMemoryGrpcSidecarHost(
address,
sp.GetRequiredService<InMemoryOrchestrationService>(),
sp.GetService<ILoggerFactory>());
});

// Create a gRPC channel that will connect to our internal sidecar
services.AddSingleton<GrpcChannel>(sp => GrpcChannel.ForAddress(address));

// Register the durable task worker (connects to our internal sidecar)
services.AddDurableTaskWorker(builder =>
{
builder.UseGrpc(address);
builder.AddTasks(configureTasks);
});

// Register the durable task client (connects to our internal sidecar)
services.AddDurableTaskClient(builder =>
{
builder.UseGrpc(address);
builder.RegisterDirectly();
});

return services;
}

/// <summary>
/// Gets the <see cref="InMemoryOrchestrationService"/> from the service provider.
/// Useful for advanced scenarios like inspecting orchestration state.
/// </summary>
/// <param name="services">The service provider.</param>
/// <returns>The in-memory orchestration service instance.</returns>
public static InMemoryOrchestrationService GetInMemoryOrchestrationService(this IServiceProvider services)
{
return services.GetRequiredService<InMemoryOrchestrationService>();
}
}

/// <summary>
/// Options for configuring in-memory durable task support.
/// </summary>
public class InMemoryDurableTaskOptions
{
/// <summary>
/// Gets or sets the port for the internal gRPC server.
/// If not set, a random port between 30000-40000 will be used.
/// </summary>
public int? Port { get; set; }
}

/// <summary>
/// Internal hosted service that runs the gRPC sidecar within the user's host.
/// </summary>
internal class InMemoryGrpcSidecarHost : IHostedService, IAsyncDisposable
{
private readonly string address;
private readonly InMemoryOrchestrationService orchestrationService;
private readonly ILoggerFactory? loggerFactory;
private IHost? sidecarHost;

public InMemoryGrpcSidecarHost(
string address,
InMemoryOrchestrationService orchestrationService,
ILoggerFactory? loggerFactory)
{
this.address = address;
this.orchestrationService = orchestrationService;
this.loggerFactory = loggerFactory;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
// Build and start the gRPC sidecar
this.sidecarHost = Host.CreateDefaultBuilder()
.ConfigureLogging(logging =>
{
logging.ClearProviders();
if (this.loggerFactory != null)
{
logging.Services.AddSingleton(this.loggerFactory);
}
})
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseUrls(this.address);
webBuilder.ConfigureKestrel(kestrelOptions =>
{
kestrelOptions.ConfigureEndpointDefaults(listenOptions =>
listenOptions.Protocols = HttpProtocols.Http2);
});

webBuilder.ConfigureServices(services =>
{
services.AddGrpc();
// Use the SAME orchestration service instance
services.AddSingleton<IOrchestrationService>(this.orchestrationService);
services.AddSingleton<IOrchestrationServiceClient>(this.orchestrationService);
services.AddSingleton<TaskHubGrpcServer>();
});

webBuilder.Configure(app =>
{
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapGrpcService<TaskHubGrpcServer>();
});
});
})
.Build();

await this.sidecarHost.StartAsync(cancellationToken);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
if (this.sidecarHost != null)
{
await this.sidecarHost.StopAsync(cancellationToken);
}
}

public async ValueTask DisposeAsync()
{
if (this.sidecarHost != null)
{
this.sidecarHost.Dispose();
}
}
}
44 changes: 44 additions & 0 deletions src/InProcessTestHost/DurableTaskTestHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ public DurableTaskTestHost(IHost sidecarHost, IHost workerHost, GrpcChannel grpc
/// </summary>
public DurableTaskClient Client { get; }

/// <summary>
/// Gets the service provider from the worker host.
/// Use this to resolve services registered via <see cref="DurableTaskTestHostOptions.ConfigureServices"/>.
/// </summary>
public IServiceProvider Services => this.workerHost.Services;

/// <summary>
/// Starts a new in-process test host with the specified orchestrators and activities.
/// </summary>
Expand Down Expand Up @@ -113,6 +119,10 @@ public static async Task<DurableTaskTestHost> StartAsync(
})
.ConfigureServices(services =>
{
// Allow user to register their own services FIRST
// This ensures their services are available when activities are resolved
options.ConfigureServices?.Invoke(services);

// Register worker that connects to our in-process sidecar
services.AddDurableTaskWorker(builder =>
{
Expand Down Expand Up @@ -170,4 +180,38 @@ public class DurableTaskTestHostOptions
/// Null by default.
/// </summary>
public ILoggerFactory? LoggerFactory { get; set; }

/// <summary>
/// Gets or sets an optional callback to configure additional services in the worker host's DI container.
/// Use this to register services that your activities and orchestrators depend on.
/// </summary>
/// <remarks>
/// <para>
/// This callback is invoked during worker host construction, allowing you to register
/// any services your orchestrations and activities need. Activities can then receive
/// these services via constructor injection.
/// </para>
/// <example>
/// <code>
/// var options = new DurableTaskTestHostOptions
/// {
/// ConfigureServices = services =>
/// {
/// services.AddSingleton&lt;IMyService, MyService&gt;();
/// services.AddScoped&lt;IRepository, Repository&gt;();
/// services.AddHttpClient();
/// }
/// };
///
/// await using var host = await DurableTaskTestHost.StartAsync(
/// registry =>
/// {
/// registry.AddOrchestrator&lt;MyOrchestrator&gt;();
/// registry.AddActivity&lt;MyActivity&gt;(); // Can now inject IMyService, IRepository, etc.
/// },
/// options);
/// </code>
/// </example>
/// </remarks>
public Action<IServiceCollection>? ConfigureServices { get; set; }
}
Loading
Loading