1+ using Azure . Core ;
2+ using Microsoft . DurableTask . Client ;
3+ using Microsoft . DurableTask . Worker ;
4+ using System . Diagnostics ;
5+
6+ namespace DurableTask . Extensions . Azure ;
7+
8+ // NOTE: These extension methods will eventually be provided by the Durable Task SDK itself.
9+ public static class DurableTaskSchedulerExtensions
10+ {
11+ // Configure the Durable Task *Worker* to use the Durable Task Scheduler service with the specified options.
12+ public static void UseDurableTaskScheduler (
13+ this IDurableTaskWorkerBuilder builder ,
14+ string endpointAddress ,
15+ string taskHubName ,
16+ TokenCredential credential ,
17+ Action < DurableTaskSchedulerOptions > ? configure = null )
18+ {
19+ DurableTaskSchedulerOptions options = new ( endpointAddress , taskHubName , credential ) ;
20+
21+ configure ? . Invoke ( options ) ;
22+
23+ builder . UseGrpc ( GetGrpcChannelForOptions ( options ) ) ;
24+ }
25+
26+ public static void UseDurableTaskScheduler (
27+ this IDurableTaskWorkerBuilder builder ,
28+ string connectionString ,
29+ Action < DurableTaskSchedulerOptions > ? configure = null )
30+ {
31+ var options = DurableTaskSchedulerOptions . FromConnectionString ( connectionString ) ;
32+ configure ? . Invoke ( options ) ;
33+ builder . UseGrpc ( GetGrpcChannelForOptions ( options ) ) ;
34+ }
35+
36+ // Configure the Durable Task *Client* to use the Durable Task Scheduler service with the specified options.
37+ public static void UseDurableTaskScheduler (
38+ this IDurableTaskClientBuilder builder ,
39+ string endpointAddress ,
40+ string taskHubName ,
41+ TokenCredential credential ,
42+ Action < DurableTaskSchedulerOptions > ? configure = null )
43+ {
44+ DurableTaskSchedulerOptions options = new ( endpointAddress , taskHubName , credential ) ;
45+
46+ configure ? . Invoke ( options ) ;
47+
48+ builder . UseGrpc ( GetGrpcChannelForOptions ( options ) ) ;
49+ }
50+
51+ public static void UseDurableTaskScheduler (
52+ this IDurableTaskClientBuilder builder ,
53+ string connectionString ,
54+ Action < DurableTaskSchedulerOptions > ? configure = null )
55+ {
56+ var options = DurableTaskSchedulerOptions . FromConnectionString ( connectionString ) ;
57+ configure ? . Invoke ( options ) ;
58+ builder . UseGrpc ( GetGrpcChannelForOptions ( options ) ) ;
59+ }
60+
61+ static GrpcChannel GetGrpcChannelForOptions ( DurableTaskSchedulerOptions options )
62+ {
63+ if ( string . IsNullOrEmpty ( options . EndpointAddress ) )
64+ {
65+ throw RequiredOptionMissing ( nameof ( options . TaskHubName ) ) ;
66+ }
67+
68+ if ( string . IsNullOrEmpty ( options . TaskHubName ) )
69+ {
70+ throw RequiredOptionMissing ( nameof ( options . TaskHubName ) ) ;
71+ }
72+
73+ TokenCredential credential = options . Credential ?? throw RequiredOptionMissing ( nameof ( options . Credential ) ) ;
74+
75+ string taskHubName = options . TaskHubName ;
76+ string endpoint = options . EndpointAddress ;
77+
78+ if ( ! endpoint . Contains ( "://" ) )
79+ {
80+ endpoint = $ "https://{ endpoint } ";
81+ }
82+
83+ string resourceId = options . ResourceId ?? "https://durabletask.io" ;
84+ #if NET6_0
85+ int processId = Environment . ProcessId ;
86+ #else
87+ int processId = Process . GetCurrentProcess ( ) . Id ;
88+ #endif
89+ string workerId = options . WorkerId ?? $ "{ Environment . MachineName } ,{ processId } ,{ Guid . NewGuid ( ) } ";
90+
91+ TokenCache ? cache =
92+ options . Credential is not null
93+ ? new (
94+ options . Credential ,
95+ new ( new [ ] { $ "{ options . ResourceId } /.default" } ) ,
96+ TimeSpan . FromMinutes ( 5 ) )
97+ : null ;
98+
99+ CallCredentials managedBackendCreds = CallCredentials . FromInterceptor (
100+ async ( context , metadata ) =>
101+ {
102+ metadata . Add ( "taskhub" , taskHubName ) ;
103+ metadata . Add ( "workerid" , workerId ) ;
104+
105+ if ( cache is null )
106+ {
107+ return ;
108+ }
109+
110+ AccessToken token = await cache . GetTokenAsync ( context . CancellationToken ) ;
111+
112+ metadata . Add ( "Authorization" , $ "Bearer { token . Token } ") ;
113+ } ) ;
114+
115+ #if NET6_0
116+ return GrpcChannel . ForAddress (
117+ endpoint ,
118+ new GrpcChannelOptions
119+ {
120+ Credentials = ChannelCredentials . Create ( ChannelCredentials . SecureSsl , managedBackendCreds ) ,
121+ } ) ;
122+ #else
123+ return new GrpcChannel (
124+ endpoint ,
125+ ChannelCredentials . Create ( ChannelCredentials . SecureSsl , managedBackendCreds ) ) ;
126+ #endif
127+ }
128+
129+ static Exception RequiredOptionMissing ( string optionName )
130+ {
131+ return new ArgumentException ( message : $ "Required option '{ optionName } ' was not provided.") ;
132+ }
133+
134+ sealed class TokenCache ( TokenCredential credential , TokenRequestContext context , TimeSpan margin )
135+ {
136+ readonly TokenCredential credential = credential ;
137+ readonly TokenRequestContext context = context ;
138+ readonly TimeSpan margin = margin ;
139+
140+ AccessToken ? token ;
141+
142+ public async Task < AccessToken > GetTokenAsync ( CancellationToken cancellationToken )
143+ {
144+ DateTimeOffset nowWithMargin = DateTimeOffset . UtcNow . Add ( this . margin ) ;
145+
146+ if ( this . token is null
147+ || this . token . Value . RefreshOn < nowWithMargin
148+ || this . token . Value . ExpiresOn < nowWithMargin )
149+ {
150+ this . token = await this . credential . GetTokenAsync ( this . context , cancellationToken ) ;
151+ }
152+
153+ return this . token . Value ;
154+ }
155+ }
156+ }
0 commit comments