Skip to content

Commit bfe352b

Browse files
authored
Remove DurableClientContext, allow for direct binding of DurableTaskClient (#2372)
1 parent d6b8c76 commit bfe352b

File tree

11 files changed

+306
-184
lines changed

11 files changed

+306
-184
lines changed

src/Worker.Extensions.DurableTask/DefaultDurableClientContext.cs

Lines changed: 0 additions & 124 deletions
This file was deleted.

src/Worker.Extensions.DurableTask/DurableClientContext.cs

Lines changed: 0 additions & 44 deletions
This file was deleted.
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Text.Json;
6+
using System.Threading.Tasks;
7+
using Microsoft.Azure.Functions.Worker.Converters;
8+
using Microsoft.DurableTask.Client;
9+
10+
namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask;
11+
12+
internal sealed partial class DurableTaskClientConverter : IInputConverter
13+
{
14+
private readonly FunctionsDurableClientProvider clientProvider;
15+
16+
// Constructor parameters are optional DI-injected services.
17+
public DurableTaskClientConverter(FunctionsDurableClientProvider clientProvider)
18+
{
19+
this.clientProvider = clientProvider ?? throw new ArgumentNullException(nameof(clientProvider));
20+
}
21+
22+
public ValueTask<ConversionResult> ConvertAsync(ConverterContext context)
23+
{
24+
if (context is null)
25+
{
26+
throw new ArgumentNullException(nameof(context));
27+
}
28+
29+
if (context.TargetType != typeof(DurableTaskClient))
30+
{
31+
return new ValueTask<ConversionResult>(ConversionResult.Unhandled());
32+
}
33+
34+
// The exact format of the expected JSON string data is controlled by the Durable Task WebJobs client binding logic.
35+
// It's never expected to be wrong, but we code defensively just in case.
36+
if (context.Source is not string clientConfigText)
37+
{
38+
return new ValueTask<ConversionResult>(ConversionResult.Failed(new InvalidOperationException(
39+
$"Expected the Durable Task WebJobs SDK extension to send a string payload for {nameof(DurableClientAttribute)}.")));
40+
}
41+
42+
try
43+
{
44+
DurableClientInputData? inputData = JsonSerializer.Deserialize<DurableClientInputData>(clientConfigText);
45+
if (!Uri.TryCreate(inputData?.rpcBaseUrl, UriKind.Absolute, out Uri? endpoint))
46+
{
47+
return new ValueTask<ConversionResult>(ConversionResult.Failed(
48+
new InvalidOperationException("Failed to parse the input binding payload data")));
49+
}
50+
51+
DurableTaskClient client = this.clientProvider.GetClient(endpoint, inputData?.taskHubName, inputData?.connectionName);
52+
client = new FunctionsDurableTaskClient(client, inputData!.requiredQueryStringParameters);
53+
return new ValueTask<ConversionResult>(ConversionResult.Success(client));
54+
}
55+
catch (Exception innerException)
56+
{
57+
InvalidOperationException exception = new(
58+
$"Failed to convert the input binding context data into a {nameof(DurableTaskClient)} object. The data may have been delivered in an invalid format.",
59+
innerException);
60+
return new ValueTask<ConversionResult>(ConversionResult.Failed(exception));
61+
}
62+
}
63+
64+
// Serializer is case-sensitive and incoming JSON properties are camel-cased.
65+
private record DurableClientInputData(string rpcBaseUrl, string taskHubName, string connectionName, string requiredQueryStringParameters);
66+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Net;
6+
using System.Threading;
7+
using Azure.Core.Serialization;
8+
using Microsoft.Azure.Functions.Worker.Http;
9+
using Microsoft.DurableTask.Client;
10+
using Microsoft.Extensions.DependencyInjection;
11+
using Microsoft.Extensions.Options;
12+
13+
namespace Microsoft.Azure.Functions.Worker;
14+
15+
/// <summary>
16+
/// Extensions for <see cref="DurableTaskClient"/>
17+
/// </summary>
18+
public static class DurableTaskClientExtensions
19+
{
20+
/// <summary>
21+
/// Creates an HTTP response that is useful for checking the status of the specified instance.
22+
/// </summary>
23+
/// <param name="client">The <see cref="DurableTaskClient"/>.</param>
24+
/// <param name="request">The HTTP request that this response is for.</param>
25+
/// <param name="instanceId">The ID of the orchestration instance to check.</param>
26+
/// <param name="cancellation">The cancellation token.</param>
27+
/// <returns>An HTTP 202 response with a Location header and a payload containing instance control URLs.</returns>
28+
public static HttpResponseData CreateCheckStatusResponse(
29+
this DurableTaskClient client,
30+
HttpRequestData request,
31+
string instanceId,
32+
CancellationToken cancellation = default)
33+
{
34+
return client.CreateCheckStatusResponse(request, instanceId, HttpStatusCode.Accepted, cancellation);
35+
}
36+
37+
/// <summary>
38+
/// Creates an HTTP response that is useful for checking the status of the specified instance.
39+
/// </summary>
40+
/// <param name="client">The <see cref="DurableTaskClient"/>.</param>
41+
/// <param name="request">The HTTP request that this response is for.</param>
42+
/// <param name="instanceId">The ID of the orchestration instance to check.</param>
43+
/// <param name="statusCode">The status code.</param>
44+
/// <param name="cancellation">The cancellation token.</param>
45+
/// <returns>An HTTP response with a Location header and a payload containing instance control URLs.</returns>
46+
public static HttpResponseData CreateCheckStatusResponse(
47+
this DurableTaskClient client,
48+
HttpRequestData request,
49+
string instanceId,
50+
HttpStatusCode statusCode,
51+
CancellationToken cancellation = default)
52+
{
53+
if (client is null)
54+
{
55+
throw new ArgumentNullException(nameof(client));
56+
}
57+
58+
static string BuildUrl(string url, params string?[] queryValues)
59+
{
60+
bool appended = false;
61+
foreach (string? query in queryValues)
62+
{
63+
if (!string.IsNullOrEmpty(query))
64+
{
65+
url = url + (appended ? "&" : "?") + query;
66+
appended = true;
67+
}
68+
}
69+
70+
return url;
71+
}
72+
73+
// TODO: To better support scenarios involving proxies or application gateways, this
74+
// code should take the X-Forwarded-Host, X-Forwarded-Proto, and Forwarded HTTP
75+
// request headers into consideration and generate the base URL accordingly.
76+
// More info: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Forwarded.
77+
// One potential workaround is to set ASPNETCORE_FORWARDEDHEADERS_ENABLED to true.
78+
string baseUrl = request.Url.GetLeftPart(UriPartial.Authority);
79+
string formattedInstanceId = Uri.EscapeDataString(instanceId);
80+
string instanceUrl = $"{baseUrl}/runtime/webhooks/durabletask/instances/{formattedInstanceId}";
81+
string? commonQueryParameters = GetQueryParams(client);
82+
83+
HttpResponseData response = request.CreateResponse(statusCode);
84+
response.Headers.Add("Location", BuildUrl(instanceUrl, commonQueryParameters));
85+
response.Headers.Add("Content-Type", "application/json");
86+
87+
ObjectSerializer serializer = GetObjectSerializer(response);
88+
var payload = new
89+
{
90+
id = instanceId,
91+
purgeHistoryDeleteUri = BuildUrl(instanceUrl, commonQueryParameters),
92+
sendEventPostUri = BuildUrl($"{instanceUrl}/raiseEvent/{{eventName}}", commonQueryParameters),
93+
statusQueryGetUri = BuildUrl(instanceUrl, commonQueryParameters),
94+
terminatePostUri = BuildUrl($"{instanceUrl}/terminate", "reason={{text}}}", commonQueryParameters),
95+
};
96+
97+
serializer.Serialize(response.Body, payload, payload.GetType(), cancellation);
98+
return response;
99+
}
100+
101+
private static ObjectSerializer GetObjectSerializer(HttpResponseData response)
102+
{
103+
return response.FunctionContext.InstanceServices.GetService<IOptions<WorkerOptions>>()?.Value?.Serializer
104+
?? throw new InvalidOperationException("A serializer is not configured for the worker.");
105+
}
106+
107+
private static string? GetQueryParams(DurableTaskClient client)
108+
{
109+
return client is FunctionsDurableTaskClient functions ? functions.QueryString : null;
110+
}
111+
}

src/Worker.Extensions.DurableTask/DurableTaskExtensionStartup.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,12 @@ public override void Configure(IFunctionsWorkerApplicationBuilder applicationBui
5454
return new DurableTaskShimFactory(options, factory); // For GrpcOrchestrationRunner
5555
});
5656

57-
applicationBuilder.Services.Configure<WorkerOptions>(o => o.InputConverters.Register<OrchestrationInputConverter>());
57+
applicationBuilder.Services.Configure<WorkerOptions>(o =>
58+
{
59+
o.InputConverters.RegisterAt<DurableTaskClientConverter>(0);
60+
o.InputConverters.Register<OrchestrationInputConverter>();
61+
});
62+
5863
applicationBuilder.UseMiddleware<DurableTaskFunctionsMiddleware>();
5964
}
6065

src/Worker.Extensions.DurableTask/FunctionsDurableClientProvider.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ namespace Microsoft.Azure.Functions.Worker.Extensions.DurableTask;
2323
/// <summary>
2424
/// The functions implementation of the durable task client provider.
2525
/// </summary>
26+
/// <remarks>
27+
/// This class does NOT provide <see cref="FunctionsDurableTaskClient" /> is meant as a per-binding wrapper.
28+
/// </remarks>
2629
internal partial class FunctionsDurableClientProvider : IAsyncDisposable
2730
{
2831
private readonly ReaderWriterLockSlim sync = new();
@@ -127,7 +130,7 @@ public DurableTaskClient GetClient(Uri endpoint, string? taskHub, string? connec
127130
};
128131

129132
ILogger logger = this.loggerFactory.CreateLogger<GrpcDurableTaskClient>();
130-
GrpcDurableTaskClient client = new(string.Empty, options, logger);
133+
GrpcDurableTaskClient client = new(taskHub, options, logger);
131134
holder = new(client, channel);
132135
this.clients[key] = holder;
133136
return client;

0 commit comments

Comments
 (0)